Beam YAML API
Beam YAML 是使用 YAML 文件描述 Apache Beam 管道的声明式语法。您可以使用 Beam YAML 来编写和运行 Beam 管道,而无需编写任何代码。
概述
Beam 为创建复杂的数据处理管道提供了一个强大的模型。但是,入门 Beam 编程可能具有挑战性,因为它需要在支持的 Beam SDK 语言之一中编写代码。您需要了解 API、设置项目、管理依赖项以及执行其他编程任务。
Beam YAML 使创建 Beam 管道变得更容易。您可以使用任何文本编辑器创建 YAML 文件,而不是编写代码。然后,您将 YAML 文件提交给运行器执行。
Beam YAML 语法旨在易于人类阅读,但也适合作为工具的中间表示。例如,管道编写 GUI 可以输出 YAML,或者谱系分析工具可以消费 YAML 管道规范。
Beam YAML 仍在开发中,但任何已包含的功能都被认为是稳定的。欢迎您在 dev@apache.beam.org 上提供反馈。
先决条件
Beam YAML 解析器当前作为 Apache Beam Python SDK 的一部分包含在内。您不需要编写 Python 代码来使用 Beam YAML,但您需要 SDK 在本地运行管道。
我们建议创建一个 虚拟环境,以便所有软件包都安装在隔离且独立的环境中。在设置 Python 环境后,按如下方式安装 SDK
pip install apache_beam[yaml,gcp]
此外,提供的转换中的几个,例如 SQL 转换,是在 Java 中实现的,并且需要一个有效的 Java 解释器。当您运行使用这些转换的管道时,所需的工件会自动从 Apache Maven 存储库下载。
入门
使用文本编辑器创建一个名为 pipeline.yaml
的文件。将以下文本粘贴到该文件并保存
pipeline:
transforms:
- type: Create
config:
elements: [1, 2, 3]
- type: LogForTesting
input: Create
此文件定义了一个包含两个转换的简单管道
Create
转换创建了一个集合。config
的值是配置设置的字典。在本例中,elements
指定集合的成员。其他转换类型具有其他配置设置。LogForTesting
转换记录每个输入元素。此转换不需要config
设置。input
键指定LogForTesting
从Create
转换接收输入。
运行管道
要执行管道,运行以下 Python 命令
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml
输出应包含类似于以下内容的日志语句
INFO:root:{"element": 1}
INFO:root:{"element": 2}
INFO:root:{"element": 3}
在 Dataflow 中运行管道
您可以使用 gcloud CLI 将 YAML 管道提交给 Dataflow。要从 YAML 文件创建 Dataflow 作业,请使用 gcloud dataflow yaml run
命令
gcloud dataflow yaml run $JOB_NAME \
--yaml-pipeline-file=pipeline.yaml \
--region=$REGION
当您使用 gcloud
CLI 时,您不需要在本地安装 Beam SDK。
可视化管道
您可以使用 apache_beam.runners.render
模块将管道执行图呈现为 PNG 文件,如下所示
安装 Graphviz.
运行以下命令
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml \ --runner=apache_beam.runners.render.RenderRunner \ --render_output=out.png
示例:读取 CSV 数据
以下管道从一组 CSV 文件读取数据,并将数据以 JSON 格式写入。此管道假定 CSV 文件具有标题行。列名称成为 JSON 字段名称。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: WriteToJson
config:
path: /path/to/output.json
input: ReadFromCsv
添加过滤器
Filter
转换过滤记录。它保留满足布尔谓词的输入记录,并丢弃不满足谓词的记录。以下示例保留 col3
值大于 100 的记录
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: WriteToJson
config:
path: /path/to/output.json
input: Filter
添加映射函数
Beam YAML 支持各种 映射函数。以下示例使用 Sql
转换按 col1
分组,并输出每个键的计数。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: Sql
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: Filter
- type: WriteToJson
config:
path: /path/to/output.json
input: Sql
模式
本节介绍 Beam YAML 中的一些常见模式。
命名转换
您可以命名管道中的转换,以帮助监视和调试。如果管道包含多个相同类型的转换,名称还用于区分转换。
pipeline:
transforms:
- type: ReadFromCsv
name: ReadMyData
config:
path: /path/to/input*.csv
- type: Filter
name: KeepBigRecords
config:
language: python
keep: "col3 > 100"
input: ReadMyData
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: KeepBigRecords
- type: WriteToJson
name: WriteTheOutput
config:
path: /path/to/output.json
input: MySqlTransform
链接转换
如果管道是线性的(没有分支或合并),您可以将管道指定为 chain
类型。在 chain
类型管道中,您不需要指定输入。输入从它们在 YAML 文件中出现的顺序隐式获得
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
- type: WriteToJson
config:
path: /path/to/output.json
源和接收器转换
作为语法糖,您可以将管道中第一个和最后一个转换命名为 source
和 sink
。此约定不会更改生成的管道,但它会表明源和接收器转换的意图。
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
sink:
type: WriteToJson
config:
path: /path/to/output.json
非线性管道
Beam YAML 支持任意非线性管道。以下管道读取两个源,将它们连接起来,然后写入两个输出
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: Filter
name: FilterToBig
input: Sql
config:
language: python
keep: "col2 > 100"
- type: WriteToCsv
name: WriteBig
input: FilterToBig
config:
path: /path/to/big.csv
由于管道不是线性的,因此您必须显式声明每个转换的输入。但是,您可以在非线性管道中嵌套一个 chain
。链是管道中的一条线性子路径。
以下示例创建一个名为 ExtraProcessingForBigRows
的链。该链从 Sql
转换获取输入,并应用几个额外的过滤器以及一个接收器。请注意,在链中,不需要指定输入。
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: chain
name: ExtraProcessingForBigRows
input: Sql
transforms:
- type: Filter
config:
language: python
keep: "col2 > 100"
- type: Filter
config:
language: python
keep: "len(col1) > 10"
- type: Filter
config:
language: python
keep: "col1 > 'z'"
sink:
type: WriteToCsv
config:
path: /path/to/big.csv
窗口化
此 API 可用于定义流式管道和批处理管道。为了在流式管道中有意义地聚合元素,通常需要某种窗口化。Beam 的 窗口化 和 触发 可以使用所有其他 Beam SDK 中可用的相同 WindowInto
转换来声明。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: JSON
schema:
type: object
properties:
col1: {type: string}
col2: {type: integer}
col3: {type: number}
- type: WindowInto
windowing:
type: fixed
size: 60s
- type: SomeGroupingTransform
config:
arg: ...
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
您可以使用指定的窗口化标记转换,而不是使用显式的 WindowInto
操作,这会导致其输入(因此转换本身)应用该窗口化。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: SomeGroupingTransform
config:
arg: ...
windowing:
type: sliding
size: 60s
period: 10s
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
请注意,Sql
操作本身通常是一种聚合形式,应用窗口化(或使用已窗口化的输入)会导致所有分组都在每个窗口内完成。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
windowing:
type: sessions
gap: 60s
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
指定的窗口化将应用于所有输入,在本例中会导致每个窗口内进行连接。
pipeline:
transforms:
- type: ReadFromPubSub
name: ReadLeft
config:
topic: leftTopic
format: ...
schema: ...
- type: ReadFromPubSub
name: ReadRight
config:
topic: rightTopic
format: ...
schema: ...
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
windowing:
type: fixed
size: 60s
options:
streaming: true
对于没有输入的转换,指定的窗口化将应用于其输出。根据 Beam 模型,窗口化然后由所有使用操作继承。这对于像 Read 这样的根操作特别有用。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60s
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
还可以指定管道的顶层(或复合)的窗口化,这等效于将相同的窗口化应用于所有根操作,这些操作本身没有指定自己的窗口化。这种方法是将窗口应用于管道中所有地方的有效方法。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
windowing:
type: fixed
size: 60
options:
streaming: true
请注意,所有这些窗口化规范都与 source
和 sink
语法兼容
pipeline:
type: chain
source:
type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 10s
transforms:
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
sink:
type: WriteToCsv
config:
path: /path/to/output.json
windowing:
type: fixed
size: 5m
options:
streaming: true
提供者
虽然我们旨在提供大量内置转换,但人们不可避免地想要编写自己的转换。这可以通过提供者概念实现,提供者利用扩展服务和模式转换。
例如,您可以构建一个 jar,它出售 跨语言转换 或 模式转换,然后在转换中使用它,如下所示
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: MyCustomTransform
config:
arg: whatever
sink:
type: WriteToJson
config:
path: /path/to/output.json
providers:
- type: javaJar
config:
jar: /path/or/url/to/myExpansionService.jar
transforms:
MyCustomTransform: "urn:registered:in:expansion:service"
可以使用以下语法提供任意 Python 转换
providers:
- type: pythonPackage
config:
packages:
- my_pypi_package>=version
- /path/to/local/package.zip
transforms:
MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"
管道选项
管道选项 用于配置管道的不同方面,例如将执行管道的管道运行器以及所选运行器所需的任何运行器特定配置。要设置管道选项,请在 YAML 文件的末尾附加一个选项块。例如
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60s
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true