使用 Apache Spark 运行器

Apache Spark 运行器可用于使用 Apache Spark 执行 Beam 管道。Spark 运行器可以像本地 Spark 应用程序一样执行 Spark 管道;为本地模式部署一个自包含应用程序,在 Spark 的独立 RM 上运行,或者使用 YARN 或 Mesos。

Spark 运行器在 Apache Spark 之上执行 Beam 管道,提供

Beam 能力矩阵 文档记录了当前支持的 Spark 运行器的功能。

三种 Spark 运行器

Spark 运行器有三种类型

  1. 一个旧版运行器,它仅支持 Java(和其他基于 JVM 的语言),并且基于 Spark RDD/DStream
  2. 一个结构化流式 Spark 运行器,它仅支持 Java(和其他基于 JVM 的语言),并且基于 Spark 数据集和 Apache Spark 结构化流式 框架。

注意:它目前仍处于实验阶段,它对 Beam 模型的覆盖范围是部分的。就目前而言,它只支持批处理模式。

  1. 一个可移植运行器,它支持 Java、Python 和 Go

本指南分为两部分,分别记录 Spark 运行器的不可移植和可移植功能。请使用下面的切换器选择合适的运行器

选择哪个运行器:可移植或不可移植运行器?

Beam 及其运行器最初只支持基于 JVM 的语言(例如 Java/Scala/Kotlin)。Python 和 Go SDK 是后来添加的。运行器的架构必须进行重大更改才能支持执行用其他语言编写的管道。

如果您的应用程序只使用 Java,那么您目前应该选择基于 Java 的运行器之一。如果您想使用 Beam 在 Spark 上运行 Python 或 Go 管道,则需要使用可移植运行器。有关可移植性的更多信息,请访问 可移植性页面

Spark 运行器先决条件和设置

Spark 运行器目前支持 Spark 的 3.2.x 分支。

注意:从 Beam 2.46.0 开始,对 Spark 2.4.x 的支持已取消。

您可以通过在您的 pom.xml 中添加以下内容来添加对最新版本的 Spark 运行器的依赖项

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-spark-3</artifactId>
  <version>2.60.0</version>
</dependency>

使用您的应用程序部署 Spark

在某些情况下,例如在本地模式/独立模式下运行,您的(自包含)应用程序将需要通过在您的 pom.xml 中显式添加以下依赖项来打包 Spark

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

