为 Python 开发 I/O 连接器
重要:请使用 Splittable DoFn
来开发您的新 I/O。有关更多详细信息,请阅读 新的 I/O 连接器概述。
要连接到 Beam 现有 I/O 连接器不支持的数据存储,您必须创建一个自定义 I/O 连接器,该连接器通常包含一个源和一个接收器。所有 Beam 源和接收器都是复合转换;但是,自定义 I/O 的实现取决于您的用例。在开始之前,请阅读 新的 I/O 连接器概述,了解开发新 I/O 连接器的概述、可用的实现选项以及如何为您的用例选择合适的选项。
本指南涵盖了使用 Source 和 FileBasedSink 接口 来操作 Python。Java SDK 提供了相同的功能,但使用略微不同的 API。有关 Java SDK 的特定信息,请参阅 为 Java 开发 I/O 连接器。
基本代码要求
Beam 运行器使用您提供的类来使用多个工作程序实例并行读取和/或写入数据。因此,您为 Source
和 FileBasedSink
子类提供的代码必须满足一些基本要求
可序列化性:您的
Source
或FileBasedSink
子类必须是可序列化的。服务可能会创建您Source
或FileBasedSink
子类的多个实例,并将它们发送到多个远程工作程序,以促进并行读取或写入。源和接收器对象序列化的方式是运行器特定的。不可变性:您的
Source
或FileBasedSink
子类必须是不可变的。如果您使用的是延迟计算昂贵计算的延迟评估,那么您应该仅在Source
或FileBasedSink
子类中使用可变状态,以实现该源。线程安全:您的代码必须是线程安全的。Beam SDK for Python 提供了
RangeTracker
类,使这变得更容易。可测试性:全面地对所有
Source
和FileBasedSink
子类进行单元测试至关重要。轻微的实现错误会导致数据损坏或数据丢失(例如跳过或重复记录),而这些错误可能难以检测。您可以使用 source_test_utils 模块 中提供的测试工具和实用程序方法来为您的源开发测试。
此外,请参阅 PTransform 风格指南,了解 Beam 的转换风格指南。
实现 Source 接口
要为您的管道创建一个新的数据源,您需要提供特定于格式的逻辑,该逻辑告诉服务如何从您的输入源读取数据,以及如何将您的数据源拆分为多个部分,以便多个工作程序实例可以并行读取您的数据。
通过创建以下类来提供您的新源的逻辑
BoundedSource
的子类。BoundedSource
是读取有限数量的输入记录的源。该类描述了您要读取的数据,包括数据的存储位置和参数(例如要读取多少数据)。RangeTracker
的子类。RangeTracker
是一个线程安全的对象,用于管理给定位置类型的范围。- 一个或多个面向用户的包装复合转换(
PTransform
),它们包装读取操作。 PTransform 包装器 讨论了为什么应该避免公开您的源,并逐步介绍了如何创建包装器。
您可以在 apache_beam.io.iobase 模块 中找到这些类。
实现 BoundedSource 子类
BoundedSource
代表一个有限的数据集,服务可以从中读取,可能并行读取。BoundedSource
包含一组方法,服务使用这些方法将数据集拆分为多个部分,以便多个远程工作程序读取。
要实现 BoundedSource
,您的子类必须覆盖以下方法
estimate_size
:服务使用此方法来估计您的数据的总大小(以字节为单位)。此估计值是指外部存储大小,在执行解压缩或其他处理之前。split
:服务使用此方法将您的有限数据拆分为给定大小的捆绑包。get_range_tracker
:服务使用此方法为给定位置范围获取RangeTracker
,并使用该信息来报告进度并执行源的动态拆分。read
:此方法返回一个迭代器,该迭代器从源读取数据,并相对于给定RangeTracker
对象定义的边界。
实现 RangeTracker 子类
RangeTracker
是一个线程安全的对象,用于管理 BoundedSource
阅读器的当前范围和当前位置,并保护对它们的并发访问。
要实现 RangeTracker
,您应该首先熟悉以下定义
基于位置的源 - 基于位置的源可以由有序类型的范围来描述,而源读取的记录可以通过该类型的范围来描述。例如,对于文件中的记录,位置可以是记录的起始字节偏移量。在这种情况下,记录的位置类型是
long
。基于位置的源的主要要求是关联性:读取位置范围“'[A, B)'”中的记录和读取位置范围“'[B, C)'”中的记录应与读取位置范围“'[A, C)'”中的记录相同,其中 'A' <= 'B' <= 'C'。此属性确保无论将位置范围拆分为多少个任意子范围,它们描述的记录总数保持不变。
另一个重要属性是源的范围与源中记录的位置之间的关系。在许多源中,每条记录都可以通过唯一的起始位置来标识。在这种情况下
- 源“'[A, B)'”返回的所有记录必须在此范围内的起始位置。
- 除最后一条记录外,所有记录都应在此范围内结束。最后一条记录可能超出或不超出范围的末尾。
- 记录不得重叠。
此类源应将“读取“'[A, B)'”定义为“从第一个起始位置在 'A' 或之后,到第一个起始位置在 'B' 或之后但不包括该位置的第一个记录”。
此类源的一些示例包括从文本文件读取行或 CSV、从数据库读取键值等。
分割点的概念允许扩展定义以处理某些记录无法通过唯一的起始位置识别的源。
分割点 - 分割点描述从包括位置 A 到无穷大(即 [A, 无穷大))的范围读取时返回的第一个记录。
某些源可能包含无法直接寻址的记录。例如,想象一个文件格式,该格式由一系列压缩块组成。每个块都可以分配一个偏移量,但是块内的记录无法在不解压缩块的情况下直接寻址。让我们将此假设格式称为 CBF(压缩块格式)。
许多此类格式仍然可以满足关联性属性。例如,在 CBF 中,读取 [A, B) 可以表示“读取所有起始偏移量在 [A, B) 中的块中的所有记录”。
为了支持此类复杂格式,Beam 引入了分割点的概念。如果存在一个位置 A,使得在读取范围 [A, 无穷大)时,记录是第一个返回的记录,则该记录是一个分割点。在 CBF 中,唯一的分割点是每个块中的第一条记录。
分割点允许我们定义记录的位置和源的范围的含义,如下面的情况所示
- 对于位于分割点的记录,其位置被定义为最大的 A,使得使用范围 [A, 无穷大)读取源返回该记录。
- 其他记录的位置仅需非递减。
- 读取源 [A, B) 必须返回从第一个分割点在 A 或之后开始,到第一个分割点在 B 或之后但不包括该位置的分割点结束的记录。特别是,这意味着源返回的第一条记录 必须始终是一个分割点。
- 分割点的位置必须是唯一的。
因此,对于将源的完整范围分解为位置范围的任何分解,记录的总数将是源中的记录总数,并且每条记录将被精确读取一次。
已使用位置 - 已使用位置指的是已读取的记录。
随着源的读取,并将其读取的记录传递给管道中的下游转换,我们说源中的位置正在使用。当读取器读取了记录(或向调用者承诺将返回记录)时,直到包括记录的起始位置在内的位置都被视为已使用。
动态拆分只能在未使用的位置发生。如果读取器刚在文件中的偏移量 42 返回了一条记录,则动态拆分只能在偏移量 43 或之后发生。否则,该记录可能会被读取两次(由当前读取器和新任务的读取器)。
RangeTracker 方法
为了实现 RangeTracker
,您的子类必须覆盖以下方法
start_position
:返回当前范围的起始位置,包含在内。stop_position
:返回当前范围的结束位置,不包含在内。try_claim
:此方法用于确定分割点处的记录是否在范围内。此方法应通过将最后一个已使用的位置更新到给定的记录的起始position
来修改RangeTracker
的内部状态。如果给定的位置落在当前范围内,则该方法返回 true。set_current_position
:此方法将最后一个已使用的位置更新到源读取的记录的给定起始位置。您可以对不以分割点开始的记录调用此方法,这将修改RangeTracker
的内部状态。如果记录以分割点开始,则必须调用try_claim
而不是此方法。position_at_fraction
:给定范围 [0.0, 1.0) 中的某个小数,此方法将返回相对于位置范围 [self.start_position
,self.stop_position
) 的给定小数的位置。try_split
:此方法尝试围绕建议位置将当前范围拆分为两部分。它允许在不同的位置进行拆分,但在大多数情况下,它将在建议位置进行拆分。
此方法将当前范围 [self.start_position
, self.stop_position
) 拆分为“主要”部分 [self.start_position
, split_position
) 和“剩余”部分 [split_position
, self.stop_position
),假设 split_position
尚未使用。
如果 split_position
已经使用,则该方法返回 None
。否则,它将当前范围更新为主要部分,并返回一个元组 (split_position
, split_fraction
)。split_fraction
应该是范围 [self.start_position
, split_position
) 的大小与原始(拆分前)范围 [self.start_position
, self.stop_position
) 的大小之比。
fraction_consumed
:返回源中已使用位置的大致比例。
注意:类 iobase.RangeTracker
的方法可能由多个线程调用,因此此类必须使其线程安全,例如,通过使用单个锁对象。
便利 Source 基类
Beam SDK for Python 包含一些方便的抽象基类,可帮助您轻松创建新的源。
FileBasedSource
FileBasedSource
是一个框架,用于开发针对新文件类型进行源开发。您可以从 FileBasedSource 类派生您的 BoundedSource
类。
若要为新文件类型创建源,您需要创建一个 FileBasedSource
的子类。FileBasedSource
的子类必须实现 FileBasedSource.read_records()
方法。
有关 FileBasedSource
的示例实现,请参阅 AvroSource。
从新的 Source 读取
以下示例 CountingSource
演示了 BoundedSource
的实现,并使用 SDK 提供的称为 OffsetRangeTracker
的 RangeTracker
。
class CountingSource(iobase.BoundedSource):
def __init__(self, count):
self.records_read = Metrics.counter(self.__class__, 'recordsRead')
self._count = count
def estimate_size(self):
return self._count
def get_range_tracker(self, start_position, stop_position):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._count
return OffsetRangeTracker(start_position, stop_position)
def read(self, range_tracker):
for i in range(range_tracker.start_position(),
range_tracker.stop_position()):
if not range_tracker.try_claim(i):
return
self.records_read.inc()
yield i
def split(self, desired_bundle_size, start_position=None, stop_position=None):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._count
bundle_start = start_position
while bundle_start < stop_position:
bundle_stop = min(stop_position, bundle_start + desired_bundle_size)
yield iobase.SourceBundle(
weight=(bundle_stop - bundle_start),
source=self,
start_position=bundle_start,
stop_position=bundle_stop)
bundle_start = bundle_stop
若要从管道中的源读取数据,请使用 Read
变换
注意:当您创建最终用户将使用的源时,我们建议您不要像上面示例中那样公开源本身的代码。而是使用包装 PTransform
。PTransform 包装器 讨论了为什么您应该避免公开源,并逐步介绍了如何创建包装器。
使用 FileBasedSink 抽象
如果您的数据源使用文件,则可以实现 FileBasedSink 抽象来创建基于文件的接收器。对于其他接收器,请使用 ParDo
、GroupByKey
和 Beam SDK for Python 提供的其他变换。有关更多详细信息,请参阅 开发 I/O 连接器概述。
使用 FileBasedSink
接口时,您必须提供特定于格式的逻辑,告诉运行器如何将管道中的 PCollection
的有界数据写入输出接收器。运行器使用多个工作程序并行写入数据包。
通过实现以下类来提供基于文件的接收器的逻辑
抽象基类
FileBasedSink
的子类。FileBasedSink
描述了管道可以并行写入的位置或资源。为了避免将接收器公开给最终用户,在创建FileBasedSink
子类时使用_
前缀。面向用户的包装器
PTransform
,作为逻辑的一部分,它调用Write
并将您的FileBasedSink
作为参数传递。用户不需要直接调用Write
。
FileBasedSink
抽象基类实现了 Beam 接收器(与文件交互的 Beam 接收器)的公共代码,包括
- 设置文件头和文件尾
- 顺序记录写入
- 设置输出 MIME 类型
FileBasedSink
及其子类支持将文件写入任何 Beam 支持的 FileSystem
实现。有关示例,请参阅以下 Beam 提供的 FileBasedSink
实现
PTransform 包装器
当您创建最终用户将使用的源或接收器时,请避免公开您的源或接收器代码。为了避免将您的源和接收器公开给最终用户,您的新类应该使用 _
前缀。然后,实现面向用户的包装器 PTransform
。`通过将源或接收器公开为变换,您的实现将隐藏,并且可以任意复杂或简单。不公开实现细节的最大好处是,以后您可以添加额外功能,而不会破坏用户现有的实现。
例如,如果您的用户管道使用 beam.io.Read
从您的源读取数据,而您想在管道中插入重新分片,则所有用户都需要自己添加重新分片(使用 GroupByKey
变换)。为了解决这个问题,我们建议您将源公开为执行读取操作和重新分片操作的复合 PTransform
。
有关使用 PTransform
进行包装的更多信息,请参阅 Beam 的 PTransform 样式指南。
以下示例更改了上面部分中的源和接收器,以便它们不会公开给最终用户。对于源,将 CountingSource
重命名为 _CountingSource
。然后,创建包装器 PTransform
,称为 ReadFromCountingSource
最后,从源读取
对于接收器,将 SimpleKVSink
重命名为 _SimpleKVSink
。然后,创建包装器 PTransform
,称为 WriteToKVSink
class WriteToKVSink(PTransform):
def __init__(self, simplekv, url, final_table_name):
self._simplekv = simplekv
super().__init__()
self._url = url
self._final_table_name = final_table_name
def expand(self, pcoll):
return pcoll | iobase.Write(
_SimpleKVSink(self._simplekv, self._url, self._final_table_name))
最后,写入接收器
上次更新时间:2024/10/31
您是否找到了您要寻找的一切?
所有内容是否都有用且清晰?您想更改什么内容?请告诉我们!