执行模型

Beam 模型允许运行器以不同的方式执行您的管道。您可能会观察到运行器选择所导致的各种影响。本页描述了这些影响,以便您可以更好地了解 Beam 管道如何执行。

元素处理

元素在机器之间的序列化和通信是管道分布式执行中最昂贵的操作之一。避免这种序列化可能需要在故障后重新处理元素,或者可能限制将输出分发到其他机器。

序列化和通信

运行器可能会为了通信目的以及其他原因(如持久化)而对元素进行机器之间的序列化。

运行器可能会决定以多种方式在转换之间传输元素,例如

运行器可能会序列化和持久化元素的一些情况是

  1. 当用作有状态的DoFn的一部分时,运行器可能会将值持久化到某些状态机制。
  2. 在提交处理结果时,运行器可能会将输出持久化为检查点。

捆绑和持久化

Beam 管道通常侧重于“简单并行”问题。因此,API 强调并行处理元素,这使得表达诸如“为 PCollection 中的每个元素分配一个序列号”之类的操作变得很困难。这是故意的,因为此类算法更有可能遇到可扩展性问题。

并行处理所有元素也有一些缺点。具体来说,它使得无法对任何操作进行批处理,例如将元素写入接收器或在处理过程中记录进度。

PCollection 中的元素不是同时处理,而是以捆绑的方式处理。集合划分为捆绑的方式是任意的,由运行器选择。这允许运行器在每次元素后持久化结果以及在出现故障时必须重试所有操作之间选择适当的折衷方案。例如,流式运行器可能更喜欢处理和提交小的捆绑,而批处理运行器可能更喜欢处理更大的捆绑。

数据分区和跨阶段执行

Beam 管道中元素处理的分区和并行化取决于两件事

Beam 管道从源读取数据(例如KafkaIOBigQueryIOJdbcIO或您自己的源实现)。要在 Beam 中实现源,必须将其实现为可拆分的DoFn。可拆分的DoFn为运行器提供接口以促进工作的拆分。

在 Beam 中运行基于键的操作(例如GroupByKeyCombineReshuffle.perKey和有状态的DoFns)时,Beam 运行器会执行称为混洗1的数据序列化和传输。混洗允许对具有相同键的数据元素进行一起处理。

运行器混洗数据的方式对于批处理和流式执行模式可能略有不同。

1不要与某些运行器中的shuffle操作混淆。

管道执行中的数据排序

Beam 模型没有定义有关运行器处理元素或在PTransforms之间传输元素的顺序的严格准则。运行器可以自由地以不同的形式实现数据传输语义。

存在一些用例,其中用户管道可能需要依赖于管道执行中的特定排序语义。 功能矩阵文档运行器针对键排序交付的行为。

考虑一个 Beam 工作器,它处理来自同一 Beam 转换的一系列捆绑,并考虑一个将数据从此阶段输出到下游PCollectionPTransform。最后,考虑由该工作器按特定顺序(在同一个捆绑内或作为不同捆绑的一部分)发射的两个具有相同键的事件。

如果 Beam 运行器保证这两个事件将按相同顺序(与数据传输方法无关)由直接下游的 PTransform 观察到,那么我们说 Beam 运行器支持键排序交付

此特性将在具有键限定并行性的运行器和操作中成立。

转换内外的故障和并行性

在本节中,我们将讨论输入集合中的元素如何并行处理,以及在发生故障时如何重试转换。

一个转换中的数据并行性

在执行单个 ParDo 时,运行器可能会将包含九个元素的示例输入集合划分为两个捆绑,如图 1 所示。

Bundle A contains five elements. Bundle B contains four elements.

图 1:运行器将输入集合划分为两个捆绑。

ParDo 执行时,工作器可以并行处理这两个捆绑,如图 2 所示。

Two workers process the two bundles in parallel. Worker one processes bundle A. Worker two processes bundle B.

图 2:两个工作器并行处理两个捆绑。

由于元素不可分割,因此变换的最大并行度取决于集合中元素的数量。在图 3 中,输入集合包含九个元素,因此最大并行度为九。