并使用 maven shade 插件对应用程序 jar 进行阴影处理

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <configuration>
    <createDependencyReducedPom>false</createDependencyReducedPom>
    <filters>
      <filter>
        <artifact>*:*</artifact>
        <excludes>
          <exclude>META-INF/*.SF</exclude>
          <exclude>META-INF/*.DSA</exclude>
          <exclude>META-INF/*.RSA</exclude>
        </excludes>
      </filter>
    </filters>
  </configuration>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <shadedArtifactAttached>true</shadedArtifactAttached>
        <shadedClassifierName>shaded</shadedClassifierName>
        <transformers>
          <transformer
            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

运行 mvn package 后,运行 ls target,您应该看到(假设您的 artifactId 为 beam-examples,版本为 1.0.0

beam-examples-1.0.0-shaded.jar

要针对独立集群运行,只需运行


对于基于 RDD/DStream 的运行器

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner


对于基于结构化流式的运行器

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkStructuredStreamingRunner

您需要在执行环境中安装 Docker。要使用 Python 开发 Apache Beam,您必须安装 Apache Beam Python SDK:pip install apache_beam。有关如何创建 Python 管道的详细信息,请参阅 Python 文档

pip install apache_beam

从 Beam 2.20.0 开始,预构建的 Spark 作业服务 Docker 镜像可在 Docker Hub 获取。

对于旧版本的 Beam,您将需要 Apache Beam 源代码的副本。您可以在 下载页面 上下载它。

  1. 启动 JobService 端点
    • 使用 Docker(首选):docker run --net=host apache/beam_spark_job_server:latest
    • 或从 Beam 源代码:./gradlew :runners:spark:3:job-server:runShadow

JobService 是您提交 Beam 管道的中央实例。JobService 将为管道创建一个 Spark 作业并执行该作业。要在 Spark 集群上执行作业,Beam JobService 需要提供 Spark 主机地址。

  1. 通过使用 PortableRunnerjob_endpoint 设置为 localhost:8099(这是 JobService 的默认地址)和 environment_type 设置为 LOOPBACK,将 Python 管道提交到上述端点。例如

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

在预先部署的 Spark 集群上运行

在已经部署了 Spark 的集群上部署您的 Beam 管道(Spark 类在容器类路径中可用)不需要任何额外的依赖项。有关不同部署模式的更多详细信息,请参阅:独立YARNMesos

  1. 启动一个 Spark 集群,该集群默认在端口 7077 上公开主机。

  1. 启动将连接到 Spark 主机的 JobService
    • 使用 Docker(首选):docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://127.0.0.1:7077
    • 或从 Beam 源代码:./gradlew :runners:spark:3:job-server:runShadow -PsparkMasterUrl=spark://127.0.0.1:7077

  1. 如上所述提交管道。但请注意,environment_type=LOOPBACK 仅适用于本地测试。有关详细信息,请参阅 此处

(请注意,根据您的集群设置,您可能需要更改 environment_type 选项。有关详细信息,请参阅 此处。)

在 Dataproc 集群(YARN 支持)上运行

要运行用 Python、Go 和其他支持语言编写的 Beam 作业,您可以使用 SparkRunnerPortableRunner,如 Beam 的 Spark 运行器 页面所述(另请参阅 可移植性框架路线图)。

以下示例从 Dataproc 集群的主节点运行一个可移植的 Beam 作业,该节点使用 YARN 支持。

注意:此示例使用 Dataproc 2.0、Spark 3.1.2 和 Beam 2.37.0 成功执行。

  1. 创建启用了 Docker 组件的 Dataproc 集群。
gcloud dataproc clusters create CLUSTER_NAME \
    --optional-components=DOCKER \
    --image-version=DATAPROC_IMAGE_VERSION \
    --region=REGION \
    --enable-component-gateway \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --properties spark:spark.master.rest.enabled=true
  1. 创建一个 Cloud Storage 存储桶。
gsutil mb BUCKET_NAME
  1. 在您的本地环境中安装作业所需的 Python 库。
python -m pip install apache-beam[gcp]==BEAM_VERSION
  1. 将单词计数示例管道与运行管道所需的所有依赖项、工件等捆绑到一个 jar 中,该 jar 可以在以后执行。
python -m apache_beam.examples.wordcount \
    --runner=SparkRunner \
    --output_executable_path=OUTPUT_JAR_PATH \
    --output=gs://BUCKET_NAME/python-wordcount-out \
    --spark_version=3
  1. 将 spark 作业提交到 Dataproc 集群的主节点。
gcloud dataproc jobs submit spark \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --class=org.apache.beam.runners.spark.SparkPipelineRunner \
        --jars=OUTPUT_JAR_PATH
  1. 检查结果是否已写入您的存储桶。
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID

Spark 运行器的管道选项

使用 Spark 运行器执行管道时,您应该考虑以下管道选项。


对于基于 RDD/DStream 的运行器

字段描述默认值
runner要使用的管道运行器。此选项允许您在运行时确定管道运行器。设置为 SparkRunner 以使用 Spark 运行。
sparkMasterSpark 主机的 URL。这等效于设置 SparkConf#setMaster(String),可以是 local[x](使用 x 个核心在本地运行)、spark://host:port(连接到 Spark 独立集群)、mesos://host:port(连接到 Mesos 集群)或 yarn(连接到 yarn 集群)。local[4]
storageLevel在批处理管道中缓存 RDD 时使用的 StorageLevel。Spark 运行器会自动缓存重复评估的 RDD。这是一个仅限批处理的属性,因为 Beam 中的流式管道是有状态的,这要求 Spark DStream 的 StorageLevelMEMORY_ONLYMEMORY_ONLY
batchIntervalMillisStreamingContextbatchDuration - 设置 Spark 的批处理间隔。1000
enableSparkMetricSinks启用向 Spark 的指标接收器报告指标。true
cacheDisabled禁用对整个管道中重复使用的 PCollection 进行缓存。当重新计算 RDD 比保存更快时,它很有用。false


对于基于结构化流式的运行器

字段描述默认值
runner要使用的管道运行器。此选项允许您在运行时确定管道运行器。设置为 SparkStructuredStreamingRunner 以使用 Spark 结构化流式运行。
sparkMasterSpark 主机的 URL。这等效于设置 SparkConf#setMaster(String),可以是 local[x](使用 x 个核心在本地运行)、spark://host:port(连接到 Spark 独立集群)、mesos://host:port(连接到 Mesos 集群)或 yarn(连接到 yarn 集群)。local[4]
testMode启用测试模式,该模式提供有用的调试信息:催化剂执行计划和 Beam DAG 打印false
enableSparkMetricSinks启用向 Spark 的指标接收器报告指标。true
checkpointDir用于流式弹性的检查点目录,批处理中忽略。为了持久化,必须使用可靠的文件系统,例如 HDFS/S3/GS。/tmp 中的本地目录
filesToStage要发送到所有 worker 并放到类路径上的 Jar 文件。来自类路径的所有文件
EnableSparkMetricSinks启用/禁用将聚合器值发送到 Spark 的指标接收器true
字段描述
--runner要使用的管道运行器。此选项允许您在运行时确定管道运行器。设置为 PortableRunner 以使用 Spark 运行。
--job_endpoint要使用的作业服务端点。应采用主机名:端口的形式,例如 localhost:3000设置为与您的作业服务端点匹配(默认情况下为 localhost:8099)

其他说明

使用 spark-submit

将 Spark 应用程序提交到集群时,通常(建议)使用 Spark 安装提供的 spark-submit 脚本。上面描述的 PipelineOptions 并非要替换 spark-submit,而是要对其进行补充。传递上述任何选项都可以作为 application-arguments 之一,设置 –master 优先。有关如何使用 spark-submit 的更多信息,请查看 Spark 文档

监控您的作业

您可以使用 Spark Web 接口 监控正在运行的 Spark 作业。默认情况下,此接口在驱动程序节点的端口 4040 上可用。如果您在本地机器上运行 Spark,则为 https://127.0.0.1:4040。Spark 还提供了一个历史服务器,用于 事后查看

指标也可通过 REST API 获取。Spark 提供了一个 指标系统,允许将 Spark 指标报告给各种接收器。Spark 运行器使用相同的指标系统报告用户定义的 Beam 聚合器,目前支持 GraphiteSinkCSVSink。为 Spark 支持的其他接收器提供支持非常容易,且非常直接。

Spark 指标在便携式运行器上尚不支持。

流式执行


对于基于 RDD/DStream 的运行器
如果您的管道使用 UnboundedSource,Spark 运行器将自动设置流式模式。强制流式模式主要用于测试,不建议使用。

对于基于结构化流式的运行器
Spark 结构化流式运行器中尚未实现流式模式。

Spark 便携式运行器上尚不支持流式传输。

使用提供的 SparkContext 和 StreamingListeners


对于基于 RDD/DStream 的运行器
如果您希望使用提供的 SparkContext 执行 Spark 作业(例如使用 spark-jobserver)或使用 StreamingListeners,则无法使用 SparkPipelineOptions(上下文或侦听器无法作为命令行参数传递)。相反,您应该使用 SparkContextOptions,它只能以编程方式使用,不是常见的 PipelineOptions 实现。

对于基于结构化流式的运行器
Spark 结构化流式运行器不支持提供的 SparkSession 和 StreamingListeners。

Spark 便携式运行器不支持提供的 SparkContext 和 StreamingListeners。

Kubernetes

提交 Beam 作业,无需作业服务器

要直接在 Spark Kubernetes 集群上提交 Beam 作业,而无需启动额外的作业服务器,您可以执行以下操作

spark-submit --master MASTER_URL \
  --conf spark.kubernetes.driver.podTemplateFile=driver_pod_template.yaml \
  --conf spark.kubernetes.executor.podTemplateFile=executor_pod_template.yaml \
  --class org.apache.beam.runners.spark.SparkPipelineRunner \
  --conf spark.kubernetes.container.image=apache/spark:v3.3.2 \
  ./wc_job.jar

类似于在 Dataproc 上运行 Beam 作业,您可以像下面这样捆绑作业 jar。该示例使用 PROCESS 类型的 SDK 协调器 通过进程执行作业。

python -m beam_example_wc \
    --runner=SparkRunner \
    --output_executable_path=./wc_job.jar \
    --environment_type=PROCESS \
    --environment_config='{\"command\": \"/opt/apache/beam/boot\"}' \
    --spark_version=3

以下是 Kubernetes 执行器 pod 模板的示例,initContainer 是必需的,用于下载 Beam SDK 协调器以运行 Beam 管道。

spec:
  containers:
    - name: spark-kubernetes-executor
      volumeMounts:
      - name: beam-data
        mountPath: /opt/apache/beam/
  initContainers:
  - name: init-beam
    image: apache/beam_python3.7_sdk
    command:
    - cp
    - /opt/apache/beam/boot
    - /init-container/data/boot
    volumeMounts:
    - name: beam-data
      mountPath: /init-container/data
  volumes:
  - name: beam-data
    emptyDir: {}

提交 Beam 作业,使用作业服务器

一个使用作业服务器将 Spark 配置为运行 Apache Beam 作业的 示例