Beam YAML 聚合

Beam YAML 能够对记录进行聚合以对值进行分组和合并。这可以通过 Combine 变换类型来实现。

例如,可以编写

- type: Combine
  config:
    group_by: col1
    combine:
      total:
        value: col2
        fn:
          type: sum

如果函数没有配置要求,可以直接以字符串形式提供

- type: Combine
  config:
    group_by: col1
    combine:
      total:
        value: col2
        fn: sum

如果输出字段名称与输入字段名称相同,则可以进一步简化

- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum

可以对多个字段同时进行聚合

- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

或按多个字段分组

- type: Combine
  config:
    group_by: [col1, col2]
    combine:
      col3: sum

或根本不分组(这将导致全局合并,只有一个输出)

- type: Combine
  config:
    group_by: []
    combine:
      col2: sum
      col3: max

窗口聚合

与所有变换一样,Combine 可以接受窗口参数

- type: Combine
  windowing:
    type: fixed
    size: 60s
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

如果没有提供窗口规范,它将继承上游的窗口参数,例如

- type: WindowInto
  windowing:
    type: fixed
    size: 60s
- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

等效于前面的示例。

自定义聚合函数

可以通过设置语言参数来使用 Python 中定义的聚合函数。

- type: Combine
  config:
    language: python
    group_by: col1
    combine:
      biggest:
        value: "col2 + col2"
        fn:
          type: 'apache_beam.transforms.combiners.TopCombineFn'
          config:
            n: 10

SQL 风格的聚合

通过将语言设置为 SQL,可以提供完整的 SQL 代码段作为合并函数。

- type: Combine
  config:
    language: sql
    group_by: col1
    combine:
      num_values: "count(*)"
      total: "sum(col2)"

当然,也可以使用 Sql 变换类型并直接提供查询。