概述
Apache Flink 运行器可用于使用 Apache Flink 执行 Beam 管道。对于执行,您可以选择集群执行模式(例如 Yarn/Kubernetes/Mesos)或本地嵌入式执行模式,这对于测试管道很有用。
Flink 运行器和 Flink 适用于大规模、连续的作业,并提供
- 一个流优先的运行时,支持批处理和数据流式程序
- 一个支持非常高吞吐量和低事件延迟的运行时
- 具有完全一次处理保证的容错性
- 流式程序中的自然背压
- 用于在内存中和内存外数据处理算法之间进行高效且健壮的切换的自定义内存管理
- 与 YARN 和 Apache Hadoop 生态系统的其他组件集成
使用 Apache Flink 运行器
了解 Flink 运行器有两种类型很重要
- 原始的经典运行器,它只支持 Java(和其他基于 JVM 的语言)
- 较新的可移植运行器,它支持 Java/Python/Go
您可能想知道为什么有两个运行器?
Beam 及其运行器最初只支持基于 JVM 的语言(例如 Java/Scala/Kotlin)。Python 和 Go SDK 后来添加。运行器的架构必须进行重大更改才能支持执行用其他语言编写的管道。
如果您的应用程序仅使用 Java,则目前应使用经典运行器。最终,可移植运行器将取代经典运行器,因为它包含用于在将来执行 Java、Python、Go 和更多语言的通用框架。
如果您想在 Flink 上运行带有 Beam 的 Python 管道,您需要使用可移植运行器。有关可移植性的更多信息,请访问 可移植性页面。
因此,本指南分为几部分,以记录 Flink 运行器的经典和可移植功能。此外,Python 提供了便利的包装器来处理运行器的完整生命周期,因此根据是否自动管理可移植性组件(推荐)或手动管理可移植性组件,进一步进行了拆分。请使用下面的切换器选择适合运行器的模式
- 经典 (Java)
- 可移植 (Python)
- 可移植 (Java/Python/Go)
先决条件和设置
如果您想使用 Flink 运行器的本地执行模式,则无需完成任何集群设置。您只需运行您的 Beam 管道。确保将运行器设置为 FlinkRunnerPortableRunner。
要使用 Flink 运行器在集群上执行,您必须按照 Flink 快速入门设置 设置 Flink 集群。
依赖项
您必须在 pom.xml 或 build.gradle 中指定您对 Flink 运行器的依赖项。使用来自下方 兼容性表 的 Beam 版本和工件 ID。例如
您需要在您的执行环境中安装 Docker。要运行嵌入式 Flink 集群或对 Python < 3.6 使用 Flink 运行器,您还需要在您的执行环境中提供 java。
您需要在您的执行环境中安装 Docker。
在 Flink 集群上执行 Beam 管道
要在 Flink 集群上执行管道,您需要将您的程序与所有依赖项打包到所谓的胖 JAR 中。如何执行此操作取决于您的构建系统,但是如果您按照 Beam 快速入门 进行操作,则需要运行以下命令
在 target 文件夹中查找此命令的输出 JAR。
Beam 快速入门 Maven 项目设置为使用 Maven Shade 插件来创建胖 JAR,并且 -Pflink-runner 参数确保包含对 Flink 运行器的依赖项。
要运行管道,最简单的选择是使用 flink 命令,它是 Flink 的一部分
$ bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters
或者,您也可以使用 Maven 的 exec 命令。例如,要执行 WordCount 示例
如果您的 Flink JobManager 在本地机器上运行,则可以为 flinkMaster 提供 localhost:8081。否则,将为作业启动一个嵌入式 Flink 集群。
要在 Flink 上运行管道,请将运行器设置为 FlinkRunner,并将 flink_master 设置为 Flink 集群的主机 URL。此外,可选地将 environment_type 设置为 LOOPBACK。例如,在启动 本地 Flink 集群 后,可以运行
要在嵌入式 Flink 集群上运行,只需省略 flink_master 选项,就会自动启动一个嵌入式 Flink 集群,并在作业结束后关闭。
对于较旧版本的 Python,可选的 flink_version 选项也可能需要。
从 Beam 2.18.0 开始,预构建的 Flink Job Service Docker 镜像可在 Docker Hub 获取:Flink 1.16。 Flink 1.17。 Flink 1.18.
要在嵌入式 Flink 集群上运行管道
(1) 启动 JobService 端点:docker run --net=host apache/beam_flink1.10_job_server:latest
JobService 是您将 Beam 管道提交到的中央实例。JobService 将为管道创建 Flink 作业并执行该作业。
(2) 使用 PortableRunner 将 Python 管道提交到上述端点,并将 job_endpoint 设置为 localhost:8099(这是 JobService 的默认地址)。可选地将 environment_type 设置为 LOOPBACK。例如
要在单独的 Flink 集群 上运行
(1) 启动一个公开 Rest 接口的 Flink 集群(例如,默认情况下为 localhost:8081)。
(2) 使用 Flink Rest 端点启动 JobService:docker run --net=host apache/beam_flink1.10_job_server:latest --flink-master=localhost:8081。
(3) 按上述方式提交管道。
请注意,environment_type=LOOPBACK 仅用于本地测试,在远程集群上将不起作用。有关详细信息,请参见 此处。
其他信息和注意事项
监控您的作业
您可以使用 Flink JobManager 仪表板或其 Rest 接口监控正在运行的 Flink 作业。默认情况下,这在 JobManager 节点的端口 8081 上可用。如果您的本地机器上安装了 Flink,则将是 https://:8081。注意:当您使用 [local] 模式时,将启动一个嵌入式 Flink 集群,它不会提供仪表板。
流式执行
如果您的管道使用无界数据源或接收器,Flink 运行器将自动切换到流式模式。您可以使用 --streaming 标志强制执行流式模式。
注意:当使用无界源且未启用检查点时,运行器将打印警告消息。许多源(如 PubSubIO)依赖于它们的检查点以进行确认,这只能在为 FlinkRunner 启用检查点时完成。要启用检查点,请将 checkpointingIntervalcheckpointing_interval 设置为所需的检查点间隔(以毫秒为单位)。
Flink 运行器的管道选项
使用 Flink 运行器执行管道时,您可以设置这些管道选项。
以下 Flink 特定管道选项列表是根据 FlinkPipelineOptions 引用类自动生成的
allowNonRestoredState | 标志指示是否允许未恢复的状态,如果保存点包含不再是管道一部分的运算符的状态。 | 默认值:false |
attachedMode | 指定管道是附加模式还是分离模式提交 | 默认值:true |
autoBalanceWriteFilesShardingEnabled | 标志指示是否应启用 WriteFiles 变换的自动平衡分片。这可能在流式用例中很有用,其中管道需要将相当多的事件写入文件,通常分为 N 个分片。Flink 上的默认行为是,一些工作器将比其他工作器接收更多分片来处理。这会导致工作器在处理积压和内存使用方面失去平衡。启用此功能将使分片均匀分布在可用工作器之间,从而提高吞吐量和内存使用稳定性。 | 默认值:false |
autoWatermarkInterval | 自动水印发射的间隔(以毫秒为单位)。 | |
checkpointTimeoutMillis | 检查点在被丢弃之前可以持续的最长时间(以毫秒为单位)。 | 默认值:-1 |
checkpointingInterval | 触发正在运行管道的检查点的间隔(以毫秒为单位)。默认值:没有检查点。 | 默认值:-1 |
checkpointingMode | 定义一致性保证的检查点模式。 | 默认值:EXACTLY_ONCE |
disableMetrics | 在 Flink 运行器中禁用 Beam 指标 | 默认值:false |
enableStableInputDrain | 允许对包含 RequiresStableInput 运算符的 Flink 管道进行清空操作。请注意,在清空时,如果 DoFn 运算符中存在任何与处理相关的故障,可能会违反 RequiresStableInput 契约。 | 默认值:false |
executionModeForBatch | 批处理管道数据交换的 Flink 模式。参考 {@link org.apache.flink.api.common.ExecutionMode}。如果管道被阻塞,请将其设置为 BATCH_FORCED,请参见 https://issues.apache.org/jira/browse/FLINK-10672 | 默认值:PIPELINED |
executionRetryDelay | 设置两次执行之间的延迟(以毫秒为单位)。值为 {@code -1} 表示应使用默认值。 | 默认值:-1 |
externalizedCheckpointsEnabled | 启用或禁用外部检查点。与 CheckpointingInterval 协同工作 | 默认值:false |
failOnCheckpointingErrors | 设置任务在遇到检查点过程中的错误时的预期行为。如果设置为 true,任务将在检查点错误时失败。如果设置为 false,任务只会拒绝检查点并继续运行。 | 默认值:true |
fasterCopy | 删除运算符之间不必要的深度复制。请参见 https://issues.apache.org/jira/browse/BEAM-11146 | 默认值:false |
fileInputSplitMaxSizeMB | 设置从文件系统读取数据时输入拆分的最大大小。0 表示没有最大大小。 | 默认值:0 |
finishBundleBeforeCheckpointing | 如果设置,则在检查点运算符状态之前完成当前捆绑并刷新所有输出。默认情况下,立即开始检查点并缓冲任何剩余的捆绑输出作为检查点的一部分。此设置可能会影响检查点对齐。 | 默认值:false |
flinkConfDir | 包含 Flink YAML 配置文件的目录。这些属性将设置为提交到 Flink 的所有作业,并优先于 FLINK_CONF_DIR 中的配置。 | |
flinkMaster | 要执行管道的 Flink 主节点地址。可以是“host:port”形式或以下特殊值之一:[local]、[collection] 或 [auto]。 | 默认值:[auto] |
forceUnalignedCheckpointEnabled | 强制执行未对齐的检查点,特别是在迭代作业中允许它们。 | 默认值:false |
jobCheckIntervalInSecs | 在分离模式下设置waitUntilFinish方法中的作业检查间隔(以秒为单位),默认值为 5 秒。 | 默认值:5 |
latencyTrackingInterval | 从源到接收器发送延迟跟踪标记的间隔(以毫秒为单位)。间隔值 <= 0 会禁用此功能。 | 默认值:0 |
maxBundleSize | 捆绑中的最大元素数。对于流式作业,默认值为 1000;对于批处理作业,默认值为 1,000,000。 | 默认值:MaxBundleSizeFactory |
maxBundleTimeMills | 在完成捆绑之前等待的最大时间(以毫秒为单位)。对于流式作业,默认值为 1000;对于批处理作业,默认值为 10,000。 | 默认值:MaxBundleTimeFactory |
maxParallelism | 要使用的管道范围内的最大并行度。最大并行度指定动态缩放的上限以及用于分区状态的键组数。 | 默认值:-1 |
minPauseBetweenCheckpoints | 触发下一个检查点之前的最小暂停时间(以毫秒为单位)。 | 默认值:-1 |
numConcurrentCheckpoints | 并发检查点的最大数量。默认值为 1(=无并发检查点)。 | 默认值:1 |
numberOfExecutionRetries | 设置失败任务重新执行的次数。值为零实际上会禁用容错。值为 -1 表示应使用系统默认值(如配置中定义)。 | 默认值:-1 |
objectReuse | 设置重用对象的机制。 | 默认值:false |
operatorChaining | 设置运算符链接的行为。 | 默认值:true |
parallelism | 将操作分配到工作进程时使用的并行度。如果未设置并行度,则使用配置的 Flink 默认值,或使用 1(如果找不到任何默认值)。 | 默认值:-1 |
reIterableGroupByKeyResult | 指示 GBK 结果是否需要可重新迭代的标志。可重新迭代的结果意味着对于单个键的所有值都必须适合内存,因为我们目前不支持溢出到磁盘。 | 默认值:false |
reportCheckpointDuration | 如果非空,则在提供的指标命名空间中报告每个 ParDo 阶段的检查点持续时间。 | |
retainExternalizedCheckpointsOnCancellation | 设置取消时外部化检查点行为。 | 默认值:false |
savepointPath | 保存点恢复路径。如果指定,则从提供的路径恢复流式管道。 | |
shutdownSourcesAfterIdleMs | 关闭空闲了指定毫秒时间的源。一旦源关闭,检查点就不再可能了。关闭源最终会导致管道关闭(= Flink 作业完成),前提是所有输入都已处理。除非明确设置,否则在启用检查点时,此值默认为 Long.MAX_VALUE;在禁用检查点时,此值默认为 0。有关此问题的进度,请参阅 https://issues.apache.org/jira/browse/FLINK-2491。 | 默认值:-1 |
stateBackend | 存储 Beam 状态的 State backend。使用“rocksdb”或“filesystem”。 | |
stateBackendFactory | 设置在流式模式下使用的 State backend 工厂。默认情况下使用 flink 集群的 state.backend 配置。 | |
stateBackendStoragePath | 持久化 State backend 数据的 State backend 路径。用于初始化 State backend。 | |
unalignedCheckpointEnabled | 如果设置,未对齐的检查点将包含正在传输的数据(即存储在缓冲区中的数据)作为检查点状态的一部分,从而允许检查点屏障超过这些缓冲区。因此,检查点持续时间变得与当前吞吐量无关,因为检查点屏障不再有效地嵌入到数据流中。 | 默认值:false |
allow_non_restored_state | 标志指示是否允许未恢复的状态,如果保存点包含不再是管道一部分的运算符的状态。 | 默认值:false |
attached_mode | 指定管道是附加模式还是分离模式提交 | 默认值:true |
auto_balance_write_files_sharding_enabled | 标志指示是否应启用 WriteFiles 变换的自动平衡分片。这可能在流式用例中很有用,其中管道需要将相当多的事件写入文件,通常分为 N 个分片。Flink 上的默认行为是,一些工作器将比其他工作器接收更多分片来处理。这会导致工作器在处理积压和内存使用方面失去平衡。启用此功能将使分片均匀分布在可用工作器之间,从而提高吞吐量和内存使用稳定性。 | 默认值:false |
auto_watermark_interval | 自动水印发射的间隔(以毫秒为单位)。 | |
checkpoint_timeout_millis | 检查点在被丢弃之前可以持续的最长时间(以毫秒为单位)。 | 默认值:-1 |
checkpointing_interval | 触发正在运行管道的检查点的间隔(以毫秒为单位)。默认值:没有检查点。 | 默认值:-1 |
checkpointing_mode | 定义一致性保证的检查点模式。 | 默认值:EXACTLY_ONCE |
disable_metrics | 在 Flink 运行器中禁用 Beam 指标 | 默认值:false |
enable_stable_input_drain | 允许对包含 RequiresStableInput 运算符的 Flink 管道进行清空操作。请注意,在清空时,如果 DoFn 运算符中存在任何与处理相关的故障,可能会违反 RequiresStableInput 契约。 | 默认值:false |
execution_mode_for_batch | 批处理管道数据交换的 Flink 模式。参考 {@link org.apache.flink.api.common.ExecutionMode}。如果管道被阻塞,请将其设置为 BATCH_FORCED,请参见 https://issues.apache.org/jira/browse/FLINK-10672 | 默认值:PIPELINED |
execution_retry_delay | 设置两次执行之间的延迟(以毫秒为单位)。值为 {@code -1} 表示应使用默认值。 | 默认值:-1 |
externalized_checkpoints_enabled | 启用或禁用外部检查点。与 CheckpointingInterval 协同工作 | 默认值:false |
fail_on_checkpointing_errors | 设置任务在遇到检查点过程中的错误时的预期行为。如果设置为 true,任务将在检查点错误时失败。如果设置为 false,任务只会拒绝检查点并继续运行。 | 默认值:true |
faster_copy | 删除运算符之间不必要的深度复制。请参见 https://issues.apache.org/jira/browse/BEAM-11146 | 默认值:false |
file_input_split_max_size_m_b | 设置从文件系统读取数据时输入拆分的最大大小。0 表示没有最大大小。 | 默认值:0 |
finish_bundle_before_checkpointing | 如果设置,则在检查点运算符状态之前完成当前捆绑并刷新所有输出。默认情况下,立即开始检查点并缓冲任何剩余的捆绑输出作为检查点的一部分。此设置可能会影响检查点对齐。 | 默认值:false |
flink_conf_dir | 包含 Flink YAML 配置文件的目录。这些属性将设置为提交到 Flink 的所有作业,并优先于 FLINK_CONF_DIR 中的配置。 | |
flink_master | 要执行管道的 Flink 主节点地址。可以是“host:port”形式或以下特殊值之一:[local]、[collection] 或 [auto]。 | 默认值:[auto] |
force_unaligned_checkpoint_enabled | 强制执行未对齐的检查点,特别是在迭代作业中允许它们。 | 默认值:false |
job_check_interval_in_secs | 在分离模式下设置waitUntilFinish方法中的作业检查间隔(以秒为单位),默认值为 5 秒。 | 默认值:5 |
latency_tracking_interval | 从源到接收器发送延迟跟踪标记的间隔(以毫秒为单位)。间隔值 <= 0 会禁用此功能。 | 默认值:0 |
max_bundle_size | 捆绑中的最大元素数。对于流式作业,默认值为 1000;对于批处理作业,默认值为 1,000,000。 | 默认值:MaxBundleSizeFactory |
max_bundle_time_mills | 在完成捆绑之前等待的最大时间(以毫秒为单位)。对于流式作业,默认值为 1000;对于批处理作业,默认值为 10,000。 | 默认值:MaxBundleTimeFactory |
max_parallelism | 要使用的管道范围内的最大并行度。最大并行度指定动态缩放的上限以及用于分区状态的键组数。 | 默认值:-1 |
min_pause_between_checkpoints | 触发下一个检查点之前的最小暂停时间(以毫秒为单位)。 | 默认值:-1 |
num_concurrent_checkpoints | 并发检查点的最大数量。默认值为 1(=无并发检查点)。 | 默认值:1 |
number_of_execution_retries | 设置失败任务重新执行的次数。值为零实际上会禁用容错。值为 -1 表示应使用系统默认值(如配置中定义)。 | 默认值:-1 |
object_reuse | 设置重用对象的机制。 | 默认值:false |
operator_chaining | 设置运算符链接的行为。 | 默认值:true |
parallelism | 将操作分配到工作进程时使用的并行度。如果未设置并行度,则使用配置的 Flink 默认值,或使用 1(如果找不到任何默认值)。 | 默认值:-1 |
re_iterable_group_by_key_result | 指示 GBK 结果是否需要可重新迭代的标志。可重新迭代的结果意味着对于单个键的所有值都必须适合内存,因为我们目前不支持溢出到磁盘。 | 默认值:false |
report_checkpoint_duration | 如果非空,则在提供的指标命名空间中报告每个 ParDo 阶段的检查点持续时间。 | |
retain_externalized_checkpoints_on_cancellation | 设置取消时外部化检查点行为。 | 默认值:false |
savepoint_path | 保存点恢复路径。如果指定,则从提供的路径恢复流式管道。 | |
shutdown_sources_after_idle_ms | 关闭空闲了指定毫秒时间的源。一旦源关闭,检查点就不再可能了。关闭源最终会导致管道关闭(= Flink 作业完成),前提是所有输入都已处理。除非明确设置,否则在启用检查点时,此值默认为 Long.MAX_VALUE;在禁用检查点时,此值默认为 0。有关此问题的进度,请参阅 https://issues.apache.org/jira/browse/FLINK-2491。 | 默认值:-1 |
state_backend | 存储 Beam 状态的 State backend。使用“rocksdb”或“filesystem”。 | |
state_backend_factory | 设置在流式模式下使用的 State backend 工厂。默认情况下使用 flink 集群的 state.backend 配置。 | |
state_backend_storage_path | 持久化 State backend 数据的 State backend 路径。用于初始化 State backend。 | |
unaligned_checkpoint_enabled | 如果设置,未对齐的检查点将包含正在传输的数据(即存储在缓冲区中的数据)作为检查点状态的一部分,从而允许检查点屏障超过这些缓冲区。因此,检查点持续时间变得与当前吞吐量无关,因为检查点屏障不再有效地嵌入到数据流中。 | 默认值:false |
有关一般 Beam 管道选项,请参阅 PipelineOptions 参考。
Flink 版本兼容性
Flink 集群版本必须与 FlinkRunner 使用的次版本匹配。次版本是版本字符串中的前两个数字,例如在 1.18.0 中,次版本是 1.18。
我们尝试跟踪 Beam 发布时 Apache Flink 的最新版本。Flink 版本在 Flink 社区支持的时间内受 Beam 支持。Flink 社区支持最近两个次版本。当 Flink 版本的支持被取消时,它可能会被弃用并从 Beam 中删除。要了解 Beam 与哪个版本的 Flink 兼容,请参阅下表。
| Flink 版本 | 工件 ID | 支持的 Beam 版本 |
|---|---|---|
| 1.19.x | beam-runners-flink-1.19 | ≥ 2.61.0 |
| 1.18.x | beam-runners-flink-1.18 | ≥ 2.57.0 |
| 1.17.x | beam-runners-flink-1.17 | ≥ 2.56.0 |
| 1.16.x | beam-runners-flink-1.16 | 2.47.0 - 2.60.0 |
| 1.15.x | beam-runners-flink-1.15 | 2.40.0 - 2.60.0 |
| 1.14.x | beam-runners-flink-1.14 | 2.38.0 - 2.56.0 |
| 1.13.x | beam-runners-flink-1.13 | 2.31.0 - 2.55.0 |
| 1.12.x | beam-runners-flink-1.12 | 2.27.0 - 2.55.0 |
| 1.11.x | beam-runners-flink-1.11 | 2.25.0 - 2.38.0 |
| 1.10.x | beam-runners-flink-1.10 | 2.21.0 - 2.30.0 |
| 1.9.x | beam-runners-flink-1.9 | 2.17.0 - 2.29.0 |
| 1.8.x | beam-runners-flink-1.8 | 2.13.0 - 2.29.0 |
| 1.7.x | beam-runners-flink-1.7 | 2.10.0 - 2.20.0 |
| 1.6.x | beam-runners-flink-1.6 | 2.10.0 - 2.16.0 |
| 1.5.x | beam-runners-flink_2.11 | 2.6.0 - 2.16.0 |
| 带有 Scala 2.11 的 1.4.x | beam-runners-flink_2.11 | 2.3.0 - 2.5.0 |
| 带有 Scala 2.10 的 1.3.x | beam-runners-flink_2.10 | 2.1.x - 2.2.0 |
| 带有 Scala 2.10 的 1.2.x | beam-runners-flink_2.10 | 2.0.0 |
要获取正确的 Flink 版本,请参阅 Flink 下载页面。
有关更多信息,Flink 文档 可能会有帮助。
Beam 功能
Beam 功能矩阵 记录了经典 Flink Runner 的功能。
可移植功能矩阵 记录了可移植 Flink Runner 的功能。
最后更新于 2024/10/31
您找到您要找的所有内容了吗?
所有内容都实用且清晰吗?您想更改任何内容吗?请告诉我们!

