概述

Apache Flink 运行器可用于使用 Apache Flink 执行 Beam 管道。对于执行,您可以选择集群执行模式(例如 Yarn/Kubernetes/Mesos)或本地嵌入式执行模式,这对于测试管道很有用。

Flink 运行器和 Flink 适用于大规模、连续的作业,并提供

使用 Apache Flink 运行器

了解 Flink 运行器有两种类型很重要

  1. 原始的经典运行器,它只支持 Java(和其他基于 JVM 的语言)
  2. 较新的可移植运行器,它支持 Java/Python/Go

您可能想知道为什么有两个运行器?

Beam 及其运行器最初只支持基于 JVM 的语言(例如 Java/Scala/Kotlin)。Python 和 Go SDK 后来添加。运行器的架构必须进行重大更改才能支持执行用其他语言编写的管道。

如果您的应用程序仅使用 Java,则目前应使用经典运行器。最终,可移植运行器将取代经典运行器,因为它包含用于在将来执行 Java、Python、Go 和更多语言的通用框架。

如果您想在 Flink 上运行带有 Beam 的 Python 管道,您需要使用可移植运行器。有关可移植性的更多信息,请访问 可移植性页面

因此,本指南分为几部分,以记录 Flink 运行器的经典和可移植功能。此外,Python 提供了便利的包装器来处理运行器的完整生命周期,因此根据是否自动管理可移植性组件(推荐)或手动管理可移植性组件,进一步进行了拆分。请使用下面的切换器选择适合运行器的模式

先决条件和设置

如果您想使用 Flink 运行器的本地执行模式,则无需完成任何集群设置。您只需运行您的 Beam 管道。确保将运行器设置为 FlinkRunnerPortableRunner

要使用 Flink 运行器在集群上执行,您必须按照 Flink 快速入门设置 设置 Flink 集群。

依赖项

您必须在 pom.xmlbuild.gradle 中指定您对 Flink 运行器的依赖项。使用来自下方 兼容性表 的 Beam 版本和工件 ID。例如

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-flink-1.18</artifactId>
  <version>2.60.0</version>
</dependency>

您需要在您的执行环境中安装 Docker。要运行嵌入式 Flink 集群或对 Python < 3.6 使用 Flink 运行器,您还需要在您的执行环境中提供 java。

您需要在您的执行环境中安装 Docker。

要在 Flink 集群上执行管道,您需要将您的程序与所有依赖项打包到所谓的胖 JAR 中。如何执行此操作取决于您的构建系统,但是如果您按照 Beam 快速入门 进行操作,则需要运行以下命令

$ mvn package -Pflink-runner

target 文件夹中查找此命令的输出 JAR。

Beam 快速入门 Maven 项目设置为使用 Maven Shade 插件来创建胖 JAR,并且 -Pflink-runner 参数确保包含对 Flink 运行器的依赖项。

要运行管道,最简单的选择是使用 flink 命令,它是 Flink 的一部分

$ bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters

或者,您也可以使用 Maven 的 exec 命令。例如,要执行 WordCount 示例

mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=/path/to/pom.xml \
      --output=/path/to/counts \
      --flinkMaster=<flink master url> \
      --filesToStage=target/word-count-beam-bundled-0.1.jar"

如果您的 Flink JobManager 在本地机器上运行,则可以为 flinkMaster 提供 localhost:8081。否则,将为作业启动一个嵌入式 Flink 集群。

要在 Flink 上运行管道,请将运行器设置为 FlinkRunner,并将 flink_master 设置为 Flink 集群的主机 URL。此外,可选地将 environment_type 设置为 LOOPBACK。例如,在启动 本地 Flink 集群 后,可以运行

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=FlinkRunner",
    "--flink_master=localhost:8081",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

要在嵌入式 Flink 集群上运行,只需省略 flink_master 选项,就会自动启动一个嵌入式 Flink 集群,并在作业结束后关闭。

对于较旧版本的 Python,可选的 flink_version 选项也可能需要。

从 Beam 2.18.0 开始,预构建的 Flink Job Service Docker 镜像可在 Docker Hub 获取:Flink 1.16Flink 1.17Flink 1.18.

要在嵌入式 Flink 集群上运行管道

(1) 启动 JobService 端点:docker run --net=host apache/beam_flink1.10_job_server:latest

