管理 Python 管道依赖项
依赖项管理是关于指定管道所需的依赖项,以及控制生产中使用的依赖项。
注意: 用于管道执行的远程工作程序通常在基于 Debian 的容器映像中具有标准的 Python 发行版安装。如果您的代码仅依赖于标准 Python 包,则可能不需要执行此页面上的任何操作。
PyPI 依赖项
如果您的管道使用来自 Python 包索引 的公共包,则必须使这些包在工作程序上远程可用。
对于仅包含单个 Python 文件或笔记本的管道,提供依赖项的最直接方法是提供一个 requirements.txt
文件。对于更复杂的情况,请定义 包中的管道 并在 自定义容器 中安装依赖项。
要提供 requirements.txt 文件
找出机器上安装了哪些包。运行以下命令
pip freeze > requirements.txt
此命令创建一个
requirements.txt
文件,其中列出机器上安装的所有包,无论它们是从哪里安装的。编辑
requirements.txt
文件并删除与代码无关的所有包。使用以下命令行选项运行管道
--requirements_file requirements.txt
运行器将使用
requirements.txt
文件将其他依赖项安装到远程工作程序上。
注意:作为
pip freeze
的替代方案,请使用 pip-tools 等库从requirements.in
文件中编译管道所需的所有依赖项。在requirements.in
文件中,只提到了顶级依赖项。
当您提供 --requirements_file
管道选项时,在管道提交期间,Beam 会将指定的包本地下载到要求缓存目录中,然后将要求缓存目录阶段到运行器。在运行时,当可用时,Beam 会从要求缓存中安装包。这种机制可以将依赖包在提交时阶段到运行器。在运行时,运行器工作程序可能能够从缓存中安装包,而无需连接到 PyPI。要禁用阶段要求,请使用 --requirements_cache=skip
管道选项。有关更多信息,请参阅 这些管道选项的帮助说明.
自定义容器
您可以通过包含管道所需所有依赖项的 容器 映像。 按照说明显示如何使用自定义容器映像运行管道.
如果您使用自定义容器映像,建议您在构建时直接从
--requirements_file
安装依赖项到映像中。在这种情况下,您不需要在运行时传递--requirements_file
选项,这将减少管道启动时间。# Add these lines with the path to the requirements.txt to the Dockerfile COPY <path to requirements.txt> /tmp/requirements.txt RUN python -m pip install -r /tmp/requirements.txt
本地 Python 包或非公开 Python 依赖项
如果您的管道使用公开不可用的包(例如,从 GitHub 存储库下载的包),请通过执行以下步骤使这些包在远程可用
确定机器上安装了哪些包并且不是公开的。运行以下命令
pip freeze
此命令列出机器上安装的所有包,无论它们是从哪里安装的。
多个文件依赖项
通常,您的管道代码跨越多个文件。要在远程运行项目,必须将这些文件作为 Python 包进行分组,并在运行管道时指定该包。当远程工作程序启动时,它们将安装您的包。要将文件分组为 Python 包并使其在远程可用,请执行以下步骤
为您的项目创建一个 setup.py 文件。以下是一个非常基本的
setup.py
文件。import setuptools setuptools.setup( name='PACKAGE-NAME', version='PACKAGE-VERSION', install_requires=[ # List Python packages your pipeline depends on. ], packages=setuptools.find_packages(), )
构建您的项目,以便根目录包含
setup.py
文件、主工作流文件以及包含其余文件的目录,例如root_dir/ setup.py main.py my_package/ my_pipeline_launcher.py my_custom_dofns_and_transforms.py other_utils_and_helpers.py
有关遵循此项目结构的示例,请参阅 Juliaset。
在提交环境中安装您的包,例如使用以下命令
pip install -e .
使用以下命令行选项运行管道
--setup_file /path/to/setup.py
注意: 如果包的依赖项在 setup.py
文件的 install_requires
字段中定义(参见步骤 1),则不需要提供 --requirements_file
选项。但是,与 --requirements_file
选项不同,当您使用 --setup_file
选项时,Beam 不会将依赖包阶段到运行器。只有管道包被阶段。如果它们没有在运行时环境中提供,则包依赖项将在运行时从 PyPI 安装。
非 Python 依赖项或具有非 Python 依赖项的 PyPI 依赖项
如果您的管道使用非 Python 包,例如需要使用 apt install
命令安装的包,或者使用在包安装期间依赖于非 Python 依赖项的 PyPI 包,建议使用 自定义容器 安装它们。否则,您必须执行以下步骤。
将非 Python 依赖项所需的安装命令(例如
apt install
命令)添加到setup.py
文件中的CUSTOM_COMMANDS
列表中。有关示例,请参阅 Juliaset setup.py 文件。注意: 您必须验证这些命令在远程工作程序上运行。例如,如果您使用
apt
,则远程工作程序需要apt
支持。使用以下命令行选项运行管道
--setup_file /path/to/setup.py
注意: 由于自定义命令在安装工作流的依赖项(通过 pip
)之后执行,因此您应该从管道的 requirements.txt
文件以及 setup.py
文件的 setuptools.setup()
调用中的 install_requires
参数中省略 PyPI 包依赖项。
预构建 SDK 容器映像
在 Beam 运行器在 Docker 容器中启动 SDK 工作程序的管道执行模式中,其他管道依赖项(通过 --requirements_file
和其他运行时选项指定)在运行时安装到容器中。这会增加工作程序启动时间。但是,可以预先构建 SDK 容器,并在使用 --prebuild_sdk_container_engine
启动工作程序之前执行一次依赖项安装。有关如何在 Google Cloud Dataflow 中使用预构建的说明,请参阅 使用自定义容器预构建 python SDK 自定义容器映像.
注意:此功能仅适用于 Dataflow Runner v2
。
Pickling 和管理主会话
当 Python SDK 将管道提交到远程运行器以执行时,管道内容(如转换用户代码)使用执行序列化的库(也称为 pickler)序列化(或 pickling)为字节码。Beam 使用的默认 pickler 库是 dill
。要使用 cloudpickle
pickler,请提供 --pickle_library=cloudpickle
管道选项。cloudpickle
支持目前是 实验性的.
默认情况下,在 Beam 作业的序列化过程中,主管道模块中定义的全局导入、函数和变量不会被保存。因此,在任何远程运行器上运行 DoFn
时,可能会遇到意外的 NameError
。要解决此问题,请通过设置 --save_main_session
管道选项将主会话内容与管道一起提供。这将把全局命名空间的 pickled 状态加载到 Dataflow 工作程序上(如果使用 DataflowRunner
)。例如,请参阅 处理 NameErrors 以在 DataflowRunner
上设置主会话。
仅当在任何远程运行器上使用dill
序列化器时,才需要在 Python SDK 中管理主会话。因此,此问题不会在DirectRunner
中发生。
由于管道序列化发生在作业提交时,而反序列化发生在运行时,因此在作业提交和运行时使用相同版本的序列化库至关重要。为了确保这一点,Beam 通常会为序列化库设置一个非常窄的受支持版本范围。如果由于某种原因,用户无法使用 Beam 所需的dill
或cloudpickle
版本,并选择安装自定义版本,他们还必须确保在运行时使用相同的自定义版本(例如,在他们的自定义容器中,或通过指定管道依赖项要求)。
控制管道使用的依赖项
管道环境
要在远程运行器上运行 Python 管道,Apache Beam 会将管道转换为 独立于运行器的表示形式 并将其提交以进行执行。转换发生在启动环境中。您可以从安装了 Beam SDK 的 Python 虚拟环境中启动管道,或者使用 Dataflow Flex 模板、笔记本环境、Apache Airflow 等工具启动管道。
运行时环境是运行器在管道执行期间使用的 Python 环境。该环境是管道代码在执行数据处理时运行的所在位置。运行时环境包括 Apache Beam 和管道运行时依赖项。
创建可重复的环境
您可以使用多种工具构建可重现的 Python 环境
使用 需求文件。在安装依赖项后,使用
pip freeze > requirements.txt
生成需求文件。要重新创建环境,请使用pip install -r requirements.txt
从 requirements.txt 文件中安装依赖项。使用 约束文件。您可以使用约束列表来限制包的安装,只允许指定版本。
使用锁定文件。使用依赖项管理工具,例如 PipEnv、Poetry 和 pip-tools 来指定顶级依赖项,以生成所有具有固定版本的传递依赖项的锁定文件,并从这些锁定文件创建虚拟环境。
使用 Docker 容器镜像。您可以将启动和运行时环境打包在 Docker 容器镜像中。如果镜像包含所有必要的依赖项,则只有在重建容器镜像时环境才会更改。
使用版本控制来管理定义环境的配置文件。
使管道运行时环境可重复
当管道在远程运行器上使用可重现的运行时环境时,运行器上的工作器每次运行管道时都会使用相同的依赖项。可重现的环境不受管道直接或传递依赖项版本发布引起的副作用影响。它不需要在运行时解析依赖项。
您可以通过以下方式创建可重现的运行时环境
在包含管道所有依赖项的自定义容器镜像中运行您的管道。使用
--sdk_container_image
管道选项。在
--requirements_file
管道选项中提供管道依赖项的完整列表。使用--prebuild_sdk_container_engine
选项在管道执行之前执行运行时环境初始化序列。如果您的依赖项没有改变,请使用--sdk_container_image
选项重新使用预构建的镜像。
自包含的运行时环境通常是可重现的。要检查运行时环境是否自包含,请限制管道运行时对 PyPI 的网络访问。如果您使用的是 Dataflow 运行器,请参阅有关 --no_use_public_ips
管道选项的文档。
如果您需要重新创建或升级运行时环境,请以受控的方式进行,并了解更改的依赖项。
不要在正在运行的管道使用容器镜像时修改它们。
避免在您的自定义镜像中使用标签
:latest
。用日期或唯一标识符标记您的构建。如果出现问题,使用这种类型的标签可能会使您能够将管道执行还原到先前已知的正常配置,并允许您检查更改。考虑将
pip freeze
的输出或requirements.txt
的内容存储在版本控制系统中。
使管道启动环境可重复
启动环境运行管道的生产版本。在本地开发管道时,您可能会使用一个开发环境,其中包含用于开发的依赖项,例如 Jupyter 或 Pylint。生产管道的启动环境可能不需要这些额外的依赖项。您可以单独构建和维护它,与开发环境分开。
为了减少对管道提交的副作用,最好能够 以可重现的方式重新创建启动环境。
Dataflow Flex 模板 提供了容器化可重现启动环境的示例。
要创建 Beam 在干净的虚拟环境中的可重现安装,请使用 需求文件,其中列出了 Beam 默认容器镜像约束文件中包含的所有 Python 依赖项。
BEAM_VERSION=2.48.0
PYTHON_VERSION=`python -c "import sys; print(f'{sys.version_info.major}{sys.version_info.minor}')"`
pip install apache-beam==$BEAM_VERSION --constraint https://raw.githubusercontent.com/apache/beam/release-${BEAM_VERSION}/sdks/python/container/py${PY_VERSION}/base_image_requirements.txt
使用约束文件来确保启动环境中的 Beam 依赖项与默认 Beam 容器中的版本匹配。约束文件还可以消除在安装时解析依赖项的需要。
使启动环境与运行时环境兼容
启动环境将管道图转换为 独立于运行器的表示形式。此过程涉及序列化(或腌制)转换的代码。序列化后的内容在工作器上反序列化。如果运行时工作器环境与启动环境有很大不同,则可能会出现以下原因导致的运行时错误
Apache Beam 版本必须在提交和运行时环境中匹配。Python 主版本号和次版本号也必须匹配。否则,管道可能会失败,并出现类似
Pipeline construction environment and pipeline runtime environment are not compatible
的错误。在较旧的 SDK 版本中,错误可能会报告为SystemError: unknown opcode
。提交和运行时环境中的
protobuf
版本需要匹配或兼容。管道代码中使用的库可能需要匹配。如果序列化后的管道代码引用了工作器上不可用的函数或模块,则管道可能会在远程运行器上失败,并出现
ModuleNotFound
或AttributeError
异常。如果您遇到此类错误,请确保受影响的库在远程工作器上可用,并检查您是否需要 保存主会话。在提交时使用的序列化库的版本必须与在运行时安装的版本匹配。为了执行此操作,Beam 会对序列化库(dill 和 cloudpickle)的版本设置严格的边界。您可以在以下条件下强制安装与 Beam 所需的版本不同的
dill
或cloudpickle
版本- 您在提交和运行时环境中安装了相同的版本。
- 所选版本适合您的管道。
要检查运行时环境是否与启动环境匹配,请检查两个环境中pip freeze
输出的差异。更新到最新版本的 Beam,因为环境兼容性检查已包含在较新的 SDK 版本中。
最后,您可以通过从运行时使用的容器化环境中启动管道来使用相同的环境。 从自定义容器镜像构建的 Dataflow Flex 模板 提供了这种设置。在这种情况下,您可以以可重现的方式重新创建启动和运行时环境。因为这两个容器都是从同一个镜像创建的,所以默认情况下启动和运行时环境彼此兼容。