为 Java 开发 I/O 连接器
重要提示:使用 Splittable DoFn
开发您的新 I/O。有关更多详细信息,请阅读 新 I/O 连接器概述.
要连接到 Beam 现有 I/O 连接器不支持的数据存储,您必须创建一个自定义 I/O 连接器,该连接器通常由一个源和一个接收器组成。所有 Beam 源和接收器都是复合转换;但是,自定义 I/O 的实现取决于您的用例。在开始之前,请阅读 新 I/O 连接器概述,了解开发新 I/O 连接器的概述、可用的实现选项以及如何为您的用例选择合适的选项。
本指南介绍了使用 Java 的 Source
和 FileBasedSink
接口。Python SDK 提供了相同的功能,但使用略微不同的 API。有关 Python SDK 的具体信息,请参见 为 Python 开发 I/O 连接器.
基本代码要求
Beam 运行器使用您提供的类,使用多个工作器实例并行地读取和/或写入数据。因此,您为 Source
和 FileBasedSink
子类提供的代码必须满足一些基本要求
可序列化性:您的
Source
或FileBasedSink
子类(无论是受限的还是不受限的),必须是可序列化的。运行器可能会创建您Source
或FileBasedSink
子类的多个实例,以便发送到多个远程工作器,以并行地促进读取或写入操作。不可变性:您的
Source
或FileBasedSink
子类必须实际上是不可变的。所有私有字段必须声明为 final,所有私有集合类型变量必须实际上是不可变的。如果您的类具有 setter 方法,则这些方法必须返回具有相关字段修改的独立对象副本。您只应在
Source
或FileBasedSink
子类中使用可变状态,如果您正在使用延迟评估的昂贵计算来实现源或接收器;在这种情况下,您必须声明所有可变实例变量为 transient。线程安全性:您的代码必须是线程安全的。如果您构建的源适用于动态工作重新平衡,那么确保代码线程安全至关重要。Beam SDK 提供了一个辅助类,使此操作更轻松。有关更多详细信息,请参见 使用您的 BoundedSource 进行动态工作重新平衡.
可测试性:对所有
Source
和FileBasedSink
子类进行详尽的单元测试至关重要,尤其是在您构建的类适用于动态工作重新平衡等高级功能时。一个小的实现错误会导致数据损坏或数据丢失(如跳过或重复记录),这些错误很难检测。为了帮助测试
BoundedSource
实现,您可以使用 SourceTestUtils 类。SourceTestUtils
包含用于自动验证BoundedSource
实现的某些属性的实用程序。您可以使用SourceTestUtils
,使用广泛的输入通过相对少的代码行来提高实现的测试覆盖率。有关使用SourceTestUtils
的示例,请参见 AvroSourceTest 和 TextIOReadTest 源代码。
此外,请参见 PTransform 风格指南,了解 Beam 的转换风格指南。
实现 Source 接口
要为您的管道创建数据源,您必须提供特定于格式的逻辑,该逻辑告诉运行器如何从您的输入源读取数据,以及如何将您的数据源拆分为多个部分,以便多个工作器实例可以并行读取您的数据。如果您正在创建一个读取不受限数据的源,则必须提供用于管理源水印和可选检查点的附加逻辑。
通过创建以下类来提供源的逻辑
如果要读取有限(批处理)数据集,则为
BoundedSource
的子类;如果要读取无限(流式)数据集,则为UnboundedSource
的子类。这些子类描述了您要读取的数据,包括数据的存储位置和参数(如要读取的数据量)。Source.Reader
的子类。每个 Source 必须具有一个关联的 Reader,该 Reader 捕获与从该Source
读取相关的所有状态。这可能包括文件句柄、RPC 连接和其他取决于您要读取的数据格式的特定要求的参数。Reader
类层次结构反映了 Source 层次结构。如果您正在扩展BoundedSource
,则需要提供一个关联的BoundedReader
。如果您正在扩展UnboundedSource
,则需要提供一个关联的UnboundedReader
。一个或多个面向用户的包装器复合转换 (
PTransform
),这些转换包装读取操作。 PTransform 包装器 讨论了为什么应避免公开您的源。
实现 Source 子类
您必须创建一个 BoundedSource
或 UnboundedSource
的子类,具体取决于您的数据是有限批次还是无限流。无论哪种情况,您的 Source
子类都必须覆盖超类中的抽象方法。运行器在使用您的数据源时可能会调用这些方法。例如,在从有界源读取时,运行器会使用这些方法来估计数据集的大小并将其拆分为并行读取。
您的 Source
子类还应管理有关数据源的基本信息,例如位置。例如,Beam 中的示例 Source
实现DatastoreIO 类将主机、数据集 ID 和查询作为参数。连接器使用这些值从 Cloud Datastore 获取数据。
BoundedSource
BoundedSource
表示一个有限数据集,Beam 运行器可以从该数据集读取,可能以并行方式。BoundedSource
包含一组抽象方法,运行器使用这些方法将数据集拆分为多个工作程序读取。
要实现 BoundedSource
,您的子类必须覆盖以下抽象方法
split
:运行器使用此方法将您的有限数据拆分为给定大小的捆绑包。getEstimatedSizeBytes
:运行器使用此方法以字节为单位估计数据的总大小。createReader
:为该BoundedSource
创建关联的BoundedReader
。
您可以在 Beam 的 Cloud BigTable (BigtableIO.java) 和 BigQuery (BigQuerySourceBase.java) 实现中了解如何实现 BoundedSource
和所需抽象方法的模型。
UnboundedSource
UnboundedSource
表示一个无限数据流,运行器可以从该数据流读取,可能以并行方式。UnboundedSource
包含一组抽象方法,运行器使用这些方法来支持并行流读取;这些方法包括用于故障恢复的检查点、用于防止数据重复的记录 ID 以及用于估计管道下游部分数据完整性的水印。
要实现 UnboundedSource
,您的子类必须覆盖以下抽象方法
split
:运行器使用此方法生成一个UnboundedSource
对象列表,这些对象表示服务应从中并行读取的子流实例数。getCheckpointMarkCoder
:运行器使用此方法获取源(如果有)检查点的编码器。requiresDeduping
:运行器使用此方法来确定数据是否需要显式删除重复记录。如果此方法返回 true,则运行器将自动插入一个步骤以从源的输出中删除重复项。只有当您的源为每个记录提供记录 ID 时,才应返回 true。有关何时应执行此操作,请参阅UnboundedReader.getCurrentRecordId
。createReader
:为该UnboundedSource
创建关联的UnboundedReader
。
实现 Reader 子类
您必须创建一个 BoundedReader
或 UnboundedReader
的子类,以便由您的源子类的 createReader
方法返回。运行器使用您的 Reader
(有界或无界)中的方法来执行数据集的实际读取。
BoundedReader
和 UnboundedReader
具有类似的基本接口,您需要定义这些接口。此外,还有一些专门用于 UnboundedReader
的附加方法,您需要针对处理无界数据进行实现,以及一个可选方法,如果您希望您的 BoundedReader
利用动态工作重新平衡,则可以实现该方法。在使用 UnboundedReader
时,start()
和 advance()
方法的语义也略有不同。
BoundedReader 和 UnboundedReader 共有的 Reader 方法
运行器使用以下方法使用 BoundedReader
或 UnboundedReader
读取数据
start
:初始化Reader
并前进到要读取的第一个记录。此方法在运行器开始读取您的数据时只调用一次,是放置初始化所需昂贵操作的合适位置。advance
:将读取器推进到下一个有效记录。如果不再有可用输入,此方法必须返回 false。BoundedReader
应该在 advance 返回 false 后停止读取,但UnboundedReader
可以在将来调用中返回 true,一旦您的流中有更多数据可用。getCurrent
:返回当前位置的数据记录,最后一次由 start 或 advance 读取。getCurrentTimestamp
:返回当前数据记录的时间戳。只有当您的源读取具有内在时间戳的数据时,您才需要覆盖getCurrentTimestamp
。运行器使用此值来设置结果输出PCollection
中每个元素的内在时间戳。
UnboundedReader 独有的 Reader 方法
除了基本 Reader
接口外,UnboundedReader
还有一些用于管理从无界数据源读取的附加方法
getCurrentRecordId
:返回当前记录的唯一标识符。运行器使用这些记录 ID 来过滤重复记录。如果您的数据在每个记录中都存在逻辑 ID,则可以使此方法返回它们;否则,您可以返回记录内容的哈希值,至少使用 128 位哈希值。使用 Java 的Object.hashCode()
是不正确的,因为 32 位哈希值通常不足以防止冲突,并且hasCode()
并不保证在进程之间保持稳定。如果您的源使用一种检查点方案来唯一标识每个记录,则实现
getCurrentRecordId
是可选的。例如,如果您的拆分是文件,而检查点是所有数据都已读取到的文件位置,则您不需要记录 ID。但是,如果上游系统将数据写入您的源偶尔会产生重复记录,然后您的源可能会读取这些记录,则记录 ID 仍然有用。getWatermark
:返回您的Reader
提供的水印。水印是您的Reader
要读取的将来元素的时间戳的近似下限。运行器使用水印作为数据完整性的估计。水印用于窗口和触发器。getCheckpointMark
:运行器使用此方法在您的数据流中创建检查点。检查点表示UnboundedReader
的进度,可以用于故障恢复。不同的数据流可以使用不同的检查点方法;某些源可能需要确认接收到的记录,而其他源可能使用位置检查点。您需要根据最合适的检查点方案来调整此方法。例如,您可能让此方法返回最近确认的记录。getCheckpointMark
是可选的;如果您的数据没有有意义的检查点,您不需要实现它。但是,如果您选择不在您的源中实现检查点,那么您可能会遇到数据重复或数据丢失,具体取决于您的数据源是否尝试在错误情况下重新发送记录。
您可以通过在从源读取时指定 .withMaxNumRecords
或 .withMaxReadTime
来从 UnboundedSource
读取有界 PCollection
。.withMaxNumRecords
从您的无界源读取固定最大数量的记录,而 .withMaxReadTime
从您的无界源读取固定最大时间段。
使用您的 BoundedSource 进行动态工作重新平衡
如果您的源提供有界数据,您可以让您的 BoundedReader
与动态工作重新平衡一起工作,方法是实现 splitAtFraction
方法。运行器可能会在给定读取器上与 start 或 advance 同时调用 splitAtFraction
,以便您的 Source
中剩余的数据可以被拆分并重新分配到其他工作程序。
当您实现 splitAtFraction
时,您的代码必须生成一组互斥的拆分,其中这些拆分的并集与总数据集匹配。
如果您实现 splitAtFraction
,则必须以线程安全的方式实现 splitAtFraction
和 getFractionConsumed
,否则可能会导致数据丢失。您还应该彻底地对您的实现进行单元测试,以避免数据重复或数据丢失。
为了确保您的代码是线程安全的,请使用 RangeTracker
线程安全的帮助器对象来管理在实现 splitAtFraction
和 getFractionConsumed
时数据源中的位置。
我们强烈建议您使用 SourceTestUtils
类对 splitAtFraction
的实现进行单元测试。SourceTestUtils
包含许多用于测试 splitAtFraction
实现的方法,包括详尽的自动测试。
便利 Source 和 Reader 基类
Beam SDK 包含一些方便的抽象基类,可以帮助您创建与常见数据存储格式(如文件)一起使用的 Source
和 Reader
类。
FileBasedSource
如果您的数据源使用文件,您可以从 FileBasedSource
和 FileBasedReader
抽象基类派生您的 Source
和 Reader
类。FileBasedSource
是一个有界源子类,它实现了与文件交互的 Beam 源的公共代码,包括
- 文件模式扩展
- 顺序记录读取
- 拆分点
使用 FileBasedSink 抽象
如果您的数据源使用文件,您可以实现 FileBasedSink
抽象来创建一个基于文件的接收器。对于其他接收器,请使用 Beam SDK for Java 提供的 ParDo
、GroupByKey
和其他转换。有关更多详细信息,请参阅开发 I/O 连接器概述。
在使用 FileBasedSink
接口时,您必须提供特定于格式的逻辑,该逻辑告诉运行器如何将管道 PCollection
中的有界数据写入输出接收器。运行器使用多个工作程序并行写入数据捆绑包。
通过实现以下类来提供基于文件的接收器的逻辑
抽象基类
FileBasedSink
的子类。FileBasedSink
描述一个位置或资源,您的管道可以并行写入该位置或资源。为了避免将接收器暴露给最终用户,您的FileBasedSink
子类应受到保护或私有。面向用户的包装器
PTransform
,它作为逻辑的一部分调用 WriteFiles 并将您的FileBasedSink
作为参数传递。用户不应该直接调用WriteFiles
。
FileBasedSink
抽象基类实现了与文件交互的 Beam 接收器的公共代码,包括
- 设置文件头和文件尾
- 顺序记录写入
- 设置输出 MIME 类型
FileBasedSink
及其子类支持将文件写入任何受 Beam 支持的 FileSystem
实现。有关示例,请参阅以下 Beam 提供的 FileBasedSink
实现
PTransform 包装器
当您创建一个最终用户将使用的源或接收器时,请避免暴露您的源或接收器代码。为了避免将您的源和接收器暴露给最终用户,您的新类应受到保护或私有。然后,实现面向用户的包装器 PTransform
。通过将您的源或接收器公开为转换,您的实现被隐藏并且可以任意复杂或简单。不公开实现细节的最大好处是,以后您可以添加其他功能,而不会破坏用户的现有实现。
例如,如果您的用户的管道使用 read
从您的源读取,并且您想在管道中插入一个重新分片,则所有用户都需要自己添加重新分片(使用 GroupByKey
转换)。为了解决这个问题,我们建议您将源公开为一个复合 PTransform
,该转换执行读取操作和重新分片。
有关使用 PTransform
进行包装的更多信息,请参阅 Beam 的PTransform 风格指南。
上次更新于 2024/10/31
您找到所需的一切了吗?
所有内容都有用且清晰吗?您想更改任何内容吗?请告诉我们!