为 Java 开发 I/O 连接器

重要提示:使用 Splittable DoFn 开发您的新 I/O。有关更多详细信息,请阅读 新 I/O 连接器概述.

要连接到 Beam 现有 I/O 连接器不支持的数据存储,您必须创建一个自定义 I/O 连接器,该连接器通常由一个源和一个接收器组成。所有 Beam 源和接收器都是复合转换;但是,自定义 I/O 的实现取决于您的用例。在开始之前,请阅读 新 I/O 连接器概述,了解开发新 I/O 连接器的概述、可用的实现选项以及如何为您的用例选择合适的选项。

本指南介绍了使用 Java 的 SourceFileBasedSink 接口。Python SDK 提供了相同的功能,但使用略微不同的 API。有关 Python SDK 的具体信息,请参见 为 Python 开发 I/O 连接器.

基本代码要求

Beam 运行器使用您提供的类,使用多个工作器实例并行地读取和/或写入数据。因此,您为 SourceFileBasedSink 子类提供的代码必须满足一些基本要求

  1. 可序列化性:您的 SourceFileBasedSink 子类(无论是受限的还是不受限的),必须是可序列化的。运行器可能会创建您 SourceFileBasedSink 子类的多个实例,以便发送到多个远程工作器,以并行地促进读取或写入操作。

  2. 不可变性:您的 SourceFileBasedSink 子类必须实际上是不可变的。所有私有字段必须声明为 final,所有私有集合类型变量必须实际上是不可变的。如果您的类具有 setter 方法,则这些方法必须返回具有相关字段修改的独立对象副本。

    您只应在 SourceFileBasedSink 子类中使用可变状态,如果您正在使用延迟评估的昂贵计算来实现源或接收器;在这种情况下,您必须声明所有可变实例变量为 transient。

  3. 线程安全性:您的代码必须是线程安全的。如果您构建的源适用于动态工作重新平衡,那么确保代码线程安全至关重要。Beam SDK 提供了一个辅助类,使此操作更轻松。有关更多详细信息,请参见 使用您的 BoundedSource 进行动态工作重新平衡.

  4. 可测试性:对所有 SourceFileBasedSink 子类进行详尽的单元测试至关重要,尤其是在您构建的类适用于动态工作重新平衡等高级功能时。一个小的实现错误会导致数据损坏或数据丢失(如跳过或重复记录),这些错误很难检测。

    为了帮助测试 BoundedSource 实现,您可以使用 SourceTestUtils 类。SourceTestUtils 包含用于自动验证 BoundedSource 实现的某些属性的实用程序。您可以使用 SourceTestUtils,使用广泛的输入通过相对少的代码行来提高实现的测试覆盖率。有关使用 SourceTestUtils 的示例,请参见 AvroSourceTestTextIOReadTest 源代码。

此外,请参见 PTransform 风格指南,了解 Beam 的转换风格指南。

实现 Source 接口

要为您的管道创建数据源,您必须提供特定于格式的逻辑,该逻辑告诉运行器如何从您的输入源读取数据,以及如何将您的数据源拆分为多个部分,以便多个工作器实例可以并行读取您的数据。如果您正在创建一个读取不受限数据的源,则必须提供用于管理源水印和可选检查点的附加逻辑。

通过创建以下类来提供源的逻辑

实现 Source 子类

您必须创建一个 BoundedSourceUnboundedSource 的子类,具体取决于您的数据是有限批次还是无限流。无论哪种情况,您的 Source 子类都必须覆盖超类中的抽象方法。运行器在使用您的数据源时可能会调用这些方法。例如,在从有界源读取时,运行器会使用这些方法来估计数据集的大小并将其拆分为并行读取。

您的 Source 子类还应管理有关数据源的基本信息,例如位置。例如,Beam 中的示例 Source 实现DatastoreIO 类将主机、数据集 ID 和查询作为参数。连接器使用这些值从 Cloud Datastore 获取数据。

BoundedSource

BoundedSource 表示一个有限数据集,Beam 运行器可以从该数据集读取,可能以并行方式。BoundedSource 包含一组抽象方法,运行器使用这些方法将数据集拆分为多个工作程序读取。

要实现 BoundedSource,您的子类必须覆盖以下抽象方法

您可以在 Beam 的 Cloud BigTable (BigtableIO.java) 和 BigQuery (BigQuerySourceBase.java) 实现中了解如何实现 BoundedSource 和所需抽象方法的模型。

UnboundedSource

UnboundedSource 表示一个无限数据流,运行器可以从该数据流读取,可能以并行方式。UnboundedSource 包含一组抽象方法,运行器使用这些方法来支持并行流读取;这些方法包括用于故障恢复的检查点、用于防止数据重复的记录 ID 以及用于估计管道下游部分数据完整性的水印

