AI 平台集成模式

本页描述了使用 Google Cloud AI Platform 转换的管道中的常见模式。

分析文本的结构和含义

本节展示了如何使用 Google Cloud Natural Language API 执行文本分析。

Beam 提供了一个名为 AnnotateText 的 PTransform。此转换接收一个类型为 Document 的 PCollection。每个 Document 对象包含有关文本的各种信息。这包括内容、它是纯文本还是 HTML、可选的语言提示和其他设置。AnnotateText 生成类型为 AnnotateTextResponse 的响应对象,该对象从 API 返回。AnnotateTextResponse 是一个 protobuf 消息,包含许多属性,其中一些是复杂的结构。

以下是一个管道的示例,该管道创建字符串的内存内 PCollection,将每个字符串更改为 Document 对象并调用 Natural Language API。然后,对于每个响应对象,调用一个函数来提取分析的某些结果。

features = nlp.types.AnnotateTextRequest.Features(
    extract_entities=True,
    extract_document_sentiment=True,
    extract_entity_sentiment=True,
    extract_syntax=True,
)

with beam.Pipeline() as pipeline:
  responses = (
      pipeline
      | beam.Create([
          'My experience so far has been fantastic! '
          'I\'d really recommend this product.'
      ])
      | beam.Map(lambda x: nlp.Document(x, type='PLAIN_TEXT'))
      | nlp.AnnotateText(features))

  _ = (
      responses
      | beam.Map(extract_sentiments)
      | 'Parse sentiments to JSON' >> beam.Map(json.dumps)
      | 'Write sentiments' >> beam.io.WriteToText('sentiments.txt'))

  _ = (
      responses
      | beam.Map(extract_entities)
      | 'Parse entities to JSON' >> beam.Map(json.dumps)
      | 'Write entities' >> beam.io.WriteToText('entities.txt'))

  _ = (
      responses
      | beam.Map(analyze_dependency_tree)
      | 'Parse adjacency list to JSON' >> beam.Map(json.dumps)
      | 'Write adjacency list' >> beam.io.WriteToText('adjancency_list.txt'))
AnnotateTextRequest.Features features =
    AnnotateTextRequest.Features.newBuilder()
        .setExtractEntities(true)
        .setExtractDocumentSentiment(true)
        .setExtractEntitySentiment(true)
        .setExtractSyntax(true)
        .build();
AnnotateText annotateText = AnnotateText.newBuilder().setFeatures(features).build();

PCollection<AnnotateTextResponse> responses =
    p.apply(
            Create.of(
                "My experience so far has been fantastic, "
                    + "I\'d really recommend this product."))
        .apply(
            MapElements.into(TypeDescriptor.of(Document.class))
                .via(
                    (SerializableFunction<String, Document>)
                        input ->
                            Document.newBuilder()
                                .setContent(input)
                                .setType(Document.Type.PLAIN_TEXT)
                                .build()))
        .apply(annotateText);

responses
    .apply(MapElements.into(TypeDescriptor.of(TextSentiments.class)).via(extractSentiments))
    .apply(
        MapElements.into(TypeDescriptors.strings())
            .via((SerializableFunction<TextSentiments, String>) TextSentiments::toJson))
    .apply(TextIO.write().to("sentiments.txt"));

responses
    .apply(
        MapElements.into(
                TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings()))
            .via(extractEntities))
    .apply(MapElements.into(TypeDescriptors.strings()).via(mapEntitiesToJson))
    .apply(TextIO.write().to("entities.txt"));

responses
    .apply(
        MapElements.into(
                TypeDescriptors.lists(
                    TypeDescriptors.maps(
                        TypeDescriptors.strings(),
                        TypeDescriptors.lists(TypeDescriptors.strings()))))
            .via(analyzeDependencyTree))
    .apply(MapElements.into(TypeDescriptors.strings()).via(mapDependencyTreesToJson))
    .apply(TextIO.write().to("adjacency_list.txt"));

提取情感