JobService 是您将 Beam 管道提交到的中央实例。JobService 将为管道创建 Flink 作业并执行该作业。

(2) 使用 PortableRunner 将 Python 管道提交到上述端点,并将 job_endpoint 设置为 localhost:8099(这是 JobService 的默认地址)。可选地将 environment_type 设置为 LOOPBACK。例如

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

要在单独的 Flink 集群 上运行

(1) 启动一个公开 Rest 接口的 Flink 集群(例如,默认情况下为 localhost:8081)。

(2) 使用 Flink Rest 端点启动 JobService:docker run --net=host apache/beam_flink1.10_job_server:latest --flink-master=localhost:8081

(3) 按上述方式提交管道。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options=options) as p:
    ...

请注意,environment_type=LOOPBACK 仅用于本地测试,在远程集群上将不起作用。有关详细信息,请参见 此处

其他信息和注意事项

监控您的作业

您可以使用 Flink JobManager 仪表板或其 Rest 接口监控正在运行的 Flink 作业。默认情况下,这在 JobManager 节点的端口 8081 上可用。如果您的本地机器上安装了 Flink,则将是 https://127.0.0.1:8081。注意:当您使用 [local] 模式时,将启动一个嵌入式 Flink 集群,它不会提供仪表板。

流式执行

如果您的管道使用无界数据源或接收器,Flink 运行器将自动切换到流式模式。您可以使用 --streaming 标志强制执行流式模式。

注意:当使用无界源且未启用检查点时,运行器将打印警告消息。许多源(如 PubSubIO)依赖于它们的检查点以进行确认,这只能在为 FlinkRunner 启用检查点时完成。要启用检查点,请将 checkpointingIntervalcheckpointing_interval 设置为所需的检查点间隔(以毫秒为单位)。

使用 Flink 运行器执行管道时,您可以设置这些管道选项。

以下 Flink 特定管道选项列表是根据 FlinkPipelineOptions 引用类自动生成的

