用于数据处理的 MLTransform
|
使用 MLTransform
对键控数据应用常见的机器学习 (ML) 处理任务。Apache Beam 提供了 ML 数据处理变换,您可以在 MLTransform
中使用它们。有关可用数据处理变换的完整列表,请参阅 GitHub 上的 tft.py 文件。
要使用 MLTransform
定义数据处理变换,请使用 columns
作为输入参数创建数据处理变换的实例。指定 columns
中的数据将被转换并输出到 beam.Row
对象。
以下示例演示了如何使用 MLTransform
通过使用整个数据集的最小值和最大值将数据归一化为 0 到 1 之间的范围。MLTransform
使用 ScaleTo01
变换。
scale_to_z_score_transform = ScaleToZScore(columns=['x', 'y'])
with beam.Pipeline() as p:
(data | MLTransform(write_artifact_location=artifact_location).with_transform(scale_to_z_score_transform))
在此示例中,MLTransform
接收 write_artifact_location
的值。MLTransform
然后使用此位置值来写入变换生成的工件。要传递数据处理变换,您可以使用 MLTransform
的 with_transform
方法或列表。
MLTransform(transforms=transforms, write_artifact_location=write_artifact_location)
传递给 MLTransform
的变换将按顺序应用于数据集。MLTransform
预期一个字典并返回一个包含 NumPy 数组的已转换行对象。
示例
以下示例演示了如何创建使用 MLTransform
预处理数据的管道。
MLTransform
可以对数据集进行全量遍历,这在您需要在分析完整个数据集后才转换单个元素时很有用。前两个示例需要对数据集进行全量遍历才能完成数据转换。
- 对于
ComputeAndApplyVocabulary
变换,变换需要访问数据集中所有唯一的词语。 - 对于
ScaleTo01
变换,变换需要知道数据集中最小值和最大值。
示例 1
此示例创建了一个使用 MLTransform
将数据缩放到 0 到 1 之间的范围的管道。该示例将一个整数列表转换为 0 到 1 的范围,使用 ScaleTo01
变换。
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ScaleTo01
import tempfile
data = [
{
'x': [1, 5, 3]
},
{
'x': [4, 2, 8]
},
]
artifact_location = tempfile.mkdtemp()
scale_to_0_1_fn = ScaleTo01(columns=['x'])
with beam.Pipeline() as p:
transformed_data = (
p
| beam.Create(data)
| MLTransform(write_artifact_location=artifact_location).with_transform(
scale_to_0_1_fn)
| beam.Map(print))
输出
示例 2
此示例创建了一个使用 MLTransform
对整个数据集计算词汇表并为每个唯一词汇表项分配索引的管道。它接受一个字符串列表,对整个数据集计算词汇表,然后将唯一的索引应用于每个词汇表项。
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile
artifact_location = tempfile.mkdtemp()
data = [
{
'x': ['I', 'love', 'Beam']
},
{
'x': ['Beam', 'is', 'awesome']
},
]
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
transformed_data = (
p
| beam.Create(data)
| MLTransform(write_artifact_location=artifact_location).with_transform(
compute_and_apply_vocabulary_fn)
| beam.Map(print))
输出
示例 3
此示例创建了一个使用 MLTransform
对整个数据集计算词汇表并为每个唯一词汇表项分配索引的管道。此管道将单个元素作为输入,而不是元素列表。
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile
data = [
{
'x': 'I'
},
{
'x': 'love'
},
{
'x': 'Beam'
},
{
'x': 'Beam'
},
{
'x': 'is'
},
{
'x': 'awesome'
},
]
artifact_location = tempfile.mkdtemp()
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
transformed_data = (
p
| beam.Create(data)
| MLTransform(write_artifact_location=artifact_location).with_transform(
compute_and_apply_vocabulary_fn)
| beam.Map(print))
输出
最后更新时间:2024/10/31
您是否找到了您要找的所有内容?
它们是否有用且清晰?您想更改什么内容?请告诉我们!