使用 Google Cloud Dataflow 运行器
- Java SDK
- Python SDK
Google Cloud Dataflow 运行器使用 Cloud Dataflow 管理服务。当您使用 Cloud Dataflow 服务运行管道时,运行器会将您的可执行代码和依赖项上传到 Google Cloud Storage 存储桶,并创建一个 Cloud Dataflow 作业,该作业在 Google Cloud Platform 上的管理资源上执行您的管道。
Cloud Dataflow 运行器和服务适用于大规模、连续作业,并提供
Beam 功能矩阵 记录了 Cloud Dataflow 运行器的支持功能。
Cloud Dataflow 运行器先决条件和设置
要使用 Cloud Dataflow 运行器,您必须完成 Cloud Dataflow 快速入门 中针对您选择的语言的“开始之前”部分中的设置。
- 选择或创建一个 Google Cloud Platform 控制台项目。
- 为您的项目启用计费。
- 启用所需的 Google Cloud API:Cloud Dataflow、Compute Engine、Stackdriver 日志记录、Cloud Storage、Cloud Storage JSON 和 Cloud Resource Manager。如果您在管道代码中使用它们,您可能需要启用其他 API(例如 BigQuery、Cloud Pub/Sub 或 Cloud Datastore)。
- 使用 Google Cloud Platform 进行身份验证。
- 安装 Google Cloud SDK。
- 创建一个 Cloud Storage 存储桶。
指定您的依赖项
使用 Java 时,您必须在 pom.xml
中指定对 Cloud Dataflow 运行器的依赖关系。
此部分不适用于 Beam SDK for Python。
自执行 JAR
此部分不适用于 Beam SDK for Python。
在某些情况下,例如使用 Apache AirFlow 等调度程序启动管道时,您必须拥有一个自包含的应用程序。您可以通过在 pom.xml
的“Project”部分中显式添加以下依赖项来打包自执行 JAR,此外还要添加上一节中显示的现有依赖项。
然后,在 Maven JAR 插件中添加 mainClass
名称。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>YOUR_MAIN_CLASS_NAME</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
运行 mvn package -Pdataflow-runner
后,运行 ls target
,您应该看到以下输出(假设您的 artifactId 为 beam-examples
,版本为 1.0.0)。
要在 Cloud Dataflow 上运行自执行 JAR,请使用以下命令。
Cloud Dataflow 运行器的管道选项
使用 Cloud Dataflow 运行器(Java)执行管道时,请考虑以下常见管道选项。 使用 Cloud Dataflow 运行器(Python)执行管道时,请考虑以下常见管道选项。
字段 | 描述 | 默认值 |
---|---|---|
runner | 要使用的管道运行器。此选项允许您在运行时确定管道运行器。 | 设置为 dataflow 或 DataflowRunner 以在 Cloud Dataflow 服务上运行。 |
project | 您的 Google Cloud 项目的项目 ID。 | 如果未设置,则默认为当前环境中的默认项目。默认项目通过 gcloud 设置。 |
region | 要创建作业的 Google Compute Engine 区域。 | 如果未设置,则默认为当前环境中的默认区域。默认区域通过 gcloud 设置。 |
streaming | 是否启用或禁用流模式;如果启用,则为 true 。如果运行具有无界 PCollection 的管道,则设置为 true 。 | false |
tempLocation temp_location | 可选。 必需。 临时文件的路径。必须是有效的 Google Cloud Storage URL,以 gs:// 开头。 如果设置,tempLocation 将用作 gcpTempLocation 的默认值。 | 没有默认值。 |
gcpTempLocation | 临时文件的 Cloud Storage 存储桶路径。必须是有效的 Cloud Storage URL,以 gs:// 开头。 | 如果未设置,则默认为 tempLocation 的值,前提是 tempLocation 是有效的 Cloud Storage URL。如果 tempLocation 不是有效的 Cloud Storage URL,则必须设置 gcpTempLocation 。 |
stagingLocation staging_location | 可选。用于存放您的二进制文件和任何临时文件的 Cloud Storage 存储桶路径。必须是有效的 Cloud Storage URL,以 gs:// 开头。 | 如果未设置,则默认为 gcpTempLocation 内的暂存目录。 如果未设置,则默认为 temp_location 内的暂存目录。 |
save_main_session | 保存主会话状态,以便可以取消对 __main__ (例如交互式会话)中定义的腌制函数和类的腌制。如果,例如,所有函数/类都在适当的模块(而不是 __main__ )中定义,并且这些模块可以在 worker 中导入,则某些工作流不需要会话状态。 | false |
sdk_location | 覆盖 Beam SDK 下载的默认位置。此值可以是 URL、Cloud Storage 路径或本地路径到 SDK 压缩包。工作流提交将从此位置下载或复制 SDK 压缩包。如果设置为字符串 default ,则使用标准 SDK 位置。如果为空,则不复制任何 SDK。 | default |
有关其他管道配置选项,请参阅 DataflowPipelineOptions PipelineOptions
接口(以及任何子接口)的参考文档。
其他信息和注意事项
监控您的作业
在管道执行期间,您可以使用 Dataflow 监控界面 或 Dataflow 命令行界面 监控作业进度、查看执行详细信息并接收管道结果的更新。
阻塞执行
要阻塞直到您的作业完成,请在 pipeline.run()
返回的 PipelineResult
上调用 waitToFinish
wait_until_finish
。Cloud Dataflow 运行器在等待时会打印作业状态更新和控制台消息。虽然结果与活动作业相连,但请注意,从命令行按下 Ctrl+C 不会取消您的作业。要取消作业,您可以使用 Dataflow 监控界面 或 Dataflow 命令行界面。
流式执行
如果您的管道使用无界数据源或接收器,则必须将 streaming
选项设置为 true
。
使用流式执行时,请牢记以下注意事项。
流式管道不会终止,除非用户显式取消。您可以从 Dataflow 监控界面 或使用 Dataflow 命令行界面(gcloud dataflow jobs cancel 命令)取消您的流式作业。
流式作业默认使用 Google Compute Engine 机器类型
n1-standard-2
或更高。您不能覆盖此设置,因为n1-standard-2
是运行流式作业所需的最低机器类型。流式执行的 定价 与批处理执行不同。
上次更新时间:2024/10/31
您是否找到了所有需要的信息?
它们是否有用且清晰?您想更改什么吗?请告诉我们!