在 Apache Beam 中测试 I/O 变换

测试 Apache Beam I/O 变换的示例和设计模式

注意:本指南仍在进行中。有一个开放问题需要完成本指南:BEAM-1025.

介绍

本文档介绍了 Beam 社区根据我们过去编写 I/O 变换的经验推荐的一组测试。如果您希望将您的 I/O 变换贡献给 Beam 社区,我们将要求您实施这些测试。

虽然编写单元测试和集成测试是标准的做法,但有许多可能的定义。我们的定义是

关于性能基准测试的说明

我们不提倡专门为性能基准测试编写单独的测试。相反,我们建议设置可以接受必要参数以涵盖许多不同测试场景的集成测试。

例如,如果根据以下指南编写集成测试,则集成测试可以在不同的运行器(本地或集群配置)和针对不同数据存储运行,这些数据存储可以是具有小型数据集的小型实例,也可以是具有大型数据集的生产就绪型大型集群。这可以为各种场景提供覆盖范围,其中之一是性能基准测试。

测试平衡 - 单元测试与集成测试

用集成测试覆盖大量代码很容易,但之后很难找到测试失败的原因,而且测试更不稳定。

但是,通过测试可以找到一组有价值的错误,这些错误会测试多个工作器读取/写入具有多个节点(例如,读副本等)的数据存储实例。这些场景很难用单元测试找到,我们发现它们通常会导致 I/O 变换中的错误。

我们的测试策略是在这两种相互矛盾的需求之间取得平衡。我们建议尽可能地在单元测试中进行测试,并编写一个可以以各种配置运行的单个小型集成测试。

示例

Java

Python

单元测试

目标

非目标

实施单元测试

有关编写所有变换单元测试的一般指南,请参阅 PTransform 样式指南。我们已在下面扩展了一些重要要点。

如果您使用的是 Source API,请确保对您的代码进行全面单元测试。微不足道的实施错误会导致数据损坏或数据丢失(例如跳过或重复记录),用户很难检测到这些错误。同时考虑使用 SourceTestUtilssource_test_utils - 它是测试 Source 实现的关键部分。

如果您没有使用 Source API,可以使用 TestPipelinePAssertassert_that 来帮助您进行测试。

如果您正在实现写入,可以使用 TestPipeline 来写入测试数据,然后使用非 Beam 客户端读取和验证它。

使用模拟

不要在单元测试中使用模拟(为每个测试预先编程每个调用的确切响应),而应使用模拟。使用模拟进行 I/O 转换测试的首选方法是使用预先存在的内存中/可嵌入的服务版本,但如果不存在,请考虑自己实现。模拟已证明是“您可以获得测试所需的条件”和“您不必编写数百万个精确的模拟函数调用”的正确组合。

网络故障

为了帮助进行测试和分离关注点,**跨网络交互的代码应在与 I/O 转换不同的类中进行处理**。建议的设计模式是,您的 I/O 转换在确定无法再读取或写入时抛出异常。

这使得 I/O 转换的单元测试能够像拥有完美的网络连接一样运行,并且它们不需要重试/以其他方式处理网络连接问题。

批处理

如果您的 I/O 转换允许对读取/写入进行批处理,则您必须在测试中强制进行批处理。在您的 I/O 转换上拥有可配置的批处理大小选项可以轻松实现这一点。这些必须标记为仅用于测试。

I/O 变换集成测试

我们目前没有 Python I/O 集成测试或针对无界或最终一致数据存储的集成测试示例。我们欢迎在这方面的贡献 - 请联系 Beam dev@ 邮件列表以获取更多信息。

目标

集成测试、数据存储和 Kubernetes

为了在现实世界条件下测试 I/O 转换,您必须连接到数据存储实例。

Beam 社区在 Kubernetes 中托管用于集成测试的数据存储。为了使集成测试在 Beam 的持续集成环境中运行,它必须具有设置数据存储实例的 Kubernetes 脚本。

但是,在本地工作时,不需要使用 Kubernetes。所有测试基础设施都允许您传入连接信息,因此开发人员可以使用他们首选的托管基础设施进行本地开发。

在您的机器上运行集成测试

您始终可以在自己的机器上运行 IO 集成测试。运行集成测试的高级步骤是

  1. 设置与要运行的测试相对应的数据存储。
  2. 运行测试,并将刚刚创建的数据存储的连接信息传递给它。
  3. 清理数据存储。