这是从 API 返回的响应对象的其中一部分。句子级情感可以在 sentences 属性中找到。sentences 类似于标准 Python 序列,因此所有核心语言功能(如迭代或切片)都将起作用。总体情感可以在 document_sentiment 属性中找到。

sentences {
  text {
    content: "My experience so far has been fantastic!"
  }
  sentiment {
    magnitude: 0.8999999761581421
    score: 0.8999999761581421
  }
}
sentences {
  text {
    content: "I\'d really recommend this product."
    begin_offset: 41
  }
  sentiment {
    magnitude: 0.8999999761581421
    score: 0.8999999761581421
  }
}

...many lines omitted

document_sentiment {
  magnitude: 1.899999976158142
  score: 0.8999999761581421
}

提取有关句子级和文档级情感的信息的函数在下一个代码片段中显示。

return {
    'sentences': [{
        sentence.text.content: sentence.sentiment.score
    } for sentence in response.sentences],
    'document_sentiment': response.document_sentiment.score,
}
extractSentiments =
(SerializableFunction<AnnotateTextResponse, TextSentiments>)
    annotateTextResponse -> {
      TextSentiments sentiments = new TextSentiments();
      sentiments.setDocumentSentiment(
          annotateTextResponse.getDocumentSentiment().getMagnitude());
      Map<String, Float> sentenceSentimentsMap =
          annotateTextResponse.getSentencesList().stream()
              .collect(
                  Collectors.toMap(
                      (Sentence s) -> s.getText().getContent(),
                      (Sentence s) -> s.getSentiment().getMagnitude()));
      sentiments.setSentenceSentiments(sentenceSentimentsMap);
      return sentiments;
    };

此代码段循环遍历 sentences,并为每个句子提取情感得分。

输出为

{"sentences": [{"My experience so far has been fantastic!": 0.8999999761581421}, {"I'd really recommend this product.": 0.8999999761581421}], "document_sentiment": 0.8999999761581421}

提取实体

下一个函数检查响应中的实体并返回这些实体的名称和类型。

return [{
    'name': entity.name,
    'type': nlp.enums.Entity.Type(entity.type).name,
} for entity in response.entities]
extractEntities =
(SerializableFunction<AnnotateTextResponse, Map<String, String>>)
    annotateTextResponse ->
        annotateTextResponse.getEntitiesList().stream()
            .collect(
                Collectors.toMap(Entity::getName, (Entity e) -> e.getType().toString()));

实体可以在 entities 属性中找到。与之前一样,entities 是一个序列,这就是列表推导成为可行选择的原因。最棘手的部分是解释实体的类型。Natural Language API 将实体类型定义为枚举。在响应对象中,实体类型作为整数返回。这就是为什么用户必须实例化 naturallanguageml.enums.Entity.Type 来访问人类可读名称的原因。

输出为

[{"name": "experience", "type": "OTHER"}, {"name": "product", "type": "CONSUMER_GOOD"}]

访问句子依赖树

以下代码循环遍历句子,并为每个句子构建一个代表依赖树的邻接表。有关依赖树是什么的更多信息,请参阅 形态学和依赖树

from collections import defaultdict
adjacency_lists = []

index = 0
for sentence in response.sentences:
  adjacency_list = defaultdict(list)
  sentence_begin = sentence.text.begin_offset
  sentence_end = sentence_begin + len(sentence.text.content) - 1

  while index < len(response.tokens) and \
      response.tokens[index].text.begin_offset <= sentence_end:
    token = response.tokens[index]
    head_token_index = token.dependency_edge.head_token_index
    head_token_text = response.tokens[head_token_index].text.content
    adjacency_list[head_token_text].append(token.text.content)
    index += 1
  adjacency_lists.append(adjacency_list)
