Python 多语言管道快速入门

本页面提供使用 Apache Beam SDK for Python 创建多语言管道的概述。有关此主题的更全面介绍,请参阅 多语言管道.

本快速入门中显示的代码可在 可运行示例集合 中找到。

要构建和运行多语言 Python 管道,您需要一个安装了 Beam SDK 的 Python 环境。如果您没有设置好环境,请先完成 Apache Beam Python SDK 快速入门.

多语言管道 是用一种 Beam SDK 语言构建的管道,它使用来自另一种 Beam SDK 语言的一个或多个转换。这些“其他语言”转换称为跨语言转换。目的是使管道组件更容易在 Beam SDK 之间共享,并为所有 SDK 扩展可用转换池。在以下示例中,多语言管道使用 Beam Python SDK 构建,跨语言转换使用 Beam Java SDK 构建。

创建跨语言转换

这是一个简单的 Java 转换,JavaPrefix,它在输入字符串前面添加前缀

public class JavaPrefix extends PTransform<PCollection<String>, PCollection<String>> {

  final String prefix;

  public JavaPrefix(String prefix) {
    this.prefix = prefix;
  }

  class AddPrefixDoFn extends DoFn<String, String> {

    @ProcessElement
    public void process(@Element String input, OutputReceiver<String> o) {
      o.output(prefix + input);
    }
  }

  @Override
  public PCollection<String> expand(PCollection<String> input) {
    return input
        .apply(
            "AddPrefix",
            ParDo.of(new AddPrefixDoFn()));
  }
}

要将其用作跨语言转换,您必须添加一个配置对象和一个构建器。

注意:从 Beam 2.34.0 开始,Python SDK 用户可以使用一些 Java 转换,而无需编写额外的 Java 代码。要了解更多信息,请参阅 创建跨语言 Java 转换.

配置对象是一个简单的 Java 对象 (POJO),它具有转换所需的字段。以下是一个示例,JavaPrefixConfiguration

public class JavaPrefixConfiguration {

  String prefix;

  public void setPrefix(String prefix) {
    this.prefix = prefix;
  }
}

构建器类,在下面实现为 JavaPrefixBuilder,必须实现 ExternalTransformBuilder 并覆盖 buildExternal,它使用配置对象。

public class JavaPrefixBuilder implements
    ExternalTransformBuilder<JavaPrefixConfiguration, PCollection<String>, PCollection<String>> {

    @Override
    public PTransform<PCollection<String>, PCollection<String>> buildExternal(
        JavaPrefixConfiguration configuration) {
      return new JavaPrefix(configuration.prefix);
    }
}

您还需要添加一个注册器类,以便将您的转换注册到扩展服务。

@AutoService(ExternalTransformRegistrar.class)
public class JavaPrefixRegistrar implements ExternalTransformRegistrar {

  final String URN = "beam:transform:my.beam.test:javaprefix:v1";

  @Override
  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
    return ImmutableMap.of(URN,new JavaPrefixBuilder());
  }
}

JavaPrefixRegistrar 中所示,注册器必须实现 ExternalTransformRegistrar,它有一个方法 knownBuilderInstances。这将返回一个映射,该映射将唯一的 URN 映射到构建器实例。您可以使用 AutoService 注释将此类注册到扩展服务。

选择扩展服务

在为多语言管道构建作业时,Beam 使用 扩展服务 来扩展 复合转换。您必须为每个远程 SDK 至少有一个扩展服务。

在大多数情况下,您可以使用默认的 Java ExpansionService。该服务接受一个参数,该参数指定扩展服务的端口。然后,Python 管道将提供该地址。

在运行多语言管道之前,您需要构建 Java 跨语言转换并启动扩展服务。启动扩展服务时,您需要将依赖项添加到类路径中。您可以使用多个 JAR,但通常创建单个阴影 JAR 更容易。Python 和 Java 依赖项都将由 Python SDK 为运行器进行分阶段处理。

运行扩展服务的步骤将根据您的构建工具而有所不同。假设您已构建了一个名为 java-prefix-bundled-0.1.jar 的 JAR,您可以使用以下命令启动该服务,其中 12345 是扩展服务运行的端口

