Apache Beam Python SDK 快速入门

本快速入门将向您展示如何运行一个 示例管道,该管道使用 Apache Beam Python SDK 编写,并使用 Direct Runner 运行。Direct Runner 在您的机器上本地执行管道。

如果您有兴趣为 Apache Beam Python 代码库做出贡献,请参阅 贡献指南

本页内容

设置您的开发环境

Apache Beam 旨在支持尚未达到生命周期结束的已发布 Python 版本,但可能需要几个版本才能让 Apache Beam 完全支持最新发布的 Python 次要版本。

所需的最低 Python 版本列在 apache-beam 项目页面的 Meta 部分下的 Requires 中。支持的所有 Python 版本的列表列在页面底部的 Classifiers 部分下的 Programming Language 中。

通过运行以下命令检查您的 Python 版本

python3 --version

如果您没有 Python 解释器,您可以从 Python 下载 页面下载并安装它。

如果您需要除了已有的版本之外再安装一个不同版本的 Python,您可以在我们的 开发者 Wiki 中找到一些建议。

克隆 GitHub 仓库

克隆或下载 apache/beam-starter-python GitHub 仓库,并进入 beam-starter-python 目录。

git clone https://github.com/apache/beam-starter-python.git
cd beam-starter-python

创建并激活虚拟环境

虚拟环境是一个包含其自身 Python 发行版的目录树。我们建议使用虚拟环境,以便将项目的所有依赖项安装在隔离且独立的环境中。要设置虚拟环境,请运行以下命令

# Create a new Python virtual environment.
python3 -m venv env

# Activate the virtual environment.
source env/bin/activate

如果这些命令在您的平台上不起作用,请参阅 venv 文档。

安装项目依赖项

运行以下命令从 requirements.txt 文件中安装项目的依赖项

pip install -e .

运行快速入门

运行以下命令

python main.py --input-text="Greetings"

输出类似于以下内容

Hello
World!
Greetings

这些行可能会以不同的顺序出现。

运行以下命令停用虚拟环境

deactivate

探索代码

本快速入门的核心代码文件是 app.py (GitHub)。代码执行以下步骤

  1. 创建 Beam 管道。
  2. 创建初始 PCollection
  3. 将变换应用于 PCollection
  4. 使用 Direct Runner 运行管道。

创建管道

代码首先创建一个 Pipeline 对象。Pipeline 对象构建要执行的变换图。

with beam.Pipeline(options=beam_options) as pipeline:

此处显示的 beam_option 变量是一个 PipelineOptions 对象,用于设置管道的选项。有关更多信息,请参阅 配置管道选项

创建初始 PCollection

PCollection 抽象表示一个可能分布式、多元素的数据集。Beam 管道需要一个数据源来填充初始 PCollection。数据源可以是有界的(具有已知固定大小)或无界的(具有无限大小)。

本示例使用 Create 方法从字符串的内存数组创建 PCollection。生成的 PCollection 包含字符串“Hello”、“World!”以及用户提供的输入字符串。

pipeline
| "Create elements" >> beam.Create(["Hello", "World!", input_text])

注意:管道运算符 | 用于 链接 变换。

将变换应用于 PCollection

变换可以更改、过滤、分组、分析或以其他方式处理 PCollection 中的元素。本示例使用 Map 变换,该变换将集合的元素映射到新集合中

| "Print elements" >> beam.Map(print)

运行管道

要运行管道,您可以调用 Pipeline.run 方法

pipeline.run.wait_until_finish()

但是,通过将 Pipeline 对象包含在 with 语句中,run 方法会自动调用。

with beam.Pipeline(options=beam_options) as pipeline:
    # ...
    # run() is called automatically

Beam 运行器 在特定平台上运行 Beam 管道。如果您没有指定运行器,Direct Runner 是默认运行器。Direct Runner 在您的机器上本地运行管道。它旨在用于测试和开发,而不是为效率而优化。有关更多信息,请参阅 使用 Direct Runner

对于生产工作负载,您通常使用分布式运行器,该运行器在大型数据处理系统(例如 Apache Flink、Apache Spark 或 Google Cloud Dataflow)上运行管道。这些系统支持大规模并行处理。

下一步

如果您遇到任何问题,请随时 联系我们