Beam YAML 变换索引

断言相等

断言输入包含提供的元素。

这主要用于测试;如果此变换的输入不完全是配置参数中给出的 elements 集合,它将导致整个管道失败。

与 Create 一样,YAML/JSON 风格的映射被解释为 Beam 行,例如

type: AssertEqual
input: SomeTransform
config:
  elements:
     - {a: 0, b: "foo"}
     - {a: 1, b: "bar"}

将确保 SomeTransform 准确生成两个元素,其值分别为 (a=0, b="foo")(a=1, b="bar")

配置

用法

type: AssertEqual
input: ...
config:
  elements:
  - element
  - element
  - ...

分配时间戳

为其输入的每个元素分配一个新的时间戳。

当读取具有嵌入时间戳的记录时,这可能很有用,例如使用各种文件类型或其他默认将所有时间戳设置为无限过去的源。

请注意,时间戳只能向前设置,因为向后设置它可能不会导致它阻止已提前的水印,数据可能会变得可丢弃的延迟。

支持的语言:通用,javascript,python

配置

用法

type: AssignTimestamps
input: ...
config:
  timestamp: timestamp
  language: "language"
  error_handling:
    output: "output"

组合

对共享公共字段的记录进行分组和组合。

内置组合函数为 summaxminallanymeancountgroupconcat,但也可以使用自定义聚合函数。

另请参阅有关 YAML 聚合 的文档。

支持的语言:calcite、通用、javascript、python、sql

配置

用法

type: Combine
input: ...
config:
  group_by:
  - "group_by"
  - "group_by"
  - ...
  combine:
    a:
      a: combine_value_a_value_a
      b: combine_value_a_value_b
      c: ...
    b:
      a: combine_value_b_value_a
      b: combine_value_b_value_b
      c: ...
    c: ...
  language: "language"

创建

创建一个包含指定元素集的集合。

此变换始终生成带模式的数据。例如

type: Create
config:
  elements: [1, 2, 3]

将导致一个输出,其中包含三个元素,其模式为 Row(element=int),而 YAML/JSON 风格的映射将直接解释为 Beam 行,例如

type: Create
config:
  elements:
     - {first: 0, second: {str: "foo", values: [1, 2, 3]}}
     - {first: 1, second: {str: "bar", values: [4, 5, 6]}}

将导致一个模式,其形式为 (int, Row(string, List[int]))。

这也可以表示为 YAML

type: Create
config:
  elements:
    - first: 0
      second:
        str: "foo"
         values: [1, 2, 3]
    - first: 1
      second:
        str: "bar"
         values: [4, 5, 6]

配置

用法

type: Create
config:
  elements:
  - element
  - element
  - ...
  reshuffle: true|false

展开

展开(也称为取消嵌套/扁平化)一个或多个字段,生成多行。

