I/O 标准

概述

这份 Apache Beam I/O 标准文档为开发 Apache Beam I/O 连接器的 1P/3P 开发者制定了规范性指导。这些指南旨在以简单明了的方式创建涵盖文档、开发和测试的最佳实践。

什么是内置 I/O 连接器?

位于 Apache Beam Github 存储库中的 I/O 连接器 (I/O) 被称为 **内置 I/O 连接器**。内置 I/O 的 集成测试 和性能测试由 Google Cloud Dataflow 团队定期使用 Dataflow 运行器运行,并将指标公开发布供 参考。否则,除非另有明确说明,以下指南将适用于两者。

指导

文档

本节概述了所有预计将与 I/O 一起提供的文档的超集。本节中提到的 Apache Beam 文档可在 此处 找到。通常,一个好的示例是内置 I/O,Snowflake I/O

内置 I/O

为 I/O 的相关语言提供代码文档。这还应包含指向 Apache Beam 网站或外部文档位置的任何外部信息源的链接。

示例

在 **I/O 连接器指南** 下添加一个新页面,涵盖特定提示和配置。以下是 ParquetHadoop 和其他连接器的指南。

示例

I/O connector guides screenshot

Javadoc/Pythondoc 中的节标题格式应始终保持一致,以便将来能够启用用于其他页面的编程信息提取。

按顺序包含在页面中的节 **子集** 示例

  1. 开始之前
  2. {连接器}IO 基础
  3. 支持的功能
    1. 关系型
  4. 身份验证
  5. 从 {连接器} 读取
  6. 写入 {连接器}
  7. 资源可扩展性
  8. 限制
  9. 报告问题

示例

KafkaIO 的 JavaDoc

I/O 连接器应在 **支持的功能** 子标题下包含一个表格,以指示所使用的 关系型功能

关系型功能是可以通过 I/O 连接器选择性地实现的概念,有助于提高效率。使用最终用户提供的管道配置 (SchemaIO) 和用户查询 (FieldAccessDescriptor) 数据,关系型理论被应用于推导出改进,例如更快的管道执行、更低的运营成本以及更少的数据读写。

示例表格

I/O connector guides screenshot

<div class="table-container-wrapper">
<table class="table table-bordered table-io-standards-relational-features">
   <tr>
      <th>
         <p><strong>Relational Feature</strong>
      </th>
      <th>
         <p><strong>Supported</strong>
      </th>
      <th>
         <p><strong>Notes</strong>
      </th>
   </tr>
   <tr>
      <td>
         <p>Column Pruning
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Filter Pushdown
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Table Statistics
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Partition Metadata
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Metastore
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
</table>
</div>

示例实现

BigQueryIO 通过投影下推 (列修剪) 使用 BigQuery DirectRead API,仅返回最终用户查询指示的必要列。

如果需要,在 **常见管道模式** 下添加一个页面,概述涉及您的 I/O 的常见使用模式。

https://beam.apache.org/documentation/patterns/bigqueryio/

使用您的 I/O 的信息更新 **I/O 连接器**

示例

https://beam.apache.org/documentation/io/connectors/#built-in-io-connectors

alt_text

在 **开始之前** 标题下提供使用 I/O 的设置步骤。

示例

https://beam.apache.org/documentation/io/built-in/parquet/#before-you-start

在每个支持语言的初始描述之后,包含一个规范的读/写代码片段。以下示例显示了 Hadoop,以及 Java 的示例。

示例

https://beam.apache.org/documentation/io/built-in/hadoop/#reading-using-hadoopformation

指示如何为元素分配时间戳。这包括批处理源,以允许未来 I/O 提供比 current_time() 更实用的信息。

示例

指示如何推进时间戳;对于批处理源,这在大多数情况下将被标记为 n/a。

概述连接器将创建的任何临时资源(例如,文件)。

示例

BigQuery 批处理加载首先创建一个临时 GCS 位置

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L455

在 **身份验证** 子标题下提供如何获取合作伙伴授权材料以安全地访问源/接收器。

示例

https://beam.apache.org/documentation/io/built-in/snowflake/#authentication

此处 BigQuery 将其命名为权限,但主题涵盖了相似之处

https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html

I/O 应在 **开始之前** 标题下提供指向源/接收器文档的链接。

示例

https://beam.apache.org/documentation/io/built-in/snowflake/

指示每种语言中是否有本机或 X 语言支持,并提供指向文档的链接。

