使用直接运行器

直接运行器在您的机器上执行管道,旨在尽可能严格地验证管道是否符合 Apache Beam 模型。它不会专注于高效的管道执行,而是进行额外的检查,以确保用户不依赖于模型未保证的语义。这些检查包括

使用直接运行器进行测试和开发有助于确保管道在不同的 Beam 运行器上保持健壮性。此外,当管道在远程集群上执行时,调试失败的运行可能是一项非平凡的任务。相反,通常在您的管道代码上执行本地单元测试更快更简单。在本地对管道进行单元测试还可以让您使用首选的本地调试工具。

以下是一些有关如何测试管道的资源。

直接运行器不适合生产管道,因为它优化了正确性而不是性能。直接运行器必须将所有用户数据放入内存,而 Flink 和 Spark 运行器如果数据无法放入内存,则可以将数据溢出到磁盘。因此,Flink 和 Spark 运行器能够运行更大的管道,并且更适合生产工作负载。

直接运行器先决条件和设置

指定您的依赖项

在使用 Java 时,您必须在 pom.xml 中指定对直接运行器的依赖项。

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.60.0</version>
   <scope>runtime</scope>
</dependency>

此部分不适用于 Beam SDK for Python。

直接运行器的管道选项

有关如何设置管道选项的常规说明,请参阅 编程指南

从命令行执行管道时,将 runner 设置为 directDirectRunner。其他管道选项的默认值通常就足够了。

有关默认值和其他管道配置选项,请参阅 DirectOptions DirectOptions 接口的参考文档。

其他信息和注意事项

内存注意事项

本地执行受本地环境中可用内存的限制。强烈建议您使用足够小的数据集运行管道,以使其适合本地内存。您可以使用 CreateCreate 变换创建一个小内存中数据集,或者您可以使用 ReadRead 变换处理小的本地或远程文件。

流式执行

Python DirectRunner 对流式处理的支持有限。有关已知问题,请参阅:https://github.com/apache/beam/issues/24528.

如果您的管道使用无界数据源或接收器,则必须将 streaming 选项设置为 true

并行执行

Python FnApiRunner 支持多线程和多进程模式。

设置并行度

工作线程的数量由 targetParallelism 管道选项定义。默认情况下,targetParallelism 是可用处理器数量和 3 之间的较大值。

线程或子进程的数量由设置 direct_num_workers 管道选项定义。从 2.22.0 开始,支持 direct_num_workers = 0。当 direct_num_workers 设置为 0 时,它将把线程/子进程的数量设置为运行管道的机器的内核数量。

设置运行模式

在 Beam 2.19.0 及更高版本中,您可以使用 direct_running_mode 管道选项来设置运行模式。direct_running_mode 可以是以下之一:['in_memory''multi_threading''multi_processing']。

in_memory:运行器和工作器之间的通信发生在内存中(不是通过 gRPC)。这是默认模式。

multi_threading:运行器和工作器通过 gRPC 进行通信,并且每个工作器都在一个线程中运行。

multi_processing:运行器和工作器通过 gRPC 进行通信,并且每个工作器都在一个子进程中运行。

在将管道部署到远程运行器之前

虽然在直接运行器上进行测试很方便,但它仍然可能与远程运行器在 Beam 模型语义之外表现不同,尤其是在与运行时环境相关的问题方面。通常建议在将管道完全部署到生产环境之前,先在目标远程运行器上以小规模对其进行测试。