工作流编排

本页提供 KFP 和 TFX 编排示例。首先提供 KFP 示例,然后演示 TFX 如何管理使用 KFP 时手动定义的功能。

理解 Beam DAG

Apache Beam 是一种开源的统一模型,用于定义批处理和流式数据并行处理管道。Apache Beam 编程模型的核心概念是有向无环图(DAG)。每个 Apache Beam 管道都是一个 DAG,您可以通过您选择的编程语言的 Beam SDK(从支持的 Apache Beam SDK 集中选择)来构建。该 DAG 的每个节点都代表一个处理步骤(PTransform),该步骤接受一个数据集合作为输入(PCollection),然后输出一个转换后的数据集合(PCollection)。边定义数据如何在管道中从一个处理步骤流向另一个处理步骤。下图显示了一个示例管道工作流。

A standalone beam pipeline

定义管道和相应的 DAG 并不意味着数据开始流经管道。要运行管道,您需要将其部署到一个 支持的 Beam 运行器 中。这些分布式处理后端包括 Apache Flink、Apache Spark 和 Google Cloud Dataflow。为了在您的机器上本地运行管道以进行开发和调试,还提供了一个 Direct Runner。查看 运行器功能矩阵 以验证您选择的运行器是否支持在您的管道中定义的数据处理步骤,尤其是在使用 Direct Runner 时。

编排框架

成功交付机器学习项目需要做的不仅仅是训练模型。完整的 ML 工作流通常包含一系列其他步骤,包括数据摄取、数据验证、数据预处理、模型评估、模型部署、数据漂移检测等等。此外,您需要跟踪来自实验的元数据和工件,以回答重要问题,例如

解决方案:MLOps。MLOps 是一个用来描述旨在使机器学习系统的开发和维护无缝且高效的最佳实践和指导原则的总称。MLOps 通常需要在整个模型和数据生命周期中自动化机器学习工作流。创建这些工作流 DAG 的流行框架是 Kubeflow PipelinesApache AirflowTFX

您可以将 Apache Beam 管道用作独立的数据处理作业,也可以将其作为工作流中一系列更大型步骤的一部分。在后一种情况下,Apache Beam DAG 是由工作流编排器组成的整体 DAG 中的一个节点。因此,此工作流包含一个 DAG 内的 DAG,如下面的图所示。

An Apache Beam pipeline as part of a larger orchestrated workflow

Apache Beam DAG 和编排 DAG 的主要区别在于,Apache Beam DAG 处理数据并将数据传递到其 DAG 的节点之间,而编排 DAG 调度和监控工作流中的步骤,并在 DAG 的节点之间传递执行参数、元数据和工件。

注意:TFX 创建一个工作流 DAG,它需要一个自己的编排器才能运行。 TFX 本地支持的编排器 是 Airflow、Kubeflow Pipelines 和 Apache Beam 本身。如 TFX 文档 中所述

“几个 TFX 组件依赖于 Beam 进行分布式数据处理。此外,TFX 可以使用 Apache Beam 来编排和执行管道 DAG。Beam 编排器使用与组件数据处理所使用的 BeamRunner 不同的 BeamRunner。”

警告:Beam 编排器不打算用作生产环境中使用的 TFX 编排器。它只是允许在 Beam 的 Direct Runner 上本地调试 TFX 管道,而无需为 Airflow 或 Kubeflow 进行额外的设置。

预处理示例

本节介绍两种编排的 ML 工作流,一种使用 Kubeflow Pipelines(KFP),另一种使用 Tensorflow Extended(TFX)。这两个框架都创建工作流,但各有其独特的优缺点

为简单起见,工作流仅包含三个组件:数据摄取、数据预处理和模型训练。根据具体情况,您可以添加一系列额外的组件,例如模型评估和模型部署。本示例重点介绍预处理组件,因为它演示了如何在 ML 工作流中使用 Apache Beam 来高效、并行地处理您的 ML 数据。

数据集由图像及其描述图像内容的文本标题组成。这些配对取自MSCOCO 2014 数据集的字幕子集。这种多模态数据(图像和文本)使我们有机会尝试两种模态的预处理操作。

Kubeflow 管道(KFP)

