在 Apache Beam 中测试 I/O 变换
测试 Apache Beam I/O 变换的示例和设计模式
- Java SDK
- Python SDK
注意:本指南仍在进行中。有一个开放问题需要完成本指南:BEAM-1025.
介绍
本文档介绍了 Beam 社区根据我们过去编写 I/O 变换的经验推荐的一组测试。如果您希望将您的 I/O 变换贡献给 Beam 社区,我们将要求您实施这些测试。
虽然编写单元测试和集成测试是标准的做法,但有许多可能的定义。我们的定义是
- 单元测试
- 目标:仅验证变换的正确性 - 核心行为、极端情况等。
- 使用的数据存储:数据存储的内存版本(如果可用),否则您需要编写一个 模拟
- 数据集大小:极小(10 到 100 行)
- 集成测试
- 目标:捕获在与数据存储的真实版本交互时出现的错误
- 使用的数据存储:实际实例,在测试之前预先配置
- 数据集大小:小型到中等(1000 行到 10 GB)
关于性能基准测试的说明
我们不提倡专门为性能基准测试编写单独的测试。相反,我们建议设置可以接受必要参数以涵盖许多不同测试场景的集成测试。
例如,如果根据以下指南编写集成测试,则集成测试可以在不同的运行器(本地或集群配置)和针对不同数据存储运行,这些数据存储可以是具有小型数据集的小型实例,也可以是具有大型数据集的生产就绪型大型集群。这可以为各种场景提供覆盖范围,其中之一是性能基准测试。
测试平衡 - 单元测试与集成测试
用集成测试覆盖大量代码很容易,但之后很难找到测试失败的原因,而且测试更不稳定。
但是,通过测试可以找到一组有价值的错误,这些错误会测试多个工作器读取/写入具有多个节点(例如,读副本等)的数据存储实例。这些场景很难用单元测试找到,我们发现它们通常会导致 I/O 变换中的错误。
我们的测试策略是在这两种相互矛盾的需求之间取得平衡。我们建议尽可能地在单元测试中进行测试,并编写一个可以以各种配置运行的单个小型集成测试。
示例
Java
- BigtableIO 的测试实施被认为是当前单元测试
Source
的最佳实践示例 - JdbcIO 提供了编写集成测试的当前最佳实践示例。
- ElasticsearchIO 演示了有界读/写测试。
- MqttIO 和 AmpqpIO 演示了无界读/写。
Python
- avroio_test 提供了测试动态分片、
source_test_utils
、assert_that
和equal_to
的示例。
单元测试
目标
- 验证 I/O 变换中的代码的正确性。
- 验证 I/O 变换在与数据存储的参考实现(其中“参考实现”是指模拟或内存版本)一起使用时是否正常工作。
- 能够快速运行,只需要一台机器,内存/磁盘占用空间合理且没有非本地网络访问(最好完全没有)。目标是测试在几秒钟内运行 - 任何超过 20 秒的测试都应与 Beam 开发邮件列表讨论。
- 验证 I/O 变换是否可以处理网络故障。
非目标
- 测试外部数据存储中的错误 - 这会导致极其复杂的测试。
实施单元测试
有关编写所有变换单元测试的一般指南,请参阅 PTransform 样式指南。我们已在下面扩展了一些重要要点。
如果您使用的是 Source
API,请确保对您的代码进行全面单元测试。微不足道的实施错误会导致数据损坏或数据丢失(例如跳过或重复记录),用户很难检测到这些错误。同时考虑使用 SourceTestUtils
source_test_utils
- 它是测试 Source
实现的关键部分。
如果您没有使用 Source
API,可以使用 TestPipeline
与 PAssert
assert_that
来帮助您进行测试。
如果您正在实现写入,可以使用 TestPipeline
来写入测试数据,然后使用非 Beam 客户端读取和验证它。
使用模拟
不要在单元测试中使用模拟(为每个测试预先编程每个调用的确切响应),而应使用模拟。使用模拟进行 I/O 转换测试的首选方法是使用预先存在的内存中/可嵌入的服务版本,但如果不存在,请考虑自己实现。模拟已证明是“您可以获得测试所需的条件”和“您不必编写数百万个精确的模拟函数调用”的正确组合。
网络故障
为了帮助进行测试和分离关注点,**跨网络交互的代码应在与 I/O 转换不同的类中进行处理**。建议的设计模式是,您的 I/O 转换在确定无法再读取或写入时抛出异常。
这使得 I/O 转换的单元测试能够像拥有完美的网络连接一样运行,并且它们不需要重试/以其他方式处理网络连接问题。
批处理
如果您的 I/O 转换允许对读取/写入进行批处理,则您必须在测试中强制进行批处理。在您的 I/O 转换上拥有可配置的批处理大小选项可以轻松实现这一点。这些必须标记为仅用于测试。
I/O 变换集成测试
我们目前没有 Python I/O 集成测试或针对无界或最终一致数据存储的集成测试示例。我们欢迎在这方面的贡献 - 请联系 Beam dev@ 邮件列表以获取更多信息。
目标
- 允许对数据存储、I/O 转换和运行器之间的交互进行端到端测试,模拟现实世界条件。
- 允许小规模和大型测试。
- 自包含:除了存在测试可以修改的数据存储之外,需要最少的初始设置或现有的外部状态。
- 任何人都可以运行与 Beam 在其持续集成服务器上运行的同一套 I/O 转换集成测试。
集成测试、数据存储和 Kubernetes
为了在现实世界条件下测试 I/O 转换,您必须连接到数据存储实例。
Beam 社区在 Kubernetes 中托管用于集成测试的数据存储。为了使集成测试在 Beam 的持续集成环境中运行,它必须具有设置数据存储实例的 Kubernetes 脚本。
但是,在本地工作时,不需要使用 Kubernetes。所有测试基础设施都允许您传入连接信息,因此开发人员可以使用他们首选的托管基础设施进行本地开发。
在您的机器上运行集成测试
您始终可以在自己的机器上运行 IO 集成测试。运行集成测试的高级步骤是
- 设置与要运行的测试相对应的数据存储。
- 运行测试,并将刚刚创建的数据存储的连接信息传递给它。
- 清理数据存储。
数据存储设置/清理
如果您使用 Kubernetes 脚本托管数据存储,请确保可以使用 kubectl 本地连接到您的集群。如果您已经设置了自己的数据存储,您只需要执行以下列表中的步骤 3。
- 设置与您要运行的测试相对应的数据存储。您可以在 .test-infra/kubernetes 中找到所有当前支持的数据存储的 Kubernetes 脚本。
- 在某些情况下,有一个专用的设置脚本(*.sh)。在其他情况下,您只需运行
kubectl create -f [scriptname]
来创建数据存储。您也可以让 kubernetes.sh 脚本为您执行一些标准步骤。 - 惯例规定会有
- 数据存储本身的 yml 脚本,以及
NodePort
服务。NodePort
服务为从同一子网内的任何连接到 Kubernetes 集群机器的人打开数据存储的端口。这些脚本通常在 Minikube Kubernetes 引擎上运行脚本时很有用。 - 一个单独的脚本,带有 LoadBalancer 服务。此类服务将为数据存储公开一个外部 IP。当需要外部访问时(例如,在 Jenkins 上),需要此类脚本。
- 数据存储本身的 yml 脚本,以及
- 示例
- 对于 JDBC,您可以设置 Postgres:
kubectl create -f .test-infra/kubernetes/postgres/postgres.yml
- 对于 Elasticsearch,您可以运行设置脚本:
bash .test-infra/kubernetes/elasticsearch/setup.sh
- 对于 JDBC,您可以设置 Postgres:
- 在某些情况下,有一个专用的设置脚本(*.sh)。在其他情况下,您只需运行
- 确定服务的 IP 地址
- NodePort 服务:
kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}
- LoadBalancer 服务:
kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
- NodePort 服务:
- 使用
integrationTest
gradle 任务和测试类中的说明运行测试(例如,请参阅 JdbcIOIT.java 中的说明)。 - 告诉 Kubernetes 删除 Kubernetes 脚本中指定 的资源
- JDBC:
kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml
- Elasticsearch:
bash .test-infra/kubernetes/elasticsearch/teardown.sh
- JDBC:
运行特定测试
integrationTest
是一个专门用于运行 IO 集成测试的 gradle 任务。
在 Cloud Dataflow 运行器上的示例用法
./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions='["--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET", "--numberOfRecords=1000", "--postgresPort=5432", "--postgresServerName=SERVER_NAME", "--postgresUsername=postgres", "--postgresPassword=PASSWORD", "--postgresDatabaseName=postgres", "--postgresSsl=false", "--runner=TestDataflowRunner"]' -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT
在 HDFS 文件系统和 Direct 运行器上的示例用法
注意:以下设置仅在 /etc/hosts 文件包含带有 Hadoop namenode 和 Hadoop datanode 外部 IP 的条目时才有效。请参阅以下内容的说明:小型集群配置文件 和 大型集群配置文件。
export HADOOP_USER_NAME=root
./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT", "--hdfsConfiguration=[{\"fs.defaultFS\":\"hdfs://HDFS_NAMENODE:9000\",\"dfs.replication\":1,\"dfs.client.use.datanode.hostname\":\"true\" }]" ]' -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT
参数描述
选项 | 功能 |
-p sdks/java/io/file-based-io-tests/ | 指定要测试的 I/O 的项目子模块。 |
-DintegrationTestPipelineOptions | 将管道选项直接传递给要运行的测试。 |
-DintegrationTestRunner | 用于运行测试的运行器。当前可能的选项是:direct、dataflow。 |
-Dfilesystem | (可选,在适用情况下)用于运行测试的文件系统。当前可能的选项是:gcs、hdfs、s3。如果没有提供,将使用本地文件系统。 |
--tests | 指定要运行的测试(对类/测试方法的完全限定引用)。 |
在拉取请求上运行集成测试
大多数 IO 集成测试都有专门的 Jenkins 作业,这些作业定期运行以收集指标并避免回归。感谢 ghprb 插件,也可以在 Github 拉取请求的评论中键入特定短语后按需触发这些作业。这样,您就可以检查您对某个 IO 的贡献是改进还是使情况变得更糟(希望不会!)。
要运行 IO 集成测试,请在您的拉取请求中键入以下评论
测试 | 短语 |
JdbcIOIT | 运行 Java JdbcIO 性能测试 |
MongoDBIOIT | 运行 Java MongoDBIO 性能测试 |
HadoopFormatIOIT | 运行 Java HadoopFormatIO 性能测试 |
TextIO - 本地文件系统 | 运行 Java TextIO 性能测试 |
TextIO - HDFS | 运行 Java TextIO 性能测试 HDFS |
压缩的 TextIO - 本地文件系统 | 运行 Java CompressedTextIO 性能测试 |
压缩的 TextIO - HDFS | 运行 Java CompressedTextIO 性能测试 HDFS |
AvroIO - 本地文件系统 | 运行 Java AvroIO 性能测试 |
AvroIO - HDFS | 运行 Java AvroIO 性能测试 HDFS |
TFRecordIO - 本地文件系统 | 运行 Java TFRecordIO 性能测试 |
ParquetIO - 本地文件系统 | 运行 Java ParquetIO 性能测试 |
XmlIO - 本地文件系统 | 运行 Java XmlIO 性能测试 |
XmlIO - HDFS | 在 HDFS 上运行 Java XmlIO 性能测试 |
每个作业定义都可以在 .test-infra/jenkins 中找到。如果您在拉取请求中修改/添加了新的 Jenkins 作业定义,请在运行集成测试之前运行种子作业(评论:“运行种子作业”)。
性能测试仪表板
如前所述,我们通过收集定期运行的 Jenkins 作业的测试执行时间来衡量 IOIT 的性能。随后的结果存储在数据库(BigQuery)中,因此我们可以在图表的形式中显示它们。
收集所有结果的仪表板可在此处获得:性能测试仪表板
实施集成测试
实现集成测试需要三个组件
- **测试代码**:执行实际测试的代码:与 I/O 转换交互,读取和写入数据,以及验证数据。
- **Kubernetes 脚本**:一个 Kubernetes 脚本,用于设置测试代码将使用的数据库。
- **Jenkins 作业**:一个 Jenkins 作业 DSL 脚本,执行设置数据源、运行和清理测试后所有必要的步骤。
这两部分将在下面详细讨论。
测试代码
以下是集成测试代码使用的约定
- 您的测试应使用管道选项接收连接信息。
- 对于 Java,在 io/common 目录中有一个共享的管道选项对象。这意味着如果对同一个数据库有两个测试(例如,对于
Elasticsearch
和HadoopFormatIO
测试),那么这些测试共享相同的管道选项。
- 对于 Java,在 io/common 目录中有一个共享的管道选项对象。这意味着如果对同一个数据库有两个测试(例如,对于
- 以编程方式生成测试数据,并对用于测试的数据量进行参数化。
- 对于 Java,
CountingInput
+TestRow
可以组合起来以任何规模生成确定性测试数据。
- 对于 Java,
- 对您的测试使用先写后读的风格。
- 在一个
Test
中,运行一个管道使用您的 I/O 转换进行写入,然后运行另一个管道使用您的 I/O 转换进行读取。 - 数据的唯一验证应该是来自读取的结果。不要以任何其他方式验证写入数据库的数据。
- 以有效的方式验证所有行的实际内容。一个简单的方法是获取行的哈希值并将它们组合起来。
HashingFn
可以帮助简化此操作,TestRow
具有预先计算的哈希值。 - 为了便于调试,请使用
PAssert
的containsInAnyOrder
来验证所有行的子集的内容。
- 在一个
- 测试应假设它们可能在同一个数据库实例上多次运行和/或同时运行。
- 清理测试数据:在
@AfterClass
中执行此操作以确保它运行。 - 在适当的情况下,每次运行使用唯一的表名(时间戳是一种简单的方法)和每个方法使用唯一的表名。
- 清理测试数据:在
这些原则的端到端示例可以在 JdbcIOIT 中找到。
Kubernetes 脚本
如 集成测试、数据存储和 Kubernetes 中所述,为了使您的测试在 Beam 的持续集成服务器上运行,您需要实现一个 Kubernetes 脚本,用于创建您的数据存储的实例。
如果您需要这方面的帮助或有任何其他问题,请联系 Beam dev@ 邮件列表,社区可能会提供帮助。
创建 Beam 数据存储 Kubernetes 脚本的指南
- 您应该定义两个 Kubernetes 脚本。
- 这是实现项目 #1 的最常用方法。
- 第一个脚本将包含主数据存储实例脚本(
StatefulSet
)以及一个公开数据存储的NodePort
服务。这将是 Beam Jenkins 持续集成服务器运行的脚本。 - 第二个脚本将定义一个额外的
LoadBalancer
服务,用于在 Kubernetes 集群位于另一个网络上时公开数据存储的外部 IP 地址。此文件的名通常以 ‘-for-local-dev’ 结尾。
- 您必须确保在发生崩溃后重新创建 pod。
- 如果您直接使用
pod
,则在 pod 崩溃或某些原因导致集群移动 pod 的容器时,它不会被重新创建。 - 在大多数情况下,您需要使用
StatefulSet
,因为它支持重启之间持久存在的磁盘,以及使用特定持久磁盘的 pod 关联稳定的网络标识符。Deployment
和ReplicaSet
也可能有用,但在更少的场景中,因为它们没有这些功能。
- 如果您直接使用
- 您应该为数据存储的小实例和大实例创建单独的脚本。
- 这似乎是支持同时提供小型和大型数据存储以进行集成测试的最佳方法,如 小型和大型集成测试 中所述。
- 您必须使用来自可信来源的 Docker 镜像,并固定 Docker 镜像的版本。
- 您应该优先考虑以下顺序的镜像
- 数据源/接收器创建者提供的镜像(如果他们正式维护它)。对于 Apache 项目,这将是官方的 Apache 存储库。
- 官方 Docker 镜像,因为它们有安全修复和保证的维护。
- 非官方 Docker 镜像,或来自其他提供者的镜像,这些提供者有良好的维护者(例如 quay.io)。
- 您应该优先考虑以下顺序的镜像
Jenkins 作业
您可以在 .test-infra/jenkins 目录中找到现有 IOIT Jenkins 任务定义的示例。查找名为 job_PerformanceTest_*.groovy 的文件。最突出的示例是
请注意,有一个实用程序类有助于轻松创建任务,而不会忘记重要步骤或重复代码。有关详细信息,请参阅 Kubernetes.groovy。
小型和大型集成测试
Apache Beam 预期它可以在多种配置中运行集成测试
- 小型
- 在运行程序上的单个工作程序上执行(这应该是可能的,但不是必需的)。
- 数据存储应配置为使用单个节点。
- 数据集可以非常小(1000 行)。
- 大型
- 在运行程序上的多个工作程序上执行。
- 数据存储应配置为使用多个节点。
- 此用例中使用的数据集更大(10 GB 以上)。
您可以通过以下方式实现
- 创建两个 Kubernetes 脚本:一个用于数据存储的小实例,另一个用于大实例。
- 让您的测试使用管道选项来决定是生成少量测试数据还是大量测试数据(其中少量和大量是适合您数据存储的大小)
这方面的一个例子是 HadoopFormatIO 的测试。
最后更新于 2024/10/31
您找到所有需要的信息了吗?
所有信息都实用且清晰吗?您想更改什么吗?请告诉我们!