概述:开发新的 I/O 连接器
适用于需要连接到不受 内置 I/O 连接器 支持的数据存储的用户指南
要连接到不受 Beam 现有 I/O 连接器支持的数据存储,您必须创建自定义 I/O 连接器。连接器通常包括一个源和一个接收器。所有 Beam 源和接收器都是复合转换;但是,自定义 I/O 的实现取决于您的用例。以下是开始使用的推荐步骤
阅读本概述并选择您的实现。您可以通过电子邮件将您遇到的任何问题发送给 Beam 开发邮件列表。此外,您可以查看是否还有其他人正在开发相同的 I/O 连接器。
如果您计划将您的 I/O 连接器贡献给 Beam 社区,请参阅 Apache Beam 贡献指南。
阅读 PTransform 风格指南,获取其他风格指南建议。
来源
对于 **有界(批处理)源**,目前创建 Beam 源有两种选项
使用
Splittable DoFn
。使用
ParDo
和GroupByKey
。
Splittable DoFn
是推荐的选项,因为它是有界和无界源的最新源框架。这旨在取代新系统中的 Source
API(BoundedSource 和 UnboundedSource)。阅读 Splittable DoFn 编程指南,了解如何编写一个 Splittable DoFn。有关更多信息,请参阅 多 SDK 连接器努力路线图。
对于 Java 和 Python **无界(流式)源**,您必须使用 Splittable DoFn
,它支持流式管道有用的功能,包括检查点、控制水印和跟踪积压。
何时使用 Splittable DoFn 接口
如果您不确定是否使用 Splittable DoFn
,请随时通过电子邮件将您遇到的任何问题发送给 Beam 开发邮件列表,我们可以讨论您的具体情况的利弊。
在某些情况下,实现 Splittable DoFn
可能是必需的或会带来更好的性能
**无界源:**
ParDo
不适用于从无界源读取。ParDo
不支持检查点或对流式数据源有用的机制,例如去重。**进度和大小估计:**
ParDo
无法向运行器提供有关其正在读取的数据的进度或大小的提示。如果没有数据的大小估计或读取进度的信息,运行器将无法猜测您的读取将有多大。因此,如果运行器尝试动态分配工作器,它将没有关于可能需要多少工作器才能运行您的管道的线索。**动态工作重新平衡:**
ParDo
不支持动态工作重新平衡,某些阅读器使用它来提高作业的处理速度。根据您的数据源,动态工作重新平衡可能不可行。**最初进行拆分以提高并行性:**
ParDo
没有执行初始拆分的能力。
例如,如果您想从包含每个文件许多记录的新文件格式中读取,或者如果您想从支持按排序键顺序读取操作的键值存储中读取。
使用 SDF 的 I/O 示例
Java 示例
- Kafka:适用于 Apache Kafka(一个开源的分布式事件流式传输平台)的 I/O 连接器。
- Watch:使用一个轮询函数,为每个输入产生越来越多的输出,直到满足每个输入的终止条件。
- Parquet:适用于 Apache Parquet(一个开源的列式存储格式)的 I/O 连接器。
- HL7v2:适用于 HL7v2 消息(一种临床消息格式,提供有关组织内部发生的事件的数据)的 I/O 连接器,是 Google 的 Cloud Healthcare API 的一部分。
- BoundedSource 包装器:一个将现有 BoundedSource 实现转换为可拆分 DoFn 的包装器。
- UnboundedSource 包装器:一个将现有 UnboundedSource 实现转换为可拆分 DoFn 的包装器。
Python 示例
- BoundedSourceWrapper:一个将现有 BoundedSource 实现转换为可拆分 DoFn 的包装器。
使用 ParDo 和 GroupByKey
对于数据存储或文件类型,如果数据可以并行读取,您可以将其视为一个微型管道。这通常包含两个步骤。
将数据拆分成多个部分,以便并行读取。
从每个部分读取数据。
每个步骤将是一个 ParDo
,它们之间用 GroupByKey
连接。GroupByKey
是一个实现细节,但对于大多数运行器而言,GroupByKey
允许运行器在某些情况下使用不同数量的 worker。
确定如何将数据拆分为块。
读取数据,这通常可以从更多 worker 中获益。
此外,GroupByKey
还允许在支持该功能的运行器上进行动态工作负载再平衡。
以下是一些使用“将读取视为微型管道”模型的读取转换实现示例,这些示例在数据可以并行读取时适用。
从文件 glob 中读取:例如,读取“~/data/**”中的所有文件。
- 获取文件路径
ParDo
:以文件 glob 作为输入。生成一个包含字符串的PCollection
,每个字符串都是一个文件路径。 - 读取
ParDo
:获取文件路径的PCollection
,读取每个路径,生成一个包含记录的PCollection
。
- 获取文件路径
从 NoSQL 数据库中读取(例如 Apache HBase):这些数据库通常允许并行从范围中读取数据。
- 确定密钥范围
ParDo
:以数据库连接信息和要读取的密钥范围作为输入。生成一个包含密钥范围的PCollection
,这些范围可以有效地并行读取。 - 读取密钥范围
ParDo
:获取密钥范围的PCollection
,读取密钥范围,生成一个包含记录的PCollection
。
- 确定密钥范围
对于无法并行读取的数据存储或文件,读取是一个简单的任务,可以使用单个 ParDo
+GroupByKey
来完成。例如
从数据库查询中读取:传统的 SQL 数据库查询通常只能顺序读取。在这种情况下,
ParDo
将建立与数据库的连接,并读取记录批次,生成包含这些记录的PCollection
。从 gzip 文件中读取:gzip 文件必须按顺序读取,因此读取不能并行化。在这种情况下,
ParDo
将打开文件并按顺序读取,生成一个包含文件中的记录的PCollection
。
接收器
要创建 Beam 接收器,我们建议您使用一个将接收到的记录写入数据存储的 ParDo
。要开发更复杂的接收器(例如,在运行器重试失败时支持数据去重),请使用 ParDo
、GroupByKey
和其他可用的 Beam 转换。许多数据服务都针对一次写入一批元素进行了优化,因此在写入之前将元素分组为批次可能是合理的。持久连接可以在 DoFn 的 setUp
或 startBundle
方法中初始化,而不是在接收到每个元素时初始化。还应注意,在大型分布式系统中,工作可能会 失败和/或重试,因此最好在可能的情况下使外部交互幂等。
对于基于文件的接收器,您可以使用 Java 和 Python SDK 提供的 FileBasedSink
抽象。Beam 的 FileSystems
实用程序类也可以用于读取和写入文件。有关更多详细信息,请参阅我们的语言特定实现指南。
上次更新于 2024/10/31
您是否找到了您要查找的所有内容?
所有内容都很有用且清晰吗?您是否想更改任何内容?请告诉我们!