Beam YAML API

Beam YAML 是使用 YAML 文件描述 Apache Beam 管道的声明式语法。您可以使用 Beam YAML 来编写和运行 Beam 管道,而无需编写任何代码。

概述

Beam 为创建复杂的数据处理管道提供了一个强大的模型。但是,入门 Beam 编程可能具有挑战性,因为它需要在支持的 Beam SDK 语言之一中编写代码。您需要了解 API、设置项目、管理依赖项以及执行其他编程任务。

Beam YAML 使创建 Beam 管道变得更容易。您可以使用任何文本编辑器创建 YAML 文件,而不是编写代码。然后,您将 YAML 文件提交给运行器执行。

Beam YAML 语法旨在易于人类阅读,但也适合作为工具的中间表示。例如,管道编写 GUI 可以输出 YAML,或者谱系分析工具可以消费 YAML 管道规范。

Beam YAML 仍在开发中,但任何已包含的功能都被认为是稳定的。欢迎您在 dev@apache.beam.org 上提供反馈。

先决条件

Beam YAML 解析器当前作为 Apache Beam Python SDK 的一部分包含在内。您不需要编写 Python 代码来使用 Beam YAML,但您需要 SDK 在本地运行管道。

我们建议创建一个 虚拟环境,以便所有软件包都安装在隔离且独立的环境中。在设置 Python 环境后,按如下方式安装 SDK

pip install apache_beam[yaml,gcp]

此外,提供的转换中的几个,例如 SQL 转换,是在 Java 中实现的,并且需要一个有效的 Java 解释器。当您运行使用这些转换的管道时,所需的工件会自动从 Apache Maven 存储库下载。

入门

使用文本编辑器创建一个名为 pipeline.yaml 的文件。将以下文本粘贴到该文件并保存

pipeline:
  transforms:
    - type: Create
      config:
        elements: [1, 2, 3]
    - type: LogForTesting
      input: Create

此文件定义了一个包含两个转换的简单管道

运行管道

要执行管道,运行以下 Python 命令

python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml

输出应包含类似于以下内容的日志语句

INFO:root:{"element": 1}
INFO:root:{"element": 2}
INFO:root:{"element": 3}

在 Dataflow 中运行管道

您可以使用 gcloud CLI 将 YAML 管道提交给 Dataflow。要从 YAML 文件创建 Dataflow 作业,请使用 gcloud dataflow yaml run 命令

gcloud dataflow yaml run $JOB_NAME \
  --yaml-pipeline-file=pipeline.yaml \
  --region=$REGION

当您使用 gcloud CLI 时,您不需要在本地安装 Beam SDK。

可视化管道

您可以使用 apache_beam.runners.render 模块将管道执行图呈现为 PNG 文件,如下所示

  1. 安装 Graphviz.

  2. 运行以下命令

    python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml \
      --runner=apache_beam.runners.render.RenderRunner \
      --render_output=out.png
    

示例:读取 CSV 数据

以下管道从一组 CSV 文件读取数据,并将数据以 JSON 格式写入。此管道假定 CSV 文件具有标题行。列名称成为 JSON 字段名称。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: ReadFromCsv

添加过滤器

Filter 转换过滤记录。它保留满足布尔谓词的输入记录,并丢弃不满足谓词的记录。以下示例保留 col3 值大于 100 的记录

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
      input: ReadFromCsv
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: Filter

添加映射函数

Beam YAML 支持各种 映射函数。以下示例使用 Sql 转换按 col1 分组,并输出每个键的计数。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
      input: ReadFromCsv
    - type: Sql
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
      input: Filter
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: Sql

模式

本节介绍 Beam YAML 中的一些常见模式。

命名转换

您可以命名管道中的转换,以帮助监视和调试。如果管道包含多个相同类型的转换,名称还用于区分转换。

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadMyData
      config:
        path: /path/to/input*.csv
    - type: Filter
      name: KeepBigRecords
      config:
        language: python
        keep: "col3 > 100"
      input: ReadMyData
    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
      input: KeepBigRecords
    - type: WriteToJson
      name: WriteTheOutput
      config:
        path: /path/to/output.json
      input: MySqlTransform

链接转换

如果管道是线性的(没有分支或合并),您可以将管道指定为 chain 类型。在 chain 类型管道中,您不需要指定输入。输入从它们在 YAML 文件中出现的顺序隐式获得

pipeline:
  type: chain

  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
    - type: WriteToJson
      config:
        path: /path/to/output.json

源和接收器转换

作为语法糖,您可以将管道中第一个和最后一个转换命名为 sourcesink。此约定不会更改生成的管道,但它会表明源和接收器转换的意图。

pipeline:
  type: chain

  source:
    type: ReadFromCsv
    config:
      path: /path/to/input*.csv

  transforms:
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"

    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"

  sink:
    type: WriteToJson
    config:
      path: /path/to/output.json

非线性管道

