使用 Google Cloud Vertex AI 特征存储进行数据丰富

Pydoc Pydoc




在 Apache Beam 2.55.0 及更高版本中,丰富转换包含一个用于 Vertex AI 特征存储 的内置丰富处理程序。以下示例演示了如何创建一个使用丰富转换的管道,该转换使用 VertexAIFeatureStoreEnrichmentHandler 处理程序和 VertexAIFeatureStoreLegacyEnrichmentHandler 处理程序。

示例 1:使用 Vertex AI 特征存储进行数据丰富

存储在 Vertex AI 特征存储中的预计算特征值使用以下格式

用户 ID年龄性别国家
2142212000
296312111
2059212122
7653812130
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \
  import VertexAIFeatureStoreEnrichmentHandler

project_id = 'apache-beam-testing'
location = 'us-central1'
api_endpoint = f"{location}-aiplatform.googleapis.com"
data = [
    beam.Row(user_id='2963', product_id=14235, sale_price=15.0),
    beam.Row(user_id='21422', product_id=11203, sale_price=12.0),
    beam.Row(user_id='20592', product_id=8579, sale_price=9.0),
]

vertex_ai_handler = VertexAIFeatureStoreEnrichmentHandler(
    project=project_id,
    location=location,
    api_endpoint=api_endpoint,
    feature_store_name="vertexai_enrichment_example",
    feature_view_name="users",
    row_key="user_id",
)
with beam.Pipeline() as p:
  _ = (
      p
      | "Create" >> beam.Create(data)
      | "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
      | "Print" >> beam.Map(print))

输出

Row(user_id='2963', product_id=14235, sale_price=15.0, age=12.0, state='1', gender='1', country='1')
Row(user_id='21422', product_id=11203, sale_price=12.0, age=12.0, state='0', gender='0', country='0')
Row(user_id='20592', product_id=8579, sale_price=9.0, age=12.0, state='2', gender='1', country='2')

示例 2:使用 Vertex AI 特征存储进行数据丰富(旧版)

存储在 Vertex AI 特征存储(旧版)中的预计算特征值使用以下格式

实体 ID标题流派
电影_01肖申克的救赎剧情片
电影_02闪灵恐怖片
电影_04黑暗骑士动作片
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \
  import VertexAIFeatureStoreLegacyEnrichmentHandler

project_id = 'apache-beam-testing'
location = 'us-central1'
api_endpoint = f"{location}-aiplatform.googleapis.com"
data = [
    beam.Row(entity_id="movie_01", title='The Shawshank Redemption'),
    beam.Row(entity_id="movie_02", title="The Shining"),
    beam.Row(entity_id="movie_04", title='The Dark Knight'),
]

vertex_ai_handler = VertexAIFeatureStoreLegacyEnrichmentHandler(
    project=project_id,
    location=location,
    api_endpoint=api_endpoint,
    entity_type_id='movies',
    feature_store_id="movie_prediction_unique",
    feature_ids=["title", "genres"],
    row_key="entity_id",
)
with beam.Pipeline() as p:
  _ = (
      p
      | "Create" >> beam.Create(data)
      | "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
      | "Print" >> beam.Map(print))

输出

Row(entity_id='movie_01', title='The Shawshank Redemption', genres='Drama')
Row(entity_id='movie_02', title='The Shining', genres='Horror')
Row(entity_id='movie_04', title='The Dark Knight', genres='Action')

不适用。

Pydoc Pydoc