allowNonRestoredState标志指示是否允许未恢复的状态,如果保存点包含不再是管道一部分的运算符的状态。默认值:false
attachedMode指定管道是附加模式还是分离模式提交默认值:true
autoBalanceWriteFilesShardingEnabled标志指示是否应启用 WriteFiles 变换的自动平衡分片。这可能在流式用例中很有用,其中管道需要将相当多的事件写入文件,通常分为 N 个分片。Flink 上的默认行为是,一些工作器将比其他工作器接收更多分片来处理。这会导致工作器在处理积压和内存使用方面失去平衡。启用此功能将使分片均匀分布在可用工作器之间,从而提高吞吐量和内存使用稳定性。默认值:false
autoWatermarkInterval自动水印发射的间隔(以毫秒为单位)。
checkpointTimeoutMillis检查点在被丢弃之前可以持续的最长时间(以毫秒为单位)。默认值:-1
checkpointingInterval触发正在运行管道的检查点的间隔(以毫秒为单位)。默认值:没有检查点。默认值:-1
checkpointingMode定义一致性保证的检查点模式。默认值:EXACTLY_ONCE
disableMetrics在 Flink 运行器中禁用 Beam 指标默认值:false
enableStableInputDrain允许对包含 RequiresStableInput 运算符的 Flink 管道进行清空操作。请注意,在清空时,如果 DoFn 运算符中存在任何与处理相关的故障,可能会违反 RequiresStableInput 契约。默认值:false
executionModeForBatch批处理管道数据交换的 Flink 模式。参考 {@link org.apache.flink.api.common.ExecutionMode}。如果管道被阻塞,请将其设置为 BATCH_FORCED,请参见 https://issues.apache.org/jira/browse/FLINK-10672默认值:PIPELINED
executionRetryDelay设置两次执行之间的延迟(以毫秒为单位)。值为 {@code -1} 表示应使用默认值。默认值:-1
externalizedCheckpointsEnabled启用或禁用外部检查点。与 CheckpointingInterval 协同工作默认值:false
failOnCheckpointingErrors设置任务在遇到检查点过程中的错误时的预期行为。如果设置为 true,任务将在检查点错误时失败。如果设置为 false,任务只会拒绝检查点并继续运行。默认值:true
fasterCopy删除运算符之间不必要的深度复制。请参见 https://issues.apache.org/jira/browse/BEAM-11146默认值:false
fileInputSplitMaxSizeMB设置从文件系统读取数据时输入拆分的最大大小。0 表示没有最大大小。默认值:0
finishBundleBeforeCheckpointing如果设置,则在检查点运算符状态之前完成当前捆绑并刷新所有输出。默认情况下,立即开始检查点并缓冲任何剩余的捆绑输出作为检查点的一部分。此设置可能会影响检查点对齐。默认值:false
flinkConfDir包含 Flink YAML 配置文件的目录。这些属性将设置为提交到 Flink 的所有作业,并优先于 FLINK_CONF_DIR 中的配置。
flinkMaster要执行管道的 Flink 主节点地址。可以是“host:port”形式或以下特殊值之一:[local]、[collection] 或 [auto]。默认值:[auto]
forceUnalignedCheckpointEnabled强制执行未对齐的检查点,特别是在迭代作业中允许它们。默认值:false
jobCheckIntervalInSecs在分离模式下设置waitUntilFinish方法中的作业检查间隔(以秒为单位),默认值为 5 秒。默认值:5
latencyTrackingInterval从源到接收器发送延迟跟踪标记的间隔(以毫秒为单位)。间隔值 <= 0 会禁用此功能。默认值:0
maxBundleSize捆绑中的最大元素数。对于流式作业,默认值为 1000;对于批处理作业,默认值为 1,000,000。默认值:MaxBundleSizeFactory
maxBundleTimeMills在完成捆绑之前等待的最大时间(以毫秒为单位)。对于流式作业,默认值为 1000;对于批处理作业,默认值为 10,000。默认值:MaxBundleTimeFactory
maxParallelism要使用的管道范围内的最大并行度。最大并行度指定动态缩放的上限以及用于分区状态的键组数。默认值:-1
minPauseBetweenCheckpoints触发下一个检查点之前的最小暂停时间(以毫秒为单位)。默认值:-1
numConcurrentCheckpoints并发检查点的最大数量。默认值为 1(=无并发检查点)。默认值:1
numberOfExecutionRetries设置失败任务重新执行的次数。值为零实际上会禁用容错。值为 -1 表示应使用系统默认值(如配置中定义)。默认值:-1
objectReuse设置重用对象的机制。默认值:false
operatorChaining设置运算符链接的行为。默认值:true
parallelism将操作分配到工作进程时使用的并行度。如果未设置并行度,则使用配置的 Flink 默认值,或使用 1(如果找不到任何默认值)。默认值:-1
reIterableGroupByKeyResult指示 GBK 结果是否需要可重新迭代的标志。可重新迭代的结果意味着对于单个键的所有值都必须适合内存,因为我们目前不支持溢出到磁盘。默认值:false
reportCheckpointDuration如果非空,则在提供的指标命名空间中报告每个 ParDo 阶段的检查点持续时间。
retainExternalizedCheckpointsOnCancellation设置取消时外部化检查点行为。默认值:false
savepointPath保存点恢复路径。如果指定,则从提供的路径恢复流式管道。
shutdownSourcesAfterIdleMs关闭空闲了指定毫秒时间的源。一旦源关闭,检查点就不再可能了。关闭源最终会导致管道关闭(= Flink 作业完成),前提是所有输入都已处理。除非明确设置,否则在启用检查点时,此值默认为 Long.MAX_VALUE;在禁用检查点时,此值默认为 0。有关此问题的进度,请参阅 https://issues.apache.org/jira/browse/FLINK-2491。默认值:-1
stateBackend存储 Beam 状态的 State backend。使用“rocksdb”或“filesystem”。
stateBackendFactory设置在流式模式下使用的 State backend 工厂。默认情况下使用 flink 集群的 state.backend 配置。
stateBackendStoragePath持久化 State backend 数据的 State backend 路径。用于初始化 State backend。
unalignedCheckpointEnabled如果设置,未对齐的检查点将包含正在传输的数据(即存储在缓冲区中的数据)作为检查点状态的一部分,从而允许检查点屏障超过这些缓冲区。因此,检查点持续时间变得与当前吞吐量无关,因为检查点屏障不再有效地嵌入到数据流中。默认值:false
allow_non_restored_state标志指示是否允许未恢复的状态,如果保存点包含不再是管道一部分的运算符的状态。默认值:false
attached_mode指定管道是附加模式还是分离模式提交默认值:true
auto_balance_write_files_sharding_enabled标志指示是否应启用 WriteFiles 变换的自动平衡分片。这可能在流式用例中很有用,其中管道需要将相当多的事件写入文件,通常分为 N 个分片。Flink 上的默认行为是,一些工作器将比其他工作器接收更多分片来处理。这会导致工作器在处理积压和内存使用方面失去平衡。启用此功能将使分片均匀分布在可用工作器之间,从而提高吞吐量和内存使用稳定性。默认值:false
auto_watermark_interval自动水印发射的间隔(以毫秒为单位)。
checkpoint_timeout_millis检查点在被丢弃之前可以持续的最长时间(以毫秒为单位)。默认值:-1
checkpointing_interval触发正在运行管道的检查点的间隔(以毫秒为单位)。默认值:没有检查点。默认值:-1
checkpointing_mode定义一致性保证的检查点模式。默认值:EXACTLY_ONCE
disable_metrics在 Flink 运行器中禁用 Beam 指标默认值:false
enable_stable_input_drain允许对包含 RequiresStableInput 运算符的 Flink 管道进行清空操作。请注意,在清空时,如果 DoFn 运算符中存在任何与处理相关的故障,可能会违反 RequiresStableInput 契约。默认值:false
execution_mode_for_batch批处理管道数据交换的 Flink 模式。参考 {@link org.apache.flink.api.common.ExecutionMode}。如果管道被阻塞,请将其设置为 BATCH_FORCED,请参见 https://issues.apache.org/jira/browse/FLINK-10672默认值:PIPELINED
execution_retry_delay设置两次执行之间的延迟(以毫秒为单位)。值为 {@code -1} 表示应使用默认值。默认值:-1
externalized_checkpoints_enabled启用或禁用外部检查点。与 CheckpointingInterval 协同工作默认值:false
fail_on_checkpointing_errors设置任务在遇到检查点过程中的错误时的预期行为。如果设置为 true,任务将在检查点错误时失败。如果设置为 false,任务只会拒绝检查点并继续运行。默认值:true
faster_copy删除运算符之间不必要的深度复制。请参见 https://issues.apache.org/jira/browse/BEAM-11146默认值:false
file_input_split_max_size_m_b设置从文件系统读取数据时输入拆分的最大大小。0 表示没有最大大小。默认值:0
finish_bundle_before_checkpointing如果设置,则在检查点运算符状态之前完成当前捆绑并刷新所有输出。默认情况下,立即开始检查点并缓冲任何剩余的捆绑输出作为检查点的一部分。此设置可能会影响检查点对齐。默认值:false
flink_conf_dir包含 Flink YAML 配置文件的目录。这些属性将设置为提交到 Flink 的所有作业,并优先于 FLINK_CONF_DIR 中的配置。
flink_master要执行管道的 Flink 主节点地址。可以是“host:port”形式或以下特殊值之一:[local]、[collection] 或 [auto]。默认值:[auto]
force_unaligned_checkpoint_enabled强制执行未对齐的检查点,特别是在迭代作业中允许它们。默认值:false
job_check_interval_in_secs在分离模式下设置waitUntilFinish方法中的作业检查间隔(以秒为单位),默认值为 5 秒。默认值:5
latency_tracking_interval从源到接收器发送延迟跟踪标记的间隔(以毫秒为单位)。间隔值 <= 0 会禁用此功能。默认值:0
max_bundle_size捆绑中的最大元素数。对于流式作业,默认值为 1000;对于批处理作业,默认值为 1,000,000。默认值:MaxBundleSizeFactory
max_bundle_time_mills在完成捆绑之前等待的最大时间(以毫秒为单位)。对于流式作业,默认值为 1000;对于批处理作业,默认值为 10,000。默认值:MaxBundleTimeFactory
max_parallelism要使用的管道范围内的最大并行度。最大并行度指定动态缩放的上限以及用于分区状态的键组数。默认值:-1
min_pause_between_checkpoints触发下一个检查点之前的最小暂停时间(以毫秒为单位)。默认值:-1
num_concurrent_checkpoints并发检查点的最大数量。默认值为 1(=无并发检查点)。默认值:1
number_of_execution_retries设置失败任务重新执行的次数。值为零实际上会禁用容错。值为 -1 表示应使用系统默认值(如配置中定义)。默认值:-1
object_reuse设置重用对象的机制。默认值:false
operator_chaining设置运算符链接的行为。默认值:true
parallelism将操作分配到工作进程时使用的并行度。如果未设置并行度,则使用配置的 Flink 默认值,或使用 1(如果找不到任何默认值)。默认值:-1
re_iterable_group_by_key_result指示 GBK 结果是否需要可重新迭代的标志。可重新迭代的结果意味着对于单个键的所有值都必须适合内存,因为我们目前不支持溢出到磁盘。默认值:false
report_checkpoint_duration如果非空,则在提供的指标命名空间中报告每个 ParDo 阶段的检查点持续时间。
retain_externalized_checkpoints_on_cancellation设置取消时外部化检查点行为。默认值:false
savepoint_path保存点恢复路径。如果指定,则从提供的路径恢复流式管道。
shutdown_sources_after_idle_ms关闭空闲了指定毫秒时间的源。一旦源关闭,检查点就不再可能了。关闭源最终会导致管道关闭(= Flink 作业完成),前提是所有输入都已处理。除非明确设置,否则在启用检查点时,此值默认为 Long.MAX_VALUE;在禁用检查点时,此值默认为 0。有关此问题的进度,请参阅 https://issues.apache.org/jira/browse/FLINK-2491。默认值:-1
state_backend存储 Beam 状态的 State backend。使用“rocksdb”或“filesystem”。
state_backend_factory设置在流式模式下使用的 State backend 工厂。默认情况下使用 flink 集群的 state.backend 配置。
state_backend_storage_path持久化 State backend 数据的 State backend 路径。用于初始化 State backend。
unaligned_checkpoint_enabled如果设置,未对齐的检查点将包含正在传输的数据(即存储在缓冲区中的数据)作为检查点状态的一部分,从而允许检查点屏障超过这些缓冲区。因此,检查点持续时间变得与当前吞吐量无关,因为检查点屏障不再有效地嵌入到数据流中。默认值:false

