关于 Beam ML

Pydoc Pydoc




Javadoc Javadoc

你可以使用 Apache Beam 来

AI/ML 工作负载

你可以使用 Apache Beam 进行数据验证、数据预处理、模型验证以及模型部署和推理。

Overview of AI/ML building blocks and where Apache Beam can be used

  1. 数据摄取:传入的新数据存储在你的文件系统或数据库中,或者发布到消息队列中。
  2. 数据验证:收到数据后,检查数据的质量。例如,你可能需要检测异常值并计算标准差和类别分布。
  3. 数据预处理:验证数据后,转换数据,使其准备好用于训练你的模型。
  4. 模型训练:当数据准备就绪后,训练你的 AI/ML 模型。根据训练模型的质量,此步骤通常需要重复多次。
  5. 模型验证:在部署你的模型之前,验证其性能和准确性。
  6. 模型部署:部署你的模型,使用它对新数据或现有数据运行推理。

为了使你的模型保持最新状态并在数据增长和演变时保持良好性能,请多次运行这些步骤。此外,你可以将 ML ops 应用于你的项目以自动化整个模型和数据生命周期中的 AI/ML 工作流。使用编排器来自动化此流程并处理项目中不同构建块之间的转换。

使用 RunInference

RunInference API 是一个针对机器学习推理优化的 PTransform,它允许你在管道中高效地使用 ML 模型。该 API 包括以下功能

支持和限制

BatchElements PTransform

为了利用许多模型实施的矢量化推理的优化,BatchElements 转换用作在进行模型预测之前的中间步骤。此转换将元素一起批处理。然后,使用 RunInference 的特定框架的转换应用于批处理后的元素。例如,对于 numpy ndarrays,我们调用 numpy.stack(),对于 torch Tensor 元素,我们调用 torch.stack()

为了自定义 beam.BatchElements 的设置,在 ModelHandler 中,重写 batch_elements_kwargs 函数。例如,使用 min_batch_size 设置每个批次的最低元素数,或使用 max_batch_size 设置每个批次的最高元素数。

有关更多信息,请参阅 BatchElements 转换文档

共享辅助类

在 RunInference 实现中使用 Shared 类可以使模型仅在每个进程中加载一次并与在该进程中创建的所有 DoFn 实例共享。此功能减少了内存消耗和模型加载时间。有关更多信息,请参阅 Shared 类文档

修改 Python 管道以使用 ML 模型

要使用 RunInference 转换,请将以下代码添加到你的管道中

