Beam 数据帧概述

在 Colab 中运行 在 Colab 中运行





Apache Beam Python SDK 提供了一个数据帧 API,用于处理类似 pandas 的 DataFrame 对象。此功能允许您将 PCollection 转换为数据帧,然后使用 pandas 数据帧 API 上可用的标准方法与数据帧进行交互。数据帧 API 建立在 pandas 实现之上,并且 pandas 数据帧方法在数据集子集上并行调用。Beam 数据帧和 pandas 数据帧之间的主要区别在于,操作由 Beam API 延迟,以支持 Beam 并行处理模型。(要了解有关数据帧实现之间差异的更多信息,请参阅 与 pandas 的区别。)

您可以将 Beam 数据帧视为 Beam 管道的一种特定领域语言 (DSL)。类似于 Beam SQL,数据帧是内置于 Beam Python SDK 中的 DSL。使用此 DSL,您可以创建管道,而无需引用标准 Beam 结构,例如 ParDoCombinePerKey

Beam 数据帧 API 旨在为 Beam 管道提供对熟悉编程接口的访问。在某些情况下,数据帧 API 还可以通过推迟到高效的向量化 pandas 实现来提高管道效率。

什么是数据帧?

如果您不熟悉 pandas 数据帧,可以先阅读 10 分钟学习 pandas,其中介绍了如何导入和使用 pandas 包。pandas 是一个用于数据操作和分析的开源 Python 库。它提供数据结构,简化了与关系或标记数据的交互。其中一个数据结构是 DataFrame,它包含二维表格数据,并为数据提供标记行和列。

先决条件

要使用 Beam 数据帧,您需要安装 Beam python 版本 2.26.0 或更高版本(有关完整设置说明,请参阅 Apache Beam Python SDK 快速入门)以及受支持的 pandas 版本。在 Beam 2.34.0 及更高版本中,最简单的方法是使用“dataframe”额外功能

pip install apache_beam[dataframe]

请注意,在分布式运行器上执行数据帧 API 管道时,相同pandas 版本应安装在工作器上。参考 base_image_requirements.txt,了解您正在使用的 Python 版本和 Beam 版本,以查看工作器默认情况下将使用哪个版本的 pandas

使用数据帧

您可以像以下示例所示那样使用数据帧,该示例从 CSV 文件中读取纽约市出租车数据,执行分组聚合,并将输出写回 CSV

from apache_beam.dataframe.io import read_csv

with pipeline as p:
  rides = p | read_csv(input_path)

  # Count the number of passengers dropped off per LocationID
  agg = rides.groupby('DOLocationID').passenger_count.sum()
  agg.to_csv(output_path)

pandas 能够从 CSV 数据的第一行推断出列名,这就是 passenger_countDOLocationID 的来源。

在此示例中,唯一的传统 Beam 类型是 Pipeline 实例。否则,该示例完全使用数据帧 API 编写。这是可能的,因为 Beam 数据帧 API 包含自己的 I/O 操作(例如,read_csvto_csv),这些操作基于 pandas 本地实现。read_*to_* 操作支持文件模式和任何 Beam 兼容文件系统。分组通过按键分组完成,可以在最终写入之前应用任意 pandas 操作(在本例中为 sum),该写入通过 to_csv 完成。

Beam 数据帧 API 旨在与本地 pandas 实现兼容,但有一些注意事项,详见以下 与 pandas 的区别

在管道中嵌入数据帧

要在更大的管道中使用数据帧 API,您可以将 PCollection 转换为数据帧,处理数据帧,然后将数据帧转换回 PCollection。为了将 PCollection 转换为数据帧并转换回来,您必须使用附加了 模式 的 PCollection。附加了模式的 PCollection 也称为模式感知 PCollection。要了解有关将模式附加到 PCollection 的更多信息,请参阅 创建模式

以下示例创建一个模式感知 PCollection,使用 to_dataframe 将其转换为数据帧,处理数据帧,然后使用 to_pcollection 将数据帧转换回 PCollection

from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
...


    # Read the text file[pattern] into a PCollection.
    lines = p | 'Read' >> ReadFromText(known_args.input)

    words = (
        lines
        | 'Split' >> beam.FlatMap(
            lambda line: re.findall(r'[\w]+', line)).with_output_types(str)
        # Map to Row objects to generate a schema suitable for conversion
        # to a dataframe.
        | 'ToRows' >> beam.Map(lambda word: beam.Row(word=word)))

    df = to_dataframe(words)
    df['count'] = 1
    counted = df.groupby('word').sum()
    counted.to_csv(known_args.output)

    # Deferred DataFrames can also be converted back to schema'd PCollections
    counted_pc = to_pcollection(counted, include_indexes=True)


您可以在 GitHub 上找到完整的词频统计示例,以及其他 示例数据帧管道

还可以通过将函数传递给 DataframeTransform 来使用数据帧 API

from apache_beam.dataframe.transforms import DataframeTransform

with beam.Pipeline() as p:
  ...
  | beam.Select(DOLocationID=lambda line: int(..),
                passenger_count=lambda line: int(..))
  | DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
  | beam.Map(lambda row: f"{row.DOLocationID},{row.passenger_count}")
  ...

DataframeTransform 类似于 SqlTransform,来自 Beam SQL DSL。其中 SqlTransform 将 SQL 查询转换为 PTransform,DataframeTransform 是一个 PTransform,它应用一个接受数据帧并返回数据帧的函数。DataframeTransform 在您拥有一个可在 Beam 和普通 pandas 数据帧上调用的独立函数时尤其有用。

DataframeTransform 可以按名称和关键字接受和返回多个 PCollection,如以下示例所示

output = (pc1, pc2) | DataframeTransform(lambda df1, df2: ...)

output = {'a': pc, ...} | DataframeTransform(lambda a, ...: ...)

pc1, pc2 = {'a': pc} | DataframeTransform(lambda a: expr1, expr2)

{...} = {a: pc} | DataframeTransform(lambda a: {...})