BigQuery ML 集成
在本页面的示例中,我们将演示如何使用 TFX Basic Shared Libraries (tfx_bsl) 将从 BigQuery ML (BQML) 导出的模型集成到您的 Apache Beam 管道中。
大致而言,以下部分将更详细地介绍以下步骤
- 创建和训练您的 BigQuery ML 模型
- 导出您的 BigQuery ML 模型
- 创建一个使用全新的 BigQuery ML 模型的转换
- Java SDK
- Python SDK
创建和训练您的 BigQuery ML 模型
为了能够使用 tfx_bsl 将您的 BQML 模型合并到 Apache Beam 管道中,它必须采用 TensorFlow SavedModel 格式。可以在 此处 找到将不同模型类型映射到其导出模型格式的概述。
为了简单起见,我们将使用 BQML 快速入门指南 中的(简化版本的)逻辑回归模型,使用公开可用的 Google Analytics 样本数据集(这是一个 日期分片表 - 或者,您可能会遇到 分区表)。可以在 此处 找到您可以使用 BQML 创建的所有模型的概述。
在创建 BigQuery 数据集后,您将继续创建模型,该模型完全用 SQL 定义
CREATE MODEL IF NOT EXISTS `bqml_tutorial.sample_model`
OPTIONS(model_type='logistic_reg', input_label_cols=["label"]) AS
SELECT
IF(totals.transactions IS NULL, 0, 1) AS label,
IFNULL(geoNetwork.country, "") AS country
FROM
`bigquery-public-data.google_analytics_sample.ga_sessions_*`
WHERE
_TABLE_SUFFIX BETWEEN '20160801' AND '20170630'
该模型将根据 2016-08-01 到 2017-06-30 之间收集的数据预测访客的国家/地区是否会进行购买。
导出您的 BigQuery ML 模型
为了将您的模型合并到 Apache Beam 管道中,您需要将其导出。这样做的先决条件是 安装 bq
命令行工具 和 创建 Google Cloud Storage 存储桶 来存储您导出的模型。
使用以下命令导出模型
bq extract -m bqml_tutorial.sample_model gs://some/gcs/path
创建使用您的 BigQuery ML 模型的 Apache Beam 转换
在本节中,我们将构建一个 Apache Beam 管道,它将使用我们刚刚创建和导出的 BigQuery ML 模型。可以使用 Google Cloud AI Platform Prediction 提供该模型 - 有关此操作,请参阅 AI Platform 模式。在本例中,我们将说明如何使用 tfx_bsl 库进行本地预测(在您的 Apache Beam 工作器上)。
首先,需要将模型下载到您将在其中开发管道其余部分的本地目录(例如,到 serving_dir/sample_model/1
)。
然后,您可以像往常一样开始开发管道。我们将使用 tfx_bsl 库中的 RunInference
PTransform,并将它指向存储模型的本地目录(请参阅代码示例中的 model_path
变量)。该转换将接收类型为 tf.train.Example
的元素作为输入,并输出类型为 tensorflow_serving.apis.prediction_log_pb2.PredictionLog
的元素。根据模型的签名,您可以从输出中提取值;在本例中,我们根据 逻辑回归模型文档 在 extract_prediction
函数中提取 label_probs
、label_values
和 predicted_label
。
import apache_beam
import tensorflow as tf
from google.protobuf import text_format
from tensorflow.python.framework import tensor_util
from tfx_bsl.beam import run_inference
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public.proto import model_spec_pb2
inputs = tf.train.Example(features=tf.train.Features(
feature={
'os': tf.train.Feature(bytes_list=tf.train.BytesList(b"Microsoft"))
})
)
model_path = "serving_dir/sample_model/1"
def extract_prediction(response):
yield response.predict_log.response.outputs['label_values'].string_val,
tensor_util.MakeNdarray(response.predict_log.response.outputs['label_probs']),
response.predict_log.response.outputs['predicted_label'].string_val
with beam.Pipeline() as p:
res = (
p
| beam.Create([inputs])
| RunInference(
model_spec_pb2.InferenceSpecType(
saved_model_spec=model_spec_pb2.SavedModelSpec(
model_path=model_path,
signature_name=['serving_default'])))
| beam.ParDo(extract_prediction)
上次更新于 2024/10/31
您是否找到了您要查找的所有内容?
所有内容是否都很有用且清晰?是否有任何您想更改的内容?请告诉我们!