ParDo
Pydoc |
用于通用并行处理的转换。ParDo 转换会考虑输入PCollection 中的每个元素,对该元素执行一些处理函数(您的用户代码),并将零个或多个元素输出到输出PCollection。
请参阅 Beam 编程指南 中的更多信息。
示例
在以下示例中,我们将探讨如何创建自定义DoFn 并访问时间戳和窗口信息。
示例 1:使用简单 DoFn 的 ParDo
以下示例定义了一个名为SplitWords 的简单DoFn 类,它将delimiter 存储为对象字段。process 方法对每个元素调用一次,它可以生成零个或多个输出元素。
示例 2:使用时间戳和窗口信息的 ParDo
在此示例中,我们在process 方法中添加了新参数以在运行时绑定参数值。
beam.DoFn.TimestampParam将时间戳信息绑定为apache_beam.utils.timestamp.Timestamp对象。beam.DoFn.WindowParam将窗口信息绑定为适当的apache_beam.transforms.window.*Window对象。
示例 3:使用 DoFn 方法的 ParDo
DoFn 可以使用多种方法进行自定义,这些方法可以帮助创建更复杂的行为。您可以使用setup 和teardown 自定义工作器启动和关闭时执行的操作。您还可以使用start_bundle 和finish_bundle 自定义在 元素捆绑包 启动和结束时执行的操作。
DoFn.setup():在工作器上反序列化DoFn实例时调用。这意味着它可以多次调用,因为可能创建了给定DoFn子类的多个实例(例如,由于并行化,或由于长时间未被使用后进行垃圾回收)。这是连接到数据库实例、打开网络连接或其他资源的好地方。另请参阅DoFn.SetupContextParam,了解如何通过上下文管理器完成此操作。DoFn.start_bundle():在对捆绑包的第一个元素调用process之前,对每个元素捆绑包调用一次。这是开始跟踪捆绑包元素的好地方。另请参阅DoFn.BundleContextParam,了解如何通过上下文管理器完成此操作。DoFn.process(element, *args, **kwargs):对每个元素调用一次,可以生成零个或多个元素。附加的*args或**kwargs可以通过beam.ParDo()传递。[必需]DoFn.finish_bundle():在对捆绑包的最后一个元素调用process之后,对每个元素捆绑包调用一次,可以生成零个或多个元素。这是对元素捆绑包执行批量调用(例如运行数据库查询)的好地方。例如,您可以在
start_bundle中初始化一个批处理,在process中将元素添加到批处理中(而不是生成它们),然后在finish_bundle上对这些元素运行批处理查询,并生成所有结果。请注意,从
finish_bundle生成的元素必须是apache_beam.utils.windowed_value.WindowedValue类型。您需要提供一个时间戳作为 Unix 时间戳,您可以从相关处理的元素中获取。您还需要提供一个窗口,您可以从相关处理的元素中获取,如下面的示例所示。DoFn.teardown():当DoFn实例关闭时,对每个DoFn实例调用一次(作为最佳努力)。这是关闭数据库实例、关闭网络连接或其他资源的好地方。请注意,
teardown是作为最佳努力调用的,并且没有保证。例如,如果工作器崩溃,则可能不会调用teardown。
已知问题
- [问题 19394]
DoFn.teardown()指标丢失。
相关转换
Pydoc |
上次更新时间:2024/10/31
您是否找到了您要查找的所有内容?
这些内容是否有用且清晰?您想更改任何内容吗?请告诉我们!


Pydoc