用于数据处理的 MLTransform

Pydoc Pydoc




使用 MLTransform 对键控数据应用常见的机器学习 (ML) 处理任务。Apache Beam 提供了 ML 数据处理变换,您可以在 MLTransform 中使用它们。有关可用数据处理变换的完整列表,请参阅 GitHub 上的 tft.py 文件

要使用 MLTransform 定义数据处理变换,请使用 columns 作为输入参数创建数据处理变换的实例。指定 columns 中的数据将被转换并输出到 beam.Row 对象。

以下示例演示了如何使用 MLTransform 通过使用整个数据集的最小值和最大值将数据归一化为 0 到 1 之间的范围。MLTransform 使用 ScaleTo01 变换。

scale_to_z_score_transform = ScaleToZScore(columns=['x', 'y'])
with beam.Pipeline() as p:
  (data | MLTransform(write_artifact_location=artifact_location).with_transform(scale_to_z_score_transform))

在此示例中,MLTransform 接收 write_artifact_location 的值。MLTransform 然后使用此位置值来写入变换生成的工件。要传递数据处理变换,您可以使用 MLTransformwith_transform 方法或列表。

MLTransform(transforms=transforms, write_artifact_location=write_artifact_location)

传递给 MLTransform 的变换将按顺序应用于数据集。MLTransform 预期一个字典并返回一个包含 NumPy 数组的已转换行对象。

示例

以下示例演示了如何创建使用 MLTransform 预处理数据的管道。

MLTransform 可以对数据集进行全量遍历,这在您需要在分析完整个数据集后才转换单个元素时很有用。前两个示例需要对数据集进行全量遍历才能完成数据转换。

示例 1

此示例创建了一个使用 MLTransform 将数据缩放到 0 到 1 之间的范围的管道。该示例将一个整数列表转换为 0 到 1 的范围,使用 ScaleTo01 变换。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ScaleTo01
import tempfile

data = [
    {
        'x': [1, 5, 3]
    },
    {
        'x': [4, 2, 8]
    },
]

artifact_location = tempfile.mkdtemp()
scale_to_0_1_fn = ScaleTo01(columns=['x'])

with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          scale_to_0_1_fn)
      | beam.Map(print))

输出

Row(x=array([0.       , 0.5714286, 0.2857143], dtype=float32))
Row(x=array([0.42857143, 0.14285715, 1.        ], dtype=float32))

示例 2

此示例创建了一个使用 MLTransform 对整个数据集计算词汇表并为每个唯一词汇表项分配索引的管道。它接受一个字符串列表,对整个数据集计算词汇表,然后将唯一的索引应用于每个词汇表项。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile

artifact_location = tempfile.mkdtemp()
data = [
    {
        'x': ['I', 'love', 'Beam']
    },
    {
        'x': ['Beam', 'is', 'awesome']
    },
]
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          compute_and_apply_vocabulary_fn)
      | beam.Map(print))

输出

Row(x=array([4, 1, 0]))
Row(x=array([0, 2, 3]))

示例 3

此示例创建了一个使用 MLTransform 对整个数据集计算词汇表并为每个唯一词汇表项分配索引的管道。此管道将单个元素作为输入,而不是元素列表。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile
data = [
    {
        'x': 'I'
    },
    {
        'x': 'love'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'is'
    },
    {
        'x': 'awesome'
    },
]
artifact_location = tempfile.mkdtemp()
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          compute_and_apply_vocabulary_fn)
      | beam.Map(print))

输出

Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))