Beam 中的大型语言模型推理

在 Apache Beam 2.40.0 中,Beam 引入了 RunInference API,它允许您在 Beam 管道中部署机器学习模型。RunInference 转换使用机器学习 (ML) 模型对 PCollection 中的示例进行推理。该转换输出一个 PCollection,其中包含输入示例和输出预测。有关更多信息,请参阅 RunInference 此处。您也可以在 GitHub 上找到 推理示例

使用 RunInference 处理非常大的模型

RunInference 在任意大型模型上都能很好地工作,只要它们适合您的硬件即可。

内存管理

RunInference 有几种机制可以减少内存使用量。例如,默认情况下,RunInference 每个进程最多加载一个模型副本(而不是每个线程一个)。

但是,许多 Beam 运行器在每台机器上同时运行多个 Beam 进程。这会导致问题,因为加载大型模型(如 LLM)多次的内存占用量可能太大,无法容纳在一台机器中。对于内存密集型模型,RunInference 提供了一种机制,可以在多个进程之间更智能地共享内存,以减少整体内存占用量。要启用此模式,用户只需在模型配置中将参数 large_model 设置为 True(有关示例,请参见下文),Beam 将负责内存管理。在使用自定义模型处理程序时,您可以覆盖 share_model_across_processes 函数或 model_copies 函数以获得类似的效果。

使用 T5 运行示例管道

此示例演示了使用管道中的 RunInferenceT5 语言模型进行推理。T5 是一种编码器-解码器模型,经过预训练,可以处理无监督和监督任务的混合多任务。每个任务都被转换为文本到文本格式。该示例使用 T5-11B,它包含 110 亿个参数,大小为 45 GB。为了在各种任务上都能良好地工作,T5 会在每个任务对应的输入前面加上不同的前缀。例如,对于翻译,输入将是:translate English to German: …,而对于摘要,它将是:summarize: …。有关 T5 的更多信息,请参阅 HuggingFace 文档中的 T5 概述

要使用此模型进行推理,首先,安装 apache-beam 2.40 或更高版本

pip install apache-beam -U

接下来,安装 requirements.txt 中列出的必需软件包,并传递必需的参数。您可以使用以下步骤从 Hugging Face Hub 下载 T5-11b 模型

import torch
from transformers import T5ForConditionalGeneration

model = T5ForConditionalGeneration.from_pretrained("path/to/cloned/t5-11b")
torch.save(model.state_dict(), "path/to/save/state_dict.pth")

您可以在 GitHub 上查看代码

  1. 在您的机器上本地运行
python main.py --runner DirectRunner \
               --model_state_dict_path <local or remote path to state_dict> \
               --model_name t5-11b

您需要有 45 GB 的磁盘空间才能运行此示例。

  1. 在 Google Cloud 上使用 Dataflow
python main.py --runner DataflowRunner \
                --model_state_dict_path <gs://path/to/saved/state_dict.pth> \
                --model_name t5-11b \
                --project <PROJECT_ID> \
                --region <REGION> \
                --requirements_file requirements.txt \
                --staging_location <gs://path/to/staging/location>
                --temp_location <gs://path/to/temp/location> \
                --experiments "use_runner_v2,no_use_multiple_sdk_containers" \
                --machine_type=n1-highmem-16 \
                --disk_size_gb=200

您也可以按照 此处 所述传递其他配置参数。

管道步骤

管道包含以下步骤

  1. 读取输入。
  2. 使用标记器将文本编码为可读的转换器标记 ID 整数。
  3. 使用 RunInference 获取输出。
  4. 解码 RunInference 输出并打印它。

以下代码片段包含四个步骤

    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
            pipeline
            | "CreateInputs" >> beam.Create(task_sentences)
            | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
            | "RunInference" >> RunInference(model_handler=model_handler)
            | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer))
        )

