Beam YAML 错误处理
随着管道规模的扩大,遇到“异常”数据(格式错误、未处理适当的先决条件或在处理过程中出现故障)的情况越来越普遍。通常,任何此类记录都会导致管道永久失败,但通常希望允许管道继续运行,将错误记录重定向到另一条路径以进行特殊处理,或者简单地记录它们以供以后脱机分析。这通常称为“死信队列”模式。
如果转换支持带有 error_handling
配置参数的 output
字段,则 Beam YAML 对这种模式有特殊支持。output
参数是一个名称,必须作为另一个将处理错误的转换的输入进行引用(例如,通过将其写入)。例如,以下代码将所有“正常”处理的记录写入一个文件,并将所有“错误”记录以及有关遇到什么错误的元数据写入另一个文件。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
- type: WriteToJson
input: MapToFields
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
input: MapToFields.my_error_output
config:
path: /path/to/errors.json
请注意,如果声明了 error_handling
,则 MapToFields.my_error_output
必须被使用;忽略它将是一个错误。任何使用方式都可以,例如,将错误记录记录到标准输出就足够了(尽管不建议在健壮的管道中这样做)。
还要注意,错误输出的确切格式仍在最终确定中。它们可以安全地打印和写入输出,但它们的确切模式可能会在 Beam 的未来版本中更改,并且目前不应依赖于它们。目前,它至少有一个 element
字段,该字段包含导致错误的元素。
某些转换允许在它们的 error_handling
配置中使用额外的参数,例如,对于 Python 函数,可以提供一个 threshold
,它限制可以为错误记录的相对数量,超过该数量将导致整个管道失败
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
# If more than 10% of records throw an error, stop the pipeline.
threshold: 0.1
- type: WriteToJson
input: MapToFields
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
input: MapToFields.my_error_output
config:
path: /path/to/errors.json
如果需要,可以对这些失败的记录进行任意进一步处理,例如:
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
name: ComputeRatio
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
- type: MapToFields
name: ComputeRatioForBadRecords
input: ComputeRatio.my_error_output
config:
language: python
fields:
col1: col1
ratio: col2 / (col3 + 1)
error_handling:
output: still_bad
- type: WriteToJson
# Takes as input everything from the "success" path of both transforms.
input: [ComputeRatio, ComputeRatioForBadRecords]
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
# These failed the first and the second transform.
input: ComputeRatioForBadRecords.still_bad
config:
path: /path/to/errors.json
当使用 chain
语法时,所需的错误使用可以在 extra_transforms
块中发生。
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
name: SomeStep
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: errors
- type: MapToFields
name: AnotherStep
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
inverse_ratio: 1 / ratio
error_handling:
output: errors
- type: WriteToJson
config:
path: /path/to/output.json
extra_transforms:
- type: WriteToJson
name: WriteErrors
input: [SomeStep.errors, AnotherStep.errors]
config:
path: /path/to/errors.json