Nine workers process a nine element input collection in parallel.

图 3:九个工作器并行处理包含九个元素的输入集合。

注意:可分割的 ParDo 允许将单个输入的处理拆分为多个捆绑。此功能正在开发中。

转换之间的依赖并行性

顺序中的 ParDo 变换可能是依赖并行的,如果运行器选择在生产变换的输出元素上执行消费变换而不会改变捆绑。在图 4 中,ParDo1ParDo2依赖并行的,如果给定元素的 ParDo1 输出必须在同一个工作器上处理。

ParDo1 processes an input collection that contains bundles A and B. ParDo2 then processes the output collection from ParDo1, which contains bundles C and D.

图 4:两个顺序变换及其对应的输入集合。

图 5 显示了这些依赖并行变换的执行方式。第一个工作器对捆绑 A 中的元素执行 ParDo1(产生捆绑 C),然后对捆绑 C 中的元素执行 ParDo2。类似地,第二个工作器对捆绑 B 中的元素执行 ParDo1(产生捆绑 D),然后对捆绑 D 中的元素执行 ParDo2

Worker one executes ParDo1 on bundle A and Pardo2 on bundle C. Worker two executes ParDo1 on bundle B and ParDo2 on bundle D.

图 5:两个工作器执行依赖并行的 ParDo 变换。

以这种方式执行变换允许运行器避免在工作器之间重新分配元素,从而节省通信成本。然而,最大并行度现在取决于依赖并行步骤中的第一步的最大并行度。

一个转换中的故障

如果捆绑内元素的处理失败,则整个捆绑将失败。捆绑中的元素必须重试(否则整个管道将失败),尽管它们不需要以相同的捆绑方式重试。

对于此示例,我们将使用图 1 中的 ParDo,它有一个包含九个元素的输入集合,并被划分为两个捆绑。

在图 6 中,第一个工作器成功地处理了捆绑 A 中的所有五个元素。第二个工作器处理了捆绑 B 中的四个元素:前两个元素处理成功,第三个元素的处理失败,还有一个元素正在等待处理。

我们看到运行器重新尝试了捆绑 B 中的所有元素,并且处理在第二次成功完成。请注意,重试不一定发生在与原始处理尝试相同的工​​作器上,如图所示。

Worker two fails to process an element in bundle B. Worker one finishes processing bundle A and then successfully retries to execute bundle B.

图 6:捆绑 B 中元素的处理失败,另一个工作器重新尝试整个捆绑。

由于我们在处理输入捆绑中的元素时遇到了故障,我们不得不重新处理所有输入捆绑中的元素。这意味着运行器必须丢弃捆绑的整个输出(包括任何状态突变和设置计时器),因为其中包含的所有结果都将被重新计算。

请注意,如果失败的变换是 ParDo,则 DoFn 实例将被拆卸并放弃。

耦合故障:转换之间的故障

如果处理 ParDo2 中的元素失败导致 ParDo1 重新执行,则这两个步骤被称为共同失败

对于此示例,我们将使用图 4 中的两个 ParDo

在图 7 中,工作器二成功地对捆绑 B 中的所有元素执行了 ParDo1。但是,工作器无法处理捆绑 D 中的元素,因此 ParDo2 失败(显示为红色 X)。结果,运行器必须丢弃并重新计算 ParDo2 的输出。因为运行器正在一起执行 ParDo1ParDo2,所以 ParDo1 的输出捆绑也必须被丢弃,并且输入捆绑中的所有元素都必须重新尝试。这两个 ParDo 是共同失败的。

Worker two fails to process en element in bundle D, so all elements in both bundle B and bundle D must be retried.

图 7:捆绑 D 中元素的处理失败,因此输入捆绑中的所有元素都将重新尝试。

请注意,重试并不一定与原始尝试具有相同的处理时间,如图所示。

所有遇到耦合故障的 DoFns 都将被终止并必须被拆卸,因为它们没有遵循正常的 DoFn 生命周期。

以这种方式执行变换允许运行器避免在变换之间持久化元素,从而节省持久化成本。