使用 RunInference 与 PyTorch

Pydoc Pydoc




以下示例演示了如何创建使用 Beam RunInference API 和 PyTorch 的管道。

示例 1:PyTorch 无键模型

在这个示例中,我们创建了一个管道,该管道在无键数据上使用 PyTorch RunInference 转换。

import apache_beam as beam
import numpy
import torch
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor

model_state_dict_path = 'gs://apache-beam-samples/run_inference/five_times_table_torch.pt'  # pylint: disable=line-too-long
model_class = LinearRegression
model_params = {'input_dim': 1, 'output_dim': 1}
model_handler = PytorchModelHandlerTensor(
    model_class=model_class,
    model_params=model_params,
    state_dict_path=model_state_dict_path)

unkeyed_data = numpy.array([10, 40, 60, 90],
                           dtype=numpy.float32).reshape(-1, 1)

with beam.Pipeline() as p:
  predictions = (
      p
      | 'InputData' >> beam.Create(unkeyed_data)
      | 'ConvertNumpyToTensor' >> beam.Map(torch.Tensor)
      | 'PytorchRunInference' >> RunInference(model_handler=model_handler)
      | beam.Map(print))

输出

PredictionResult(example=tensor([10.]), inference=tensor([52.2325]), model_id='gs://apache-beam-samples/run_inference/five_times_table_torch.pt')
PredictionResult(example=tensor([40.]), inference=tensor([201.1165]), model_id='gs://apache-beam-samples/run_inference/five_times_table_torch.pt')
PredictionResult(example=tensor([60.]), inference=tensor([300.3724]), model_id='gs://apache-beam-samples/run_inference/five_times_table_torch.pt')
PredictionResult(example=tensor([90.]), inference=tensor([449.2563]), model_id='gs://apache-beam-samples/run_inference/five_times_table_torch.pt')

示例 2:PyTorch 有键模型

在这个示例中,我们创建了一个管道,该管道在有键数据上使用 PyTorch RunInference 转换。

import apache_beam as beam
import torch
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor

model_state_dict_path = 'gs://apache-beam-samples/run_inference/five_times_table_torch.pt'  # pylint: disable=line-too-long
model_class = LinearRegression
model_params = {'input_dim': 1, 'output_dim': 1}
keyed_model_handler = KeyedModelHandler(
    PytorchModelHandlerTensor(
        model_class=model_class,
        model_params=model_params,
        state_dict_path=model_state_dict_path))

keyed_data = [("first_question", 105.00), ("second_question", 108.00),
              ("third_question", 1000.00), ("fourth_question", 1013.00)]

with beam.Pipeline() as p:
  predictions = (
      p
      | 'KeyedInputData' >> beam.Create(keyed_data)
      | "ConvertIntToTensor" >>
      beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | 'PytorchRunInference' >>
      RunInference(model_handler=keyed_model_handler)
      | beam.Map(print))

输出

('first_question', PredictionResult(example=tensor([105.]), inference=tensor([523.6982]), model_id='gs://apache-beam-samples/run_inference/five_times_table_torch.pt'))
('second_question', PredictionResult(example=tensor([108.]), inference=tensor([538.5867]), model_id='gs://apache-beam-samples/run_inference/five_times_table_torch.pt'))
('third_question', PredictionResult(example=tensor([1000.]), inference=tensor([4965.4019]), model_id='gs://apache-beam-samples/run_inference/five_times_table_torch.pt'))
('fourth_question', PredictionResult(example=tensor([1013.]), inference=tensor([5029.9180]), model_id='gs://apache-beam-samples/run_inference/five_times_table_torch.pt'))