给定一个或多个可迭代类型字段,产生多行,每行对应于该字段的每个值。例如,('a', [1, 2, 3]) 形式的行将在第二个字段上展开为 ('a', 1)('a', 2')('a', 3)

这类似于将 FlatMap 与 MapToFields 变换配对时的 FlatMap

查看有关 YAML 映射函数 的更完整文档。

配置

用法

type: Explode
input: ...
config:
  fields: fields
  cross_product: true|false
  error_handling:
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...

过滤

仅保留满足给定条件的记录。

查看有关 YAML 过滤 的更完整文档。

支持的语言:calcite、通用、java、javascript、python、sql

配置

用法

type: Filter
input: ...
config:
  keep: keep
  language: "language"
  error_handling:
    output: "output"

扁平化

将多个 PCollection 扁平化为单个 PCollection。

结果 PCollection 的元素将是所有输入元素的(不相交)并集。

请注意,YAML 变换始终可以接受一个输入列表,该列表将被隐式扁平化。

配置

没有配置参数。

用法

type: Flatten
input: ...
config: ...

连接

使用指定的条件连接两个或多个输入。

例如

type: Join
input:
  input1: SomeTransform
  input2: AnotherTransform
  input3: YetAnotherTransform
config:
  type: inner
  equalities:
    - input1: colA
      input2: colB
    - input2: colX
      input3: colY
  fields:
    input1: [colA, colB, colC]
    input2: {new_name: colB}

将对三个输入执行内连接,满足约束条件 input1.colA = input2.colBinput2.colX = input3.colY,发出包含 colAcolBcolC 的行来自 input1input2.colB 的值作为名为 new_name 的字段,以及 input3 中的所有字段。

配置

用法

type: Join
input: ...
config:
  equalities: equalities
  type: type
  fields:
    a: fields_value_a
    b: fields_value_b
    c: ...

测试日志记录

记录其输入 PCollection 的每个元素。

此变换的输出是其输入的副本,便于在链式管道中使用。

配置

用法

type: LogForTesting
input: ...
config:
  level: "level"
  prefix: "prefix"

ML 变换

配置

用法

type: MLTransform
input: ...
config:
  write_artifact_location: "write_artifact_location"
  read_artifact_location: "read_artifact_location"
  transforms:
  - transforms
  - transforms
  - ...

映射到字段

创建具有根据输入字段定义的新字段的记录。

查看有关 YAML 映射函数 的更完整文档。

支持的语言:calcite、通用、java、javascript、python、sql

配置

用法

type: MapToFields
input: ...
config:
  language: language
  error_handling:
    output: "output"
  mapping_args: mapping_args

分区

将输入拆分为几个不同的输出。

每个输入元素将根据 by 配置参数中给出的字段或函数转到不同的输出。

支持的语言:通用,javascript,python

配置

用法

type: Partition
input: ...
config:
  by: by
  outputs:
  - "outputs"
  - "outputs"
  - ...
  unknown_output: "unknown_output"
  error_handling:
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...
  language: "language"

PyTransform

由完全限定名称标识的 Python PTransform。

这允许导入、构造和应用任何 Beam Python 变换。这对于使用尚未通过 YAML 接口公开的变换很有用。但是请注意,如果此变换不接受或生成 Beam 行,则可能需要进行转换。

例如

type: PyTransform
config:
   constructor: apache_beam.pkg.mod.SomeClass
   args: [1, 'foo']
   kwargs:
     baz: 3

可用于访问变换 apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)

另请参阅有关 内联 Python 的文档。

配置

用法

type: PyTransform
input: ...
config:
  constructor: "constructor"
  args:
  - arg
  - arg
  - ...
  kwargs:
    a: kwargs_value_a
    b: kwargs_value_b
    c: ...

SQL

配置

用法

type: Sql
input: ...
config: ...

窗口化

一个窗口变换,将窗口分配给 PCollection 的每个元素。

分配的窗口将影响所有下游聚合操作,这些操作将根据窗口以及键进行聚合。

查看 有关窗口化的 Beam 文档 以了解更多详细信息。

大小、偏移量、周期和间隙(如果适用)必须使用时间单位后缀 'ms'、's'、'm'、'h' 或 'd' 分别定义毫秒、秒、分钟、小时或天。如果未指定时间单位,则默认为 's'。

例如

windowing:
   type: fixed
   size: 30s

请注意,任何 Yaml 变换都可以具有 窗口参数,该参数应用于其输入(如果有)或输出(如果没有输入),这意味着通常不需要显式 WindowInto 操作。

配置

用法

type: WindowInto
input: ...
config:
  windowing: windowing

从 Avro 读取

一个用于从 avro 文件读取记录的 PTransform

结果 PCollection 的每个记录将包含从源读取的单个记录。类型简单的记录将被映射到具有包含记录值的单个 record 字段的 beam 行。类型为 Avro RECORD 的记录将被映射到符合包含这些记录的 Avro 文件中包含的模式的 Beam 行。

配置

用法

type: ReadFromAvro
config:
  path: path

写入 Avro

一个用于写入 avro 文件的 PTransform

