概述
Hazelcast Jet 运行器可用于使用 Hazelcast Jet 执行 Beam 管道。
Jet 运行器和 Jet 适用于大规模持续作业,并提供
- 对批处理(有界)和流式处理(无界)数据集的支持
- 同时支持极高吞吐量和低事件延迟的运行时
- 流式处理程序中的自然背压
- 带有内存存储的分布式大规模并行数据处理引擎
需要注意的是,Jet 运行器目前处于实验状态,无法利用 Jet 中存在的许多功能
- Jet 具有完整的容错支持,Jet 运行器没有;如果作业失败,则必须重新启动
- Jet 的内部性能非常高。目前运行器无法与之匹配,因为 Beam 管道优化/手术尚未完全实现。
The Beam Capability Matrix documents the supported capabilities of the Jet Runner.
使用 Hazelcast Jet 运行器运行 WordCount
生成 Beam 示例项目
只需按照 Java 快速入门页面 中的说明操作即可。
在本地 Jet 集群上运行 WordCount
在 Beam 示例项目中执行以下命令以启动新的 Jet 集群并在其上运行 WordCount 示例。
$ mvn package exec:java \
-DskipTests \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="\
--runner=JetRunner \
--jetLocalMode=3 \
--inputFile=pom.xml \
--output=counts" \
-Pjet-runner
在远程 Jet 集群上运行 WordCount
Beam 示例项目(从原型生成时)来自特定发布的 Beam 版本(这就是 archetypeVersion
属性的含义)。每个包含 Jet 运行器的 Beam 版本(即从 2.14.0 开始)都使用特定版本的 Jet。因此,当我们启动独立的 Jet 集群并尝试在其上运行 Beam 示例时,我们需要确保两者兼容。请参见下表了解针对不同 Beam 版本推荐的 Jet 版本。
Beam 版本 | 兼容的 Jet 版本 |
---|---|
2.20.0 或更高版本 | 4.x |
2.14.0 - 2.19.0 | 3.x |
2.13.0 或更低版本 | N/A |
从 Hazelcast Jet 网站 下载与您使用的 Beam 版本兼容的最新 Hazelcast Jet 版本。
- Hazelcast Jet 3.x
- Hazelcast Jet 4.x
下载完成后,您需要启动 Jet 集群。最简单的方法是使用下载的 Jet 发行版附带的 jet-start
脚本启动 Jet 集群成员。成员使用 自动发现功能 自动发现功能 来形成集群。我们启动一个由两个成员组成的集群
检查集群是否已启动并运行
您应该看到类似以下内容
将目录更改为 Beam Examples 项目,并执行以下命令以将您的管道提交到远程 Jet 集群并执行它。请确保将输入文件(包含要计算的词语的文件)分发到运行集群的所有机器上。否则,词语计数作业将无法读取数据。
$ mvn package exec:java \
-DskipTests \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="\
--runner=JetRunner \
--jetServers=192.168.0.117:5701,192.168.0.117:5702 \
--codeJarPathname=target/word-count-beam-bundled-0.1.jar \
--inputFile=<INPUT_FILE_AVAILABLE_ON_ALL_CLUSTER_MEMBERS> \
--output=/tmp/counts" \
-Pjet-runner
Jet 运行器的管道选项
字段 | 描述 | 默认值 |
---|---|---|
runner | 要使用的管道运行器。此选项允许您在运行时确定管道运行器。 | 设置为 JetRunner 以使用 Jet 运行。 |
jetGroupNamejetClusterName | 要加入的 Hazelcast 组的名称,本质上是运行器将使用的 Jet 集群的 ID。使用组可以创建多个集群,每个集群都有自己的组,并且不会干扰其他集群。 运行器将使用的 Hazelcast 集群的名称。 | jet |
jetServers | Jet 集群成员地址列表,当运行器不启动自己的 Jet 集群,而是使用外部独立启动的集群时需要使用。采用逗号分隔的 ip/hostname-port 对列表形式,例如:192.168.0.117:5701,192.168.0.117:5702 | 127.0.0.1:5701 |
codeJarPathname | 也是仅在使用外部 Jet 集群时才需要的属性,指定包含所有需要在集群上运行的代码(因此至少包含管道和运行器代码)的胖 jar 的位置。该值是 new java.io.File() 作为参数接受的任何字符串。 | 没有默认值。 |
jetLocalMode | 运行器应在本地启动的 Jet 集群成员数。如果为 0 ,则运行器将使用外部集群。如果大于该值,则运行器将使用由自身启动的集群。 | 0 |
jetDefaultParallelism | Jet 成员的本地并行度,即将在每个 Jet 集群成员上创建的 DAG 的每个顶点的处理器数量。 | 2 |
jetProcessorsCooperative | 布尔标志,指定是否允许 DoFns 的 Jet 处理器协作(即使用绿色线程而不是专用的操作系统线程)。如果设置为 true,则所有此类处理器都将协作,除非它们没有输出(因此被认为是同步)。 | false |
上次更新于 2024/10/31
您是否找到了您想要的一切?
所有内容是否有用且清晰?您想更改任何内容吗?请告诉我们!