为 Python 开发 I/O 连接器

重要:请使用 Splittable DoFn 来开发您的新 I/O。有关更多详细信息,请阅读 新的 I/O 连接器概述

要连接到 Beam 现有 I/O 连接器不支持的数据存储,您必须创建一个自定义 I/O 连接器,该连接器通常包含一个源和一个接收器。所有 Beam 源和接收器都是复合转换;但是,自定义 I/O 的实现取决于您的用例。在开始之前,请阅读 新的 I/O 连接器概述,了解开发新 I/O 连接器的概述、可用的实现选项以及如何为您的用例选择合适的选项。

本指南涵盖了使用 Source 和 FileBasedSink 接口 来操作 Python。Java SDK 提供了相同的功能,但使用略微不同的 API。有关 Java SDK 的特定信息,请参阅 为 Java 开发 I/O 连接器

基本代码要求

Beam 运行器使用您提供的类来使用多个工作程序实例并行读取和/或写入数据。因此,您为 SourceFileBasedSink 子类提供的代码必须满足一些基本要求

  1. 可序列化性:您的 SourceFileBasedSink 子类必须是可序列化的。服务可能会创建您 SourceFileBasedSink 子类的多个实例,并将它们发送到多个远程工作程序,以促进并行读取或写入。源和接收器对象序列化的方式是运行器特定的。

  2. 不可变性:您的 SourceFileBasedSink 子类必须是不可变的。如果您使用的是延迟计算昂贵计算的延迟评估,那么您应该仅在 SourceFileBasedSink 子类中使用可变状态,以实现该源。

  3. 线程安全:您的代码必须是线程安全的。Beam SDK for Python 提供了 RangeTracker 类,使这变得更容易。

  4. 可测试性:全面地对所有 SourceFileBasedSink 子类进行单元测试至关重要。轻微的实现错误会导致数据损坏或数据丢失(例如跳过或重复记录),而这些错误可能难以检测。您可以使用 source_test_utils 模块 中提供的测试工具和实用程序方法来为您的源开发测试。

此外,请参阅 PTransform 风格指南,了解 Beam 的转换风格指南。

实现 Source 接口

要为您的管道创建一个新的数据源,您需要提供特定于格式的逻辑,该逻辑告诉服务如何从您的输入源读取数据,以及如何将您的数据源拆分为多个部分,以便多个工作程序实例可以并行读取您的数据。

通过创建以下类来提供您的新源的逻辑

您可以在 apache_beam.io.iobase 模块 中找到这些类。

实现 BoundedSource 子类

BoundedSource 代表一个有限的数据集,服务可以从中读取,可能并行读取。BoundedSource 包含一组方法,服务使用这些方法将数据集拆分为多个部分,以便多个远程工作程序读取。

要实现 BoundedSource,您的子类必须覆盖以下方法

实现 RangeTracker 子类

RangeTracker 是一个线程安全的对象,用于管理 BoundedSource 阅读器的当前范围和当前位置,并保护对它们的并发访问。

要实现 RangeTracker,您应该首先熟悉以下定义

RangeTracker 方法

为了实现 RangeTracker,您的子类必须覆盖以下方法

此方法将当前范围 [self.start_position, self.stop_position) 拆分为“主要”部分 [self.start_position, split_position) 和“剩余”部分 [split_position, self.stop_position),假设 split_position 尚未使用。

如果 split_position 已经使用,则该方法返回 None。否则,它将当前范围更新为主要部分,并返回一个元组 (split_position, split_fraction)。split_fraction 应该是范围 [self.start_position, split_position) 的大小与原始(拆分前)范围 [self.start_position, self.stop_position) 的大小之比。

注意:iobase.RangeTracker 的方法可能由多个线程调用,因此此类必须使其线程安全,例如,通过使用单个锁对象。

便利 Source 基类

Beam SDK for Python 包含一些方便的抽象基类,可帮助您轻松创建新的源。

FileBasedSource

FileBasedSource 是一个框架,用于开发针对新文件类型进行源开发。您可以从 FileBasedSource 类派生您的 BoundedSource 类。

若要为新文件类型创建源,您需要创建一个 FileBasedSource 的子类。FileBasedSource 的子类必须实现 FileBasedSource.read_records() 方法。

有关 FileBasedSource 的示例实现,请参阅 AvroSource

从新的 Source 读取

以下示例 CountingSource 演示了 BoundedSource 的实现,并使用 SDK 提供的称为 OffsetRangeTrackerRangeTracker