有关一般 Beam 管道选项,请参阅 PipelineOptions 参考。

Flink 集群版本必须与 FlinkRunner 使用的次版本匹配。次版本是版本字符串中的前两个数字,例如在 1.18.0 中,次版本是 1.18

我们尝试跟踪 Beam 发布时 Apache Flink 的最新版本。Flink 版本在 Flink 社区支持的时间内受 Beam 支持。Flink 社区支持最近两个次版本。当 Flink 版本的支持被取消时,它可能会被弃用并从 Beam 中删除。要了解 Beam 与哪个版本的 Flink 兼容,请参阅下表。

Flink 版本工件 ID支持的 Beam 版本
1.19.xbeam-runners-flink-1.19≥ 2.61.0
1.18.xbeam-runners-flink-1.18≥ 2.57.0
1.17.xbeam-runners-flink-1.17≥ 2.56.0
1.16.xbeam-runners-flink-1.162.47.0 - 2.60.0
1.15.xbeam-runners-flink-1.152.40.0 - 2.60.0
1.14.xbeam-runners-flink-1.142.38.0 - 2.56.0
1.13.xbeam-runners-flink-1.132.31.0 - 2.55.0
1.12.xbeam-runners-flink-1.122.27.0 - 2.55.0
1.11.xbeam-runners-flink-1.112.25.0 - 2.38.0
1.10.xbeam-runners-flink-1.102.21.0 - 2.30.0
1.9.xbeam-runners-flink-1.92.17.0 - 2.29.0
1.8.xbeam-runners-flink-1.82.13.0 - 2.29.0
1.7.xbeam-runners-flink-1.72.10.0 - 2.20.0
1.6.xbeam-runners-flink-1.62.10.0 - 2.16.0
1.5.xbeam-runners-flink_2.112.6.0 - 2.16.0
带有 Scala 2.11 的 1.4.xbeam-runners-flink_2.112.3.0 - 2.5.0
带有 Scala 2.10 的 1.3.xbeam-runners-flink_2.102.1.x - 2.2.0
带有 Scala 2.10 的 1.2.xbeam-runners-flink_2.102.0.0

要获取正确的 Flink 版本,请参阅 Flink 下载页面

有关更多信息,Flink 文档 可能会有帮助。

Beam 功能

Beam 功能矩阵 记录了经典 Flink Runner 的功能。

可移植功能矩阵 记录了可移植 Flink Runner 的功能。