管理 Python 管道依赖项

依赖项管理是关于指定管道所需的依赖项,以及控制生产中使用的依赖项。

注意: 用于管道执行的远程工作程序通常在基于 Debian 的容器映像中具有标准的 Python 发行版安装。如果您的代码仅依赖于标准 Python 包,则可能不需要执行此页面上的任何操作。

PyPI 依赖项

如果您的管道使用来自 Python 包索引 的公共包,则必须使这些包在工作程序上远程可用。

对于仅包含单个 Python 文件或笔记本的管道,提供依赖项的最直接方法是提供一个 requirements.txt 文件。对于更复杂的情况,请定义 包中的管道 并在 自定义容器 中安装依赖项。

要提供 requirements.txt 文件

  1. 找出机器上安装了哪些包。运行以下命令

     pip freeze > requirements.txt
    

    此命令创建一个 requirements.txt 文件,其中列出机器上安装的所有包,无论它们是从哪里安装的。

  2. 编辑 requirements.txt 文件并删除与代码无关的所有包。

  3. 使用以下命令行选项运行管道

     --requirements_file requirements.txt
    

    运行器将使用 requirements.txt 文件将其他依赖项安装到远程工作程序上。

注意:作为 pip freeze 的替代方案,请使用 pip-tools 等库从 requirements.in 文件中编译管道所需的所有依赖项。在 requirements.in 文件中,只提到了顶级依赖项。

当您提供 --requirements_file 管道选项时,在管道提交期间,Beam 会将指定的包本地下载到要求缓存目录中,然后将要求缓存目录阶段到运行器。在运行时,当可用时,Beam 会从要求缓存中安装包。这种机制可以将依赖包在提交时阶段到运行器。在运行时,运行器工作程序可能能够从缓存中安装包,而无需连接到 PyPI。要禁用阶段要求,请使用 --requirements_cache=skip 管道选项。有关更多信息,请参阅 这些管道选项的帮助说明.

自定义容器

您可以通过包含管道所需所有依赖项的 容器 映像。 按照说明显示如何使用自定义容器映像运行管道.

  1. 如果您使用自定义容器映像,建议您在构建时直接从 --requirements_file 安装依赖项到映像中。在这种情况下,您不需要在运行时传递 --requirements_file 选项,这将减少管道启动时间。

    # Add these lines with the path to the requirements.txt to the Dockerfile
    COPY <path to requirements.txt> /tmp/requirements.txt
    RUN python -m pip install -r /tmp/requirements.txt
    

本地 Python 包或非公开 Python 依赖项

如果您的管道使用公开不可用的包(例如,从 GitHub 存储库下载的包),请通过执行以下步骤使这些包在远程可用

  1. 确定机器上安装了哪些包并且不是公开的。运行以下命令

    pip freeze

    此命令列出机器上安装的所有包,无论它们是从哪里安装的。

    1. 使用以下命令行选项运行管道

       --extra_package /path/to/package/package-name
      

      其中 package-name 是包的 tarball。您可以使用名为 build 的命令行工具构建包 tarball。

        # Install build using pip
        pip install --upgrade build
        python -m build --sdist
      

      有关此命令的更多详细信息,请参阅 build 文档

多个文件依赖项

通常,您的管道代码跨越多个文件。要在远程运行项目,必须将这些文件作为 Python 包进行分组,并在运行管道时指定该包。当远程工作程序启动时,它们将安装您的包。要将文件分组为 Python 包并使其在远程可用,请执行以下步骤

  1. 为您的项目创建一个 setup.py 文件。以下是一个非常基本的 setup.py 文件。

     import setuptools
    
     setuptools.setup(
        name='PACKAGE-NAME',
        version='PACKAGE-VERSION',
        install_requires=[
          # List Python packages your pipeline depends on.
        ],
        packages=setuptools.find_packages(),
     )
    
  2. 构建您的项目,以便根目录包含 setup.py 文件、主工作流文件以及包含其余文件的目录,例如

     root_dir/
       setup.py
       main.py
       my_package/
         my_pipeline_launcher.py
         my_custom_dofns_and_transforms.py
         other_utils_and_helpers.py
    

    有关遵循此项目结构的示例,请参阅 Juliaset

  3. 在提交环境中安装您的包,例如使用以下命令

     pip install -e .
    
  4. 使用以下命令行选项运行管道

     --setup_file /path/to/setup.py
    

注意: 如果包的依赖项在 setup.py 文件的 install_requires 字段中定义(参见步骤 1),则不需要提供 --requirements_file 选项。但是,与 --requirements_file 选项不同,当您使用 --setup_file 选项时,Beam 不会将依赖包阶段到运行器。只有管道包被阶段。如果它们没有在运行时环境中提供,则包依赖项将在运行时从 PyPI 安装。

非 Python 依赖项或具有非 Python 依赖项的 PyPI 依赖项

如果您的管道使用非 Python 包,例如需要使用 apt install 命令安装的包,或者使用在包安装期间依赖于非 Python 依赖项的 PyPI 包,建议使用 自定义容器 安装它们。否则,您必须执行以下步骤。

  1. 将管道构建为一个包.

  2. 将非 Python 依赖项所需的安装命令(例如 apt install 命令)添加到 setup.py 文件中的 CUSTOM_COMMANDS 列表中。有关示例,请参阅 Juliaset setup.py 文件

    注意: 您必须验证这些命令在远程工作程序上运行。例如,如果您使用 apt,则远程工作程序需要 apt 支持。

  3. 使用以下命令行选项运行管道

     --setup_file /path/to/setup.py
    

