Beam YAML 错误处理

随着管道规模的扩大,遇到“异常”数据(格式错误、未处理适当的先决条件或在处理过程中出现故障)的情况越来越普遍。通常,任何此类记录都会导致管道永久失败,但通常希望允许管道继续运行,将错误记录重定向到另一条路径以进行特殊处理,或者简单地记录它们以供以后脱机分析。这通常称为“死信队列”模式。

如果转换支持带有 error_handling 配置参数的 output 字段,则 Beam YAML 对这种模式有特殊支持。output 参数是一个名称,必须作为另一个将处理错误的转换的输入进行引用(例如,通过将其写入)。例如,以下代码将所有“正常”处理的记录写入一个文件,并将所有“错误”记录以及有关遇到什么错误的元数据写入另一个文件。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output

    - type: WriteToJson
      input: MapToFields
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      input: MapToFields.my_error_output
      config:
        path: /path/to/errors.json

请注意,如果声明了 error_handling,则 MapToFields.my_error_output 必须被使用;忽略它将是一个错误。任何使用方式都可以,例如,将错误记录记录到标准输出就足够了(尽管不建议在健壮的管道中这样做)。

还要注意,错误输出的确切格式仍在最终确定中。它们可以安全地打印和写入输出,但它们的确切模式可能会在 Beam 的未来版本中更改,并且目前不应依赖于它们。目前,它至少有一个 element 字段,该字段包含导致错误的元素。

某些转换允许在它们的 error_handling 配置中使用额外的参数,例如,对于 Python 函数,可以提供一个 threshold,它限制可以为错误记录的相对数量,超过该数量将导致整个管道失败

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output
          # If more than 10% of records throw an error, stop the pipeline.
          threshold: 0.1

    - type: WriteToJson
      input: MapToFields
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      input: MapToFields.my_error_output
      config:
        path: /path/to/errors.json

如果需要,可以对这些失败的记录进行任意进一步处理,例如:

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      name: ComputeRatio
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output

    - type: MapToFields
      name: ComputeRatioForBadRecords
      input: ComputeRatio.my_error_output
      config:
        language: python
        fields:
          col1: col1
          ratio: col2 / (col3 + 1)
        error_handling:
          output: still_bad

    - type: WriteToJson
      # Takes as input everything from the "success" path of both transforms.
      input: [ComputeRatio, ComputeRatioForBadRecords]
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      # These failed the first and the second transform.
      input: ComputeRatioForBadRecords.still_bad
      config:
        path: /path/to/errors.json

当使用 chain 语法时,所需的错误使用可以在 extra_transforms 块中发生。

pipeline:
  type: chain
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      name: SomeStep
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: errors

    - type: MapToFields
      name: AnotherStep
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          inverse_ratio: 1 / ratio
        error_handling:
          output: errors

    - type: WriteToJson
      config:
        path: /path/to/output.json

  extra_transforms:
    - type: WriteToJson
      name: WriteErrors
      input: [SomeStep.errors, AnotherStep.errors]
      config:
        path: /path/to/errors.json