Beam YAML 聚合
Beam YAML 能够对记录进行聚合以对值进行分组和合并。这可以通过 Combine
变换类型来实现。
例如,可以编写
- type: Combine
config:
group_by: col1
combine:
total:
value: col2
fn:
type: sum
如果函数没有配置要求,可以直接以字符串形式提供
- type: Combine
config:
group_by: col1
combine:
total:
value: col2
fn: sum
如果输出字段名称与输入字段名称相同,则可以进一步简化
- type: Combine
config:
group_by: col1
combine:
col2: sum
可以对多个字段同时进行聚合
- type: Combine
config:
group_by: col1
combine:
col2: sum
col3: max
或按多个字段分组
- type: Combine
config:
group_by: [col1, col2]
combine:
col3: sum
或根本不分组(这将导致全局合并,只有一个输出)
- type: Combine
config:
group_by: []
combine:
col2: sum
col3: max
窗口聚合
与所有变换一样,Combine
可以接受窗口参数
- type: Combine
windowing:
type: fixed
size: 60s
config:
group_by: col1
combine:
col2: sum
col3: max
如果没有提供窗口规范,它将继承上游的窗口参数,例如
- type: WindowInto
windowing:
type: fixed
size: 60s
- type: Combine
config:
group_by: col1
combine:
col2: sum
col3: max
等效于前面的示例。
自定义聚合函数
可以通过设置语言参数来使用 Python 中定义的聚合函数。
- type: Combine
config:
language: python
group_by: col1
combine:
biggest:
value: "col2 + col2"
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 10
SQL 风格的聚合
通过将语言设置为 SQL,可以提供完整的 SQL 代码段作为合并函数。
- type: Combine
config:
language: sql
group_by: col1
combine:
num_values: "count(*)"
total: "sum(col2)"
当然,也可以使用 Sql
变换类型并直接提供查询。