数据存储设置/清理

如果您使用 Kubernetes 脚本托管数据存储,请确保可以使用 kubectl 本地连接到您的集群。如果您已经设置了自己的数据存储,您只需要执行以下列表中的步骤 3。

  1. 设置与您要运行的测试相对应的数据存储。您可以在 .test-infra/kubernetes 中找到所有当前支持的数据存储的 Kubernetes 脚本。
    1. 在某些情况下,有一个专用的设置脚本(*.sh)。在其他情况下,您只需运行 kubectl create -f [scriptname] 来创建数据存储。您也可以让 kubernetes.sh 脚本为您执行一些标准步骤。
    2. 惯例规定会有
      1. 数据存储本身的 yml 脚本,以及 NodePort 服务。NodePort 服务为从同一子网内的任何连接到 Kubernetes 集群机器的人打开数据存储的端口。这些脚本通常在 Minikube Kubernetes 引擎上运行脚本时很有用。
      2. 一个单独的脚本,带有 LoadBalancer 服务。此类服务将为数据存储公开一个外部 IP。当需要外部访问时(例如,在 Jenkins 上),需要此类脚本。
    3. 示例
      1. 对于 JDBC,您可以设置 Postgres:kubectl create -f .test-infra/kubernetes/postgres/postgres.yml
      2. 对于 Elasticsearch,您可以运行设置脚本:bash .test-infra/kubernetes/elasticsearch/setup.sh
  2. 确定服务的 IP 地址
    1. NodePort 服务:kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}
    2. LoadBalancer 服务:kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
  3. 使用 integrationTest gradle 任务和测试类中的说明运行测试(例如,请参阅 JdbcIOIT.java 中的说明)。
  4. 告诉 Kubernetes 删除 Kubernetes 脚本中指定 的资源
    1. JDBC:kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml
    2. Elasticsearch:bash .test-infra/kubernetes/elasticsearch/teardown.sh

运行特定测试

integrationTest 是一个专门用于运行 IO 集成测试的 gradle 任务。

在 Cloud Dataflow 运行器上的示例用法

./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions='["--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET", "--numberOfRecords=1000", "--postgresPort=5432", "--postgresServerName=SERVER_NAME", "--postgresUsername=postgres", "--postgresPassword=PASSWORD", "--postgresDatabaseName=postgres", "--postgresSsl=false", "--runner=TestDataflowRunner"]' -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT

在 HDFS 文件系统和 Direct 运行器上的示例用法

注意:以下设置仅在 /etc/hosts 文件包含带有 Hadoop namenode 和 Hadoop datanode 外部 IP 的条目时才有效。请参阅以下内容的说明:小型集群配置文件大型集群配置文件

export HADOOP_USER_NAME=root

