使用直接运行器
- Java SDK
- Python SDK
直接运行器在您的机器上执行管道,旨在尽可能严格地验证管道是否符合 Apache Beam 模型。它不会专注于高效的管道执行,而是进行额外的检查,以确保用户不依赖于模型未保证的语义。这些检查包括
- 强制元素的不可变性
- 强制元素的可编码性
- 元素在所有点上以任意顺序处理
- 用户函数(
DoFn
、CombineFn
等)的序列化 有关详细信息,请参阅 DoFns 的可序列化性。
使用直接运行器进行测试和开发有助于确保管道在不同的 Beam 运行器上保持健壮性。此外,当管道在远程集群上执行时,调试失败的运行可能是一项非平凡的任务。相反,通常在您的管道代码上执行本地单元测试更快更简单。在本地对管道进行单元测试还可以让您使用首选的本地调试工具。
以下是一些有关如何测试管道的资源。
- 测试您的管道
- 在 Apache Beam 中测试无界管道 讨论了使用 Java 类 PAssert 和 TestStream 测试您的管道的过程。
- Apache Beam WordCount 演练 包含一个使用 PAssert 记录和测试管道的示例。
- Apache Beam WordCount 演练 包含一个使用
assert_that
记录和测试管道的示例。
直接运行器不适合生产管道,因为它优化了正确性而不是性能。直接运行器必须将所有用户数据放入内存,而 Flink 和 Spark 运行器如果数据无法放入内存,则可以将数据溢出到磁盘。因此,Flink 和 Spark 运行器能够运行更大的管道,并且更适合生产工作负载。
直接运行器先决条件和设置
指定您的依赖项
在使用 Java 时,您必须在 pom.xml
中指定对直接运行器的依赖项。
此部分不适用于 Beam SDK for Python。
直接运行器的管道选项
有关如何设置管道选项的常规说明,请参阅 编程指南。
从命令行执行管道时,将 runner
设置为 direct
或 DirectRunner
。其他管道选项的默认值通常就足够了。
有关默认值和其他管道配置选项,请参阅 DirectOptions
DirectOptions
接口的参考文档。
其他信息和注意事项
内存注意事项
本地执行受本地环境中可用内存的限制。强烈建议您使用足够小的数据集运行管道,以使其适合本地内存。您可以使用 Create
Create
变换创建一个小内存中数据集,或者您可以使用 Read
Read
变换处理小的本地或远程文件。
流式执行
Python DirectRunner 对流式处理的支持有限。有关已知问题,请参阅:https://github.com/apache/beam/issues/24528.
如果您的管道使用无界数据源或接收器,则必须将 streaming
选项设置为 true
。
并行执行
Python FnApiRunner 支持多线程和多进程模式。
设置并行度
工作线程的数量由 targetParallelism
管道选项定义。默认情况下,targetParallelism
是可用处理器数量和 3 之间的较大值。
线程或子进程的数量由设置 direct_num_workers
管道选项定义。从 2.22.0 开始,支持 direct_num_workers = 0
。当 direct_num_workers
设置为 0 时,它将把线程/子进程的数量设置为运行管道的机器的内核数量。
设置运行模式
在 Beam 2.19.0 及更高版本中,您可以使用 direct_running_mode
管道选项来设置运行模式。direct_running_mode
可以是以下之一:['in_memory'
、'multi_threading'
、'multi_processing'
]。
in_memory:运行器和工作器之间的通信发生在内存中(不是通过 gRPC)。这是默认模式。
multi_threading:运行器和工作器通过 gRPC 进行通信,并且每个工作器都在一个线程中运行。
multi_processing:运行器和工作器通过 gRPC 进行通信,并且每个工作器都在一个子进程中运行。
在将管道部署到远程运行器之前
虽然在直接运行器上进行测试很方便,但它仍然可能与远程运行器在 Beam 模型语义之外表现不同,尤其是在与运行时环境相关的问题方面。通常建议在将管道完全部署到生产环境之前,先在目标远程运行器上以小规模对其进行测试。
最后更新时间:2024/10/31
您找到所需的一切了吗?
所有内容都有用且清晰吗?您想更改什么吗?请告诉我们!