要实现 UnboundedSource,您的子类必须覆盖以下抽象方法

实现 Reader 子类

您必须创建一个 BoundedReaderUnboundedReader 的子类,以便由您的源子类的 createReader 方法返回。运行器使用您的 Reader(有界或无界)中的方法来执行数据集的实际读取。

BoundedReaderUnboundedReader 具有类似的基本接口,您需要定义这些接口。此外,还有一些专门用于 UnboundedReader 的附加方法,您需要针对处理无界数据进行实现,以及一个可选方法,如果您希望您的 BoundedReader 利用动态工作重新平衡,则可以实现该方法。在使用 UnboundedReader 时,start()advance() 方法的语义也略有不同。

BoundedReader 和 UnboundedReader 共有的 Reader 方法

运行器使用以下方法使用 BoundedReaderUnboundedReader 读取数据

UnboundedReader 独有的 Reader 方法

除了基本 Reader 接口外,UnboundedReader 还有一些用于管理从无界数据源读取的附加方法

您可以通过在从源读取时指定 .withMaxNumRecords.withMaxReadTime 来从 UnboundedSource 读取有界 PCollection.withMaxNumRecords 从您的无界源读取固定最大数量的记录,而 .withMaxReadTime 从您的无界源读取固定最大时间段。

使用您的 BoundedSource 进行动态工作重新平衡

如果您的源提供有界数据,您可以让您的 BoundedReader 与动态工作重新平衡一起工作,方法是实现 splitAtFraction 方法。运行器可能会在给定读取器上与 start 或 advance 同时调用 splitAtFraction,以便您的 Source 中剩余的数据可以被拆分并重新分配到其他工作程序。

当您实现 splitAtFraction 时,您的代码必须生成一组互斥的拆分,其中这些拆分的并集与总数据集匹配。

如果您实现 splitAtFraction,则必须以线程安全的方式实现 splitAtFractiongetFractionConsumed,否则可能会导致数据丢失。您还应该彻底地对您的实现进行单元测试,以避免数据重复或数据丢失。

为了确保您的代码是线程安全的,请使用 RangeTracker 线程安全的帮助器对象来管理在实现 splitAtFractiongetFractionConsumed 时数据源中的位置。

我们强烈建议您使用 SourceTestUtils 类对 splitAtFraction 的实现进行单元测试。SourceTestUtils 包含许多用于测试 splitAtFraction 实现的方法,包括详尽的自动测试。

便利 Source 和 Reader 基类

Beam SDK 包含一些方便的抽象基类,可以帮助您创建与常见数据存储格式(如文件)一起使用的 SourceReader 类。

FileBasedSource

如果您的数据源使用文件,您可以从 FileBasedSourceFileBasedReader 抽象基类派生您的 SourceReader 类。FileBasedSource 是一个有界源子类,它实现了与文件交互的 Beam 源的公共代码,包括

使用 FileBasedSink 抽象

如果您的数据源使用文件,您可以实现 FileBasedSink 抽象来创建一个基于文件的接收器。对于其他接收器,请使用 Beam SDK for Java 提供的 ParDoGroupByKey 和其他转换。有关更多详细信息,请参阅开发 I/O 连接器概述

在使用 FileBasedSink 接口时,您必须提供特定于格式的逻辑,该逻辑告诉运行器如何将管道 PCollection 中的有界数据写入输出接收器。运行器使用多个工作程序并行写入数据捆绑包。

通过实现以下类来提供基于文件的接收器的逻辑

FileBasedSink 抽象基类实现了与文件交互的 Beam 接收器的公共代码,包括

FileBasedSink 及其子类支持将文件写入任何受 Beam 支持的 FileSystem 实现。有关示例,请参阅以下 Beam 提供的 FileBasedSink 实现

PTransform 包装器

当您创建一个最终用户将使用的源或接收器时,请避免暴露您的源或接收器代码。为了避免将您的源和接收器暴露给最终用户,您的新类应受到保护或私有。然后,实现面向用户的包装器 PTransform。通过将您的源或接收器公开为转换,您的实现被隐藏并且可以任意复杂或简单。不公开实现细节的最大好处是,以后您可以添加其他功能,而不会破坏用户的现有实现。

例如,如果您的用户的管道使用 read 从您的源读取,并且您想在管道中插入一个重新分片,则所有用户都需要自己添加重新分片(使用 GroupByKey 转换)。为了解决这个问题,我们建议您将源公开为一个复合 PTransform,该转换执行读取操作和重新分片。

有关使用 PTransform 进行包装的更多信息,请参阅 Beam 的PTransform 风格指南