Apache Beam YAML 连接

Beam YAML 可以根据指定的列连接两个或多个输入。例如,以下管道在第一个输入的 col1 等于第二个输入的 col2 时连接第一个输入 pcollection 和第二个输入 pcollection。

- type: Join
  input:
    input1: First Input
    input2: Second Input
  config:
    equalities:
      - input1: col1
        input2: col2

当在所有输入中命名相同的列上连接多个输入时,可以使用以下简写语法

- type: Join
  input:
    input1: First Input
    input2: Second Input
    input3: Third Input
  config:
    equalities: col1

连接类型

当使用连接转换时,可以指定要对输入执行的连接类型。如果没有指定连接类型,则所有输入都将使用内部连接连接。支持的连接类型为

连接类型YAML 关键字
内部连接inner
完全外部连接left
右外部连接right

以下示例使用指定等式上的内部连接连接两个输入

- type: Join
  input:
    input1: First Input
    input2: Second Input
  config:
    type: inner
    equalities:
      - input1: col1
        input2: col1

以下示例使用指定等式上的左外部连接连接两个输入。在这种情况下,将保留输入 1 中的所有行,因为输入 1 是左输入。连接的顺序遵循等式中指定的顺序。

- type: Join
  input:
    input1: First Input
    input2: Second Input
  config:
    type: left
    equalities:
      - input1: col1
        input2: col1

以下示例使用指定等式上的完全外部连接连接三个输入

- type: Join
  input:
    input1: First Input
    input2: Second Input
    input3: Third Input
  config:
    type: outer
    equalities:
      - input1: col1
        input2: col1
      - input2: col2
        input3: col2

如果您想要连接类型的组合,您可以指定要外部连接的输入。以下示例使用右外部连接连接输入 1 和输入 2,因为输入 2 位于右侧,并将使用左外部连接连接输入 2 和输入 3,因为输入 2 位于左侧。

- type: Join
  input:
    input1: First Input
    input2: Second Input
    input3: Third Input
  config:
    type:
      outer:
        - input2
    equalities:
      - input1: col1
        input2: col1
      - input2: col2
        input3: col2

字段

默认情况下,连接转换将包含所有输入表中的所有列。如果列名冲突,最好明确地重命名它们。否则,系统将通过添加数字后缀来对名称进行重复数据删除

要选择要输出的列,或自定义输出列名,请使用“fields”配置。

要指定要从输入中输出的列,请使用输入引用作为配置键,并使用所需的列列表作为配置值。以下示例输出输入 1 中的 col1,输入 2 中的 col2 和 col3,以及输入 3 中的所有列。如果存在名称冲突,它会在名称末尾添加数字后缀以避免重复命名。

- type: Join
  input:
    input1: First Input
    input2: Second Input
    input3: Third Input
  config:
    equalities: col1
    fields:
      input1: [col1]
      input2: [col2, col3]

要重命名输出中的列,请为输入创建映射,其中键为新列名,值为原始列名。以下示例将输入 3 中的 col1 映射到列名“renamed_col1”

- type: Join
  input:
    input1: First Input
    input2: Second Input
    input3: Third Input
  config:
    equalities: col1
    fields:
      input1: [col1]
      input2: [col2, col3]
      input3:
        renamed_col1: col1