from apache_beam.ml.inference.base import RunInference
with pipeline as p:
   predictions = ( p |  'Read' >> beam.ReadFromSource('a_source')
                     | 'RunInference' >> RunInference(<model_handler>)

用模型处理程序设置代码替换 model_handler

要导入模型,您需要配置一个包装底层模型的 ModelHandler 对象。导入哪个模型处理程序取决于框架和包含输入的数据结构类型。ModelHandler 对象还允许您使用 env_vars 关键字参数设置推理所需的環境變數。以下示例展示了一些您可能想要导入的模型处理程序。

from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from tfx_bsl.public.beam.run_inference import CreateModelHandler

使用预训练模型

本节提供了使用预训练模型的 PyTorch、Scikit-learn 和 Tensorflow 的要求。

PyTorch

您需要提供一个指向包含模型保存的权重的文件的路径。此路径必须可供管道访问。要使用 RunInference API 和 PyTorch 框架使用预训练模型,请完成以下步骤

  1. 下载预训练的权重并将其托管在管道可以访问的位置。
  2. 使用以下代码将模型权重的路径传递给 PyTorch ModelHandlerstate_dict_path=<path_to_weights>

请参阅 此笔记本,其中演示了使用 Apache Beam 运行 PyTorch 模型。

Scikit-learn

您需要提供一个指向包含腌制 Scikit-learn 模型的文件的路径。此路径必须可供管道访问。要使用 RunInference API 和 Scikit-learn 框架使用预训练模型,请完成以下步骤

  1. 下载腌制模型类并将其托管在管道可以访问的位置。
  2. 使用以下代码将模型的路径传递给 Sklearn ModelHandlermodel_uri=<path_to_pickled_file>model_file_type: <ModelFileType>,其中您可以根据模型序列化方式指定 ModelFileType.PICKLEModelFileType.JOBLIB

请参阅 此笔记本,其中演示了使用 Apache Beam 运行 Scikit-learn 模型。

TensorFlow

要使用 RunInference API 使用 TensorFlow,您有两个选择

  1. 使用 Apache Beam SDK 中的内置 TensorFlow 模型处理程序 - TFModelHandlerNumpyTFModelHandlerTensor
    • 根据模型的输入类型,分别使用 TFModelHandlerNumpy 用于 numpy 输入,TFModelHandlerTensor 用于 tf.Tensor 输入。
    • 使用 tensorflow 2.7 或更高版本。
    • 使用 model_uri=<path_to_trained_model> 将模型的路径传递给 TensorFlow ModelHandler
    • 或者,您可以传递训练模型的保存权重的路径,一个使用 create_model_fn=<function> 构建模型的函数,并设置 model_type=ModelType.SAVED_WEIGHTS。请参阅 此笔记本,其中演示了使用内置模型处理程序运行 Tensorflow 模型。
  2. 使用 tfx_bsl
    • 如果您的模型输入类型为 tf.Example,请使用此方法。
    • 使用 tfx_bsl 版本 1.10.0 或更高版本。
    • 使用 tfx_bsl.public.beam.run_inference.CreateModelHandler() 创建模型处理程序。
    • 将模型处理程序与 apache_beam.ml.inference.base.RunInference 变换一起使用。请参阅 此笔记本,其中演示了使用 Apache Beam 和 tfx-bsl 运行 TensorFlow 模型。

使用自定义模型

如果您想使用不受支持框架之一指定的模型,RunInference API 灵活地设计,允许您使用任何自定义机器学习模型。您只需要创建自己的 ModelHandlerKeyedModelHandler,其中包含加载模型并使用它运行推理的逻辑。

可以在 此笔记本 中找到一个简单的示例。load_model 方法展示了如何使用流行的 spaCy 包加载模型,而 run_inference 展示了如何在示例批次上运行推理。

RunInference 模式

本节建议了一些模式和最佳实践,您可以使用它们来使您的推理管道更简单、更健壮和更高效。

使用键控 ModelHandler 对象

如果示例附加了键,请将 KeyedModelHandler 包装在 ModelHandler 对象周围

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...))
with pipeline as p:
   data = p | beam.Create([
      ('img1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('img2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('img3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

如果您不确定您的数据是否已键入,可以使用 MaybeKeyedModelHandler

您还可以使用 KeyedModelHandler 根据关联的键加载多个不同的模型。以下示例使用 config1 加载模型。该模型用于对与 key1 关联的所有示例进行推理。它使用 config2 加载第二个模型。该模型用于与 key2key3 关联的所有示例。

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
   data = p | beam.Create([
      ('key1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

有关更详细的示例,请参阅笔记本 使用多个不同训练的模型运行 ML 推理

同时加载多个模型会增加出现内存不足错误 (OOM) 的风险。默认情况下,KeyedModelHandler 不会限制同时加载到内存中的模型数量。如果模型不能全部放入内存,您的管道可能会因内存不足错误而失败。为了避免这个问题,使用 max_models_per_worker_hint 参数来设置可以同时加载到内存中的模型的最大数量。

以下示例一次最多加载每个 SDK 工作器进程两个模型。它卸载当前未使用的模型。

mhs = [
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
  KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
  KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

在给定机器上具有多个 SDK 工作器进程的运行器一次最多加载 max_models_per_worker_hint*<num worker processes> 个模型到机器上。

为模型和其他变换的任何额外内存需求留出足够的空间。由于内存可能不会在模型卸载后立即释放,因此建议留出额外的缓冲区。

注意:拥有大量模型但 max_models_per_worker_hint 很小可能会导致内存抖动,即大量执行时间用于在内存中交换模型。为了降低内存抖动的可能性和影响,如果您使用的是分布式运行器,请在推理步骤之前插入一个 GroupByKey 变换。GroupByKey 变换通过确保具有相同键和模型的元素位于同一工作器上,来减少抖动。

有关更多信息,请参阅 KeyedModelHander

使用 PredictionResult 对象

在 Apache Beam 中进行预测时,输出 PCollection 包含输入示例的键和推理。在输出中包含这两个项目允许您找到确定预测的输入。

PredictionResult 对象是一个 NamedTuple,它包含输入和推理,分别命名为 exampleinference。当输入数据中的键传递给 RunInference 变换时,输出 PCollection 返回一个 Tuple[str, PredictionResult],即键和 PredictionResult 对象。您的管道在 RunInference 变换之后的步骤中与 PredictionResult 对象进行交互。

class PostProcessor(beam.DoFn):
    def process(self, element: Tuple[str, PredictionResult]):
       key, prediction_result = element
       inputs = prediction_result.example
       predictions = prediction_result.inference

       # Post-processing logic
       result = ...

       yield (key, result)

with pipeline as p:
    output = (
        p | 'Read' >> beam.ReadFromSource('a_source')
                | 'PyTorchRunInference' >> RunInference(<keyed_model_handler>)
                | 'ProcessOutput' >> beam.ParDo(PostProcessor()))

如果您需要显式使用此对象,请在您的管道中包含以下行以导入该对象

from apache_beam.ml.inference.base import PredictionResult

有关更多信息,请参阅 PredictionResult 文档

自动模型刷新

要自动更新正在使用 RunInference PTransform 的模型,而无需停止管道,请将 ModelMetadata 侧输入 PCollection 传递给 RunInference 输入参数 model_metadata_pcoll

ModelMetdata 是一个 NamedTuple,它包含

用例

侧输入 PCollection 必须遵循 AsSingleton 视图以避免错误。

注意:如果主 PCollection 发射输入而侧输入尚未收到输入,则主 PCollection 将被缓冲,直到侧输入更新为止。这可能会发生在具有数据驱动触发器(例如 AfterCountAfterProcessingTime)的全局窗口化侧输入中。在侧输入更新之前,发射用于将相应的 ModelHandler 作为侧输入传递的默认或初始模型 ID。

预处理和后处理你的记录

使用 RunInference,您可以将预处理和后处理操作添加到您的变换中。要应用预处理操作,请在您的模型处理程序上使用 with_preprocess_fn

inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))

要应用后处理操作,请在您的模型处理程序上使用 with_postprocess_fn

inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))

您还可以链接多个预处理和后处理操作

inference = pcoll | RunInference(
    model_handler.with_preprocess_fn(
      lambda x : do_something(x)
    ).with_preprocess_fn(
      lambda x : do_something_else(x)
    ).with_postprocess_fn(
      lambda x : do_something_after_inference(x)
    ).with_postprocess_fn(
      lambda x : do_something_else_after_inference(x)
    ))

预处理函数在批处理和推理之前运行。此函数将您的输入 PCollection 映射到模型处理程序的基输入类型。如果您应用了多个预处理函数,它们将按从后到前的顺序在您的原始 PCollection 上运行。

后处理函数在推理后运行。此函数将基模型处理程序的输出类型映射到您想要的输出类型。如果您应用了多个后处理函数,它们将按从前到后的顺序在您的原始推理结果上运行。

处理错误

为了在使用 RunInference 时稳健地处理错误,您可以使用死信队列。死信队列将失败的记录输出到一个单独的 PCollection 中,以供进一步处理。然后可以分析此 PCollection 并将其发送到存储系统,在那里可以对其进行审查并重新提交到管道,或者将其丢弃。RunInference 对死信队列提供了内置支持。您可以通过将 with_exception_handling 应用于您的 RunInference 变换来使用死信队列

main, other = pcoll | RunInference(model_handler).with_exception_handling()
other.failed_inferences | beam.Map(print) # insert logic to handle failed records here

您还可以将此模式应用于具有关联的预处理和后处理操作的 RunInference 变换

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
other.failed_preprocessing[0] | beam.Map(print) # handles failed preprocess operations, indexed in the order in which they were applied
other.failed_inferences | beam.Map(print) # handles failed inferences
other.failed_postprocessing[0] | beam.Map(print) # handles failed postprocess operations, indexed in the order in which they were applied

从 Java 管道运行推理

RunInference API 可与 Beam Java SDK 版本 2.41.0 及更高版本一起使用,通过 Apache Beam 的 多语言管道框架。有关 Java 包装器变换的信息,请参阅 RunInference.java。要试用它,请参阅 Java Sklearn Mnist 分类示例。此外,请参阅 从 Java SDK 使用 RunInference,其中包含一个复合 Python 变换的示例,该变换使用 RunInference API 以及来自 Beam Java SDK 管道的预处理和后处理。

自定义推理

RunInference API 目前不支持使用自然语言 API 或 Cloud Vision API 等进行远程推理调用。 因此,要在 Apache Beam 中使用这些远程 API,您需要编写自定义推理调用。 Apache Beam 笔记本中的远程推理 演示了如何使用 beam.DoFn 实现自定义远程推理调用。 当您为现实生活中的项目实现远程推理时,请考虑以下因素

多模型管道

使用 RunInference 变换将多个推理模型添加到您的管道中。 多模型管道可用于 A/B 测试或构建由执行标记化、句子分割、词性标注、命名实体提取、语言检测、共指消解等模型组成的级联模型。 有关更多信息,请参阅 多模型管道

A/B 模式

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = data | RunInference(<model_handler_B>)

其中 model_handler_Amodel_handler_B 是模型处理程序设置代码。

级联模式

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>)

其中 model_handler_Amodel_handler_B 是模型处理程序设置代码。

对不同模型需求使用资源提示

在单个管道中使用多个模型时,不同的模型可能具有不同的内存或工作程序 SKU 要求。 资源提示允许您向运行程序提供有关管道中每个步骤的计算资源要求的信息。

例如,以下代码段扩展了前面的级联模式,并为每个 RunInference 调用提供了提示以指定 RAM 和硬件加速器要求

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>).with_resource_hints(min_ram="20GB")
   model_b_predictions = model_a_predictions
      | beam.Map(some_post_processing)
      | RunInference(<model_handler_B>).with_resource_hints(
         min_ram="4GB",
         accelerator="type:nvidia-tesla-k80;count:1;install-nvidia-driver")

有关资源提示的更多信息,请参阅 资源提示

模型验证

模型验证允许您对模型的性能进行基准测试,以测试其对以前从未见过的数据集的性能。 您可以提取选定的指标,创建可视化,记录元数据,并比较不同模型的性能,最终目标是验证您的模型是否已准备好部署。 Beam 提供支持直接在管道内运行 TensorFlow 模型的模型评估。

ML 模型评估 页面展示了如何通过使用 TensorFlow 模型分析 (TFMA) 将模型评估集成到管道中。

故障排除

如果您在管道或作业中遇到问题,本节列出了您可能会遇到的问题,并提供了解决方案的建议。

无法批处理张量元素

RunInference 使用动态批处理。 但是,RunInference API 无法批处理不同大小的张量元素,因此传递到 RunInference 变换的样本必须具有相同的维度或长度。 如果您提供不同大小的图像或不同长度的词嵌入,可能会出现以下错误

File "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py", line 232, in run_inference batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack expects each tensor to be equal size, but got [12] at entry 0 and [10] at entry 1 [while running 'PyTorchRunInference/ParDo(_RunInferenceDoFn)']

为了避免此问题,请使用相同大小的元素或禁用批处理。

选项 1:使用相同大小的元素

使用相同大小的元素或调整输入的大小。 对于计算机视觉应用程序,请调整图像输入的大小,使其具有相同的尺寸。 对于具有不同长度文本的自然语言处理 (NLP) 应用程序,请调整文本或词嵌入的大小,使其具有相同的长度。 在处理不同长度的文本时,调整大小可能不可行。 在这种情况下,您可以禁用批处理(请参阅选项 2)。

选项 2:禁用批处理

通过覆盖 ModelHandler 中的 batch_elements_kwargs 函数并将最大批处理大小 (max_batch_size) 设置为 1 来禁用批处理:max_batch_size=1。 有关更多信息,请参阅 BatchElements PTransforms。 有关示例,请参阅我们的 语言建模示例

Pydoc Pydoc




Javadoc Javadoc