创建你的管道

你的 Beam 程序表示从头到尾的数据处理管道。本节介绍使用 Beam SDK 中的类来构建管道的机制。要使用 Beam SDK 中的类构建管道,你的程序需要执行以下一般步骤

创建你的管道对象

Beam 程序通常从创建一个 Pipeline 对象开始。

在 Beam SDK 中,每个管道都由一个类型的显式对象表示 Pipeline。每个 Pipeline 对象都是一个独立的实体,它封装了管道操作的数据以及应用于该数据的转换。

要创建一个管道,声明一个 Pipeline 对象,并传递一些 配置选项

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

将数据读取到你的管道中

要创建你的管道的初始 PCollection,将根转换应用于你的管道对象。根转换从外部数据源或你指定的本地数据创建 PCollection

Beam SDK 中有两种类型的根转换:ReadCreateRead 转换从外部源(如文本文件或数据库表)读取数据。Create 转换从内存中的 java.util.Collection 创建 PCollection

以下示例代码演示如何 apply TextIO.Read 根转换以从文本文件读取数据。该转换应用于 Pipeline 对象 p,并以 PCollection<String> 的形式返回一个管道数据集

PCollection<String> lines = p.apply(
  "ReadLines", TextIO.read().from("gs://some/inputData.txt"));

应用转换以处理管道数据

你可以使用 Beam SDK 中提供的各种 转换 来操作你的数据。为此,你将转换 应用 到管道的 PCollection,方法是调用每个要处理的 PCollection 上的 apply 方法,并将所需的转换对象作为参数传递。

以下代码演示如何 apply 转换到字符串的 PCollection。该转换是一个用户定义的自定义转换,它反转每个字符串的内容,并输出一个包含反转字符串的新 PCollection

输入是一个名为 wordsPCollection<String>;代码将 ReverseWords 称为 PTransform 对象的实例传递给 apply,并将返回值保存为名为 reversedWordsPCollection<String>

PCollection<String> words = ...;

PCollection<String> reversedWords = words.apply(new ReverseWords());

写入或输出你的最终管道数据

一旦你的管道应用了所有转换,你通常需要输出结果。要输出你的管道的最终 PCollection,将 Write 转换应用于该 PCollectionWrite 转换可以将 PCollection 的元素输出到外部数据接收器,例如数据库表。你可以在管道的任何时间使用 Write 输出 PCollection,尽管你通常会在管道的末尾写入数据。

以下示例代码演示如何 apply TextIO.Write 转换以将 StringPCollection 写入文本文件

PCollection<String> filteredWords = ...;

filteredWords.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt"));

运行你的管道

构建管道后,使用 run 方法执行管道。管道是异步执行的:你创建的程序将管道规范发送到 管道运行器,然后管道运行器构造并运行实际的管道操作序列。

p.run();

run 方法是异步的。如果你想要阻塞执行,请追加 waitUntilFinish 方法运行你的管道

p.run().waitUntilFinish();

下一步