设计您的管道
此页面帮助您设计 Apache Beam 管道。它包含有关如何确定管道结构、如何选择要应用于数据的转换以及如何确定输入和输出方法的信息。
在阅读本节之前,建议您熟悉 Beam 编程指南 中的信息。
设计管道时需要考虑的事项
设计 Beam 管道时,请考虑以下几个基本问题
- 您的输入数据存储在哪里?您有多少组输入数据?这将决定您在管道开头需要应用哪些类型的
Read
转换。 - 您的数据是什么样的?它可能是纯文本、格式化的日志文件或数据库表中的行。某些 Beam 转换仅适用于
PCollection
的键/值对;您需要确定您的数据是否以及如何进行键控,以及如何在管道的PCollection
中最佳地表示它。 - 您想对数据做什么?Beam SDK 中的核心转换是通用的。了解您需要如何更改或操作数据将决定您如何构建核心转换,如 ParDo,或者何时使用 Beam SDK 附带的预写转换。
- 您的输出数据是什么样的,它应该去哪里?这将决定您在管道末尾需要应用哪些类型的
Write
转换。
基本管道
最简单的管道表示操作的线性流程,如示意图 1 所示。
示意图 1:线性管道。
但是,您的管道可以复杂得多。管道表示步骤的 有向无环图。它可以具有多个输入源、多个输出接收器,并且其操作 (PTransform
) 可以读取和输出多个 PCollection
。以下示例展示了管道可以采用的几种不同形状。
分支 PCollection
重要的是要理解,转换不会消耗 PCollection
;相反,它们会考虑 PCollection
中的每个单独元素,并创建一个新的 PCollection
作为输出。这样,您可以对同一 PCollection
中的不同元素执行不同的操作。
多个转换处理相同的 PCollection
您可以将同一 PCollection
作为多个转换的输入,而不会消耗输入或改变它。
示意图 2 中的管道是分支管道。管道从数据库表中读取其输入(表示为字符串的首字母),并创建一个包含表行的 PCollection
。然后,管道对同一PCollection
应用多个转换。转换 A 从该 PCollection
中提取所有以字母“A”开头的名称,转换 B 从该 PCollection
中提取所有以字母“B”开头的名称。转换 A 和 B 具有相同的输入 PCollection
。
示意图 2:分支管道。将两个转换应用于单个包含数据库表行的 PCollection。
以下示例代码将两个转换应用于单个输入集合。
PCollection<String> dbRowCollection = ...;
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){
c.output(c.element());
}
}
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){
c.output(c.element());
}
}
}));
产生多个输出的单个转换
分支管道的另一种方法是让单个转换使用 标记输出 输出到多个 PCollection
。产生多个输出的转换会处理输入中的每个元素一次,并输出到零个或多个 PCollection
。
示意图 3 说明了上面描述的相同示例,但使用一个产生多个输出的转换。以“A”开头的名称将添加到主输出 PCollection
中,而以“B”开头的名称将添加到附加输出 PCollection
中。
示意图 3:具有产生多个 PCollection 的转换的管道。
如果我们比较示意图 2 和示意图 3 中的管道,您会发现它们以不同的方式执行相同操作。示意图 2 中的管道包含两个处理同一输入 PCollection
中的元素的转换。一个转换使用以下逻辑
if (starts with 'A') { outputToPCollectionA }
而另一个转换使用
if (starts with 'B') { outputToPCollectionB }
由于每个转换都会读取整个输入 PCollection
,因此输入 PCollection
中的每个元素都会被处理两次。
示意图 3 中的管道以不同的方式执行相同操作 - 仅使用一个转换,该转换使用以下逻辑
if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }
其中输入 PCollection
中的每个元素都处理一次。
以下示例代码应用一个转换,该转换会处理每个元素一次并输出两个集合。
// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// Emit to main output, which is the output with tag startsWithATag.
c.output(c.element());
} else if(c.element().startsWith("B")) {
// Emit to output with tag startsWithBTag.
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);
您可以使用任一机制来产生多个输出 PCollection
。如果在逻辑上没有意义将处理逻辑合并为一个 ParDo
,则建议使用第一种选项。但是,如果转换对每个元素的计算非常耗时,并且如果您计划在将来添加更多输出类型,则使用第二个选项(一个产生多个输出的转换)更有意义。
合并 PCollection
通常,在您通过多个转换将 PCollection
分支到多个 PCollection
之后,您可能希望将某些或所有这些生成的 PCollection
合并在一起。您可以通过以下方法之一来完成此操作
- 扁平化 - 您可以在 Beam SDK 中使用
Flatten
变换来合并多个类型相同的PCollection
。 - 连接 - 您可以在 Beam SDK 中使用
CoGroupByKey
变换来执行两个PCollection
之间的关联连接。PCollection
必须进行键控(即,它们必须是键值对的集合),并且它们必须使用相同的键类型。
图 4 中的示例是 上面部分 中图 2 示例的延续。在分支成两个 PCollection
之后,一个包含以 'A' 开头的名称,另一个包含以 'B' 开头的名称,管道将这两个 PCollection
合并到一个单独的 PCollection
中,现在包含所有以 'A' 或 'B' 开头的名称。在这里,使用 Flatten
有意义,因为要合并的 PCollection
都包含相同的类型。
图 4:一个使用 Flatten 变换将两个集合合并成一个集合的管道。
以下示例代码将 Flatten
应用于合并两个集合。
//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);
多个来源
您的管道可以从一个或多个来源读取输入。如果您的管道从多个来源读取数据,并且这些来源的数据相关,那么将这些输入连接起来可能很有用。在下面的图 5 中所示的示例中,管道从数据库表中读取姓名和地址,并从 Kafka 主题中读取姓名和订单号。然后,管道使用 CoGroupByKey
来连接这些信息,其中键是姓名;结果 PCollection
包含姓名、地址和订单的所有组合。
图 5:一个对两个输入集合进行关联连接的管道。
以下示例代码将 Join
应用于连接两个输入集合。
PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);
下一步
最后更新于 2024/10/31
您找到了您要找的所有东西了吗?
所有内容都实用且清晰吗?您想改变什么吗?请告诉我们!