示例

Kinesis I/O 有一个本机 Java 实现和 Python 的 X 语言支持,但没有 Golang 支持。

在 **限制** 标题下说明已知的限制。如果限制有跟踪问题,请在内联链接。

示例

https://beam.apache.org/documentation/io/built-in/snowflake/#limitations

I/O(非内置)

自定义 I/O 不包含在 Apache Beam Github 存储库中。一些示例是 SolaceIO。

使用您的信息更新 Apache Beam 表中的其他 I/O 连接器。

上述表格

## 开发

本节概述了 API 语法、语义和针对新旧 Apache Beam I/O 连接器都应采用的功能的建议。

I/O 连接器开发指南秉承以下原则编写

所有 SDK

管道配置/执行/流式/窗口化语义指南

主题

语义

管道选项

I/O 很少依赖于 PipelineOptions 子类来调整内部参数。

如果有必要,连接器相关的管道选项类应该

  • 清楚地记录每个选项的效果以及为什么要修改它。
  • 选项名称必须使用命名空间以避免冲突
  • 类名称:{Connector}Options
  • 方法名称:set{Connector}{Option}get{Connector}{Option}

源窗口化

除非用户在 API 中明确参数化,否则源必须在 GlobalWindow 中返回元素。

允许的非全局窗口模式

  • ReadFromIO(window_by=...)
  • ReadFromIO.IntoFixedWindows(...)
  • ReadFromIO(apply_windowing=True/False)(例如 PeriodicImpulse
  • IO.read().withWindowing(...)
  • IO.read().windowBy(...)
  • IO.read().withFixedWindows(...)

接收器窗口化

接收器应是窗口无关的,并处理使用任何窗口化方法发送的元素,除非在 API 中明确参数化或表达。

接收器可以在内部以任何方式更改 PCollection 的窗口化。但是,它作为结果对象的一部分返回的元数据必须

  • 必须与输入在同一窗口中,除非 API 中明确声明了其他情况。
  • 必须具有准确的时间戳
  • 可以包含有关窗口化的其他信息(例如,BigQuery 作业可能具有时间戳,但也与之关联的窗口)。

允许的非全局窗口模式

  • WriteToIO(triggering_frequency=...) - 例如 WriteToBigQuery(这仅设置转换内的窗口化 - 输入数据仍在 Global Window 中)。
  • WriteBatchesToIO(...)
  • WriteWindowsToIO(...)

节流

流接收器(或访问外部服务的任何转换)可以实现对其请求的节流,以防止过载外部服务。

待办事项:Beam 应公开节流实用程序 (跟踪问题)

  • 每个键的固定节流
  • 具有接收器报告的反压的自适应节流
  • 从起点开始的逐步增长的节流

错误处理

待办事项:跟踪问题

Java

常规

用于处理连接器的主要类应命名为 **{connector}IO**

示例

BigQuery I/O 是 **org.apache.beam.sdk.io.bigquery.BigQueryIO**

该类应放置在包 **org.apache.beam.sdk.io.{connector}** 中

示例

BigQueryIO 位于 Java 包 org.apache.beam.sdk.io.bigquery

单元/集成/性能测试应位于包 **org.apache.beam.sdk.io.{connector}.testing** 下。这将使各种测试与连接器的标准用户界面一起使用。

单元测试应驻留在同一个包中(即 **org.apache.beam.sdk.io.{connector}**),因为它们通常会测试连接器的内部结构。

BigQueryIO 位于 Java 包 org.apache.beam.sdk.io.bigquery

I/O 转换应避免接收用户 lambda 以将元素从用户类型映射到特定于连接器的类型。相反,它们应与特定于连接器的类型(可能带有架构信息)交互。

在必要时,I/O 转换应接收一个类型参数,该参数指定转换的输入类型(对于接收器)或输出类型(对于源)。

I/O 转换可能没有类型参数 **只有在确定其输出类型不会改变的情况下**(例如 FileIO.MatchAll 和其他 FileIO 转换)。

强烈建议不要在 I/O 连接器的公共 API 部分中直接公开第三方库,原因如下

  • 它会降低 Apache Beam 的兼容性保证 - 对第三方库的更改可以直接破坏现有用户的管道。
  • 它使代码维护变得困难 - 如果库在 API 级别直接公开,则依赖项更改将需要在整个 I/O 实现代码中进行多次更改
  • 它会将第三方依赖项强加给最终用户

相反,我们强烈建议公开 Beam 本地接口和包含映射逻辑的适配器。

如果您认为该库本质上是极其静态的。请在 I/O 本身中注明。

源和接收器应使用 PTransform 包装器进行抽象,内部类应声明为受保护或私有。通过这样做,可以在不破坏依赖项实现的情况下添加/更改/修改实现细节。

类/方法/属性

Java 语法

语义

类 IO.Read

提供对代表 I/O 中读取操作的类的访问权限。Read 类应实现类似于 fluentbuilder 模式的流畅接口(例如 withX(...).withY(...))。与默认值一起,它提供了一种快速失败的机制(在每个 .withX() 后立即提供验证反馈),这种机制比构建器模式略微不那么冗长。

用户**不应**直接创建此类。它应由顶级实用程序方法创建。

类 IO.ReadAll

一些不同的源实现从数据源读取的运行时配置。这是一个有价值的模式,因为它使纯粹的批处理源能够成为更复杂的流源。

在尽可能多的情况下,这种类型的转换应具有与构建时配置的转换相同的类型丰富度

  • 支持使用在构建时已知的架构的 Beam 行输出。
  • 在这种情况下可能需要(并且可以接受)额外的配置(例如,SchemaProvider 参数、Schema 参数、Schema Catalog 或类似的实用程序)。
  • 输入 PCollection 应具有带有架构的固定类型,因此用户可以轻松地对其进行操作。

示例

JdbcIO.ReadAllParquetIO.ReadFiles

类 IO.Write

提供对代表 I/O 中写入操作的类的访问权限。Write 类应实现流畅接口模式(例如 withX(...).withY(...)),如上面针对 IO.Read 所述。

用户不应直接创建此类。它应由顶级实用程序方法创建。

其他转换类

某些数据存储和外部系统实现的 API 不易于调整为 Read 或 Write 语义(例如,FhirIO 实现了几种不同的转换,这些转换可以获取或发送数据到 Fhir)。

这些类应仅在**无法或非常困难地将它们的功能封装为 Read、Write 和 ReadAll 转换的额外配置的一部分**时添加,以避免增加用户的认知负荷。

用户不应直接创建这些类。它们应由顶级静态方法创建。

实用程序类

一些连接器依赖于其他面向用户的类来设置配置参数。

(例如 JdbcIO.DataSourceConfiguration)。这些类应**嵌套在 {Connector}IO 类中**。

这种格式使它们在主 Javadoc 中可见,并易于用户发现。

方法 IO<T>.write()

顶级 I/O 类将提供一个**静态方法**来开始构建 I/O.Write 转换。这将返回一个具有单个输入 PCollection 和 Write.Result 输出的 PTransform。

此方法的名称不应指定以下任何内容

  • 内部数据格式
  • 用于写入数据的策略
  • 输入或输出数据类型

如果可能,以上内容应通过配置参数来指定。**如果不可能**,则可以引入**新的静态方法**,但这**必须是例外情况**。

方法 IO<T>.read()

用于开始构建 I/O.Read 转换的方法。这将返回一个具有单个输出 PCollection 的 PTransform。

此方法的名称不应指定以下任何内容

  • 内部数据格式
  • 用于读取数据的策略
  • 输出数据类型

如果可能,以上内容应通过配置参数来指定。**如果不可能**,则可以引入**新的静态方法**,但这**必须是例外情况,并在 I/O 标题中作为 API 的一部分进行记录**。

如果这些参数很少且通用,或者它们是配置转换所必需的(例如 FhirIO.exportResourcesToGcsJdbcIO.ReadWithPartitions 需要 TypeDescriptor 进行初始配置),则初始静态构造函数方法可以接收参数。

IO.Read.from(source)

Read 转换必须提供一个**from** 方法,用户可以在其中指定从哪里读取。如果转换可以从不同类型的源读取(例如,表、查询、主题、分区),则可以提供此 from 方法的多个实现以适应这种情况

  • IO.Read from(Query query)
  • IO.Read from(Table table) / from(String table)
  • IO.Read from (Topic topic)
  • IO.Read from(Partition partition)

这些方法的输入类型可以反映外部源的 API(例如,Kafka TopicPartition 应使用**Beam 实现的** TopicPartition 对象)。

有时,可能有多个**from** 位置使用相同的输入类型,这意味着我们无法利用方法重载。考虑到这一点,使用新的方法来启用这种情况。

  • IO.Read from(String table)
  • IO.Read fromQuery(String query)

IO.Read.fromABC(String abc)

如果方法重载是可能的,则不鼓励这种模式,请遵循**Read.from(source)**中的指南。

IO.Write.to(destination)

Write 转换必须提供一个**to** 方法,用户可以在其中指定将数据写入的位置。如果转换可以将数据写入不同类型的源,但仍使用相同的输入元素类型(例如,表、查询、主题、分区),则可以提供此 from 方法的多个实现以适应这种情况

  • IO.Write to(Query query)
  • IO.Write to(Table table) / from(String table)
  • IO.写入到 (主题主题)
  • IO.写入到 (分区分区)

这些方法的输入类型可以反映外部接收器的 API(例如,Kafka TopicPartition 应该使用 **Beam 实现** 的 TopicPartition 对象)。

如果不同类型的目标需要不同类型的输入对象类型,则应在单独的 I/O 连接器中完成。

有时,可能有多个**from** 位置使用相同的输入类型,这意味着我们无法利用方法重载。考虑到这一点,使用新的方法来启用这种情况。

  • IO.写入到 (字符串表)
  • IO.写入到表 (字符串表)

IO.写入.到 (动态目标目标)

写入转换可以启用写入多个目标。这可能是一种复杂的模式,应谨慎实施(它是连接器的首选模式,这些连接器可能在单个管道中具有多个目标)。

此模式的首选模式是定义一个 DynamicDestinations 接口(例如,BigQueryIO.DynamicDestinations),该接口允许用户定义配置目标所需的所有参数。

DynamicDestinations 接口还允许维护者随着时间的推移添加新方法(使用 **默认实现** 以避免破坏现有用户),这些方法将在必要时定义额外的配置参数。

IO.写入.到 ABC (目标)

如果可能进行方法重载,则不建议使用此模式,请遵循 **Write.to(destination)** 中的指南。

类 IO.读取.带 X

IO.写入.带 X

withX 提供了一种将配置传递给 Read 方法的方法,其中 X 表示要创建的配置。除通用 with 语句(定义如下)外,I/O 应尝试将配置选项的名称与源中的选项名称匹配。

这些方法应返回 I/O 的新实例,而不是修改现有实例。

示例

TextIO.读取.带压缩

IO.读取.带配置对象

IO.写入.带配置对象

Java 中的一些连接器在配置过程中接收配置对象。**仅在特定情况下鼓励使用此模式**。在大多数情况下,连接器可以在顶层保存所有必要的配置参数。

要确定多参数配置对象是否适合作为高级转换的参数,配置对象必须

  • 仅保存与外部数据存储的连接/身份验证参数相关的属性(例如 JdbcIO.DataSourceConfiguration)。
    • 通常,**不应将秘密作为参数传递**,除非没有其他可行的方法。对于秘密管理,建议使用秘密管理服务或 KMS。
  • **或者** 镜像外部数据源的 API 特性(例如 KafkaIO.Read.withConsumerConfigUpdates),而不公开 Beam API 中的该外部 API。
    • 该方法应镜像 API 对象的名称(例如,给定对象 SubscriptionStatConfig,该方法将为 withSubscriptionStatConfig)。
  • **或者** 当连接器可以支持不同的配置“路径”时,其中特定属性需要指定其他属性(例如,BigQueryIO 的方法将包含各种不同的属性)。(见最后示例)。

示例

JdbcIO.DataSourceConfigurationSpannerConfigKafkaIO.Read.withConsumerConfigUpdates

BigQueryIO.write()
  .withWriteConfig(FileLoadsConfig.withAvro()
                                 .withTriggeringFrequency()...)

BigQueryIO.write()
  .withWriteConfig(StreamingInsertsConfig.withDetailedError()
                                  .withExactlyOnce().etc..)

类 IO.写入.带格式函数

不建议 - 除非是动态目标

对于可以接收 Beam Row 类型 PCollection 的源,格式函数应该没有必要,因为 Beam 应该能够根据其模式格式化输入数据。

对于提供动态目标功能的接收器,元素可能携带帮助确定其目标的数据。这些数据可能需要在写入最终目标之前删除。

要包含此方法,连接器应

  • 表明无法自动执行数据匹配。
  • 支持动态目标,并且由于该原因需要更改输入数据。

IO.读取.带编码器

IO.写入.带编码器

强烈不建议

设置用于编码/解码此连接器的输出/输入 PCollection 的元素类型的编码器。一般来说,建议源将

  1. 返回带有自动推断模式的 Row 对象。
  2. 通过具有固定输出/输入类型,或推断其输出/输入类型来自动设置必要的编码器。

如果 #1 和 #2 都不可行,则可以添加 withCoder(...) 方法。

IO.ABC.带端点 / 带 {IO} 客户端 / 带客户端

连接器转换应提供一种方法来覆盖它们与它们通信的外部系统之间的接口。这可以启用各种用途

设置用于编码/解码此连接器的输出/输入 PCollection 的元素类型的编码器。一般来说,建议源将

  • 通过模拟目标服务进行本地测试
  • 用户启用的指标、监控和客户端中的安全处理。
  • 基于模拟器进行集成测试

示例

BigQueryIO.Write.withTestServices(BigQueryServices)

类型

Java 语法

语义

方法 IO.读取.扩展

读取转换的 expand 方法必须返回具有类型的 PCollection 对象。该类型可以参数化或固定为类。

用户**不应**直接创建此类。它应由顶级实用程序方法创建。

方法 IO.读取.expand 的 PCollection 类型

PCollection 的类型通常是以下四种选项之一。对于每个选项,建议编码/数据如下

  • 预定义的基本 Java 类型(例如 String)
    • 这种编码应该很简单,并且使用简单的 Beam 编码器(例如 Utf8StringCoder)
  • 预设的 POJO 类型(例如,Metadata)带有模式
  • 带有特定模式的 Beam 行
  • 在构建时不知道模式的类型

在所有情况下,**不建议** 要求用户传递编码器(例如,withCoder(...))。

方法 IO.写入.扩展

任何写入转换的 expand 方法都必须返回扩展 PCollectionTuple 的类型 IO.写入.结果对象。此对象允许转换返回有关其写入结果的元数据,并允许此写入之后是其他 PTransforms。

如果写入转换不需要返回任何元数据,则**仍然首选** Write.Result 对象,因为它将允许转换随着时间的推移演变其元数据。

元数据的示例

  • 失败的元素和错误
  • 成功写入的元素
  • 转换发出的调用中的 API 令牌

示例

BigQueryIO 的 WriteResult

演变

随着时间的推移,I/O 需要发展以解决新的用例,或使用新 API 来替代。I/O 需要演变的一些示例

Java 语法

语义

顶层静态方法

一般来说,应该抵制为可以通过现有方法中的配置捕获的功能添加全新的静态方法。

可以使用配置支持太多顶层方法的示例是 PubsubIO

仅在以下情况下应添加新的顶层静态方法

Python

常规

如果 I/O 位于 Apache Beam 中,则应将其放置在包 **apache_beam.io.{connector}** 或 **apache_beam.io.{namespace}.{connector}** 中

示例

apache_beam.io.fileio 和 apache_beam.io.gcp.bigquery

将有一个名为 {connector}.py 的模块,它是管道中使用连接器的主要入口点 **apache_beam.io.{connector}** 或 **apache_beam.io.{namespace}.{connector}**

示例

apache_beam.io.gcp.bigquery / apache_beam/io/gcp/bigquery.py

另一种可能的布局:apache_beam/io/gcp/bigquery/bigquery.py(自动导入 bigquery/__init__.py 中的公共类)

连接器必须在其主文件中定义一个 __all__ 属性,并且只导出用户可以访问的类和方法。

如果 I/O 实现存在于单个模块(单个文件)中,则文件 {connector}.py 可以保存它。

否则,连接器代码应定义在一个目录(连接器包)中,该目录带有 __init__.py 文件,该文件记录公共 API。

如果连接器定义了包含其实现的实用程序的其他文件,则这些文件必须明确记录它们不是公共接口的事实。

类/方法/属性

Python 语法

语义

可调用 ReadFrom{Connector}

这提供了从给定数据源读取的 PTransform 的访问权限。它允许您通过它接收的参数来配置它。对于较长的可选参数列表,它们可以定义为具有默认值的参数。

问:Java 使用构建器模式。为什么我们不能在 Python 中这样做?可选参数可以在 Python 中发挥相同的作用

示例

apache_beam.io.gcp.bigquery.ReadFromBigQuery

可调用 ReadAllFrom{Connector}

一些不同的源实现从数据源读取的运行时配置。这是一个有价值的模式,因为它使纯粹的批处理源能够成为更复杂的流源。

尽可能地,这种类型的转换应具有构建时配置转换的类型丰富性和安全性

  • 支持在构建时知道模式的输出
    • 在这种情况下可能需要(并且可以接受)额外的配置(例如,SchemaProvider 参数、Schema 参数、Schema Catalog 或类似的实用程序)。
  • 输入 PCollection 应具有带有架构的固定类型,因此用户可以轻松地对其进行操作。

示例

ReadAllFromBigQuery

可调用 WriteTo{Connector}

这提供了写入给定数据接收器的 PTransform 的访问权限。它允许您通过它接收的参数来配置它。对于较长的可选参数列表,它们可以定义为具有默认值的参数。

问:Java 使用构建器模式。为什么我们不能在 Python 中这样做?可选参数可以发挥相同的

作用 在 Python 中。

示例

apache_beam.io.gcp.bigquery.WriteToBigQuery

可调用 Read/Write

顶层转换初始化程序(ReadFromIO/ReadAllFromIO/WriteToIO)必须旨在要求最少的参数,以简化其使用,并允许用户快速使用它们。

参数 ReadFrom{Connector}({source})

参数 WriteTo{Connector}({sink})

读取或写入 I/O 连接器中的第一个参数必须指定读取器的源或写入器的目标。

如果转换可以从不同*类型*的源读取(例如,表、查询、主题、分区),则按优先级顺序的建议方法是

  1. 保留单个参数,但自动推断源/接收器类型(例如,pandas.read_sql(...) 支持表和查询)。
  2. 为每个可能的源/接收器类型添加一个新参数(例如,ReadFromBigQuery 具有查询/表参数)。

参数 WriteToIO(destination={multiple_destinations})

写入转换可以启用写入多个目标。这可能是一种复杂的模式,应谨慎实施(它是连接器的首选模式,这些连接器可能在单个管道中具有多个目标)。

Python 中的首选 API 模式是为所有需要配置的参数传递可调用对象(例如,WriteToBigQuery)。一般来说,可调用参数的示例可能包括

  • 目标可调用 → 应接收一个元素,并返回该元素的目标。
  • 其他示例
    • 模式可调用 → 应接收一个目标并返回该目标的模式。
    • 格式函数 → 应接收一个记录(可能还有目标),并将记录格式化为要插入的格式。

使用这些可调用对象还允许维护者随着时间的推移添加新的可参数化的可调用对象(使用**默认值**以避免破坏现有用户),这些可调用对象将根据需要定义额外的配置参数。

**极端情况**:通常需要将侧边输入传递给其中一些可调用对象。建议的模式是在构造函数中添加一个额外的参数来包含这些侧边输入(例如,WriteToBigQuery 的 table_side_inputs 参数)。

参数 ReadFromIO(param={param_val})

参数 WriteToIO(param={param_val})

任何额外的配置都可以作为可选参数添加到 I/O 的构造函数中。只要可能,应避免强制性的额外参数。可选参数应该具有合理的默认值,以便尽可能轻松地选择新的连接器。

参数 ReadFromIO(config={config_object})

不推荐

Python 中的一些连接器可能会接收一个复杂的配置对象作为其配置的一部分。**此模式不推荐**,因为连接器可以在顶层保存所有必要的配置参数。

要确定多参数配置对象是否适合作为高级转换的参数,配置对象必须

类型

Python 语法

语义

方法 ReadFromIO.expand 的输出

Read 变换的 expand 方法必须返回一个带有类型的 PCollection 对象,并用类型进行注释。Python 中首选的 PCollection 类型(按优先级排序)为

如果为(字节、字符串、数字),则为简单的 Python 类型

对于复杂类型

  1. 具有固定模式的 NamedTuple 或 DataClass,使用 RowCoder 编码
  2. Python 字典
    1. 如果可能,字典应该通过 RowCoder 编码。
  3. 如果无法使用模式,则为预设的 Python 类

方法 WriteToIO.expand 的输出

任何写入变换的 expand 方法必须返回一个具有**固定类类型**的 Python 对象。建议的类名称为**WriteTo{IO}Result**。此对象允许变换返回有关其写入结果的元数据。

如果 Write 变换不需要返回任何元数据,则具有类类型的 Python 对象**仍然更可取**,因为它将允许变换随着时间的推移演变其元数据。

元数据的示例

  • 失败的元素和错误
  • 成功写入的元素
  • 转换发出的调用中的 API 令牌

示例

BigQueryIO 的 WriteResult

激励示例(不良模式):WriteToBigQuery 不一致的字典结果[1][2]

方法 WriteToIO.expand 的输入

Write 变换的 expand 方法必须返回一个带有类型的 PCollection 对象,并用类型进行注释。Python 中首选的 PCollection 类型与 T1 中引用的 ReadFromIO 的输出类型相同。

GoLang

常规

如果 I/O 位于 Apache Beam 中,则应将其放置在包中

{connector}io

示例

avroiobigqueryio

集成和性能测试应该与 I/O 本身位于同一个包下

{connector}io

Typescript

类/方法/属性

Typescript 语法

语义

function readFromXXX

用于开始构建 I/O.Read 变换的方法。

function writeToXXX

用于开始构建 I/O.Write 变换的方法。

测试

一个 I/O 应该具有单元测试、集成测试和性能测试。在以下指南中,我们将解释每种类型的测试的目标,并提供基线测试覆盖率标准。请注意,实际测试用例和实际测试的业务逻辑会根据每个源/接收器的具体情况而有所不同,但我们已经包含了一些建议的测试用例作为基线。

本指南通过添加特定的测试用例和场景来补充Apache Beam I/O 变换测试指南。有关测试 Beam I/O 连接器的常规信息,请参阅该指南。

集成和性能测试应位于包 org.apache.beam.sdk.io.{connector}.testing 下。这将使各种测试能够使用连接器的标准用户界面。

单元测试应位于同一个包(即 org.apache.beam.sdk.io.{connector})中,因为它们通常会测试连接器的内部结构。

单元测试

I/O 单元测试需要有效地测试代码的功能。鉴于单元测试预计将在多个测试套件中执行多次(例如,对于每个 Python 版本),因此这些测试的执行速度应该比较快,并且不应该产生副作用。我们建议尝试通过单元测试实现 100% 的代码覆盖率。

只要可能,单元测试优于集成测试,因为执行时间更快,资源使用率更低。此外,单元测试可以轻松地包含在预提交测试套件中(例如,Jenkins **beam_PreCommit_*** 测试套件),因此更有可能在早期发现回归。对于错误条件,单元测试也是首选。

单元测试类应与 IO 位于同一个包中,并命名为 {connector}IOTest。

示例

sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java

建议的测试用例

要测试的功能

描述

示例

使用默认选项读取

最好使用 DirectRunner 在本地运行管道,并使用数据存储的伪造品。但这可以是使用模拟对象对源变换进行的单元测试。

BigtableIOTest.testReading

pubsub_test.TestReadFromPubSub.test_read_messages_success

CassandraIOTest.testRead

使用默认选项写入

最好使用 DirectRunner 在本地运行管道,并使用数据存储的伪造品。但这可以是使用模拟对象对接收器变换进行的单元测试。

BigtableIOTest.testWriting

pubsub_test.TestWriteToPubSub.test_write_messages_success

使用其他选项读取

对于用户可用的每个选项。

BigtableIOTest.testReadingWithFilter

使用其他选项写入

对于用户可用的每个选项。例如,写入动态目标。

BigTableIOTest.testReadWithBigTableOptionsSetsRetryOptions

BigQueryIOWriteTest.testWriteDynamicDestinations

读取其他元素类型

如果数据存储读取模式支持不同的数据类型。

BigQueryIOReadTest.testReadTableWithSchema

写入其他元素类型

如果数据存储写入模式支持不同的数据类型。

显示数据

测试源/接收器是否正确填充显示数据。

AvroIOTest.testReadDisplayData

DatastoreV1Test.testReadDisplayData

bigquery_test.TestBigQuerySourcetest_table_reference_display_data

初始拆分

这些测试可能有许多变体。有关详细信息,请参阅示例。

BigqueryIOReadTest.estBigQueryQuerySourceInitSplit

avroio_test.AvroBase.test_read_with_splitting

动态工作重新平衡

这些测试可能有许多变体。有关详细信息,请参阅示例。

BigTableIOTest.testReadingSplitAtFractionExhaustive

avroio_test.AvroBase.test_dynamic_work_rebalancing_exhaustive

模式支持

读取 PCollection<Row> 或写入 PCollection<Row>

应验证从源检索模式,以及为接收器推送/验证模式。

BigQueryIOReadTest.testReadTableWithSchema

BigQueryIOWriteTest.testSchemaWriteLoads

验证测试

测试源/接收器变换是否正确验证,即不正确/不兼容的配置是否被拒绝,并出现可操作的错误。

BigQueryIOWriteTest.testWriteValidatesDataset

PubsubIOTest.testTopicValidationSuccess

指标

确认各种读/写指标是否已设置

SpannerIOReadTest.testReadMetrics

bigtableio_test.TestWriteBigTable.test_write_metrics

读取全部

测试读取全部(PCollection<Read Config>)版本的测试是否有效

SpannerIOReadTest.readAllPipeline

CassandraIOTest.readAllQuery

接收器批处理测试

确保接收器在写入之前对数据进行批处理,如果接收器出于性能原因执行批处理。

SpannerIOWriteTest.testBatchFn_cells

错误处理

确保从数据存储中出现的各种错误(例如,HTTP 错误代码)得到正确处理

BigQueryIOWriteTest.testExtendedErrorRetrieval

重试策略

确认源/接收器是否按预期重试请求

BigQueryIOWriteTest.testRetryPolicy

接收器的输出 PCollection

接收器应生成一个 PCollection,后续步骤可以依赖于该 PCollection。

BigQueryIOWriteTest.testWriteTables

积压字节报告

测试以确认无界源变换是否正确报告积压字节。

KinesisReaderTest.getSplitBacklogBytesShouldReturnBacklogUnknown

水位线报告

测试以确认无界源变换是否正确报告水位线。

WatermarkPolicyTest.shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords

集成测试

集成测试测试 Beam 运行器与给定 I/O 连接到的数据存储之间的端到端交互。由于这些交互通常涉及远程 RPC 调用,因此集成测试的执行时间更长。此外,Beam 运行器在执行集成测试时可能会使用多个工作器。由于这些成本,集成测试仅应在无法通过单元测试覆盖给定场景时实现。

实现至少一个涉及 Beam 与外部存储系统之间交互的集成测试是提交所需的。

涉及源和接收器的 I/O 连接器,Beam 指南建议以“先写后读”的形式实现测试,以便可以通过同一个测试管道覆盖读和写。

集成测试类应与 I/O 位于同一个包中,并命名为**{connector}IOIT**。

例如

sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java

建议的测试用例

测试类型

描述

示例

使用 Dataflow 的“先写后读”测试

将生成的数据写入数据存储,并使用 Dataflow 从数据存储中读取相同的数据。

JdbcIOIT.testWriteThenRead

使用 Dataflow 的“先写后读取全部”测试

与“先写后读”相同,但适用于支持读取 PCollection 源配置的源。所有未来的 (SDF) 源都应支持此功能。

如果相同的变换用于“读取”和“读取全部”形式,或者两个变换基本上相同(例如,读取变换是读取全部的简单包装,反之亦然),则添加一个“读取全部”测试就足够了。

SpannerReadIT.testReadAllRecordsInDb

使用 Dataflow 的无界先写后读

一个持续写入和读取数据的管道。应取消此类管道以验证结果。这仅适用于支持无界读取的连接器。

KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInStreaming

性能测试

由于性能测试框架仍处于不断变化中,因此性能测试可以在实际 I/O 代码提交后进行跟进提交。

性能测试框架尚不支持 GoLang 或 Typescript。

性能基准测试是 I/O 最佳实践的重要组成部分,因为它们有效地解决了几个方面

仪表盘

Google 定期对内置 I/O 进行性能测试,并将结果发布到可供外部查看的仪表板,供 JavaPython 使用。

Dataflow performance test dashboard

指导

尽可能使用相同的测试进行集成和性能测试。性能测试通常与集成测试相同,但涉及更多的数据量。测试框架(内部和外部)提供功能来跟踪与这些测试相关的性能基准,并提供仪表板/工具来检测异常。

在您页面上的 内置 I/O 连接器指南 文档中添加一个 资源可扩展性 部分,该部分将指示 I/O 的集成测试的上限。

例如

表明 KafkaIO 已进行 xxxx 个主题的集成测试。文档可以说明连接器作者是否认为连接器可以扩展到超过集成测试的数量,但这将向用户清楚地说明测试路径的限制。

文档应明确指示用于限制的配置。例如,使用运行器 x 和配置选项 a。

记录您的 I/O 收集的性能/内部指标,包括它们的含义以及如何使用它们(某些连接器收集并发布性能指标,例如延迟/捆绑大小等)。

根据连接器现有的性能测试,包括 I/O 的预期性能特征。