在管道的第三步中,我们使用 RunInference。为了使用它,您必须首先定义一个 ModelHandler。RunInference 为 PyTorchTensorFlowScikit-Learn 提供模型处理程序。由于该示例使用的是 PyTorch 模型,因此它使用 PyTorchModelHandlerTensor 模型处理程序。

  gen_fn = make_tensor_model_fn('generate')

  model_handler = PytorchModelHandlerTensor(
      state_dict_path=args.model_state_dict_path,
      model_class=T5ForConditionalGeneration,
      model_params={"config": AutoConfig.from_pretrained(args.model_name)},
      device="cpu",
      inference_fn=gen_fn,
      large_model=True)

ModelHandler 需要参数,例如

大型模型故障排除

腌制错误

在使用 large_model=True 在进程之间共享模型或使用自定义模型处理程序时,Beam 会将输入和输出数据通过进程边界发送。为此,它使用一种称为 腌制 的序列化方法。例如,如果您调用 output=model.my_inference_fn(input_1, input_2),则 input_1input_2output 都需要被腌制。模型本身不需要被腌制,因为它不会通过进程边界传递。

虽然大多数对象可以毫无问题地被腌制,但如果这些对象之一不可腌制,您可能会遇到类似 error: can't pickle fasttext_pybind.fasttext objects 的错误。要解决此问题,有一些方法

首先,如果可能,您可以选择不跨进程共享您的模型。这会带来额外的内存压力,但在某些情况下可能是可以忍受的。

其次,使用自定义模型处理器,您可以封装您的模型以接收和返回可序列化类型。例如,如果您的模型处理器看起来像

class MyModelHandler():
   def load_model(self):
      return model_loading_logic()

   def run_inference(self, batch: Sequence[str], model, inference_args):
      unpickleable_object = Unpickleable(batch)
      unpickleable_returned = model.predict(unpickleable_object)
      my_output = int(unpickleable_returned[0])
      return my_output

您也可以将不可pickle化的部分封装到一个模型包装器中。由于模型包装器将在推理过程中存在,只要它只接收/返回可pickle化的对象,这就会起作用。

class MyWrapper():
   def __init__(self, model):
      self._model = model

   def predict(self, batch: Sequence[str]):
      unpickleable_object = Unpickleable(batch)
      unpickleable_returned = model.predict(unpickleable_object)
      return int(prediction[0])

class MyModelHandler():
   def load_model(self):
      return MyWrapper(model_loading_logic())

   def run_inference(self, batch: Sequence[str], model: MyWrapper, inference_args):
      return model.predict(unpickleable_object)

Beam 中的 RAG 和提示工程

Beam 也是一个非常棒的工具,可用于使用检索增强生成 (RAG) 来提高您的 LLM 提示的质量。检索增强生成是一种技术,它通过将大型语言模型 (LLM) 连接到外部知识源来增强它们。这使得 LLM 能够访问和处理实时信息,从而提高其响应的准确性、相关性和真实性。

Beam 有几种机制可以简化此过程

  1. Beam 的 MLTransform 提供了一个嵌入包,用于生成 RAG 用到的嵌入。如果您有模型却没有嵌入处理器,您也可以使用 RunInference 来生成嵌入。
  2. Beam 的 Enrichment 变换 使得在外部存储系统(例如 向量数据库)中查找嵌入或其他信息变得很容易。

总的来说,您可以使用以下步骤来执行 RAG:

管道 1 - 生成知识库

  1. 使用 Beam 的 I/O 连接器 中的一种从外部源获取数据。
  2. 使用 MLTransform 对数据生成嵌入。
  3. 使用 ParDo 将这些嵌入写入向量数据库。

管道 2 - 使用知识库执行 RAG

  1. 使用 Beam 的 I/O 连接器 中的一种从外部源获取数据。
  2. 使用 MLTransform 对数据生成嵌入。
  3. 使用 Enrichment 从您的向量数据库中使用其他嵌入来丰富这些数据。
  4. 使用这些丰富的数据,并使用 RunInference 来提示您的 LLM。
  5. 使用 Beam 的 I/O 连接器 中的一种将这些数据写入您想要的接收器。

要查看执行 RAG 的示例管道,请参阅 https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/rag_usecase/beam_rag_notebook.ipynb