数据探索

几种类型的 Apache Beam 数据处理适用于 AI/ML 项目

数据处理可以分为两个主要主题。此示例首先考察数据探索,然后考察使用数据预处理和验证的 ML 中的数据管道。数据后处理不包括在内,因为它与预处理类似。后处理仅在顺序和管道类型方面有所不同。

初始数据探索

Pandas 是一种用于执行数据探索的流行工具。Pandas 是一种用于 Python 的数据分析和操作工具。它使用 DataFrames,这是一种数据结构,包含二维表格数据,并为数据提供标记的行和列。Apache Beam Python SDK 提供了一个用于处理类似 Pandas 的 DataFrame 对象的 DataFrame API

Beam DataFrame API 旨在为 Apache Beam 管道提供对熟悉编程接口的访问。此 API 允许你执行数据探索。你可以将代码重复用于你的数据预处理管道。使用 DataFrame API,你可以通过调用标准 Pandas 命令来构建复杂的数据处理管道。

你可以在 Beam 交互式运行器 中的 JupyterLab 笔记本 中结合使用 DataFrame API。使用笔记本迭代开发管道并显示各个管道步骤的结果。

以下是 Apache Beam 中笔记本中数据探索的一个示例

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

p = beam.Pipeline(InteractiveRunner())
beam_df = p | beam.dataframe.io.read_csv(input_path)

# Investigate columns and data types
beam_df.dtypes

# Generate descriptive statistics
ib.collect(beam_df.describe())

# Investigate missing values
ib.collect(beam_df.isnull())

有关使用 Apache Beam 和 DataFrame API 为你的 AI/ML 项目实现数据探索和数据预处理的完整端到端示例,请参阅 Beam Dataframe API AI/ML 教程

用于 ML 的数据管道

典型的预处理数据管道包含以下步骤

  1. 读取和写入数据:从文件系统、数据库或消息队列中读取和写入数据。Apache Beam 有一套丰富的 I/O 连接器 用于提取和写入数据。
  2. 数据清洗:在将数据用于 ML 模型之前,对其进行过滤和清洗。你可能会删除重复或不相关的數據,更正数据集中的错误,过滤掉不需要的异常值,或处理缺失的数据。
  3. 数据转换:你的数据需要符合你的模型进行训练所需的预期输入。你可能需要对数据进行归一化、独热编码、缩放或矢量化。
  4. 数据丰富:你可能希望用外部数据源来丰富你的数据,以使数据更有意义或更容易让 ML 模型解释。例如,你可能希望将城市名称或地址转换为一组坐标。
  5. 数据验证和指标:确保你的数据符合可以在你的管道中验证的特定一组要求。从你的数据中报告指标,例如类别分布。

你可以使用 Apache Beam 管道来实现所有这些步骤。此示例展示了一个展示所有先前提到的步骤的管道

import apache_beam as beam
from apache_beam.metrics import Metrics

with beam.Pipeline() as pipeline:
  # Create data
  input_data = (
      pipeline
      | beam.Create([
         {'age': 25, 'height': 176, 'weight': 60, 'city': 'London'},
         {'age': 61, 'height': 192, 'weight': 95, 'city': 'Brussels'},
         {'age': 48, 'height': 163, 'weight': None, 'city': 'Berlin'}]))

  # Clean data
  def filter_missing_data(row):
    return row['weight'] is not None

  cleaned_data = input_data | beam.Filter(filter_missing_data)

  # Transform data
  def scale_min_max_data(row):
    row['age'] = (row['age']/100)
    row['height'] = (row['height']-150)/50
    row['weight'] = (row['weight']-50)/50
    yield row

  transformed_data = cleaned_data | beam.FlatMap(scale_min_max_data)

  # Enrich data
  side_input = pipeline | beam.io.ReadFromText('coordinates.csv')
  def coordinates_lookup(row, coordinates):
    row['coordinates'] = coordinates.get(row['city'], (0, 0))
    del row['city']
    yield row

  enriched_data = (
      transformed_data
      | beam.FlatMap(coordinates_lookup, coordinates=beam.pvalue.AsDict(side_input)))

  # Metrics
  counter = Metrics.counter('main', 'counter')

  def count_data(row):
    counter.inc()
    yield row

  output_data = enriched_data | beam.FlatMap(count_data)

  # Write data
  output_data | beam.io.WriteToText('output.csv')