设计您的管道

此页面帮助您设计 Apache Beam 管道。它包含有关如何确定管道结构、如何选择要应用于数据的转换以及如何确定输入和输出方法的信息。

在阅读本节之前,建议您熟悉 Beam 编程指南 中的信息。

设计管道时需要考虑的事项

设计 Beam 管道时,请考虑以下几个基本问题

基本管道

最简单的管道表示操作的线性流程,如示意图 1 所示。

A linear pipeline starts with one input collection, sequentially appliesthree transforms, and ends with one output collection.

示意图 1:线性管道。

但是,您的管道可以复杂得多。管道表示步骤的 有向无环图。它可以具有多个输入源、多个输出接收器,并且其操作 (PTransform) 可以读取和输出多个 PCollection。以下示例展示了管道可以采用的几种不同形状。

分支 PCollection

重要的是要理解,转换不会消耗 PCollection;相反,它们会考虑 PCollection 中的每个单独元素,并创建一个新的 PCollection 作为输出。这样,您可以对同一 PCollection 中的不同元素执行不同的操作。

多个转换处理相同的 PCollection

您可以将同一 PCollection 作为多个转换的输入,而不会消耗输入或改变它。

示意图 2 中的管道是分支管道。管道从数据库表中读取其输入(表示为字符串的首字母),并创建一个包含表行的 PCollection。然后,管道对同一PCollection 应用多个转换。转换 A 从该 PCollection 中提取所有以字母“A”开头的名称,转换 B 从该 PCollection 中提取所有以字母“B”开头的名称。转换 A 和 B 具有相同的输入 PCollection

The pipeline applies two transforms to a single input collection. Eachtransform produces an output collection.

示意图 2:分支管道。将两个转换应用于单个包含数据库表行的 PCollection。

以下示例代码将两个转换应用于单个输入集合。

PCollection<String> dbRowCollection = ...;

PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("A")){
      c.output(c.element());
    }
  }
}));

PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("B")){
      c.output(c.element());
    }
  }
}));

产生多个输出的单个转换

分支管道的另一种方法是让单个转换使用 标记输出 输出到多个 PCollection。产生多个输出的转换会处理输入中的每个元素一次,并输出到零个或多个 PCollection

示意图 3 说明了上面描述的相同示例,但使用一个产生多个输出的转换。以“A”开头的名称将添加到主输出 PCollection 中,而以“B”开头的名称将添加到附加输出 PCollection 中。

The pipeline applies one transform that produces multiple output collections.

示意图 3:具有产生多个 PCollection 的转换的管道。

如果我们比较示意图 2 和示意图 3 中的管道,您会发现它们以不同的方式执行相同操作。示意图 2 中的管道包含两个处理同一输入 PCollection 中的元素的转换。一个转换使用以下逻辑

if (starts with 'A') { outputToPCollectionA }

而另一个转换使用

if (starts with 'B') { outputToPCollectionB }

由于每个转换都会读取整个输入 PCollection,因此输入 PCollection 中的每个元素都会被处理两次。

示意图 3 中的管道以不同的方式执行相同操作 - 仅使用一个转换,该转换使用以下逻辑

if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }

其中输入 PCollection 中的每个元素都处理一次。

以下示例代码应用一个转换,该转换会处理每个元素一次并输出两个集合。

// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};

PCollectionTuple mixedCollection =
    dbRowCollection.apply(ParDo
        .of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            if (c.element().startsWith("A")) {
              // Emit to main output, which is the output with tag startsWithATag.
              c.output(c.element());
            } else if(c.element().startsWith("B")) {
              // Emit to output with tag startsWithBTag.
              c.output(startsWithBTag, c.element());
            }
          }
        })
        // Specify main output. In this example, it is the output
        // with tag startsWithATag.
        .withOutputTags(startsWithATag,
        // Specify the output with tag startsWithBTag, as a TupleTagList.
                        TupleTagList.of(startsWithBTag)));

// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);

// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

您可以使用任一机制来产生多个输出 PCollection。如果在逻辑上没有意义将处理逻辑合并为一个 ParDo,则建议使用第一种选项。但是,如果转换对每个元素的计算非常耗时,并且如果您计划在将来添加更多输出类型,则使用第二个选项(一个产生多个输出的转换)更有意义。

合并 PCollection

通常,在您通过多个转换将 PCollection 分支到多个 PCollection 之后,您可能希望将某些或所有这些生成的 PCollection 合并在一起。您可以通过以下方法之一来完成此操作

图 4 中的示例是 上面部分 中图 2 示例的延续。在分支成两个 PCollection 之后,一个包含以 'A' 开头的名称,另一个包含以 'B' 开头的名称,管道将这两个 PCollection 合并到一个单独的 PCollection 中,现在包含所有以 'A' 或 'B' 开头的名称。在这里,使用 Flatten 有意义,因为要合并的 PCollection 都包含相同的类型。

The pipeline merges two collections into one collection with the Flatten transform.

图 4:一个使用 Flatten 变换将两个集合合并成一个集合的管道。

以下示例代码将 Flatten 应用于合并两个集合。

//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

多个来源

您的管道可以从一个或多个来源读取输入。如果您的管道从多个来源读取数据,并且这些来源的数据相关,那么将这些输入连接起来可能很有用。在下面的图 5 中所示的示例中,管道从数据库表中读取姓名和地址,并从 Kafka 主题中读取姓名和订单号。然后,管道使用 CoGroupByKey 来连接这些信息,其中键是姓名;结果 PCollection 包含姓名、地址和订单的所有组合。

The pipeline joins two input collections into one collection with the Join transform.

图 5:一个对两个输入集合进行关联连接的管道。

以下示例代码将 Join 应用于连接两个输入集合。

PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);

PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);

final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();

// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
  KeyedPCollectionTuple.of(addressTag, userAddress)
                       .and(orderTag, userOrder)
                       .apply(CoGroupByKey.<String>create());

joinedCollection.apply(...);

下一步