ParDo
Pydoc |
用于通用并行处理的转换。ParDo
转换会考虑输入PCollection
中的每个元素,对该元素执行一些处理函数(您的用户代码),并将零个或多个元素输出到输出PCollection
。
请参阅 Beam 编程指南 中的更多信息。
示例
在以下示例中,我们将探讨如何创建自定义DoFn
并访问时间戳和窗口信息。
示例 1:使用简单 DoFn 的 ParDo
以下示例定义了一个名为SplitWords
的简单DoFn
类,它将delimiter
存储为对象字段。process
方法对每个元素调用一次,它可以生成零个或多个输出元素。
示例 2:使用时间戳和窗口信息的 ParDo
在此示例中,我们在process
方法中添加了新参数以在运行时绑定参数值。
beam.DoFn.TimestampParam
将时间戳信息绑定为apache_beam.utils.timestamp.Timestamp
对象。beam.DoFn.WindowParam
将窗口信息绑定为适当的apache_beam.transforms.window.*Window
对象。
示例 3:使用 DoFn 方法的 ParDo
DoFn
可以使用多种方法进行自定义,这些方法可以帮助创建更复杂的行为。您可以使用setup
和teardown
自定义工作器启动和关闭时执行的操作。您还可以使用start_bundle
和finish_bundle
自定义在 元素捆绑包 启动和结束时执行的操作。
DoFn.setup()
:在工作器上反序列化DoFn
实例时调用。这意味着它可以多次调用,因为可能创建了给定DoFn
子类的多个实例(例如,由于并行化,或由于长时间未被使用后进行垃圾回收)。这是连接到数据库实例、打开网络连接或其他资源的好地方。另请参阅DoFn.SetupContextParam
,了解如何通过上下文管理器完成此操作。DoFn.start_bundle()
:在对捆绑包的第一个元素调用process
之前,对每个元素捆绑包调用一次。这是开始跟踪捆绑包元素的好地方。另请参阅DoFn.BundleContextParam
,了解如何通过上下文管理器完成此操作。DoFn.process(element, *args, **kwargs)
:对每个元素调用一次,可以生成零个或多个元素。附加的*args
或**kwargs
可以通过beam.ParDo()
传递。[必需]DoFn.finish_bundle()
:在对捆绑包的最后一个元素调用process
之后,对每个元素捆绑包调用一次,可以生成零个或多个元素。这是对元素捆绑包执行批量调用(例如运行数据库查询)的好地方。例如,您可以在
start_bundle
中初始化一个批处理,在process
中将元素添加到批处理中(而不是生成它们),然后在finish_bundle
上对这些元素运行批处理查询,并生成所有结果。请注意,从
finish_bundle
生成的元素必须是apache_beam.utils.windowed_value.WindowedValue
类型。您需要提供一个时间戳作为 Unix 时间戳,您可以从相关处理的元素中获取。您还需要提供一个窗口,您可以从相关处理的元素中获取,如下面的示例所示。DoFn.teardown()
:当DoFn
实例关闭时,对每个DoFn
实例调用一次(作为最佳努力)。这是关闭数据库实例、关闭网络连接或其他资源的好地方。请注意,
teardown
是作为最佳努力调用的,并且没有保证。例如,如果工作器崩溃,则可能不会调用teardown
。
已知问题
- [问题 19394]
DoFn.teardown()
指标丢失。
相关转换
Pydoc |
上次更新时间:2024/10/31
您是否找到了您要查找的所有内容?
这些内容是否有用且清晰?您想更改任何内容吗?请告诉我们!