Beam YAML 支持任意非线性管道。以下管道读取两个源,将它们连接起来,然后写入两个输出

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadLeft
      config:
        path: /path/to/left*.csv

    - type: ReadFromCsv
      name: ReadRight
      config:
        path: /path/to/right*.csv

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight

    - type: WriteToJson
      name: WriteAll
      input: Sql
      config:
        path: /path/to/all.json

    - type: Filter
      name: FilterToBig
      input: Sql
      config:
        language: python
        keep: "col2 > 100"

    - type: WriteToCsv
      name: WriteBig
      input: FilterToBig
      config:
        path: /path/to/big.csv

由于管道不是线性的,因此您必须显式声明每个转换的输入。但是,您可以在非线性管道中嵌套一个 chain。链是管道中的一条线性子路径。

以下示例创建一个名为 ExtraProcessingForBigRows 的链。该链从 Sql 转换获取输入,并应用几个额外的过滤器以及一个接收器。请注意,在链中,不需要指定输入。

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadLeft
      config:
        path: /path/to/left*.csv

    - type: ReadFromCsv
      name: ReadRight
      config:
        path: /path/to/right*.csv

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight

    - type: WriteToJson
      name: WriteAll
      input: Sql
      config:
        path: /path/to/all.json

    - type: chain
      name: ExtraProcessingForBigRows
      input: Sql
      transforms:
        - type: Filter
          config:
            language: python
            keep: "col2 > 100"
        - type: Filter
          config:
            language: python
            keep: "len(col1) > 10"
        - type: Filter
          config:
            language: python
            keep: "col1 > 'z'"
      sink:
        type: WriteToCsv
        config:
          path: /path/to/big.csv

窗口化

此 API 可用于定义流式管道和批处理管道。为了在流式管道中有意义地聚合元素,通常需要某种窗口化。Beam 的 窗口化触发 可以使用所有其他 Beam SDK 中可用的相同 WindowInto 转换来声明。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: JSON
        schema:
          type: object
          properties:
            col1: {type: string}
            col2: {type: integer}
            col3: {type: number}
    - type: WindowInto
      windowing:
        type: fixed
        size: 60s
    - type: SomeGroupingTransform
      config:
        arg: ...
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

您可以使用指定的窗口化标记转换,而不是使用显式的 WindowInto 操作,这会导致其输入(因此转换本身)应用该窗口化。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: SomeGroupingTransform
      config:
        arg: ...
      windowing:
        type: sliding
        size: 60s
        period: 10s
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

请注意,Sql 操作本身通常是一种聚合形式,应用窗口化(或使用已窗口化的输入)会导致所有分组都在每个窗口内完成。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
      windowing:
        type: sessions
        gap: 60s
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

指定的窗口化将应用于所有输入,在本例中会导致每个窗口内进行连接。

pipeline:
  transforms:
    - type: ReadFromPubSub
      name: ReadLeft
      config:
        topic: leftTopic
        format: ...
        schema: ...

    - type: ReadFromPubSub
      name: ReadRight
      config:
        topic: rightTopic
        format: ...
        schema: ...

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight
      windowing:
        type: fixed
        size: 60s
options:
  streaming: true

对于没有输入的转换,指定的窗口化将应用于其输出。根据 Beam 模型,窗口化然后由所有使用操作继承。这对于像 Read 这样的根操作特别有用。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
      windowing:
        type: fixed
        size: 60s
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

还可以指定管道的顶层(或复合)的窗口化,这等效于将相同的窗口化应用于所有根操作,这些操作本身没有指定自己的窗口化。这种方法是将窗口应用于管道中所有地方的有效方法。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
  windowing:
    type: fixed
    size: 60
options:
  streaming: true

请注意,所有这些窗口化规范都与 sourcesink 语法兼容

pipeline:
  type: chain

  source:
    type: ReadFromPubSub
    config:
      topic: myPubSubTopic
      format: ...
      schema: ...
    windowing:
      type: fixed
      size: 10s

  transforms:
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"

  sink:
    type: WriteToCsv
    config:
      path: /path/to/output.json
    windowing:
      type: fixed
      size: 5m

options:
  streaming: true

提供者

虽然我们旨在提供大量内置转换,但人们不可避免地想要编写自己的转换。这可以通过提供者概念实现,提供者利用扩展服务和模式转换。

例如,您可以构建一个 jar,它出售 跨语言转换模式转换,然后在转换中使用它,如下所示

pipeline:
  type: chain
  source:
    type: ReadFromCsv
    config:
      path: /path/to/input*.csv

  transforms:
    - type: MyCustomTransform
      config:
        arg: whatever

  sink:
    type: WriteToJson
    config:
      path: /path/to/output.json

providers:
  - type: javaJar
    config:
       jar: /path/or/url/to/myExpansionService.jar
    transforms:
       MyCustomTransform: "urn:registered:in:expansion:service"

可以使用以下语法提供任意 Python 转换

providers:
  - type: pythonPackage
    config:
       packages:
           - my_pypi_package>=version
           - /path/to/local/package.zip
    transforms:
       MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"

管道选项

管道选项 用于配置管道的不同方面,例如将执行管道的管道运行器以及所选运行器所需的任何运行器特定配置。要设置管道选项,请在 YAML 文件的末尾附加一个选项块。例如

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
      windowing:
        type: fixed
        size: 60s
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

其他资源