多模型管道

Apache Beam 允许您开发多模型管道。本示例演示了如何提取和转换输入数据,将其运行通过模型,然后将第一个模型的结果传递给第二个模型。本页解释了多模型管道的工作原理,并概述了构建多模型管道所需了解的信息。

在阅读本节之前,建议您熟悉 管道开发生命周期 中的信息。

如何使用 Beam 构建多模型管道

典型的机器学习工作流程涉及一系列数据转换步骤,例如数据提取、数据处理任务、推理和后处理。Apache Beam 允许您通过将所有这些步骤封装在一个 Apache Beam 有向无环图 (DAG) 中来编排所有这些步骤,这使您能够构建弹性和可扩展的端到端机器学习系统。

要将您的机器学习模型部署在 Apache Beam 管道中,请使用 RunInferenceAPI,它可以将您的模型作为 DAG 中的 PTransform 步骤进行集成。在一个 DAG 中组合多个 RunInference 转换可以构建一个包含多个 ML 模型的管道。通过这种方式,Apache Beam 支持开发复杂的 ML 系统。

您可以使用不同的模式在 Apache Beam 中构建多模型管道。本页探讨了 A/B 模式和级联模式。

A/B 模式

A/B 模式描述了一个框架,其中多个 ML 模型并行运行。此模式的一种应用是测试不同机器学习模型的性能,并确定新模型是否比现有模型有所改进。这也被称为“冠军/挑战者”方法。通常,您需要定义一个业务指标来比较控制模型与当前模型的性能。

一个例子可能是推荐引擎模型,其中您有一个现有模型,该模型根据用户的偏好和活动历史推荐广告。当决定部署新模型时,您可以将传入的用户流量分成两个分支,其中一半的用户接触到新模型,另一半的用户接触到现有模型。

之后,您可以测量两组用户在特定时间段内的广告平均点击率 (CTR),以确定新模型是否比现有模型表现更好。

import apache_beam as beam

with beam.Pipeline() as pipeline:
   userset_a_traffic, userset_b_traffic =
     (pipeline | 'ReadFromStream' >> beam.ReadFromStream('stream_source')
               | ‘Partition’ >> beam.partition(split_dataset, 2, ratio=[5, 5])
     )

model_a_predictions = userset_a_traffic | RunInference(<model_handler_A>)
model_b_predictions = userset_b_traffic | RunInference(<model_handler_B>)

其中 beam.partition 用于将数据源拆分成 50/50 的拆分分区。有关数据分区的更多信息,请参见 Partition

级联模式

当问题的解决方案涉及一系列 ML 模型时,使用级联模式。在这种情况下,模型的输出通常使用 PTransform 转换为合适的格式,然后再传递给另一个模型。

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = model_a_predictions | beam.ParDo(post_processing()) | RunInference(<model_handler_B>)

the Ensemble model using an image captioning and ranking example notebook shows an end-to-end example of a cascade pipeline used to generate and rank image captions. The solution consists of two open-source models

  1. A caption generation model (BLIP) that generates candidate image captions from an input image.
  2. A caption ranking model (CLIP) that uses the image and candidate captions to rank the captions in the order in which they best describe the image.

使用多个经过不同训练的模型

您可以使用 KeyedModelHandler 将多个不同的模型加载到 RunInference 转换中。使用关联的密钥来确定使用哪个模型以及哪些数据。以下示例使用 config1 加载模型。该模型用于对与 key1 关联的所有示例进行推理。它使用 config2 加载第二个模型。该模型用于与 key2key3 关联的所有示例。

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
   data = p | beam.Create([
      ('key1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

有关更详细的示例,请参见笔记本 Run ML inference with multiple differently-trained models.

同时加载多个模型会增加出现内存不足错误 (OOM) 的风险。默认情况下,KeyedModelHandler 不会限制同时加载到内存中的模型数量。如果所有模型都无法放入内存,您的管道可能会因内存不足错误而失败。要避免此问题,请使用 max_models_per_worker_hint 参数来设置可以同时加载到内存中的最大模型数量。

以下示例一次最多加载两个模型到每个 SDK 工作器进程。它卸载当前未使用的模型。

mhs = [
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
  KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
  KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

在给定机器上具有多个 SDK 工作器进程的 Runner 在该机器上最多加载 max_models_per_worker_hint*<num worker processes> 个模型。

为模型和其他转换的任何额外内存需求留出足够的空间。由于模型在卸载后可能不会立即释放内存,因此建议留出额外的缓冲区。

注意:如果模型很多,但 max_models_per_worker_hint 很小,会导致内存抖动,即大量的执行时间用于将模型交换进出内存。要降低内存抖动的可能性和影响,如果您使用的是分布式 Runner,请在推理步骤之前插入一个 GroupByKey 转换。GroupByKey 转换通过确保具有相同密钥和模型的元素位于同一工作器上,从而减少了抖动。

有关更多信息,请参见 KeyedModelHander.