Beam YAML 变换索引
断言相等
断言输入包含提供的元素。
这主要用于测试;如果此变换的输入不完全是配置参数中给出的 elements
集合,它将导致整个管道失败。
与 Create 一样,YAML/JSON 风格的映射被解释为 Beam 行,例如
type: AssertEqual
input: SomeTransform
config:
elements:
- {a: 0, b: "foo"}
- {a: 1, b: "bar"}
将确保 SomeTransform
准确生成两个元素,其值分别为 (a=0, b="foo")
和 (a=1, b="bar")
。
配置
- elements
Array[?]
: 应该属于 PCollection 的元素集。YAML/JSON 风格的映射将被解释为 Beam 行。
用法
type: AssertEqual
input: ...
config:
elements:
- element
- element
- ...
分配时间戳
为其输入的每个元素分配一个新的时间戳。
当读取具有嵌入时间戳的记录时,这可能很有用,例如使用各种文件类型或其他默认将所有时间戳设置为无限过去的源。
请注意,时间戳只能向前设置,因为向后设置它可能不会导致它阻止已提前的水印,数据可能会变得可丢弃的延迟。
支持的语言:通用,javascript,python
配置
-
timestamp
?
(可选) : 提供新时间戳的字段、可调用对象或表达式。 -
language
string
(可选) : 时间戳表达式的语言。 -
error_handling
Row
: 是否以及如何处理时间戳计算期间的错误。行字段
- output
string
- output
用法
type: AssignTimestamps
input: ...
config:
timestamp: timestamp
language: "language"
error_handling:
output: "output"
组合
对共享公共字段的记录进行分组和组合。
内置组合函数为 sum
、max
、min
、all
、any
、mean
、count
、group
、concat
,但也可以使用自定义聚合函数。
另请参阅有关 YAML 聚合 的文档。
支持的语言:calcite、通用、javascript、python、sql
配置
-
group_by
Array[string]
-
combine
Map[string, Map[string, ?]]
-
language
string
(可选)
用法
type: Combine
input: ...
config:
group_by:
- "group_by"
- "group_by"
- ...
combine:
a:
a: combine_value_a_value_a
b: combine_value_a_value_b
c: ...
b:
a: combine_value_b_value_a
b: combine_value_b_value_b
c: ...
c: ...
language: "language"
创建
创建一个包含指定元素集的集合。
此变换始终生成带模式的数据。例如
type: Create
config:
elements: [1, 2, 3]
将导致一个输出,其中包含三个元素,其模式为 Row(element=int),而 YAML/JSON 风格的映射将直接解释为 Beam 行,例如
type: Create
config:
elements:
- {first: 0, second: {str: "foo", values: [1, 2, 3]}}
- {first: 1, second: {str: "bar", values: [4, 5, 6]}}
将导致一个模式,其形式为 (int, Row(string, List[int]))。
这也可以表示为 YAML
type: Create
config:
elements:
- first: 0
second:
str: "foo"
values: [1, 2, 3]
- first: 1
second:
str: "bar"
values: [4, 5, 6]
配置
-
elements
Array[?]
: 应该属于 PCollection 的元素集。YAML/JSON 风格的映射将被解释为 Beam 行。基元将被映射到带有单个“元素”字段的行。 -
reshuffle
boolean
(可选) : (可选) 如果集合中有多个元素,是否引入重新洗牌(以可能重新分配工作)。默认为 True。
用法
type: Create
config:
elements:
- element
- element
- ...
reshuffle: true|false
展开
展开(也称为取消嵌套/扁平化)一个或多个字段,生成多行。
给定一个或多个可迭代类型字段,产生多行,每行对应于该字段的每个值。例如,('a', [1, 2, 3])
形式的行将在第二个字段上展开为 ('a', 1)
、('a', 2')
和 ('a', 3)
。
这类似于将 FlatMap
与 MapToFields 变换配对时的 FlatMap
。
查看有关 YAML 映射函数 的更完整文档。
配置
-
fields
?
(可选) : 要展开的字段列表。 -
cross_product
boolean
(可选) : 如果指定了多个字段,则指示是否应产生组合的完整交叉积,或者第一个字段的第一个元素是否对应于第二个字段的第一个元素,依此类推。例如,行(['a', 'b'], [1, 2])
将扩展为四个行('a', 1)
、('a', 2)
、('b', 1)
和('b', 2)
,当cross_product
设置为true
时,但只有两行('a', 1)
和('b', 2)
,当它设置为false
时。仅在指定多行时有意义(且是必需的)。 -
error_handling
Map[string, ?]
(可选) : 是否以及如何处理迭代期间的错误。
用法
type: Explode
input: ...
config:
fields: fields
cross_product: true|false
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
过滤
仅保留满足给定条件的记录。
查看有关 YAML 过滤 的更完整文档。
支持的语言:calcite、通用、java、javascript、python、sql
配置
-
keep
?
(可选) -
language
string
(可选) -
error_handling
Row
行字段
- output
string
- output
用法
type: Filter
input: ...
config:
keep: keep
language: "language"
error_handling:
output: "output"
扁平化
将多个 PCollection 扁平化为单个 PCollection。
结果 PCollection 的元素将是所有输入元素的(不相交)并集。
请注意,YAML 变换始终可以接受一个输入列表,该列表将被隐式扁平化。
配置
没有配置参数。
用法
type: Flatten
input: ...
config: ...
连接
使用指定的条件连接两个或多个输入。
例如
type: Join
input:
input1: SomeTransform
input2: AnotherTransform
input3: YetAnotherTransform
config:
type: inner
equalities:
- input1: colA
input2: colB
- input2: colX
input3: colY
fields:
input1: [colA, colB, colC]
input2: {new_name: colB}
将对三个输入执行内连接,满足约束条件 input1.colA = input2.colB
和 input2.colX = input3.colY
,发出包含 colA
、colB
和 colC
的行来自 input1
,input2.colB
的值作为名为 new_name
的字段,以及 input3
中的所有字段。
配置
-
equalities
?
(可选) : 要连接的条件。应该相等的列集的列表,以满足连接条件。对于跨所有输入在同一列上连接的简单场景,其中列名相同,可以将列名指定为等式,而无需为每个输入列出它。 -
type
?
(可选) : 连接类型。可以是 ["inner", "left", "right", "outer"] 中的字符串值,指定要执行的连接类型。对于多个输入连接的场景,其中需要不同的连接类型,指定要外连接的输入。例如,{outer: [input1, input2]}
表示input1
和input2
将使用指定的条件进行外连接,而其他输入将进行内连接。 -
fields
Map[string, ?]
(可选) : 要输出的字段。一个映射,其输入别名作为键,输入中要输出的字段列表作为值。映射中的值可以是字典,其中新字段名作为键,原始字段名作为值(例如 new_field_name: field_name),也可以是包含要输出的字段的列表以及它们的原始名称(例如[col1, col2, col3]
),或者是一个 '*',表示输入中的所有字段都将被输出。如果未指定,将输出所有输入中的所有字段。
用法
type: Join
input: ...
config:
equalities: equalities
type: type
fields:
a: fields_value_a
b: fields_value_b
c: ...
测试日志记录
记录其输入 PCollection 的每个元素。
此变换的输出是其输入的副本,便于在链式管道中使用。
配置
-
level
string
(可选) : ERROR、INFO 或 DEBUG 之一,映射到相应的特定于语言的日志记录级别 -
prefix
string
(可选) : 一个可选标识符,它将被附加到正在记录的元素之前
用法
type: LogForTesting
input: ...
config:
level: "level"
prefix: "prefix"
ML 变换
配置
-
write_artifact_location
string
(可选) -
read_artifact_location
string
(可选) -
transforms
Array[?]
(可选)
用法
type: MLTransform
input: ...
config:
write_artifact_location: "write_artifact_location"
read_artifact_location: "read_artifact_location"
transforms:
- transforms
- transforms
- ...
映射到字段
创建具有根据输入字段定义的新字段的记录。
查看有关 YAML 映射函数 的更完整文档。
支持的语言:calcite、通用、java、javascript、python、sql
配置
-
language
?
(可选) -
error_handling
Row
行字段
- output
string
- output
-
mapping_args
?
(可选)
用法
type: MapToFields
input: ...
config:
language: language
error_handling:
output: "output"
mapping_args: mapping_args
分区
将输入拆分为几个不同的输出。
每个输入元素将根据 by
配置参数中给出的字段或函数转到不同的输出。
支持的语言:通用,javascript,python
配置
-
by
?
(可选) : 提供此元素的目标输出的字段、可调用对象或表达式。应返回一个字符串,该字符串是outputs
参数的成员。如果unknown_output
也已设置,则其他返回值也将被接受,否则将引发错误。 -
outputs
Array[string]
: 此输入被分区的输出集。 -
unknown_output
string
(可选) : (可选) 如果设置,则表示任何未分配outputs
参数中列出的输出的元素的目标输出。 -
error_handling
Map[string, ?]
(可选) : (可选) 是否以及如何处理分区期间的错误。 -
language
string
(可选) : (可选)by
表达式的语言。
用法
type: Partition
input: ...
config:
by: by
outputs:
- "outputs"
- "outputs"
- ...
unknown_output: "unknown_output"
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
language: "language"
PyTransform
由完全限定名称标识的 Python PTransform。
这允许导入、构造和应用任何 Beam Python 变换。这对于使用尚未通过 YAML 接口公开的变换很有用。但是请注意,如果此变换不接受或生成 Beam 行,则可能需要进行转换。
例如
type: PyTransform
config:
constructor: apache_beam.pkg.mod.SomeClass
args: [1, 'foo']
kwargs:
baz: 3
可用于访问变换 apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)
。
另请参阅有关 内联 Python 的文档。
配置
-
constructor
string
: 用于构造变换的可调用对象的完全限定名称。通常,这将是一个类,例如apache_beam.pkg.mod.SomeClass
,但它也可以是一个函数或任何返回 PTransform 的其他可调用对象。 -
args
Array[?]
(可选) : 传递给可调用对象作为位置参数的参数列表。 -
kwargs
Map[string, ?]
(可选) : 传递给可调用对象作为关键字参数的参数列表。
用法
type: PyTransform
input: ...
config:
constructor: "constructor"
args:
- arg
- arg
- ...
kwargs:
a: kwargs_value_a
b: kwargs_value_b
c: ...
SQL
配置
用法
type: Sql
input: ...
config: ...
窗口化
一个窗口变换,将窗口分配给 PCollection 的每个元素。
分配的窗口将影响所有下游聚合操作,这些操作将根据窗口以及键进行聚合。
查看 有关窗口化的 Beam 文档 以了解更多详细信息。
大小、偏移量、周期和间隙(如果适用)必须使用时间单位后缀 'ms'、's'、'm'、'h' 或 'd' 分别定义毫秒、秒、分钟、小时或天。如果未指定时间单位,则默认为 's'。
例如
windowing:
type: fixed
size: 30s
请注意,任何 Yaml 变换都可以具有 窗口参数,该参数应用于其输入(如果有)或输出(如果没有输入),这意味着通常不需要显式 WindowInto 操作。
配置
- windowing
?
(可选) : 要执行的窗口化的类型和参数
用法
type: WindowInto
input: ...
config:
windowing: windowing
从 Avro 读取
一个用于从 avro 文件读取记录的 PTransform
。
结果 PCollection 的每个记录将包含从源读取的单个记录。类型简单的记录将被映射到具有包含记录值的单个 record
字段的 beam 行。类型为 Avro RECORD
的记录将被映射到符合包含这些记录的 Avro 文件中包含的模式的 Beam 行。
配置
- path
?
(可选)
用法
type: ReadFromAvro
config:
path: path
写入 Avro
一个用于写入 avro 文件的 PTransform
。
如果输入数据具有模式,则会自动生成相应的 Avro 模式,并用于写入输出记录。
配置
- path
?
(可选)
用法
type: WriteToAvro
input: ...
config:
path: path
从 BigQuery 读取
从 BigQuery 读取数据。
必须设置 table 或 query 中的其中一个。如果设置了 query,则不应该设置 row_restriction 或 fields。
配置
-
table
string
(可选) : 要读取的表,指定为DATASET.TABLE
或PROJECT:DATASET.TABLE
。 -
query
string
(可选) : 用于代替 table 参数的查询。 -
row_restriction
string
(可选) : 可选的 SQL 文本过滤语句,类似于查询中的 WHERE 子句。不支持聚合。长度限制为 1 MB。 -
fields
Array[string]
(可选)
用法
type: ReadFromBigQuery
config:
table: "table"
query: "query"
row_restriction: "row_restriction"
fields:
- "field"
- "field"
- ...
写入 BigQuery
配置
用法
type: WriteToBigQuery
input: ...
config: ...
从 CSV 读取
用于将逗号分隔值 (csv) 文件读取到 PCollection 的 PTransform。
配置
-
path
string
: 要读取的文件路径。路径可以包含通配符,例如*
和?
。 -
delimiter
?
(可选) -
comment
?
(可选)
用法
type: ReadFromCsv
config:
path: "path"
delimiter: delimiter
comment: comment
写入 CSV
用于将带模式的 PCollection 作为 (一组) 逗号分隔值 (csv) 文件写入的 PTransform。
配置
-
path
string
: 要写入的文件路径。写入的文件将以该前缀开头,后跟 shard 标识符 (参见num_shards
),具体取决于file_naming
参数。 -
delimiter
?
(可选)
用法
type: WriteToCsv
input: ...
config:
path: "path"
delimiter: delimiter
从 JDBC 读取
配置
用法
type: ReadFromJdbc
config: ...
写入 JDBC
配置
用法
type: WriteToJdbc
input: ...
config: ...
从 JSON 读取
用于将 JSON 值从文件读取到 PCollection 的 PTransform。
配置
- path
string
: 要读取的文件路径。路径可以包含通配符,例如*
和?
。
用法
type: ReadFromJson
config:
path: "path"
写入 JSON
用于将 PCollection 作为 JSON 值写入文件的 PTransform。
配置
- path
string
: 要写入的文件路径。写入的文件将以该前缀开头,后跟 shard 标识符 (参见num_shards
),具体取决于file_naming
参数。
用法
type: WriteToJson
input: ...
config:
path: "path"
从 Kafka 读取
配置
用法
type: ReadFromKafka
config: ...
写入 Kafka
配置
用法
type: WriteToKafka
input: ...
config: ...
从 MySQL 读取
配置
用法
type: ReadFromMySql
config: ...
写入 MySQL
配置
用法
type: WriteToMySql
input: ...
config: ...
从 Oracle 读取
配置
用法
type: ReadFromOracle
config: ...
写入 Oracle
配置
用法
type: WriteToOracle
input: ...
config: ...
从 Parquet 读取
用于读取 Parquet 文件的 PTransform
。
配置
- path
?
(可选)
用法
type: ReadFromParquet
config:
path: path
写入 Parquet
用于写入 Parquet 文件的 PTransform
。
配置
- path
?
(可选)
用法
type: WriteToParquet
input: ...
config:
path: path
从 PostgreSQL 读取
配置
用法
type: ReadFromPostgres
config: ...
写入 PostgreSQL
配置
用法
type: WriteToPostgres
input: ...
config: ...
从 Pub/Sub 读取
从 Cloud Pub/Sub 读取消息。
配置
-
topic
string
(可选) : Cloud Pub/Sub 主题,格式为 "projects//topics/ ”。如果提供,则 subscription 必须为 None。 -
subscription
string
(可选) : 要使用的现有 Cloud Pub/Sub 订阅,格式为 "projects//subscriptions/ ”。如果未指定,则会从指定的主题创建临时订阅。如果提供,则 topic 必须为 None。 -
format
string
: 消息有效负载的预期格式。当前支持的格式为- RAW: 生成具有单个
payload
字段的记录,其内容为 pubsub 消息的原始字节。 - AVRO: 使用给定的 Avro 模式解析记录。
- JSON: 使用给定的 JSON 模式解析记录。
- RAW: 生成具有单个
-
schema
?
(可选) : 给定格式的模式规范。 -
attributes
Array[string]
(可选) : 属性键列表,其值将被扁平化为输出消息中的附加字段。例如,如果格式为raw
,并且 attributes 为["a", "b"]
,则此读取操作将生成以下形式的元素:Row(payload=..., a=..., b=...)
。 -
attributes_map
string
(可选) -
id_attribute
string
(可选) : 用于传入 Pub/Sub 消息的属性,作为唯一的记录标识符。当指定时,此属性的值 (可以是唯一标识记录的任何字符串) 将用于对消息进行去重。如果未提供,则无法保证不会在 Pub/Sub 流中传递重复数据。在这种情况下,流的去重将严格地尽力而为。 -
timestamp_attribute
string
(可选) : 用作元素时间戳的消息值。如果为 None,则使用消息发布时间作为时间戳。时间戳值应采用以下两种格式之一
- 表示自 Unix 纪元开始以来的毫秒数的数值。
- RFC 3339 格式的字符串,UTC 时区。示例:
2015-10-29T23:41:41.123Z
。时间戳的亚秒组件是可选的,第一个三位数 (即小于毫秒的时间单位) 后的数字可能会被忽略。
-
error_handling
Row
行字段
- output
string
- output
用法
type: ReadFromPubSub
config:
topic: "topic"
subscription: "subscription"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
写入 Pub/Sub
将消息写入 Cloud Pub/Sub。
配置
-
topic
string
: Cloud Pub/Sub 主题,格式为 "/topics// ". -
format
string
: 如何格式化消息有效负载。当前支持的格式为- RAW: 预计一条消息,该消息具有单个字段 (不包括与属性相关的字段),其内容用作 pubsub 消息的原始字节。
- AVRO: 使用给定的 Avro 模式编码记录,该模式可以从输入 PCollection 模式中推断。
- JSON: 使用给定的 JSON 模式格式化记录,该模式可以从输入 PCollection 模式中推断。
-
schema
?
(可选) : 给定格式的模式规范。 -
attributes
Array[string]
(可选) : 属性键列表,其值将被提取为 PubSub 消息属性。例如,如果格式为raw
,并且 attributes 为["a", "b"]
,则以下形式的元素:Row(any_field=..., a=..., b=...)
将导致 PubSub 消息,其有效负载包含 any_field 的内容,并且其属性将填充a
和b
的值。 -
attributes_map
string
(可选) -
id_attribute
string
(可选) : 如果设置,将为每条 Cloud Pub/Sub 消息设置一个属性,该属性具有给定的名称和唯一值。然后,可以使用该属性在 ReadFromPubSub PTransform 中对消息进行去重。 -
timestamp_attribute
string
(可选) : 如果设置,将为每条 Cloud Pub/Sub 消息设置一个属性,该属性具有给定的名称,并且该属性的值为消息的发布时间。 -
error_handling
Row
行字段
- output
string
- output
用法
type: WriteToPubSub
input: ...
config:
topic: "topic"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
从 Pub/Sub Lite 读取
配置
用法
type: ReadFromPubSubLite
config: ...
写入 Pub/Sub Lite
配置
用法
type: WriteToPubSubLite
input: ...
config: ...
从 Spanner 读取
配置
用法
type: ReadFromSpanner
config: ...
写入 Spanner
配置
用法
type: WriteToSpanner
input: ...
config: ...
从 SQL Server 读取
配置
用法
type: ReadFromSqlServer
config: ...
写入 SQL Server
配置
用法
type: WriteToSqlServer
input: ...
config: ...
从文本读取
从文本文件读取行。
生成的 PCollection 包含具有单个字符串字段的行,名为 "line"。
配置
- path
string
: 要读取的文件路径。路径可以包含通配符,例如*
和?
。
用法
type: ReadFromText
config:
path: "path"
写入文本
将 PCollection 写入 (一组) 文本文件。
输入必须是 PCollection,其模式正好具有一个字段。
配置
- path
string
: 要写入的文件路径。写入的文件将以该前缀开头,后跟 shard 标识符。
用法
type: WriteToText
input: ...
config:
path: "path"