java -jar java-prefix-bundled-0.1.jar 12345

有关运行示例扩展服务的说明,请参阅 此自述文件.

创建 Python 管道

您的 Python 管道现在可以使用 ExternalTransform API 来配置您的跨语言转换。以下是 addprefix.py 中的示例

with beam.Pipeline(options=pipeline_options) as p:
  input = p | 'Read' >> ReadFromText(input_path).with_output_types(str)

  java_output = (
      input
      | 'JavaPrefix' >> beam.ExternalTransform(
            'beam:transform:my.beam.test:javaprefix:v1',
            ImplicitSchemaPayloadBuilder({'prefix': 'java:'}),
            "localhost:12345"))

  def python_prefix(record):
    return 'python:%s' % record

  output = java_output | 'PythonPrefix' >> beam.Map(python_prefix)
  output | 'Write' >> WriteToText(output_path)

ExternalTransform 接受三个参数

URN 只是转换的唯一 Beam 标识符,扩展服务已在前面讨论过。PayloadBuilder 是一个新概念,将在下面讨论。

注意:为了确保您的 URN 不会与其他转换的 URN 发生冲突,请遵循 为跨语言转换选择 URN 中描述的 URN 约定。

提供有效负载构建器

上面的 Python 管道示例提供了 ImplicitSchemaPayloadBuilder 作为 ExternalTransform 的第二个参数。ImplicitSchemaPayloadBuilder 将构建一个有效负载,该有效负载根据提供的 value 生成一个模式。在本例中,提供的 value 包含在以下键值对中:{'prefix': 'java:'}JavaPrefix 转换需要一个 prefix 参数,有效负载构建器传入字符串 java:,该字符串将被添加到每个输入元素的前面。

有效负载构建器有助于在扩展请求中为转换构建有效负载。您可以使用 NamedTupleBasedPayloadBuilder(它根据命名元组模式构建有效负载)或 AnnotationBasedPayloadBuilder(它根据类型注释构建模式)来代替 ImplicitSchemaPayloadBuilder。有关可用有效负载构建器的完整列表,请参阅 transforms.external API 参考.

使用标准元素类型

在多语言边界,您必须使用所有 Beam SDK 都能理解的元素类型。这些类型由 Beam 标准编码器 表示

对于任意结构化类型(例如,任意 Java 对象),请使用 ROW (PCollection<Row>)。您可能需要开发一个新的 Java 复合转换,它生成一个 PCollection<Row>。只要这些编码器不是由其他 SDK 使用的 PCollections 使用,您就可以在复合跨语言转换中使用 SDK 特定的编码器。

运行管道

运行 Python 管道的确切命令将根据您的环境而有所不同。假设您的管道是在一个名为 addprefix.py 的文件中编码的,则步骤应该类似于以下步骤。有关更多信息,请参阅 addprefix.py 中的注释.

使用 Direct Runner 运行

在以下命令中,input1 是一个包含文本行的文件

python addprefix.py --runner DirectRunner --environment_type=DOCKER --input input1 --output output

使用 Dataflow Runner 运行

以下脚本在 Dataflow 上运行多语言管道,使用来自 Cloud Storage 存储桶的示例文本。您需要根据您的环境调整脚本。

#!/bin/bash
export GCP_PROJECT=<project>
export GCS_BUCKET=<bucket>
export TEMP_LOCATION=gs://$GCS_BUCKET/tmp
export GCP_REGION=<region>
export JOB_NAME="javaprefix-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="1"

# other commands, e.g. changing into the appropriate directory

gsutil rm gs://$GCS_BUCKET/javaprefix/*

python addprefix.py \
    --runner DataflowRunner \
    --temp_location $TEMP_LOCATION \
    --project $GCP_PROJECT \
    --region $GCP_REGION \
    --job_name $JOB_NAME \
    --num_workers $NUM_WORKERS \
    --input "gs://dataflow-samples/shakespeare/kinglear.txt" \
    --output "gs://$GCS_BUCKET/javaprefix/output"