analyzeDependencyTree =
    (SerializableFunction<AnnotateTextResponse, List<Map<String, List<String>>>>)
        response -> {
          List<Map<String, List<String>>> adjacencyLists = new ArrayList<>();
          int index = 0;
          for (Sentence s : response.getSentencesList()) {
            Map<String, List<String>> adjacencyMap = new HashMap<>();
            int sentenceBegin = s.getText().getBeginOffset();
            int sentenceEnd = sentenceBegin + s.getText().getContent().length() - 1;
            while (index < response.getTokensCount()
                && response.getTokens(index).getText().getBeginOffset() <= sentenceEnd) {
              Token token = response.getTokensList().get(index);
              int headTokenIndex = token.getDependencyEdge().getHeadTokenIndex();
              String headTokenContent =
                  response.getTokens(headTokenIndex).getText().getContent();
              List<String> adjacencyList =
                  adjacencyMap.getOrDefault(headTokenContent, new ArrayList<>());
              adjacencyList.add(token.getText().getContent());
              adjacencyMap.put(headTokenContent, adjacencyList);
              index++;
            }
            adjacencyLists.add(adjacencyMap);
          }
          return adjacencyLists;
        };

输出如下所示。为了便于阅读,索引已替换为它们所指的文本

[
  {
    "experience": [
      "My"
    ],
    "been": [
      "experience",
      "far",
      "has",
      "been",
      "fantastic",
      "!"
    ],
    "far": [
      "so"
    ]
  },
  {
    "recommend": [
      "I",
      "'d",
      "really",
      "recommend",
      "product",
      "."
    ],
    "product": [
      "this"
    ]
  }
]

获取预测

本节展示了如何使用 Google Cloud AI Platform 预测 对来自云托管机器学习模型的新数据进行预测。

tfx_bsl 是一个包含名为 RunInference 的 Beam PTransform 的库。RunInference 能够执行推理,该推理可以使用外部服务端点接收数据。使用服务端点时,此转换接收一个类型为 tf.train.Example 的 PCollection,并为每个元素批次向 AI Platform 预测发送请求。批次的大小是自动计算的。有关 Beam 如何找到最佳批次大小的更多详细信息,请参阅 BatchElements 的文档字符串。目前,此转换不支持使用 tf.train.SequenceExample 作为输入,但正在进行中。

此转换生成类型为 PredictionLog 的 PCollection,其中包含预测。

在开始之前,将 TensorFlow 模型部署到 AI Platform 预测。云服务管理处理预测请求所需的必要基础设施,既高效又可扩展。请注意,此转换仅支持 TensorFlow 模型。有关更多信息,请参阅 导出用于预测的 SavedModel

部署机器学习模型后,准备一个要获取预测的实例列表。要发送二进制数据,请确保输入的名称以 _bytes 结尾。这将在发送请求之前对数据进行 base64 编码。

示例

以下是一个管道的示例,该管道从文件读取输入实例,将 JSON 对象转换为 tf.train.Example 对象,并将数据发送到 AI Platform 预测。文件的內容可能如下所示

{"input": "the quick brown"}
{"input": "la bruja le"}

此示例创建 tf.train.BytesList 实例,因此它需要字节类字符串作为输入。但是,此转换还支持其他数据类型,如 tf.train.FloatListtf.train.Int64List

以下是代码

import json

import apache_beam as beam

import tensorflow as tf
from tfx_bsl.beam.run_inference import RunInference
from tfx_bsl.proto import model_spec_pb2

def convert_json_to_tf_example(json_obj):
  samples = json.loads(json_obj)
  for name, text in samples.items():
      value = tf.train.Feature(bytes_list=tf.train.BytesList(
        value=[text.encode('utf-8')]))
      feature = {name: value}
      return tf.train.Example(features=tf.train.Features(feature=feature))

with beam.Pipeline() as p:
     _ = (p
         | beam.io.ReadFromText('gs://my-bucket/samples.json')
         | beam.Map(convert_json_to_tf_example)
         | RunInference(
             model_spec_pb2.InferenceEndpoint(
                 model_endpoint_spec=model_spec_pb2.AIPlatformPredictionModelSpec(
                     project_id='my-project-id',
                     model_name='my-model-name',
                     version_name='my-model-version'))))
// Getting predictions is not yet available for Java. [https://github.com/apache/beam/issues/20001]