如果输入数据具有模式,则会自动生成相应的 Avro 模式,并用于写入输出记录。

配置

用法

type: WriteToAvro
input: ...
config:
  path: path

从 BigQuery 读取

从 BigQuery 读取数据。

必须设置 table 或 query 中的其中一个。如果设置了 query,则不应该设置 row_restriction 或 fields。

配置

用法

type: ReadFromBigQuery
config:
  table: "table"
  query: "query"
  row_restriction: "row_restriction"
  fields:
  - "field"
  - "field"
  - ...

写入 BigQuery

配置

用法

type: WriteToBigQuery
input: ...
config: ...

从 CSV 读取

用于将逗号分隔值 (csv) 文件读取到 PCollection 的 PTransform。

配置

用法

type: ReadFromCsv
config:
  path: "path"
  delimiter: delimiter
  comment: comment

写入 CSV

用于将带模式的 PCollection 作为 (一组) 逗号分隔值 (csv) 文件写入的 PTransform。

配置

用法

type: WriteToCsv
input: ...
config:
  path: "path"
  delimiter: delimiter

从 JDBC 读取

配置

用法

type: ReadFromJdbc
config: ...

写入 JDBC

配置

用法

type: WriteToJdbc
input: ...
config: ...

从 JSON 读取

用于将 JSON 值从文件读取到 PCollection 的 PTransform。

配置

用法

type: ReadFromJson
config:
  path: "path"

写入 JSON

用于将 PCollection 作为 JSON 值写入文件的 PTransform。

配置

用法

type: WriteToJson
input: ...
config:
  path: "path"

从 Kafka 读取

配置

用法

type: ReadFromKafka
config: ...

写入 Kafka

配置

用法

type: WriteToKafka
input: ...
config: ...

从 MySQL 读取

配置

用法

type: ReadFromMySql
config: ...

写入 MySQL

配置

用法

type: WriteToMySql
input: ...
config: ...

从 Oracle 读取

配置

用法

type: ReadFromOracle
config: ...

写入 Oracle

配置

用法

type: WriteToOracle
input: ...
config: ...

从 Parquet 读取

用于读取 Parquet 文件的 PTransform

配置

用法

type: ReadFromParquet
config:
  path: path

写入 Parquet

用于写入 Parquet 文件的 PTransform

配置

用法

type: WriteToParquet
input: ...
config:
  path: path

从 PostgreSQL 读取

配置

用法

type: ReadFromPostgres
config: ...

写入 PostgreSQL

配置

用法

type: WriteToPostgres
input: ...
config: ...

从 Pub/Sub 读取

从 Cloud Pub/Sub 读取消息。

配置

用法

type: ReadFromPubSub
config:
  topic: "topic"
  subscription: "subscription"
  format: "format"
  schema: schema
  attributes:
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

写入 Pub/Sub

将消息写入 Cloud Pub/Sub。

配置

用法

type: WriteToPubSub
input: ...
config:
  topic: "topic"
  format: "format"
  schema: schema
  attributes:
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

从 Pub/Sub Lite 读取

配置

用法

type: ReadFromPubSubLite
config: ...

写入 Pub/Sub Lite

配置

用法

type: WriteToPubSubLite
input: ...
config: ...

从 Spanner 读取

配置

用法

type: ReadFromSpanner
config: ...

写入 Spanner

配置

用法

type: WriteToSpanner
input: ...
config: ...

从 SQL Server 读取

配置

用法

type: ReadFromSqlServer
config: ...

写入 SQL Server

配置

用法

type: WriteToSqlServer
input: ...
config: ...

从文本读取

从文本文件读取行。

生成的 PCollection 包含具有单个字符串字段的行,名为 "line"。

配置

用法

type: ReadFromText
config:
  path: "path"

写入文本

将 PCollection 写入 (一组) 文本文件。

输入必须是 PCollection,其模式正好具有一个字段。

配置

用法

type: WriteToText
input: ...
config:
  path: "path"