使用 RunInference 与 Sklearn

Pydoc Pydoc




以下示例演示了如何创建使用 Beam RunInference API 和 Sklearn 的管道。

示例 1:Sklearn 无键模型

在此示例中,我们创建一个管道,该管道对无键数据使用 SKlearn RunInference 转换。

import apache_beam as beam
import numpy
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import ModelFileType
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy

sklearn_model_filename = 'gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl'  # pylint: disable=line-too-long
sklearn_model_handler = SklearnModelHandlerNumpy(
    model_uri=sklearn_model_filename, model_file_type=ModelFileType.PICKLE)

unkeyed_data = numpy.array([20, 40, 60, 90],
                           dtype=numpy.float32).reshape(-1, 1)
with beam.Pipeline() as p:
  predictions = (
      p
      | "ReadInputs" >> beam.Create(unkeyed_data)
      | "RunInferenceSklearn" >>
      RunInference(model_handler=sklearn_model_handler)
      | beam.Map(print))

输出

PredictionResult(example=array([20.], dtype=float32), inference=array([100.], dtype=float32), model_id='gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl')
PredictionResult(example=array([40.], dtype=float32), inference=array([200.], dtype=float32), model_id='gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl')
PredictionResult(example=array([60.], dtype=float32), inference=array([300.], dtype=float32), model_id='gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl')
PredictionResult(example=array([90.], dtype=float32), inference=array([450.], dtype=float32), model_id='gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl')

示例 2:Sklearn 有键模型

在此示例中,我们创建一个管道,该管道对有键数据使用 SKlearn RunInference 转换。

import apache_beam as beam
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import ModelFileType
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy

sklearn_model_filename = 'gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl'  # pylint: disable=line-too-long
sklearn_model_handler = KeyedModelHandler(
    SklearnModelHandlerNumpy(
        model_uri=sklearn_model_filename,
        model_file_type=ModelFileType.PICKLE))

keyed_data = [("first_question", 105.00), ("second_question", 108.00),
              ("third_question", 1000.00), ("fourth_question", 1013.00)]

with beam.Pipeline() as p:
  predictions = (
      p
      | "ReadInputs" >> beam.Create(keyed_data)
      | "ConvertDataToList" >> beam.Map(lambda x: (x[0], [x[1]]))
      | "RunInferenceSklearn" >>
      RunInference(model_handler=sklearn_model_handler)
      | beam.Map(print))

输出

('first_question', PredictionResult(example=[105.0], inference=array([525.]), model_id='gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl'))
('second_question', PredictionResult(example=[108.0], inference=array([540.]), model_id='gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl'))
('third_question', PredictionResult(example=[1000.0], inference=array([5000.]), model_id='gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl'))
('fourth_question', PredictionResult(example=[1013.0], inference=array([5065.]), model_id='gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl'))