Java 多语言管道快速入门

本页提供了使用 Apache Beam SDK for Java 创建多语言管道的概述。有关此主题的更完整讨论,请参见 多语言管道

多语言管道是指使用一种 Beam SDK 语言构建的管道,但使用了一种或多种来自另一种 Beam SDK 语言的转换。这些来自另一种 SDK 的转换称为跨语言转换。多语言支持使管道组件更易于在 Beam SDK 之间共享,并为所有 SDK 扩展了可用转换的范围。

在以下示例中,多语言管道使用 Beam Java SDK 构建,跨语言转换使用 Beam Python SDK 构建。

先决条件

本快速入门基于一个 Java 示例管道,PythonDataframeWordCount,它计算莎士比亚文本中的单词数。如果您想运行该管道,可以克隆或下载 Beam 存储库,并从源代码构建该示例。

要构建和运行该示例,您需要一个安装了 Beam Java SDK 版本 2.41.0 或更高版本的 Java 环境,以及一个 Python 环境。如果您还没有设置这些环境,请先完成 Apache Beam Java SDK 快速入门Apache Beam Python SDK 快速入门

对于使用可移植 DirectRunner 运行,您需要在本地安装 Docker,并且 Docker 守护进程应处于运行状态。这对于 Dataflow 来说是不需要的。

对于在 Dataflow 上运行,您需要一个启用了计费功能的 Google Cloud 项目和一个 Google Cloud Storage 存储桶

此示例依赖于 Python pandas 包 1.4.0 或更高版本,该版本在早于 3.8 的 Python 版本中不可用。因此,请确保系统中安装的默认 Python 版本为 3.8 或更高版本。

指定跨语言转换

Java 示例管道使用 Python DataframeTransform 作为跨语言转换。该转换是 Beam Dataframe API 的一部分,用于处理类似 pandas 的 DataFrame 对象。

要应用跨语言转换,您的管道必须指定它。Python 转换由其完全限定名称标识。例如,DataframeTransform 可以在 apache_beam.dataframe.transforms 包中找到,因此其完全限定名称为 apache_beam.dataframe.transforms.DataframeTransform。示例管道,PythonDataframeWordCount,将此完全限定名称传递给 PythonExternalTransform

注意:示例管道旨在演示使用任意 Python 跨语言转换开发 Java 多语言管道。对于 Java 中 Dataframe API 的生产用例,您应该使用更高层的 DataframeTransform

以下是示例中的完整管道定义

static void runWordCount(WordCountOptions options) {
  Pipeline p = Pipeline.create(options);

  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(ParDo.of(new ExtractWordsFn()))
      .setRowSchema(ExtractWordsFn.SCHEMA)
      .apply(
          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
                  "apache_beam.dataframe.transforms.DataframeTransform",
                  options.getExpansionService())
              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
              .withKwarg("include_indexes", true))
      .apply(MapElements.via(new FormatAsTextFn()))
      .apply("WriteCounts", TextIO.write().to(options.getOutput()));

  p.run().waitUntilFinish();
}

PythonExternalTransform 是用于调用外部 Python 转换的包装器。from 方法接受两个字符串:1) 完全限定的转换名称;2) 扩展服务的可选地址和端口号。该方法返回一个 Python 跨语言转换的存根,可以直接在 Java 管道中使用。withKwarg 指定用于实例化 Python 跨语言转换的关键字参数。在本例中,withKwarg 被调用两次,以指定 func 参数和 include_indexes 参数,并将这些参数传递给 DataframeTransformPythonExternalTransform 还提供了其他方法来指定 Python 跨语言转换的 args 和 kwargs。

要了解此管道的工作原理,最好仔细查看第一个 withKwarg 调用

.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))

PythonCallableSource.of 的参数是 Python lambda 函数的字符串表示形式。DataframeTransform 接受一个 Python 可调用对象作为参数,并将其应用于 PCollection,就好像它是一个 Dataframe 一样。withKwarg 方法允许您在 Java 管道中指定 Python 可调用对象。要详细了解如何将函数传递给 DataframeTransform,请参见 将 DataFrames 嵌入管道

运行 Java 管道

