使用 Apache Samza 运行器

Apache Samza 运行器可用于使用 Apache Samza 执行 Beam 管道。Samza 运行器在 Samza 应用程序中执行 Beam 管道,并且可以在本地运行。应用程序可以进一步构建成 .tgz 文件,并部署到带有 Zookeeper 的 YARN 集群或 Samza 独立集群。

Samza 运行器和 Samza 适用于大规模、有状态的流式作业,并提供

The Beam Capability Matrix documents the currently supported capabilities of the Samza Runner.

Samza 运行器先决条件和设置

Samza 运行器基于 Samza 1.0 以上版本。

指定您的依赖项

您可以通过将以下内容添加到您的 pom.xml 中来指定您对 Samza 运行器的依赖项

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-samza</artifactId>
  <version>2.60.0</version>
  <scope>runtime</scope>
</dependency>

<!-- Samza dependencies -->
<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-api</artifactId>
  <version>${samza.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-core_2.11</artifactId>
  <version>${samza.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kafka_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kv_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kv-rocksdb_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

使用 Samza 运行器执行管道

如果您在本地运行管道或将其部署到包含所有 jar 文件和资源文件的独立集群,则不需要打包。例如,以下命令运行 WordCount 示例

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Psamza-runner \
    -Dexec.args="--runner=SamzaRunner \
      --inputFile=/path/to/input \
      --output=/path/to/counts"

要将管道部署到 YARN 集群,请参阅 部署示例 Samza 作业的说明。首先,您需要将应用程序 jar 文件和资源文件打包到一个 .tgz 归档文件中,并使其可供 Yarn 容器下载。在您的配置中,您需要指定此 TGZ 文件位置的 URI

yarn.package.path=${your_job_tgz_URI}

job.name=${your_job_name}
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.coordinator.system=${job_coordinator_system}
job.default.system=${job_default_system}

有关配置的更多详细信息,请参阅 Samza 配置参考

配置文件将通过设置命令行参数 --configFilePath=/path/to/config.properties 传递。有了它,您就可以在 Yarn 资源管理器中运行 Beam 管道的 main 类,Samza 运行器将在后台提交 Yarn 作业。

查看我们的 GitHub 上的 Samza Beam 示例

Samza 运行器的管道选项

使用 Samza 运行器执行管道时,可以使用以下管道选项。

字段描述默认值
runner要使用的管道运行器。此选项允许您在运行时确定管道运行器。设置为 SamzaRunner 以使用 Samza 运行。
configFilePath使用属性文件的 Samza 配置。empty,即使用本地执行。
configFactory用于从配置文件路径读取配置文件的工厂。PropertiesConfigFactory,将配置读取为属性文件。
configOverride以编程方式设置的配置覆盖。empty,即使用配置文件或本地执行。
jobInstance作业的实例名称。1
samzaExecutionEnvironmentSamza 应用程序执行环境。有关更多详细信息,请参阅 SamzaExecutionEnvironmentLOCAL
watermarkInterval检查水印的时间间隔(以毫秒为单位)。1000
systemBufferSize为给定系统缓冲的最大消息数。5000
eventTimerBufferSize为 PTransform 在内存中缓冲的事件时间计时器的最大数量。5000
maxSourceParallelism任何数据源允许的最大并行度。1
storeBatchGetSize状态存储的批次获取大小限制。10000
enableMetrics在 Samza 运行器中启用/禁用 Beam 指标。true
stateDurable状态持久化的配置。false
maxBundleSize捆绑中元素的最大数量。1(默认情况下,自动捆绑已禁用)
maxBundleTimeMs在完成捆绑之前等待的最大时间(以毫秒为单位)。1000

监控您的作业

您可以使用 Beam 和 Samza 发出的指标来监控管道作业,例如 Beam 源指标(例如 elements_readbacklog_elements),以及 Samza 作业指标(例如 job-healthyprocess-envelopes)。有关 Samza 指标的完整列表,请参阅 Samza 指标参考。您可以在开发中通过 JMX 查看作业的指标,并将指标发送到图形系统(例如 Graphite)。有关更多详细信息,请参阅 Samza 指标

对于正在运行的 Samza YARN 作业,您可以使用 YARN Web UI 来监控作业状态并检查日志。