关于 Beam ML
| Javadoc |
你可以使用 Apache Beam 来
- 处理大量数据,包括预处理和推理。
- 在项目的探索阶段对数据进行实验。
- 作为生产环境中 ML ops 生态系统的一部分扩展你的数据管道。
- 在批处理和流式处理中,在生产环境中以不同的数据负载运行你的模型。
AI/ML 工作负载
你可以使用 Apache Beam 进行数据验证、数据预处理、模型验证以及模型部署和推理。
- 数据摄取:传入的新数据存储在你的文件系统或数据库中,或者发布到消息队列中。
- 数据验证:收到数据后,检查数据的质量。例如,你可能需要检测异常值并计算标准差和类别分布。
- 数据预处理:验证数据后,转换数据,使其准备好用于训练你的模型。
- 模型训练:当数据准备就绪后,训练你的 AI/ML 模型。根据训练模型的质量,此步骤通常需要重复多次。
- 模型验证:在部署你的模型之前,验证其性能和准确性。
- 模型部署:部署你的模型,使用它对新数据或现有数据运行推理。
为了使你的模型保持最新状态并在数据增长和演变时保持良好性能,请多次运行这些步骤。此外,你可以将 ML ops 应用于你的项目以自动化整个模型和数据生命周期中的 AI/ML 工作流。使用编排器来自动化此流程并处理项目中不同构建块之间的转换。
使用 RunInference
RunInference API 是一个针对机器学习推理优化的 PTransform
,它允许你在管道中高效地使用 ML 模型。该 API 包括以下功能
- 为了高效地为你的模型提供数据,使用 Apache Beam 的
BatchElements
转换根据管道吞吐量动态地对输入进行批处理。 - 为了平衡内存和吞吐量使用,使用中央模型管理器确定要加载的最佳模型数量。根据需要在线程和进程之间共享这些模型以最大限度地提高吞吐量。
- 通过 自动模型刷新 功能,确保你的管道使用最新部署的模型版本。
- 支持 多个框架和模型中心,包括 Tensorflow、Pytorch、Sklearn、XGBoost、Hugging Face、TensorFlow Hub、Vertex AI、TensorRT 和 ONNX。
- 使用 自定义模型处理程序 支持任意框架。
- 支持 多模型管道。
- 允许你在受支持的运行器上使用 GPU 以提高推理速度。有关更多信息,请参阅 Dataflow 文档中的 Dataflow 中的 GPU。
支持和限制
- RunInference API 在 Apache Beam 2.40.0 及更高版本中受支持。
- 模型处理程序适用于 PyTorch、scikit-learn、TensorFlow、Hugging Face、Vertex AI、ONNX、TensorRT 和 XGBoost。你也可以使用自定义模型处理程序。
- RunInference API 支持批处理和流式处理管道。
- RunInference API 支持远程推理和本地推理(相对于运行器工作器)。
BatchElements PTransform
为了利用许多模型实施的矢量化推理的优化,BatchElements
转换用作在进行模型预测之前的中间步骤。此转换将元素一起批处理。然后,使用 RunInference 的特定框架的转换应用于批处理后的元素。例如,对于 numpy ndarrays
,我们调用 numpy.stack()
,对于 torch Tensor
元素,我们调用 torch.stack()
。
为了自定义 beam.BatchElements
的设置,在 ModelHandler
中,重写 batch_elements_kwargs
函数。例如,使用 min_batch_size
设置每个批次的最低元素数,或使用 max_batch_size
设置每个批次的最高元素数。
有关更多信息,请参阅 BatchElements
转换文档。
共享辅助类
在 RunInference 实现中使用 Shared
类可以使模型仅在每个进程中加载一次并与在该进程中创建的所有 DoFn 实例共享。此功能减少了内存消耗和模型加载时间。有关更多信息,请参阅 Shared
类文档。
修改 Python 管道以使用 ML 模型
要使用 RunInference 转换,请将以下代码添加到你的管道中
from apache_beam.ml.inference.base import RunInference
with pipeline as p:
predictions = ( p | 'Read' >> beam.ReadFromSource('a_source')
| 'RunInference' >> RunInference(<model_handler>)
用模型处理程序设置代码替换 model_handler
。
要导入模型,您需要配置一个包装底层模型的 ModelHandler
对象。导入哪个模型处理程序取决于框架和包含输入的数据结构类型。ModelHandler
对象还允许您使用 env_vars
关键字参数设置推理所需的環境變數。以下示例展示了一些您可能想要导入的模型处理程序。
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from tfx_bsl.public.beam.run_inference import CreateModelHandler
使用预训练模型
本节提供了使用预训练模型的 PyTorch、Scikit-learn 和 Tensorflow 的要求。
PyTorch
您需要提供一个指向包含模型保存的权重的文件的路径。此路径必须可供管道访问。要使用 RunInference API 和 PyTorch 框架使用预训练模型,请完成以下步骤
- 下载预训练的权重并将其托管在管道可以访问的位置。
- 使用以下代码将模型权重的路径传递给 PyTorch
ModelHandler
:state_dict_path=<path_to_weights>
。
请参阅 此笔记本,其中演示了使用 Apache Beam 运行 PyTorch 模型。
Scikit-learn
您需要提供一个指向包含腌制 Scikit-learn 模型的文件的路径。此路径必须可供管道访问。要使用 RunInference API 和 Scikit-learn 框架使用预训练模型,请完成以下步骤
- 下载腌制模型类并将其托管在管道可以访问的位置。
- 使用以下代码将模型的路径传递给 Sklearn
ModelHandler
:model_uri=<path_to_pickled_file>
和model_file_type: <ModelFileType>
,其中您可以根据模型序列化方式指定ModelFileType.PICKLE
或ModelFileType.JOBLIB
。
请参阅 此笔记本,其中演示了使用 Apache Beam 运行 Scikit-learn 模型。
TensorFlow
要使用 RunInference API 使用 TensorFlow,您有两个选择
- 使用 Apache Beam SDK 中的内置 TensorFlow 模型处理程序 -
TFModelHandlerNumpy
和TFModelHandlerTensor
。- 根据模型的输入类型,分别使用
TFModelHandlerNumpy
用于numpy
输入,TFModelHandlerTensor
用于tf.Tensor
输入。 - 使用 tensorflow 2.7 或更高版本。
- 使用
model_uri=<path_to_trained_model>
将模型的路径传递给 TensorFlowModelHandler
。 - 或者,您可以传递训练模型的保存权重的路径,一个使用
create_model_fn=<function>
构建模型的函数,并设置model_type=ModelType.SAVED_WEIGHTS
。请参阅 此笔记本,其中演示了使用内置模型处理程序运行 Tensorflow 模型。
- 根据模型的输入类型,分别使用
- 使用
tfx_bsl
。- 如果您的模型输入类型为
tf.Example
,请使用此方法。 - 使用
tfx_bsl
版本 1.10.0 或更高版本。 - 使用
tfx_bsl.public.beam.run_inference.CreateModelHandler()
创建模型处理程序。 - 将模型处理程序与
apache_beam.ml.inference.base.RunInference
变换一起使用。请参阅 此笔记本,其中演示了使用 Apache Beam 和 tfx-bsl 运行 TensorFlow 模型。
- 如果您的模型输入类型为
使用自定义模型
如果您想使用不受支持框架之一指定的模型,RunInference API 灵活地设计,允许您使用任何自定义机器学习模型。您只需要创建自己的 ModelHandler
或 KeyedModelHandler
,其中包含加载模型并使用它运行推理的逻辑。
可以在 此笔记本 中找到一个简单的示例。load_model
方法展示了如何使用流行的 spaCy
包加载模型,而 run_inference
展示了如何在示例批次上运行推理。
RunInference 模式
本节建议了一些模式和最佳实践,您可以使用它们来使您的推理管道更简单、更健壮和更高效。
使用键控 ModelHandler 对象
如果示例附加了键,请将 KeyedModelHandler
包装在 ModelHandler
对象周围
from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...))
with pipeline as p:
data = p | beam.Create([
('img1', torch.tensor([[1,2,3],[4,5,6],...])),
('img2', torch.tensor([[1,2,3],[4,5,6],...])),
('img3', torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
如果您不确定您的数据是否已键入,可以使用 MaybeKeyedModelHandler
。
您还可以使用 KeyedModelHandler
根据关联的键加载多个不同的模型。以下示例使用 config1
加载模型。该模型用于对与 key1
关联的所有示例进行推理。它使用 config2
加载第二个模型。该模型用于与 key2
和 key3
关联的所有示例。
from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
data = p | beam.Create([
('key1', torch.tensor([[1,2,3],[4,5,6],...])),
('key2', torch.tensor([[1,2,3],[4,5,6],...])),
('key3', torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
有关更详细的示例,请参阅笔记本 使用多个不同训练的模型运行 ML 推理。
同时加载多个模型会增加出现内存不足错误 (OOM) 的风险。默认情况下,KeyedModelHandler
不会限制同时加载到内存中的模型数量。如果模型不能全部放入内存,您的管道可能会因内存不足错误而失败。为了避免这个问题,使用 max_models_per_worker_hint
参数来设置可以同时加载到内存中的模型的最大数量。
以下示例一次最多加载每个 SDK 工作器进程两个模型。它卸载当前未使用的模型。
mhs = [
KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)
在给定机器上具有多个 SDK 工作器进程的运行器一次最多加载 max_models_per_worker_hint*<num worker processes>
个模型到机器上。
为模型和其他变换的任何额外内存需求留出足够的空间。由于内存可能不会在模型卸载后立即释放,因此建议留出额外的缓冲区。
注意:拥有大量模型但 max_models_per_worker_hint
很小可能会导致内存抖动,即大量执行时间用于在内存中交换模型。为了降低内存抖动的可能性和影响,如果您使用的是分布式运行器,请在推理步骤之前插入一个 GroupByKey
变换。GroupByKey
变换通过确保具有相同键和模型的元素位于同一工作器上,来减少抖动。
有关更多信息,请参阅 KeyedModelHander
。
使用 PredictionResult 对象
在 Apache Beam 中进行预测时,输出 PCollection
包含输入示例的键和推理。在输出中包含这两个项目允许您找到确定预测的输入。
PredictionResult
对象是一个 NamedTuple
,它包含输入和推理,分别命名为 example
和 inference
。当输入数据中的键传递给 RunInference 变换时,输出 PCollection
返回一个 Tuple[str, PredictionResult]
,即键和 PredictionResult
对象。您的管道在 RunInference 变换之后的步骤中与 PredictionResult
对象进行交互。
class PostProcessor(beam.DoFn):
def process(self, element: Tuple[str, PredictionResult]):
key, prediction_result = element
inputs = prediction_result.example
predictions = prediction_result.inference
# Post-processing logic
result = ...
yield (key, result)
with pipeline as p:
output = (
p | 'Read' >> beam.ReadFromSource('a_source')
| 'PyTorchRunInference' >> RunInference(<keyed_model_handler>)
| 'ProcessOutput' >> beam.ParDo(PostProcessor()))
如果您需要显式使用此对象,请在您的管道中包含以下行以导入该对象
from apache_beam.ml.inference.base import PredictionResult
有关更多信息,请参阅 PredictionResult
文档。
自动模型刷新
要自动更新正在使用 RunInference PTransform
的模型,而无需停止管道,请将 ModelMetadata
侧输入 PCollection
传递给 RunInference 输入参数 model_metadata_pcoll
。
ModelMetdata
是一个 NamedTuple
,它包含
model_id
:模型的唯一标识符。这可以是文件路径或可以访问模型的 URL。它用于加载模型进行推理。URL 或文件路径必须采用兼容格式,以便相应的ModelHandlers
可以加载模型而不会出现错误。例如,
PyTorchModelHandler
最初使用权重和模型类加载模型。如果您在使用侧输入更新模型时传入来自不同模型类的权重,则模型无法正确加载,因为它需要来自原始模型类的权重。model_name
:模型的人类可读名称。您可以使用此名称在 RunInference 变换生成的指标中标识模型。
用例
- 使用
WatchFilePattern
作为 RunInferencePTransform
的侧输入来自动更新 ML 模型。有关更多信息,请参阅 使用WatchFilePattern
作为侧输入来自动更新 RunInference 中的 ML 模型。
侧输入 PCollection
必须遵循 AsSingleton
视图以避免错误。
注意:如果主 PCollection
发射输入而侧输入尚未收到输入,则主 PCollection
将被缓冲,直到侧输入更新为止。这可能会发生在具有数据驱动触发器(例如 AfterCount
和 AfterProcessingTime
)的全局窗口化侧输入中。在侧输入更新之前,发射用于将相应的 ModelHandler
作为侧输入传递的默认或初始模型 ID。
预处理和后处理你的记录
使用 RunInference,您可以将预处理和后处理操作添加到您的变换中。要应用预处理操作,请在您的模型处理程序上使用 with_preprocess_fn
inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
要应用后处理操作,请在您的模型处理程序上使用 with_postprocess_fn
inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
您还可以链接多个预处理和后处理操作
inference = pcoll | RunInference(
model_handler.with_preprocess_fn(
lambda x : do_something(x)
).with_preprocess_fn(
lambda x : do_something_else(x)
).with_postprocess_fn(
lambda x : do_something_after_inference(x)
).with_postprocess_fn(
lambda x : do_something_else_after_inference(x)
))
预处理函数在批处理和推理之前运行。此函数将您的输入 PCollection
映射到模型处理程序的基输入类型。如果您应用了多个预处理函数,它们将按从后到前的顺序在您的原始 PCollection
上运行。
后处理函数在推理后运行。此函数将基模型处理程序的输出类型映射到您想要的输出类型。如果您应用了多个后处理函数,它们将按从前到后的顺序在您的原始推理结果上运行。
处理错误
为了在使用 RunInference 时稳健地处理错误,您可以使用死信队列。死信队列将失败的记录输出到一个单独的 PCollection
中,以供进一步处理。然后可以分析此 PCollection
并将其发送到存储系统,在那里可以对其进行审查并重新提交到管道,或者将其丢弃。RunInference 对死信队列提供了内置支持。您可以通过将 with_exception_handling
应用于您的 RunInference 变换来使用死信队列
main, other = pcoll | RunInference(model_handler).with_exception_handling()
other.failed_inferences | beam.Map(print) # insert logic to handle failed records here
您还可以将此模式应用于具有关联的预处理和后处理操作的 RunInference 变换
main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
other.failed_preprocessing[0] | beam.Map(print) # handles failed preprocess operations, indexed in the order in which they were applied
other.failed_inferences | beam.Map(print) # handles failed inferences
other.failed_postprocessing[0] | beam.Map(print) # handles failed postprocess operations, indexed in the order in which they were applied
从 Java 管道运行推理
RunInference API 可与 Beam Java SDK 版本 2.41.0 及更高版本一起使用,通过 Apache Beam 的 多语言管道框架。有关 Java 包装器变换的信息,请参阅 RunInference.java。要试用它,请参阅 Java Sklearn Mnist 分类示例。此外,请参阅 从 Java SDK 使用 RunInference,其中包含一个复合 Python 变换的示例,该变换使用 RunInference API 以及来自 Beam Java SDK 管道的预处理和后处理。
自定义推理
RunInference API 目前不支持使用自然语言 API 或 Cloud Vision API 等进行远程推理调用。 因此,要在 Apache Beam 中使用这些远程 API,您需要编写自定义推理调用。 Apache Beam 笔记本中的远程推理 演示了如何使用 beam.DoFn
实现自定义远程推理调用。 当您为现实生活中的项目实现远程推理时,请考虑以下因素
API 配额以及您可能在外部 API 上产生的繁重负载。 为了优化对外部 API 的调用,您可以配置
PipelineOptions
以限制对外部远程 API 的并行调用。做好遇到、识别和尽可能优雅地处理错误的准备。 使用诸如指数回退和死信队列(未处理消息队列)之类的技术。
在使用外部 API 进行推理时,将输入批处理在一起,以实现更有效的执行。
考虑在部署时监控和衡量管道的性能,因为监控可以提供对应用程序状态和运行状况的洞察。
多模型管道
使用 RunInference 变换将多个推理模型添加到您的管道中。 多模型管道可用于 A/B 测试或构建由执行标记化、句子分割、词性标注、命名实体提取、语言检测、共指消解等模型组成的级联模型。 有关更多信息,请参阅 多模型管道。
A/B 模式
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(<model_handler_A>)
model_b_predictions = data | RunInference(<model_handler_B>)
其中 model_handler_A
和 model_handler_B
是模型处理程序设置代码。
级联模式
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(<model_handler_A>)
model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>)
其中 model_handler_A
和 model_handler_B
是模型处理程序设置代码。
对不同模型需求使用资源提示
在单个管道中使用多个模型时,不同的模型可能具有不同的内存或工作程序 SKU 要求。 资源提示允许您向运行程序提供有关管道中每个步骤的计算资源要求的信息。
例如,以下代码段扩展了前面的级联模式,并为每个 RunInference 调用提供了提示以指定 RAM 和硬件加速器要求
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(<model_handler_A>).with_resource_hints(min_ram="20GB")
model_b_predictions = model_a_predictions
| beam.Map(some_post_processing)
| RunInference(<model_handler_B>).with_resource_hints(
min_ram="4GB",
accelerator="type:nvidia-tesla-k80;count:1;install-nvidia-driver")
有关资源提示的更多信息,请参阅 资源提示。
模型验证
模型验证允许您对模型的性能进行基准测试,以测试其对以前从未见过的数据集的性能。 您可以提取选定的指标,创建可视化,记录元数据,并比较不同模型的性能,最终目标是验证您的模型是否已准备好部署。 Beam 提供支持直接在管道内运行 TensorFlow 模型的模型评估。
ML 模型评估 页面展示了如何通过使用 TensorFlow 模型分析 (TFMA) 将模型评估集成到管道中。
故障排除
如果您在管道或作业中遇到问题,本节列出了您可能会遇到的问题,并提供了解决方案的建议。
无法批处理张量元素
RunInference 使用动态批处理。 但是,RunInference API 无法批处理不同大小的张量元素,因此传递到 RunInference
变换的样本必须具有相同的维度或长度。 如果您提供不同大小的图像或不同长度的词嵌入,可能会出现以下错误
File "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py", line 232, in run_inference batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack expects each tensor to be equal size, but got [12] at entry 0 and [10] at entry 1 [while running 'PyTorchRunInference/ParDo(_RunInferenceDoFn)']
为了避免此问题,请使用相同大小的元素或禁用批处理。
选项 1:使用相同大小的元素
使用相同大小的元素或调整输入的大小。 对于计算机视觉应用程序,请调整图像输入的大小,使其具有相同的尺寸。 对于具有不同长度文本的自然语言处理 (NLP) 应用程序,请调整文本或词嵌入的大小,使其具有相同的长度。 在处理不同长度的文本时,调整大小可能不可行。 在这种情况下,您可以禁用批处理(请参阅选项 2)。
选项 2:禁用批处理
通过覆盖 ModelHandler 中的 batch_elements_kwargs
函数并将最大批处理大小 (max_batch_size
) 设置为 1 来禁用批处理:max_batch_size=1
。 有关更多信息,请参阅 BatchElements PTransforms。 有关示例,请参阅我们的 语言建模示例。
相关链接
| Javadoc |
上次更新时间:2024/10/31
您是否找到了您要找的所有内容?
所有内容都实用且清晰吗? 您想更改任何内容吗? 请告诉我们!