如果您想自定义环境或使用默认 Beam SDK 中不可用的转换,您可能需要运行自己的扩展服务。在这种情况下,请在运行管道之前启动扩展服务

在运行管道之前,请确保执行您选择的 Beam 运行器所需的运行器特定设置

使用 Maven Archetype 运行 Dataflow 运行器(Beam 2.43.0 及更高版本)

export BEAM_VERSION=<Beam version>

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.beam \
    -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
    -DarchetypeVersion=$BEAM_VERSION \
    -DgroupId=org.example \
    -DartifactId=multi-language-beam \
    -Dversion="0.1" \
    -Dpackage=org.apache.beam.examples \
    -DinteractiveMode=false
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.PythonDataframeWordCount \
    -Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \
                 --region=$GCP_REGION \
                 --gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
                 --output=gs://$GCP_BUCKET/multi-language-beam/output" \
    -Pdataflow-runner

在 HEAD 处运行 Dataflow 运行器

以下脚本使用 Cloud Storage 存储桶中的示例文本在 Dataflow 上运行示例多语言管道。您需要根据您的环境调整该脚本。

export GCP_PROJECT=<project>
export OUTPUT_BUCKET=<bucket>
export GCP_REGION=<region>
export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp

./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
--runner=DataflowRunner \
--project=$GCP_PROJECT \
--output=gs://${OUTPUT_BUCKET}/count \
--region=${GCP_REGION}"

该管道将结果文件输出到 gs://$OUTPUT_BUCKET/count-00000-of-00001

使用 DirectRunner 运行

注意:多语言管道需要使用可移植运行器。可移植 DirectRunner 仍处于试验阶段,不支持所有 Beam 功能。

  1. 创建一个 Python 虚拟环境,其中安装了最新版本的 Beam Python SDK。请参见此处获取说明。
  2. 运行可移植 DirectRunner 的作业服务器(在 Python 中实现)。
export JOB_SERVER_PORT=<port>

python -m apache_beam.runners.portability.local_job_service_main -p $JOB_SERVER_PORT
  1. 在另一个 Shell 中,转到 Beam HEAD Git 克隆

  2. 为本地管道执行构建 Beam Java SDK 容器(本指南要求将您的 JAVA_HOME 设置为 Java 11)。

./gradlew :sdks:java:container:java11:docker -Pjava11Home=$JAVA_HOME
  1. 运行管道。
export JOB_SERVER_PORT=<port>  # Same port as before
export OUTPUT_FILE=<local relative path>

./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
--runner=PortableRunner \
--jobEndpoint=localhost:$JOB_SERVER_PORT \
--output=$OUTPUT_FILE"

注意此输出将写入 Python Docker 容器的本地文件系统。要通过写入 GCS 来验证输出,您需要为 output 选项指定一个公开可访问的 GCS 路径,因为可移植 DirectRunner 目前无法正确转发本地凭据以访问 GCS。

高级:启动扩展服务

构建多语言管道的作业时,Beam 使用扩展服务来扩展复合转换。您必须至少为每个远程 SDK 拥有一个扩展服务。

在一般情况下,如果您在系统上安装了受支持的 Python 版本,您可以让 PythonExternalTransform 处理创建和启动扩展服务的详细信息。但是,如果您想自定义环境或使用默认 Beam SDK 中不可用的转换,您可能需要运行自己的扩展服务。

例如,要为 Python 转换启动标准扩展服务,ExpansionServiceServicer,请按照以下步骤操作

  1. 按照这些说明激活一个新的虚拟环境。

  2. 使用 gcpdataframe 包安装 Apache Beam。

pip install 'apache-beam[gcp,dataframe]'
  1. 运行以下命令
python -m apache_beam.runners.portability.expansion_service_main -p <PORT> --fully_qualified_name_glob "*"

该命令运行 expansion_service_main.py,它启动标准扩展服务。当您使用 Gradle 运行 Java 管道时,可以使用 expansionService 选项指定扩展服务。例如:--expansionService=localhost:<PORT>

后续步骤

要详细了解 Beam 对跨语言管道的支持,请参见 多语言管道。要详细了解 Beam DataFrame API,请参见 Beam DataFrames 概述