概述

Hazelcast Jet 运行器可用于使用 Hazelcast Jet 执行 Beam 管道。

Jet 运行器和 Jet 适用于大规模持续作业,并提供

需要注意的是,Jet 运行器目前处于实验状态,无法利用 Jet 中存在的许多功能

The Beam Capability Matrix documents the supported capabilities of the Jet Runner.

使用 Hazelcast Jet 运行器运行 WordCount

生成 Beam 示例项目

只需按照 Java 快速入门页面 中的说明操作即可。

在本地 Jet 集群上运行 WordCount

在 Beam 示例项目中执行以下命令以启动新的 Jet 集群并在其上运行 WordCount 示例。

    $ mvn package exec:java \
        -DskipTests \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="\
            --runner=JetRunner \
            --jetLocalMode=3 \
            --inputFile=pom.xml \
            --output=counts" \
        -Pjet-runner

在远程 Jet 集群上运行 WordCount

Beam 示例项目(从原型生成时)来自特定发布的 Beam 版本(这就是 archetypeVersion 属性的含义)。每个包含 Jet 运行器的 Beam 版本(即从 2.14.0 开始)都使用特定版本的 Jet。因此,当我们启动独立的 Jet 集群并尝试在其上运行 Beam 示例时,我们需要确保两者兼容。请参见下表了解针对不同 Beam 版本推荐的 Jet 版本。

Beam 版本兼容的 Jet 版本
2.20.0 或更高版本4.x
2.14.0 - 2.19.03.x
2.13.0 或更低版本N/A

Hazelcast Jet 网站 下载与您使用的 Beam 版本兼容的最新 Hazelcast Jet 版本。

下载完成后,您需要启动 Jet 集群。最简单的方法是使用下载的 Jet 发行版附带的 jet-start 脚本启动 Jet 集群成员。成员使用 自动发现功能 自动发现功能 来形成集群。我们启动一个由两个成员组成的集群

$ cd hazelcast-jet
$ bin/jet-start.sh &
$ bin/jet-start.sh &
$ cd hazelcast-jet
$ bin/jet-start &
$ bin/jet-start &

检查集群是否已启动并运行

$ bin/jet.sh cluster
$ bin/jet cluster

您应该看到类似以下内容

State: ACTIVE
Version: 3.0
Size: 2

ADDRESS                  UUID
[192.168.0.117]:5701     76bea7ba-f032-4c25-ad04-bdef6782f481
[192.168.0.117]:5702     03ecfaa2-be16-41b6-b5cf-eea584d7fb86
State: ACTIVE
Version: 4.0
Size: 2

ADDRESS                  UUID
[192.168.0.117]:5701     b9937bba-32aa-48ba-8e32-423aafed763b
[192.168.0.117]:5702     dfeadfb2-3ba5-4d1c-95e7-71a1a3ca4937

将目录更改为 Beam Examples 项目,并执行以下命令以将您的管道提交到远程 Jet 集群并执行它。请确保将输入文件(包含要计算的词语的文件)分发到运行集群的所有机器上。否则,词语计数作业将无法读取数据。

    $ mvn package exec:java \
        -DskipTests \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="\
            --runner=JetRunner \
            --jetServers=192.168.0.117:5701,192.168.0.117:5702 \
            --codeJarPathname=target/word-count-beam-bundled-0.1.jar \
            --inputFile=<INPUT_FILE_AVAILABLE_ON_ALL_CLUSTER_MEMBERS> \
            --output=/tmp/counts" \
        -Pjet-runner

Jet 运行器的管道选项

字段描述默认值
runner要使用的管道运行器。此选项允许您在运行时确定管道运行器。设置为 JetRunner 以使用 Jet 运行。
jetGroupNamejetClusterName要加入的 Hazelcast 组的名称,本质上是运行器将使用的 Jet 集群的 ID。使用组可以创建多个集群,每个集群都有自己的组,并且不会干扰其他集群。 运行器将使用的 Hazelcast 集群的名称。jet
jetServersJet 集群成员地址列表,当运行器不启动自己的 Jet 集群,而是使用外部独立启动的集群时需要使用。采用逗号分隔的 ip/hostname-port 对列表形式,例如:192.168.0.117:5701,192.168.0.117:5702127.0.0.1:5701
codeJarPathname也是仅在使用外部 Jet 集群时才需要的属性,指定包含所有需要在集群上运行的代码(因此至少包含管道和运行器代码)的胖 jar 的位置。该值是 new java.io.File() 作为参数接受的任何字符串。没有默认值。
jetLocalMode运行器应在本地启动的 Jet 集群成员数。如果为 0,则运行器将使用外部集群。如果大于该值,则运行器将使用由自身启动的集群。0
jetDefaultParallelismJet 成员的本地并行度,即将在每个 Jet 集群成员上创建的 DAG 的每个顶点的处理器数量。2
jetProcessorsCooperative布尔标志,指定是否允许 DoFns 的 Jet 处理器协作(即使用绿色线程而不是专用的操作系统线程)。如果设置为 true,则所有此类处理器都将协作,除非它们没有输出(因此被认为是同步)。false