Beam YAML 映射

Beam YAML 能够进行简单的转换,这些转换可用于将数据转换为正确的形状。其中最简单的转换是 MapToFields,它根据输入字段创建具有新字段的记录。

字段重命名

要重命名字段,可以编写

- type: MapToFields
  config:
    fields:
      new_col1: col1
      new_col2: col2

将生成一个输出,其中每个记录都有两个字段,new_col1new_col2,它们的值分别与 col1col2 相同(它们是来自输入模式的两个字段的名称)。

可以指定 append 参数,该参数指示应保留原始字段,类似于在 SQL select 语句中使用 *。例如

- type: MapToFields
  config:
    append: true
    fields:
      new_col1: col1
      new_col2: col2

将输出具有 new_col1new_col2 作为附加字段的记录。当指定 append 字段时,也可以删除字段,例如

- type: MapToFields
  config:
    append: true
    drop:
      - col3
    fields:
      new_col1: col1
      new_col2: col2

除了输出两个新字段之外,还包括所有原始字段,除了 col3。

映射函数

当然,您可能希望进行超出简单删除和重命名字段的转换。Beam YAML 能够内联简单的 UDF。这需要语言规范。例如,我们可以提供一个引用输入字段的 Python 表达式

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "col1.upper()"
      another_col: "col2 + col3"

此外,您可以提供一个完整的 Python 可调用对象,该对象将行作为参数来执行更复杂的映射(有关可接受格式,请参见 PythonCallableSource)。因此,您可以编写

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        callable: |
          import re
          def my_mapping(row):
            if re.match("[0-9]+", row.col1) and row.col2 > 0:
              return "good"
            else:
              return "bad"

一旦达到一定的复杂程度,最好将其打包为依赖项,并简单地通过完全限定名称引用它,例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        callable: pkg.module.fn

也可以将函数逻辑存储在文件中,并指向函数名称,例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        path: /path/to/some/udf.py
        name: my_mapping

目前,除了 Python 之外,还支持 Java、SQL 和 JavaScript(实验性)表达式

Java

当使用 Java 映射时,必须声明 UDF 类型,即使对于简单的表达式也是如此,例如

- type: MapToFields
  config:
    language: java
    fields:
      new_col:
        expression: col1.toUpperCase()

对于可调用 UDF,Java 要求将该函数声明为一个实现 java.util.function.Function 的类,例如

- type: MapToFields
  config:
    language: java
    fields:
      new_col:
        callable: |
          import org.apache.beam.sdk.values.Row;
          import java.util.function.Function;
          public class MyFunction implements Function<Row, String> {
            public String apply(Row row) {
              return row.getString("col1").toUpperCase();
            }
          }

SQL

当 SQL 用于 MapToFields UDF 时,它本质上是 SQL SELECT 语句。

例如,查询 SELECT UPPER(col1) AS new_col, "col2 + col3" AS another_col FROM PCOLLECTION 将如下所示

- type: MapToFields
  config:
    language: sql
    fields:
      new_col: "UPPER(col1)"
      another_col: "col2 + col3"

请记住,任何不打算包含在输出中的字段都应添加到 drop 字段中。

如果您想选择一个与 保留的 SQL 关键字 冲突的字段,则必须将该字段(或字段)用反引号括起来。例如,假设传入的 PCollection 具有一个名为“timestamp”的字段,您将不得不编写

- type: MapToFields
  config:
    language: sql
    fields:
      new_col: "`timestamp`"

注意drop 中定义的字段映射标签和字段不需要转义。只有 UDF 本身需要是一个有效的 SQL 语句。

通用

如果没有指定语言,则表达式集将限于预先存在的字段和整数、浮点数或字符串字面量。例如

- type: MapToFields
  config:
    fields:
      new_col: col1
      int_literal: 389
      float_litera: 1.90216
      str_literal: '"example"'  # note the double quoting

FlatMap

有时,您可能希望为每个输入记录发出多个(或更少)记录。这可以通过映射到一个可迭代类型,并在映射之后使用 Explode 操作来完成,例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "col2 + col3"
- type: Explode
  config:
    fields: new_col

将为每个输入记录生成三个输出记录。

如果要爆炸多个记录,则必须指定是否应对所有字段取笛卡尔积。例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
  config:
    fields: [new_col, another_col]
    cross_product: true

将发出九条记录,而

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
  config:
    fields: [new_col, another_col]
    cross_product: false

将只发出三条记录。

如果要爆炸的字段已经是可迭代类型,则可以单独使用 Explode 操作。

- type: Explode
  config:
    fields: [col1]

过滤

有时,您可能希望只保留满足特定条件的记录。这可以通过使用 Filter 转换来完成,例如