为了使用 KFP 运行我们的 ML 工作流,我们必须执行三个步骤

  1. 通过指定组件的接口,以及编写和容器化组件逻辑的实现,来创建 KFP 组件。
  2. 通过连接创建的组件,指定输入和输出如何在组件之间传递,并将管道定义编译成完整的管道定义,来创建 KFP 管道。
  3. 通过将 KFP 管道提交到 KFP 客户端端点来运行 KFP 管道。

完整的示例代码可以在GitHub 仓库中找到。

创建 KFP 组件

下图显示了我们的目标文件结构

    kfp
    ├── pipeline.py
    ├── components
    │   ├── ingestion
    │   │   ├── Dockerfile
    │   │   ├── component.yaml
    │   │   ├── requirements.txt
    │   │   └── src
    │   │       └── ingest.py
    │   ├── preprocessing
    │   │   ├── Dockerfile
    │   │   ├── component.yaml
    │   │   ├── requirements.txt
    │   │   └── src
    │   │       └── preprocess.py
    │   └── train
    │       ├── Dockerfile
    │       ├── component.yaml
    │       ├── requirements.txt
    │       └── src
    │           └── train.py
    └── requirements.txt

完整的预处理组件规范在以下示例中显示。输入是摄取组件保存摄取数据集的路径以及组件可以存储工件的目录路径。此外,一些输入指定 Apache Beam 管道如何以及在何处运行。摄取和训练组件的规范类似,可以在ingestion component.yaml文件和train component.yaml文件中找到,分别。

注意:我们正在使用 KFP v1 SDK,因为 v2 仍在beta阶段。v2 SDK 引入了一些新的选项,用于使用更原生支持的输入和输出工件来指定组件接口。要了解如何将组件从 v1 迁移到 v2,请参阅KFP 文档

name: preprocessing
description: Component that mimicks scraping data from the web and outputs it to a jsonlines format file
inputs:
  - name: ingested_dataset_path
    description: source uri of the data to scrape
    type: String
  - name: base_artifact_path
    description: base path to store data
    type: String
  - name: gcp_project_id
    description: ID for the google cloud project to deploy the pipeline to.
    type: String
  - name: region
    description: Region in which to deploy the Dataflow pipeline.
    type: String
  - name: dataflow_staging_root
    description: Path to staging directory for the dataflow runner.
    type: String
  - name: beam_runner
    description: Beam runner, DataflowRunner or DirectRunner.
    type: String
outputs:
  - name: preprocessed_dataset_path
    description: target uri for the ingested dataset
    type: String
implementation:
  container:
    image: <your-docker-registry/preprocessing-image-name:latest>
    command: [
      python3,
      preprocess.py,
      --ingested-dataset-path,
      {inputValue: ingested_dataset_path},
      --base-artifact-path,
      {inputValue: base_artifact_path},
      --preprocessed-dataset-path,
      {outputPath: preprocessed_dataset_path},
      --gcp-project-id,
      {inputValue: gcp_project_id},
      --region,
      {inputValue: region},
      --dataflow-staging-root,
      {inputValue: dataflow_staging_root},
      --beam-runner,
      {inputValue: beam_runner},
    ]

在本例中,每个组件共享一个相同的 Dockerfile,但您可以在需要的地方添加额外的组件特定依赖项。

FROM python:3.9-slim

# (Optional) install extra dependencies

# install pypi dependencies
COPY requirements.txt /
RUN python3 -m pip install --no-cache-dir -r requirements.txt

# copy src files and set working directory
COPY src /src
WORKDIR /src

完成组件规范和容器化后,实现预处理组件。

由于 KFP 将输入和输出参数作为命令行参数提供,因此需要一个argumentparser