class CountingSource(iobase.BoundedSource):
  def __init__(self, count):
    self.records_read = Metrics.counter(self.__class__, 'recordsRead')
    self._count = count

  def estimate_size(self):
    return self._count

  def get_range_tracker(self, start_position, stop_position):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    return OffsetRangeTracker(start_position, stop_position)

  def read(self, range_tracker):
    for i in range(range_tracker.start_position(),
                   range_tracker.stop_position()):
      if not range_tracker.try_claim(i):
        return
      self.records_read.inc()
      yield i

  def split(self, desired_bundle_size, start_position=None, stop_position=None):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    bundle_start = start_position
    while bundle_start < stop_position:
      bundle_stop = min(stop_position, bundle_start + desired_bundle_size)
      yield iobase.SourceBundle(
          weight=(bundle_stop - bundle_start),
          source=self,
          start_position=bundle_start,
          stop_position=bundle_stop)
      bundle_start = bundle_stop

若要从管道中的源读取数据,请使用 Read 变换

with beam.Pipeline() as pipeline:
  numbers = pipeline | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))

注意:当您创建最终用户将使用的源时,我们建议您不要像上面示例中那样公开源本身的代码。而是使用包装 PTransformPTransform 包装器 讨论了为什么您应该避免公开源,并逐步介绍了如何创建包装器。

使用 FileBasedSink 抽象

如果您的数据源使用文件,则可以实现 FileBasedSink 抽象来创建基于文件的接收器。对于其他接收器,请使用 ParDoGroupByKey 和 Beam SDK for Python 提供的其他变换。有关更多详细信息,请参阅 开发 I/O 连接器概述

使用 FileBasedSink 接口时,您必须提供特定于格式的逻辑,告诉运行器如何将管道中的 PCollection 的有界数据写入输出接收器。运行器使用多个工作程序并行写入数据包。

通过实现以下类来提供基于文件的接收器的逻辑

FileBasedSink 抽象基类实现了 Beam 接收器(与文件交互的 Beam 接收器)的公共代码,包括

FileBasedSink 及其子类支持将文件写入任何 Beam 支持的 FileSystem 实现。有关示例,请参阅以下 Beam 提供的 FileBasedSink 实现

PTransform 包装器

当您创建最终用户将使用的源或接收器时,请避免公开您的源或接收器代码。为了避免将您的源和接收器公开给最终用户,您的新类应该使用 _ 前缀。然后,实现面向用户的包装器 PTransform。`通过将源或接收器公开为变换,您的实现将隐藏,并且可以任意复杂或简单。不公开实现细节的最大好处是,以后您可以添加额外功能,而不会破坏用户现有的实现。

例如,如果您的用户管道使用 beam.io.Read 从您的源读取数据,而您想在管道中插入重新分片,则所有用户都需要自己添加重新分片(使用 GroupByKey 变换)。为了解决这个问题,我们建议您将源公开为执行读取操作和重新分片操作的复合 PTransform

有关使用 PTransform 进行包装的更多信息,请参阅 Beam 的 PTransform 样式指南

以下示例更改了上面部分中的源和接收器,以便它们不会公开给最终用户。对于源,将 CountingSource 重命名为 _CountingSource。然后,创建包装器 PTransform,称为 ReadFromCountingSource

class ReadFromCountingSource(PTransform):
  def __init__(self, count):
    super().__init__()
    self._count = count

  def expand(self, pcoll):
    return pcoll | iobase.Read(_CountingSource(self._count))

最后,从源读取

with beam.Pipeline() as pipeline:
  numbers = pipeline | 'ProduceNumbers' >> ReadFromCountingSource(count)

对于接收器,将 SimpleKVSink 重命名为 _SimpleKVSink。然后,创建包装器 PTransform,称为 WriteToKVSink

class WriteToKVSink(PTransform):
  def __init__(self, simplekv, url, final_table_name):
    self._simplekv = simplekv
    super().__init__()
    self._url = url
    self._final_table_name = final_table_name

  def expand(self, pcoll):
    return pcoll | iobase.Write(
        _SimpleKVSink(self._simplekv, self._url, self._final_table_name))

最后,写入接收器

with beam.Pipeline(options=PipelineOptions()) as pipeline:
  kvs = pipeline | 'CreateKVs' >> beam.core.Create(KVs)
  kvs | 'WriteToSimpleKV' >> WriteToKVSink(
      simplekv, 'http://url_to_simple_kv/', final_table_name)