- type: Filter
  config:
    keep: "col2 > 0"

对于比现有字段和数字字面量之间的简单比较更复杂的操作,必须提供 language 参数,例如

- type: Filter
  config:
    language: python
    keep: "col2 + col3 > 0"

对于更复杂的过滤函数,您可以提供一个完整的 Python 可调用对象,该对象将行作为参数来执行更复杂的映射(有关可接受格式,请参见 PythonCallableSource)。因此,您可以编写

- type: Filter
  config:
    language: python
    keep:
      callable: |
        import re
        def my_filter(row):
          return re.match("[0-9]+", row.col1) and row.col2 > 0

一旦达到一定的复杂程度,最好将其打包为依赖项,并简单地通过完全限定名称引用它,例如

- type: Filter
  config:
    language: python
    keep:
      callable: pkg.module.fn

也可以将函数逻辑存储在文件中,并指向函数名称,例如

- type: Filter
  config:
    language: python
    keep:
      path: /path/to/some/udf.py
      name: my_filter

目前,除了 Python 之外,还支持 Java、SQL 和 JavaScript(实验性)表达式

Java

当使用 Java 过滤时,必须声明 UDF 类型,即使对于简单的表达式也是如此,例如

- type: Filter
  config:
    language: java
    keep:
      expression: col2 > 0

对于可调用 UDF,Java 要求将该函数声明为一个实现 java.util.function.Function 的类,例如

- type: Filter
  config:
    language: java
    keep:
      callable: |
        import org.apache.beam.sdk.values.Row;
        import java.util.function.Function;
        import java.util.regex.Pattern;
        public class MyFunction implements Function<Row, Boolean> {
          public Boolean apply(Row row) {
            Pattern pattern = Pattern.compile("[0-9]+");
            return pattern.matcher(row.getString("col1")).matches() && row.getInt64("col2") > 0;
          }
        }

SQL

类似于 映射函数,当 SQL 用于 MapToFields UDF 时,它本质上是 SQL WHERE 语句。

例如,查询 SELECT * FROM PCOLLECTION WHERE col2 > 0 将如下所示

- type: Filter
  config:
    language: sql
    keep: "col2 > 0"

如果您想根据与 保留的 SQL 关键字 冲突的字段进行过滤,则必须将该字段(或字段)用反引号括起来。例如,假设传入的 PCollection 具有一个名为“timestamp”的字段,您将不得不编写

- type: Filter
  config:
    language: sql
    keep: "`timestamp` > 0"

分区

将不同元素发送到不同位置(类似于其他 SDK 中使用边输出所做的事情)也很有用。虽然可以使用一组 Filter 操作来完成此操作,但如果每个元素都有一个唯一的目标,则使用 Partition 转换可能会更自然,它会将每个元素发送到唯一的输出。例如,这将发送所有 col1 等于 "a" 的元素到输出 Partition.a

- type: Partition
  input: input
  config:
    by: col1
    outputs: ['a', 'b', 'c']

- type: SomeTransform
  input: Partition.a
  config:
    param: ...

- type: AnotherTransform
  input: Partition.b
  config:
    param: ...

您还可以将目标指定为函数,例如

- type: Partition
  input: input
  config:
    by: "'even' if col2 % 2 == 0 else 'odd'"
    language: python
    outputs: ['even', 'odd']

您可以选择提供一个 catch-all 输出,该输出将捕获所有不在命名输出中的元素(否则将是错误)

- type: Partition
  input: input
  config:
    by: col1
    outputs: ['a', 'b', 'c']
    unknown_output: 'other'

有时,您希望将一个 PCollection 拆分为多个不一定是不相交的 PCollection。要将元素发送到多个(或无)输出,可以使用一个可迭代列,并在 Partition 之前使用 Explode

- type: Explode
  input: input
  config:
    fields: col1

- type: Partition
  input: Explode
  config:
    by: col1
    outputs: ['a', 'b', 'c']

类型

Beam 将尝试推断映射中涉及的类型,但有时这不可能。在这些情况下,您可以明确指定预期的输出类型,例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        expression: "col1.upper()"
        output_type: string

预期类型以 json 模式表示法给出,此外,顶级基本类型可以作为文字字符串给出,而不是要求 {type: 'basic_type_name'} 嵌套。

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        expression: "col1.upper()"
        output_type: string
      another_col:
        expression: "beam.Row(a=col1, b=[col2])"
        output_type:
          type: 'object'
          properties:
            a:
              type: 'string'
            b:
              type: 'array'
              items:
                type: 'number'

这在解决与无法处理 beam:logical:pythonsdk_any:v1 类型有关的错误时特别有用。