./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT", "--hdfsConfiguration=[{\"fs.defaultFS\":\"hdfs://HDFS_NAMENODE:9000\",\"dfs.replication\":1,\"dfs.client.use.datanode.hostname\":\"true\" }]" ]' -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT

参数描述

选项功能
-p sdks/java/io/file-based-io-tests/指定要测试的 I/O 的项目子模块。
-DintegrationTestPipelineOptions将管道选项直接传递给要运行的测试。
-DintegrationTestRunner用于运行测试的运行器。当前可能的选项是:direct、dataflow。
-Dfilesystem(可选,在适用情况下)用于运行测试的文件系统。当前可能的选项是:gcs、hdfs、s3。如果没有提供,将使用本地文件系统。
--tests指定要运行的测试(对类/测试方法的完全限定引用)。

在拉取请求上运行集成测试

大多数 IO 集成测试都有专门的 Jenkins 作业,这些作业定期运行以收集指标并避免回归。感谢 ghprb 插件,也可以在 Github 拉取请求的评论中键入特定短语后按需触发这些作业。这样,您就可以检查您对某个 IO 的贡献是改进还是使情况变得更糟(希望不会!)。

要运行 IO 集成测试,请在您的拉取请求中键入以下评论

测试短语
JdbcIOIT运行 Java JdbcIO 性能测试
MongoDBIOIT运行 Java MongoDBIO 性能测试
HadoopFormatIOIT运行 Java HadoopFormatIO 性能测试
TextIO - 本地文件系统运行 Java TextIO 性能测试
TextIO - HDFS运行 Java TextIO 性能测试 HDFS
压缩的 TextIO - 本地文件系统运行 Java CompressedTextIO 性能测试
压缩的 TextIO - HDFS运行 Java CompressedTextIO 性能测试 HDFS
AvroIO - 本地文件系统运行 Java AvroIO 性能测试
AvroIO - HDFS运行 Java AvroIO 性能测试 HDFS
TFRecordIO - 本地文件系统运行 Java TFRecordIO 性能测试
ParquetIO - 本地文件系统运行 Java ParquetIO 性能测试
XmlIO - 本地文件系统运行 Java XmlIO 性能测试
XmlIO - HDFS在 HDFS 上运行 Java XmlIO 性能测试

每个作业定义都可以在 .test-infra/jenkins 中找到。如果您在拉取请求中修改/添加了新的 Jenkins 作业定义,请在运行集成测试之前运行种子作业(评论:“运行种子作业”)。

性能测试仪表板

如前所述,我们通过收集定期运行的 Jenkins 作业的测试执行时间来衡量 IOIT 的性能。随后的结果存储在数据库(BigQuery)中,因此我们可以在图表的形式中显示它们。

收集所有结果的仪表板可在此处获得:性能测试仪表板

实施集成测试

实现集成测试需要三个组件

这两部分将在下面详细讨论。

测试代码

以下是集成测试代码使用的约定

这些原则的端到端示例可以在 JdbcIOIT 中找到。

Kubernetes 脚本

集成测试、数据存储和 Kubernetes 中所述,为了使您的测试在 Beam 的持续集成服务器上运行,您需要实现一个 Kubernetes 脚本,用于创建您的数据存储的实例。

如果您需要这方面的帮助或有任何其他问题,请联系 Beam dev@ 邮件列表,社区可能会提供帮助。

创建 Beam 数据存储 Kubernetes 脚本的指南

  1. 您应该定义两个 Kubernetes 脚本。
    • 这是实现项目 #1 的最常用方法。
    • 第一个脚本将包含主数据存储实例脚本(StatefulSet)以及一个公开数据存储的 NodePort 服务。这将是 Beam Jenkins 持续集成服务器运行的脚本。
    • 第二个脚本将定义一个额外的 LoadBalancer 服务,用于在 Kubernetes 集群位于另一个网络上时公开数据存储的外部 IP 地址。此文件的名通常以 ‘-for-local-dev’ 结尾。
  2. 您必须确保在发生崩溃后重新创建 pod。
    • 如果您直接使用pod,则在 pod 崩溃或某些原因导致集群移动 pod 的容器时,它不会被重新创建。
    • 在大多数情况下,您需要使用StatefulSet,因为它支持重启之间持久存在的磁盘,以及使用特定持久磁盘的 pod 关联稳定的网络标识符。DeploymentReplicaSet 也可能有用,但在更少的场景中,因为它们没有这些功能。
  3. 您应该为数据存储的小实例和大实例创建单独的脚本。
    • 这似乎是支持同时提供小型和大型数据存储以进行集成测试的最佳方法,如 小型和大型集成测试 中所述。
  4. 您必须使用来自可信来源的 Docker 镜像,并固定 Docker 镜像的版本。
    • 您应该优先考虑以下顺序的镜像
      1. 数据源/接收器创建者提供的镜像(如果他们正式维护它)。对于 Apache 项目,这将是官方的 Apache 存储库。
      2. 官方 Docker 镜像,因为它们有安全修复和保证的维护。
      3. 非官方 Docker 镜像,或来自其他提供者的镜像,这些提供者有良好的维护者(例如 quay.io)。

Jenkins 作业

您可以在 .test-infra/jenkins 目录中找到现有 IOIT Jenkins 任务定义的示例。查找名为 job_PerformanceTest_*.groovy 的文件。最突出的示例是

请注意,有一个实用程序类有助于轻松创建任务,而不会忘记重要步骤或重复代码。有关详细信息,请参阅 Kubernetes.groovy

小型和大型集成测试

Apache Beam 预期它可以在多种配置中运行集成测试

您可以通过以下方式实现

  1. 创建两个 Kubernetes 脚本:一个用于数据存储的小实例,另一个用于大实例。
  2. 让您的测试使用管道选项来决定是生成少量测试数据还是大量测试数据(其中少量和大量是适合您数据存储的大小)

这方面的一个例子是 HadoopFormatIO 的测试。