从 YAML 使用 PyTransform

Beam YAML 提供了通过 PyTransform 类型轻松调用 Python 转换的能力,只需按完全限定名称引用它们。例如,

- type: PyTransform
  config:
    constructor: apache_beam.pkg.module.SomeTransform
    args: [1, 'foo']
    kwargs:
       baz: 3

将调用转换 apache_beam.pkg.mod.SomeTransform(1, 'foo', baz=3)。此完全限定名称可以是任何 PTransform 类或返回 PTransform 的其他可调用对象。但是请注意,不接受或不返回带模式数据的 PTransform 可能不适合从 YAML 使用。可以通过在 MapToFields 上使用 callable 选项来恢复非模式返回转换后的模式性,该选项将整个元素作为输入,例如

- type: PyTransform
  config:
    constructor: apache_beam.pkg.module.SomeTransform
    args: [1, 'foo']
    kwargs:
       baz: 3
- type: MapToFields
  config:
    language: python
    fields:
      col1:
        callable: 'lambda element: element.col1'
        output_type: string
      col2:
        callable: 'lambda element: element.col2'
        output_type: integer

这可以用来在 Beam SDK 中调用任意转换,例如

pipeline:
  transforms:
    - type: PyTransform
      name: ReadFromTsv
      input: {}
      config:
        constructor: apache_beam.io.ReadFromCsv
        kwargs:
           path: '/path/to/*.tsv'
           sep: '\t'
           skip_blank_lines: True
           true_values: ['yes']
           false_values: ['no']
           comment: '#'
           on_bad_lines: 'skip'
           binary: False
           splittable: False

使用 __constructor__ 在行内定义转换

如果所需的转换不存在,也可以在行内定义它。这是使用特殊的 __constructor__ 关键字完成的,类似于跨语言转换的方式。

使用 __constuctor__ 关键字,定义一个 Python 可调用对象,该对象在调用时 *返回* 所需的转换。第一个参数(如果存在位置参数,则为 source 关键字参数)被解释为 Python 代码。例如

- type: PyTransform
  config:
    constructor: __constructor__
    kwargs:
      source: |
        def create_my_transform(inc):
          return beam.Map(lambda x: beam.Row(a=x.col2 + inc))

      inc: 10

将对传入的 PCollection 应用 beam.Map(lambda x: beam.Row(a=x.col2 + 10))

由于类对象可以作为其自身的构造函数被调用,因此这允许在行内定义一个 beam.PTransform,例如

- type: PyTransform
  config:
    constructor: __constructor__
    kwargs:
      source: |
        class MyPTransform(beam.PTransform):
          def __init__(self, inc):
            self._inc = inc
          def expand(self, pcoll):
            return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + self._inc))

      inc: 10

这与预期的一样工作。

使用 __callable__ 在行内定义转换

__callable__ 关键字的工作方式类似,但不是定义一个返回适用的 PTransform 的可调用对象,而是简单地定义要执行的扩展作为可调用对象。这类似于 BeamPython 的 ptransform.ptransform_fn 装饰器。

在这种情况下,您可以简单地编写

- type: PyTransform
  config:
    constructor: __callable__
    kwargs:
      source: |
        def my_ptransform(pcoll, inc):
          return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + inc))

      inc: 10

外部转换

您还可以通过 python 提供程序调用在其他地方定义的 PTransform,例如

pipeline:
  transforms:
    - ...
    - type: MyTransform
      config:
        kwarg: whatever

providers:
  - ...
  - type: python
    input: ...
    config:
      packages:
        - 'some_pypi_package>=version'
    transforms:
      MyTransform: 'pkg.module.MyTransform'

这些也可以在行内定义,无论是否有依赖项,例如

pipeline:
  transforms:
    - ...
    - type: ToCase
      input: ...
      config:
        upper: True

providers:
  - type: python
    config: {}
    transforms:
      'ToCase': |
        @beam.ptransform_fn
        def ToCase(pcoll, upper):
          if upper:
            return pcoll | beam.Map(lambda x: str(x).upper())
          else:
            return pcoll | beam.Map(lambda x: str(x).lower())