def parse_args():
  """Parse preprocessing arguments."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      "--ingested-dataset-path",
      type=str,
      help="Path to the ingested dataset",
      required=True)
  parser.add_argument(
      "--preprocessed-dataset-path",
      type=str,
      help="The target directory for the ingested dataset.",
      required=True)
  parser.add_argument(
      "--base-artifact-path",
      type=str,
      help="Base path to store pipeline artifacts.",
      required=True)
  parser.add_argument(
      "--gcp-project-id",
      type=str,
      help="ID for the google cloud project to deploy the pipeline to.",
      required=True)
  parser.add_argument(
      "--region",
      type=str,
      help="Region in which to deploy the pipeline.",
      required=True)
  parser.add_argument(
      "--dataflow-staging-root",
      type=str,
      help="Path to staging directory for dataflow.",
      required=True)
  parser.add_argument(
      "--beam-runner",
      type=str,
      help="Beam runner: DataflowRunner or DirectRunner.",
      default="DirectRunner")

  return parser.parse_args()

preprocess_dataset函数的实现包含 Apache Beam 管道代码和选择运行程序的 Beam 管道选项。执行的预处理包括从其 URL 下载图像字节,将其转换为 Torch 张量,并调整为所需的大小。标题会进行一系列字符串操作,以确保我们的模型接收统一的图像描述。令牌化不会在此处完成,但如果已知词汇表,则可以包含在此处。最后,每个元素被序列化并写入Avro文件。您可以使用其他文件格式,例如 TFRecords。

# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(
    runner=beam_runner,
    project=gcp_project_id,
    job_name=f'preprocessing-{int(time.time())}',
    temp_location=dataflow_staging_root,
    region=region,
    requirements_file="/requirements.txt",
    save_main_session=True,
)

with beam.Pipeline(options=pipeline_options) as pipeline:
  (
      pipeline
      | "Read input jsonlines file" >>
      beam.io.ReadFromText(ingested_dataset_path)
      | "Load json" >> beam.Map(json.loads)
      | "Filter licenses" >> beam.Filter(valid_license)
      | "Download image from URL" >> beam.FlatMap(download_image_from_url)
      | "Resize image" >> beam.Map(resize_image, size=IMAGE_SIZE)
      | "Clean Text" >> beam.Map(clean_text)
      | "Serialize Example" >> beam.Map(serialize_example)
      | "Write to Avro files" >> beam.io.WriteToAvro(
          file_path_prefix=target_path,
          schema={
              "namespace": "preprocessing.example",
              "type": "record",
              "name": "Sample",
              "fields": [{
                  "name": "id", "type": "int"
              }, {
                  "name": "caption", "type": "string"
              }, {
                  "name": "image", "type": "bytes"
              }]
          },
          file_name_suffix=".avro"))

它还包含执行组件 I/O 的必要代码。首先,根据组件输入参数base_artifact_path和时间戳构造一个存储预处理数据集的目标路径。组件的输出值仅作为文件返回,因此我们将构造的目标路径的值写入 KFP 为我们的组件提供的输出文件。

timestamp = time.time()
target_path = f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}"

# the directory where the output file is created may or may not exists
# so we have to create it.
Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True)
with open(preprocessed_dataset_path, 'w') as f:
  f.write(target_path)

由于我们主要对预处理组件感兴趣,以展示如何将 Beam 管道集成到更大的 ML 工作流中,因此本节没有深入介绍摄取和训练组件的实现。完整的示例代码中提供了模拟其行为的虚拟组件的实现。

创建管道定义

pipeline.py首先从其规范.yaml文件加载创建的组件。

# load the kfp components from their yaml files
DataIngestOp = comp.load_component('components/ingestion/component.yaml')
DataPreprocessingOp = comp.load_component(
    'components/preprocessing/component.yaml')
TrainModelOp = comp.load_component('components/train/component.yaml')

之后,创建管道,并手动指定所需的组件输入和输出。

@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="beam-preprocessing-kfp-example",
    description="Pipeline to show an apache beam preprocessing example in KFP")
def pipeline(
    gcp_project_id: str,
    region: str,
    component_artifact_root: str,
    dataflow_staging_root: str,
    beam_runner: str):
  """KFP pipeline definition.

  Args:
      gcp_project_id (str): ID for the google cloud project to deploy the pipeline to.
      region (str): Region in which to deploy the pipeline.
      component_artifact_root (str): Path to artifact repository where Kubeflow Pipelines
        components can store artifacts.
      dataflow_staging_root (str): Path to staging directory for the dataflow runner.
      beam_runner (str): Beam runner: DataflowRunner or DirectRunner.
  """

  ingest_data_task = DataIngestOp(base_artifact_path=component_artifact_root)

  data_preprocessing_task = DataPreprocessingOp(
      ingested_dataset_path=ingest_data_task.outputs["ingested_dataset_path"],
      base_artifact_path=component_artifact_root,
      gcp_project_id=gcp_project_id,
      region=region,
      dataflow_staging_root=dataflow_staging_root,
      beam_runner=beam_runner)

  train_model_task = TrainModelOp(
      preprocessed_dataset_path=data_preprocessing_task.
      outputs["preprocessed_dataset_path"],
      base_artifact_path=component_artifact_root)

最后,编译定义的管道,并生成一个pipeline.json规范文件。

Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

运行 KFP 管道

使用以下规范文件和代码片段,以及安装的必要的要求,您现在可以运行管道了。有关更多信息,请参阅run_pipeline文档。在运行管道之前,必须构建每个组件的容器并将其推送到您的管道可以访问的容器注册表。此外,组件规范.yaml文件必须指向正确的容器镜像。

client = kfp.Client()
experiment = client.create_experiment("KFP orchestration example")
run_result = client.run_pipeline(
    experiment_id=experiment.id,
    job_name="KFP orchestration job",
    pipeline_package_path="pipeline.json",
    params=run_arguments)

Tensorflow Extended(TFX)

使用 TFX 的工作方式类似于前面说明的 KFP 方法:定义单独的工作流组件,将它们连接到管道对象中,并在目标环境中运行管道。TFX 与众不同的是,它已经构建了一组 Python 包,这些包是用于创建工作流组件的库。与 KFP 示例不同,您无需从头开始编写和容器化代码。

使用 TFX,您需要选择哪些 TFX 组件与您的工作流相关,并使用库来调整其功能以适应您的用例。下图显示了可用组件及其对应的库。

TFX libraries and components

TFX 很大程度上依赖于 Apache Beam 在这些库中实现数据并行管道。您需要使用支持的 Apache Beam 运行程序之一来运行使用这些库创建的组件。完整的 TFX 示例代码再次可以在GitHub 仓库中找到。

对于 KFP 示例,我们使用了摄取、预处理和训练器组件。在本 TFX 示例中,我们使用了 ExampleGen、Transform 和 Trainer 库。

首先,查看管道定义。请注意,此定义类似于我们之前的示例。

def create_pipeline(
    gcp_project_id,
    region,
    pipeline_name,
    pipeline_root,
    csv_file,
    module_file,
    beam_runner,
    metadata_file):
  """Create the TFX pipeline.

  Args:
      gcp_project_id (str): ID for the google cloud project to deploy the pipeline to.
      region (str): Region in which to deploy the pipeline.
      pipeline_name (str): Name for the Beam pipeline
      pipeline_root (str): Path to artifact repository where TFX
        stores a pipeline’s artifacts.
      csv_file (str): Path to the csv input file.
      module_file (str): Path to module file containing the preprocessing_fn and run_fn.
      beam_runner (str): Beam runner: DataflowRunner or DirectRunner.
      metadata_file (str): Path to store a metadata file as a mock metadata database.
  """
  example_gen = tfx.components.CsvExampleGen(input_base=csv_file)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  transform = tfx.components.Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=module_file)

  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=transform.outputs['transformed_examples'],
      transform_graph=transform.outputs['transform_graph'])

  components = [example_gen, statistics_gen, schema_gen, transform, trainer]

  beam_pipeline_args_by_runner = {
      'DirectRunner': [],
      'DataflowRunner': [
          '--runner=DataflowRunner',
          '--project=' + gcp_project_id,
          '--temp_location=' + os.path.join(pipeline_root, 'tmp'),
          '--region=' + region,
      ]
  }

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      enable_cache=True,
      metadata_connection_config=tfx.orchestration.metadata.
      sqlite_metadata_connection_config(metadata_file),
      beam_pipeline_args=beam_pipeline_args_by_runner[beam_runner])

我们使用相同的数据输入,即从MSCOCO 2014 数据集中提取的几个图像-标题对。但是这一次,我们使用 CSV 格式的数据,因为 ExampleGen 组件默认情况下不支持 jsonlines。TensorFlow 文档中的数据源和格式页面列出了开箱即用的支持格式。或者,您可以编写一个自定义 ExampleGen

将下面的代码片段复制到一个输入数据 CSV 文件中

image_id,id,caption,image_url,image_name,image_license
318556,255,"An angled view of a beautifully decorated bathroom.","http://farm4.staticflickr.com/3133/3378902101_3c9fa16b84_z.jpg","COCO_train2014_000000318556.jpg","Attribution-NonCommercial-ShareAlike License"
476220,14,"An empty kitchen with white and black appliances.","http://farm7.staticflickr.com/6173/6207941582_b69380c020_z.jpg","COCO_train2014_000000476220.jpg","Attribution-NonCommercial License"

到目前为止,我们只导入了标准 TFX 组件并将它们串联到一个管道中。Transform 和 Trainer 组件都定义了module_file参数。这就是我们定义想要从这些标准组件中获得的行为的地方。

预处理

Transform 组件在module_file中搜索preprocessing_fn函数的定义。此函数是tf.transform库的核心概念。TFX 文档描述了此函数

预处理函数是 tf.Transform 最重要的概念。预处理函数是对数据集的转换的逻辑描述。预处理函数接受并返回一个张量字典,其中张量表示 Tensor 或 SparseTensor。有两种类型的函数用于定义预处理函数

  1. 任何接受和返回张量的函数。这些会将 TensorFlow 操作添加到图中,以将原始数据转换为转换后的数据。
  2. tf.Transform 提供的任何分析器。分析器也接受和返回张量,但与 TensorFlow 函数不同,它们不会将操作添加到图中。相反,分析器会导致 tf.Transform 在 TensorFlow 之外计算一个完整的传递操作。它们使用整个数据集上的输入张量值来生成一个常量张量作为输出。例如,tft.min 会计算整个数据集上张量的最小值。tf.Transform 提供了一组固定的分析器,但这将在未来的版本中扩展。

因此,preprocesing_fn可以包含所有接受和返回张量的 tf 操作,以及特定的tf.transform操作。在以下示例中,我们使用前者将所有传入的标题转换为仅小写字母,而后者则对我们数据集中所有数据进行完整的传递以计算用于后续预处理步骤的标题的平均长度。

def preprocessing_fn(inputs):
  """Transform raw data."""
  # convert the captions to lowercase
  # split the captions into separate words
  lower = tf.strings.lower(inputs['caption'])

  # compute the vocabulary of the captions during a full pass
  # over the dataset and use this to tokenize.
  mean_length = tft.mean(tf.strings.length(lower))
  # <do some preprocessing with the mean length>

  return {
      'caption_lower': lower,
  }

此函数仅定义了预处理期间必须执行的逻辑步骤。该函数需要一个具体的实现才能运行。一个这样的实现由tf.Transform使用 Apache Beam 提供,后者提供一个 PTransform tft_beam.AnalyzeAndTransformDataset来处理数据。我们可以使用此 PTransform 明确地在 TFX Transform 组件之外测试此 preproccesing_fn。当使用tf.Transform与 TFX Transform 组件结合时,无需以这种方式调用processing_fn

if __name__ == "__main__":
  # Test processing_fn directly without the tfx pipeline
  raw_data = [
      {
          "caption": "A bicycle replica with a clock as the front wheel."
      }, {
          "caption": "A black Honda motorcycle parked in front of a garage."
      }, {
          "caption": "A room with blue walls and a white sink and door."
      }
  ]

  # define the feature_spec (in a tfx pipeline this would be generated by a SchemaGen component)
  feature_spec = dict(caption=tf.io.FixedLenFeature([], tf.string))
  raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec)

  # test out the beam implementation of the
  # processing_fn with AnalyzeAndTransformDataset
  with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset, transform_fn = (
      (raw_data, raw_data_metadata)
      | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
  transformed_data, transformed_metadata = transformed_dataset

训练

Trainer 组件的行为类似于 Transform 组件,但它不是寻找preprocessing_fn,而是要求在指定的module_file中有一个run_fn函数。我们的简单实现使用tf.Keras创建了一个存根模型,并将生成的模型保存到一个目录中。

def run_fn(fn_args: tfx.components.FnArgs) -> None:
  """Build the TF model, train it and export it."""
  # create a model
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.Dense(1, input_dim=10))
  model.compile()

  # train the model on the preprocessed data
  # model.fit(...)

  # Save model to fn_args.serving_model_dir.
  model.save(fn_args.serving_model_dir)

执行管道

要启动管道,请提供两个配置:TFX 管道的协调器和运行 Apache Beam 管道的管道选项。为了在没有额外设置依赖项的情况下在本地运行管道,本示例使用LocalDagRunner进行协调。创建的管道可以通过beam_pipeline_args参数指定 Apache Beam 的管道选项。

args = parse_args()
tfx.orchestration.LocalDagRunner().run(create_pipeline(**vars(args)))