使用 Apache Samza 运行器
Apache Samza 运行器可用于使用 Apache Samza 执行 Beam 管道。Samza 运行器在 Samza 应用程序中执行 Beam 管道,并且可以在本地运行。应用程序可以进一步构建成 .tgz 文件,并部署到带有 Zookeeper 的 YARN 集群或 Samza 独立集群。
Samza 运行器和 Samza 适用于大规模、有状态的流式作业,并提供
- 对本地状态(使用 RocksDB 存储)的一流支持。这允许对高频流式作业进行快速状态访问。
- 容错,支持对状态进行增量检查点而不是完整快照。这使 Samza 能够扩展到具有非常大状态的应用程序。
- 一个完全异步的处理引擎,使远程调用效率更高。
- 灵活的部署模型,用于在任何带有 Zookeeper 的托管环境中运行应用程序。
- 金丝雀、升级和回滚等功能,支持以最少的停机时间进行超大规模部署。
The Beam Capability Matrix documents the currently supported capabilities of the Samza Runner.
Samza 运行器先决条件和设置
Samza 运行器基于 Samza 1.0 以上版本。
指定您的依赖项
您可以通过将以下内容添加到您的 pom.xml
中来指定您对 Samza 运行器的依赖项
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-samza</artifactId>
<version>2.60.0</version>
<scope>runtime</scope>
</dependency>
<!-- Samza dependencies -->
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
使用 Samza 运行器执行管道
如果您在本地运行管道或将其部署到包含所有 jar 文件和资源文件的独立集群,则不需要打包。例如,以下命令运行 WordCount 示例
$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Psamza-runner \
-Dexec.args="--runner=SamzaRunner \
--inputFile=/path/to/input \
--output=/path/to/counts"
要将管道部署到 YARN 集群,请参阅 部署示例 Samza 作业的说明。首先,您需要将应用程序 jar 文件和资源文件打包到一个 .tgz
归档文件中,并使其可供 Yarn 容器下载。在您的配置中,您需要指定此 TGZ 文件位置的 URI
yarn.package.path=${your_job_tgz_URI}
job.name=${your_job_name}
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.coordinator.system=${job_coordinator_system}
job.default.system=${job_default_system}
有关配置的更多详细信息,请参阅 Samza 配置参考。
配置文件将通过设置命令行参数 --configFilePath=/path/to/config.properties
传递。有了它,您就可以在 Yarn 资源管理器中运行 Beam 管道的 main 类,Samza 运行器将在后台提交 Yarn 作业。
查看我们的 GitHub 上的 Samza Beam 示例
Samza 运行器的管道选项
使用 Samza 运行器执行管道时,可以使用以下管道选项。
字段 | 描述 | 默认值 |
---|---|---|
runner | 要使用的管道运行器。此选项允许您在运行时确定管道运行器。 | 设置为 SamzaRunner 以使用 Samza 运行。 |
configFilePath | 使用属性文件的 Samza 配置。 | empty ,即使用本地执行。 |
configFactory | 用于从配置文件路径读取配置文件的工厂。 | PropertiesConfigFactory ,将配置读取为属性文件。 |
configOverride | 以编程方式设置的配置覆盖。 | empty ,即使用配置文件或本地执行。 |
jobInstance | 作业的实例名称。 | 1 |
samzaExecutionEnvironment | Samza 应用程序执行环境。有关更多详细信息,请参阅 SamzaExecutionEnvironment 。 | LOCAL |
watermarkInterval | 检查水印的时间间隔(以毫秒为单位)。 | 1000 |
systemBufferSize | 为给定系统缓冲的最大消息数。 | 5000 |
eventTimerBufferSize | 为 PTransform 在内存中缓冲的事件时间计时器的最大数量。 | 5000 |
maxSourceParallelism | 任何数据源允许的最大并行度。 | 1 |
storeBatchGetSize | 状态存储的批次获取大小限制。 | 10000 |
enableMetrics | 在 Samza 运行器中启用/禁用 Beam 指标。 | true |
stateDurable | 状态持久化的配置。 | false |
maxBundleSize | 捆绑中元素的最大数量。 | 1 (默认情况下,自动捆绑已禁用) |
maxBundleTimeMs | 在完成捆绑之前等待的最大时间(以毫秒为单位)。 | 1000 |
监控您的作业
您可以使用 Beam 和 Samza 发出的指标来监控管道作业,例如 Beam 源指标(例如 elements_read
和 backlog_elements
),以及 Samza 作业指标(例如 job-healthy
和 process-envelopes
)。有关 Samza 指标的完整列表,请参阅 Samza 指标参考。您可以在开发中通过 JMX 查看作业的指标,并将指标发送到图形系统(例如 Graphite)。有关更多详细信息,请参阅 Samza 指标。
对于正在运行的 Samza YARN 作业,您可以使用 YARN Web UI 来监控作业状态并检查日志。
上次更新时间:2024/10/31
您是否找到了您要找的所有内容?
所有内容都有用且清晰吗?您想更改什么?请告诉我们!