BigQuery ML 集成

在本页面的示例中,我们将演示如何使用 TFX Basic Shared Libraries (tfx_bsl) 将从 BigQuery ML (BQML) 导出的模型集成到您的 Apache Beam 管道中。

大致而言,以下部分将更详细地介绍以下步骤

  1. 创建和训练您的 BigQuery ML 模型
  2. 导出您的 BigQuery ML 模型
  3. 创建一个使用全新的 BigQuery ML 模型的转换

创建和训练您的 BigQuery ML 模型

为了能够使用 tfx_bsl 将您的 BQML 模型合并到 Apache Beam 管道中,它必须采用 TensorFlow SavedModel 格式。可以在 此处 找到将不同模型类型映射到其导出模型格式的概述。

为了简单起见,我们将使用 BQML 快速入门指南 中的(简化版本的)逻辑回归模型,使用公开可用的 Google Analytics 样本数据集(这是一个 日期分片表 - 或者,您可能会遇到 分区表)。可以在 此处 找到您可以使用 BQML 创建的所有模型的概述。

在创建 BigQuery 数据集后,您将继续创建模型,该模型完全用 SQL 定义

CREATE MODEL IF NOT EXISTS `bqml_tutorial.sample_model`
OPTIONS(model_type='logistic_reg', input_label_cols=["label"]) AS
SELECT
  IF(totals.transactions IS NULL, 0, 1) AS label,
  IFNULL(geoNetwork.country, "") AS country
FROM
  `bigquery-public-data.google_analytics_sample.ga_sessions_*`
WHERE
  _TABLE_SUFFIX BETWEEN '20160801' AND '20170630'

该模型将根据 2016-08-01 到 2017-06-30 之间收集的数据预测访客的国家/地区是否会进行购买。

导出您的 BigQuery ML 模型

为了将您的模型合并到 Apache Beam 管道中,您需要将其导出。这样做的先决条件是 安装 bq 命令行工具创建 Google Cloud Storage 存储桶 来存储您导出的模型。

使用以下命令导出模型

bq extract -m bqml_tutorial.sample_model gs://some/gcs/path

创建使用您的 BigQuery ML 模型的 Apache Beam 转换

在本节中,我们将构建一个 Apache Beam 管道,它将使用我们刚刚创建和导出的 BigQuery ML 模型。可以使用 Google Cloud AI Platform Prediction 提供该模型 - 有关此操作,请参阅 AI Platform 模式。在本例中,我们将说明如何使用 tfx_bsl 库进行本地预测(在您的 Apache Beam 工作器上)。

首先,需要将模型下载到您将在其中开发管道其余部分的本地目录(例如,到 serving_dir/sample_model/1)。

然后,您可以像往常一样开始开发管道。我们将使用 tfx_bsl 库中的 RunInference PTransform,并将它指向存储模型的本地目录(请参阅代码示例中的 model_path 变量)。该转换将接收类型为 tf.train.Example 的元素作为输入,并输出类型为 tensorflow_serving.apis.prediction_log_pb2.PredictionLog 的元素。根据模型的签名,您可以从输出中提取值;在本例中,我们根据 逻辑回归模型文档extract_prediction 函数中提取 label_probslabel_valuespredicted_label

import apache_beam
import tensorflow as tf
from google.protobuf import text_format
from tensorflow.python.framework import tensor_util
from tfx_bsl.beam import run_inference
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public.proto import model_spec_pb2


inputs = tf.train.Example(features=tf.train.Features(
            feature={
                'os': tf.train.Feature(bytes_list=tf.train.BytesList(b"Microsoft"))
            })
          )

model_path = "serving_dir/sample_model/1"

def extract_prediction(response):
  yield response.predict_log.response.outputs['label_values'].string_val,
        tensor_util.MakeNdarray(response.predict_log.response.outputs['label_probs']),
        response.predict_log.response.outputs['predicted_label'].string_val

with beam.Pipeline() as p:
    res = (
        p
        | beam.Create([inputs])
        | RunInference(
            model_spec_pb2.InferenceSpecType(
                saved_model_spec=model_spec_pb2.SavedModelSpec(
                    model_path=model_path,
                    signature_name=['serving_default'])))
        | beam.ParDo(extract_prediction)
Implemented in Python.