注意: 由于自定义命令在安装工作流的依赖项(通过 pip)之后执行,因此您应该从管道的 requirements.txt 文件以及 setup.py 文件的 setuptools.setup() 调用中的 install_requires 参数中省略 PyPI 包依赖项。

预构建 SDK 容器映像

在 Beam 运行器在 Docker 容器中启动 SDK 工作程序的管道执行模式中,其他管道依赖项(通过 --requirements_file 和其他运行时选项指定)在运行时安装到容器中。这会增加工作程序启动时间。但是,可以预先构建 SDK 容器,并在使用 --prebuild_sdk_container_engine 启动工作程序之前执行一次依赖项安装。有关如何在 Google Cloud Dataflow 中使用预构建的说明,请参阅 使用自定义容器预构建 python SDK 自定义容器映像.

注意:此功能仅适用于 Dataflow Runner v2

Pickling 和管理主会话

当 Python SDK 将管道提交到远程运行器以执行时,管道内容(如转换用户代码)使用执行序列化的库(也称为 pickler)序列化(或 pickling)为字节码。Beam 使用的默认 pickler 库是 dill。要使用 cloudpickle pickler,请提供 --pickle_library=cloudpickle 管道选项。cloudpickle 支持目前是 实验性的.

默认情况下,在 Beam 作业的序列化过程中,主管道模块中定义的全局导入、函数和变量不会被保存。因此,在任何远程运行器上运行 DoFn 时,可能会遇到意外的 NameError。要解决此问题,请通过设置 --save_main_session 管道选项将主会话内容与管道一起提供。这将把全局命名空间的 pickled 状态加载到 Dataflow 工作程序上(如果使用 DataflowRunner)。例如,请参阅 处理 NameErrors 以在 DataflowRunner 上设置主会话。

仅当在任何远程运行器上使用dill序列化器时,才需要在 Python SDK 中管理主会话。因此,此问题不会在DirectRunner中发生。

由于管道序列化发生在作业提交时,而反序列化发生在运行时,因此在作业提交和运行时使用相同版本的序列化库至关重要。为了确保这一点,Beam 通常会为序列化库设置一个非常窄的受支持版本范围。如果由于某种原因,用户无法使用 Beam 所需的dillcloudpickle版本,并选择安装自定义版本,他们还必须确保在运行时使用相同的自定义版本(例如,在他们的自定义容器中,或通过指定管道依赖项要求)。

控制管道使用的依赖项

管道环境

要在远程运行器上运行 Python 管道,Apache Beam 会将管道转换为 独立于运行器的表示形式 并将其提交以进行执行。转换发生在启动环境中。您可以从安装了 Beam SDK 的 Python 虚拟环境中启动管道,或者使用 Dataflow Flex 模板笔记本环境Apache Airflow 等工具启动管道。

运行时环境是运行器在管道执行期间使用的 Python 环境。该环境是管道代码在执行数据处理时运行的所在位置。运行时环境包括 Apache Beam 和管道运行时依赖项。

创建可重复的环境

您可以使用多种工具构建可重现的 Python 环境

使用版本控制来管理定义环境的配置文件。

使管道运行时环境可重复

当管道在远程运行器上使用可重现的运行时环境时,运行器上的工作器每次运行管道时都会使用相同的依赖项。可重现的环境不受管道直接或传递依赖项版本发布引起的副作用影响。它不需要在运行时解析依赖项。

您可以通过以下方式创建可重现的运行时环境

自包含的运行时环境通常是可重现的。要检查运行时环境是否自包含,请限制管道运行时对 PyPI 的网络访问。如果您使用的是 Dataflow 运行器,请参阅有关 --no_use_public_ips 管道选项的文档。

如果您需要重新创建或升级运行时环境,请以受控的方式进行,并了解更改的依赖项。

使管道启动环境可重复

启动环境运行管道的生产版本。在本地开发管道时,您可能会使用一个开发环境,其中包含用于开发的依赖项,例如 Jupyter 或 Pylint。生产管道的启动环境可能不需要这些额外的依赖项。您可以单独构建和维护它,与开发环境分开。

为了减少对管道提交的副作用,最好能够 以可重现的方式重新创建启动环境

Dataflow Flex 模板 提供了容器化可重现启动环境的示例。

要创建 Beam 在干净的虚拟环境中的可重现安装,请使用 需求文件,其中列出了 Beam 默认容器镜像约束文件中包含的所有 Python 依赖项。

BEAM_VERSION=2.48.0
PYTHON_VERSION=`python -c "import sys; print(f'{sys.version_info.major}{sys.version_info.minor}')"`
pip install apache-beam==$BEAM_VERSION --constraint https://raw.githubusercontent.com/apache/beam/release-${BEAM_VERSION}/sdks/python/container/py${PY_VERSION}/base_image_requirements.txt

使用约束文件来确保启动环境中的 Beam 依赖项与默认 Beam 容器中的版本匹配。约束文件还可以消除在安装时解析依赖项的需要。

使启动环境与运行时环境兼容

启动环境将管道图转换为 独立于运行器的表示形式。此过程涉及序列化(或腌制)转换的代码。序列化后的内容在工作器上反序列化。如果运行时工作器环境与启动环境有很大不同,则可能会出现以下原因导致的运行时错误

要检查运行时环境是否与启动环境匹配,请检查两个环境中pip freeze输出的差异。更新到最新版本的 Beam,因为环境兼容性检查已包含在较新的 SDK 版本中。

最后,您可以通过从运行时使用的容器化环境中启动管道来使用相同的环境。 从自定义容器镜像构建的 Dataflow Flex 模板 提供了这种设置。在这种情况下,您可以以可重现的方式重新创建启动和运行时环境。因为这两个容器都是从同一个镜像创建的,所以默认情况下启动和运行时环境彼此兼容。