Beam YAML 映射
Beam YAML 能够进行简单的转换,这些转换可用于将数据转换为正确的形状。其中最简单的转换是 MapToFields
,它根据输入字段创建具有新字段的记录。
字段重命名
要重命名字段,可以编写
- type: MapToFields
config:
fields:
new_col1: col1
new_col2: col2
将生成一个输出,其中每个记录都有两个字段,new_col1
和 new_col2
,它们的值分别与 col1
和 col2
相同(它们是来自输入模式的两个字段的名称)。
可以指定 append 参数,该参数指示应保留原始字段,类似于在 SQL select 语句中使用 *
。例如
- type: MapToFields
config:
append: true
fields:
new_col1: col1
new_col2: col2
将输出具有 new_col1
和 new_col2
作为附加字段的记录。当指定 append 字段时,也可以删除字段,例如
- type: MapToFields
config:
append: true
drop:
- col3
fields:
new_col1: col1
new_col2: col2
除了输出两个新字段之外,还包括所有原始字段,除了 col3。
映射函数
当然,您可能希望进行超出简单删除和重命名字段的转换。Beam YAML 能够内联简单的 UDF。这需要语言规范。例如,我们可以提供一个引用输入字段的 Python 表达式
- type: MapToFields
config:
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
此外,您可以提供一个完整的 Python 可调用对象,该对象将行作为参数来执行更复杂的映射(有关可接受格式,请参见 PythonCallableSource)。因此,您可以编写
- type: MapToFields
config:
language: python
fields:
new_col:
callable: |
import re
def my_mapping(row):
if re.match("[0-9]+", row.col1) and row.col2 > 0:
return "good"
else:
return "bad"
一旦达到一定的复杂程度,最好将其打包为依赖项,并简单地通过完全限定名称引用它,例如
- type: MapToFields
config:
language: python
fields:
new_col:
callable: pkg.module.fn
也可以将函数逻辑存储在文件中,并指向函数名称,例如
- type: MapToFields
config:
language: python
fields:
new_col:
path: /path/to/some/udf.py
name: my_mapping
目前,除了 Python 之外,还支持 Java、SQL 和 JavaScript(实验性)表达式
Java
当使用 Java 映射时,必须声明 UDF 类型,即使对于简单的表达式也是如此,例如
- type: MapToFields
config:
language: java
fields:
new_col:
expression: col1.toUpperCase()
对于可调用 UDF,Java 要求将该函数声明为一个实现 java.util.function.Function
的类,例如
- type: MapToFields
config:
language: java
fields:
new_col:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
public class MyFunction implements Function<Row, String> {
public String apply(Row row) {
return row.getString("col1").toUpperCase();
}
}
SQL
当 SQL 用于 MapToFields
UDF 时,它本质上是 SQL SELECT
语句。
例如,查询 SELECT UPPER(col1) AS new_col, "col2 + col3" AS another_col FROM PCOLLECTION
将如下所示
- type: MapToFields
config:
language: sql
fields:
new_col: "UPPER(col1)"
another_col: "col2 + col3"
请记住,任何不打算包含在输出中的字段都应添加到 drop
字段中。
如果您想选择一个与 保留的 SQL 关键字 冲突的字段,则必须将该字段(或字段)用反引号括起来。例如,假设传入的 PCollection 具有一个名为“timestamp”的字段,您将不得不编写
- type: MapToFields
config:
language: sql
fields:
new_col: "`timestamp`"
注意:drop
中定义的字段映射标签和字段不需要转义。只有 UDF 本身需要是一个有效的 SQL 语句。
通用
如果没有指定语言,则表达式集将限于预先存在的字段和整数、浮点数或字符串字面量。例如
- type: MapToFields
config:
fields:
new_col: col1
int_literal: 389
float_litera: 1.90216
str_literal: '"example"' # note the double quoting
FlatMap
有时,您可能希望为每个输入记录发出多个(或更少)记录。这可以通过映射到一个可迭代类型,并在映射之后使用 Explode
操作来完成,例如
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "col2 + col3"
- type: Explode
config:
fields: new_col
将为每个输入记录生成三个输出记录。
如果要爆炸多个记录,则必须指定是否应对所有字段取笛卡尔积。例如
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: true
将发出九条记录,而
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: false
将只发出三条记录。
如果要爆炸的字段已经是可迭代类型,则可以单独使用 Explode
操作。
- type: Explode
config:
fields: [col1]
过滤
有时,您可能希望只保留满足特定条件的记录。这可以通过使用 Filter
转换来完成,例如
- type: Filter
config:
keep: "col2 > 0"
对于比现有字段和数字字面量之间的简单比较更复杂的操作,必须提供 language
参数,例如
- type: Filter
config:
language: python
keep: "col2 + col3 > 0"
对于更复杂的过滤函数,您可以提供一个完整的 Python 可调用对象,该对象将行作为参数来执行更复杂的映射(有关可接受格式,请参见 PythonCallableSource)。因此,您可以编写
- type: Filter
config:
language: python
keep:
callable: |
import re
def my_filter(row):
return re.match("[0-9]+", row.col1) and row.col2 > 0
一旦达到一定的复杂程度,最好将其打包为依赖项,并简单地通过完全限定名称引用它,例如
- type: Filter
config:
language: python
keep:
callable: pkg.module.fn
也可以将函数逻辑存储在文件中,并指向函数名称,例如
- type: Filter
config:
language: python
keep:
path: /path/to/some/udf.py
name: my_filter
目前,除了 Python 之外,还支持 Java、SQL 和 JavaScript(实验性)表达式
Java
当使用 Java 过滤时,必须声明 UDF 类型,即使对于简单的表达式也是如此,例如
- type: Filter
config:
language: java
keep:
expression: col2 > 0
对于可调用 UDF,Java 要求将该函数声明为一个实现 java.util.function.Function
的类,例如
- type: Filter
config:
language: java
keep:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
import java.util.regex.Pattern;
public class MyFunction implements Function<Row, Boolean> {
public Boolean apply(Row row) {
Pattern pattern = Pattern.compile("[0-9]+");
return pattern.matcher(row.getString("col1")).matches() && row.getInt64("col2") > 0;
}
}
SQL
类似于 映射函数,当 SQL 用于 MapToFields
UDF 时,它本质上是 SQL WHERE
语句。
例如,查询 SELECT * FROM PCOLLECTION WHERE col2 > 0
将如下所示
- type: Filter
config:
language: sql
keep: "col2 > 0"
如果您想根据与 保留的 SQL 关键字 冲突的字段进行过滤,则必须将该字段(或字段)用反引号括起来。例如,假设传入的 PCollection 具有一个名为“timestamp”的字段,您将不得不编写
- type: Filter
config:
language: sql
keep: "`timestamp` > 0"
分区
将不同元素发送到不同位置(类似于其他 SDK 中使用边输出所做的事情)也很有用。虽然可以使用一组 Filter
操作来完成此操作,但如果每个元素都有一个唯一的目标,则使用 Partition
转换可能会更自然,它会将每个元素发送到唯一的输出。例如,这将发送所有 col1
等于 "a"
的元素到输出 Partition.a
。
- type: Partition
input: input
config:
by: col1
outputs: ['a', 'b', 'c']
- type: SomeTransform
input: Partition.a
config:
param: ...
- type: AnotherTransform
input: Partition.b
config:
param: ...
您还可以将目标指定为函数,例如
- type: Partition
input: input
config:
by: "'even' if col2 % 2 == 0 else 'odd'"
language: python
outputs: ['even', 'odd']
您可以选择提供一个 catch-all 输出,该输出将捕获所有不在命名输出中的元素(否则将是错误)
- type: Partition
input: input
config:
by: col1
outputs: ['a', 'b', 'c']
unknown_output: 'other'
有时,您希望将一个 PCollection 拆分为多个不一定是不相交的 PCollection。要将元素发送到多个(或无)输出,可以使用一个可迭代列,并在 Partition
之前使用 Explode
。
- type: Explode
input: input
config:
fields: col1
- type: Partition
input: Explode
config:
by: col1
outputs: ['a', 'b', 'c']
类型
Beam 将尝试推断映射中涉及的类型,但有时这不可能。在这些情况下,您可以明确指定预期的输出类型,例如
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
预期类型以 json 模式表示法给出,此外,顶级基本类型可以作为文字字符串给出,而不是要求 {type: 'basic_type_name'}
嵌套。
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
another_col:
expression: "beam.Row(a=col1, b=[col2])"
output_type:
type: 'object'
properties:
a:
type: 'string'
b:
type: 'array'
items:
type: 'number'
这在解决与无法处理 beam:logical:pythonsdk_any:v1
类型有关的错误时特别有用。