DataFrame API 预览版现已推出!

我们很高兴地宣布,Beam Python SDK 的新 DataFrame API 预览版现已在 Beam 2.26.0 中提供。与 SqlTransform (Java, Python) 类似,DataFrame API 为 Beam 用户提供了一种方法,可以用比以前更简洁的方式表达复杂的关联逻辑。

更具表现力的 API

Beam 的新 DataFrame API 旨在与众所周知的 Pandas DataFrame API 兼容,但也有一些下面详细说明的注意事项。有了这个新 API,一个简单的管道,它从 CSV 文件中读取 NYC 计程车数据,执行分组聚合,并将输出写入 CSV 文件,可以非常简洁地表达

from apache_beam.dataframe.io import read_csv

with beam.Pipeline() as p:
  df = p | read_csv("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
                    usecols=['passenger_count' , 'DOLocationID'])
  # Count the number of passengers dropped off per LocationID
  agg = df.groupby('DOLocationID').sum()
  agg.to_csv(output)

将此与使用 CombinePerKey 实现的相同逻辑作为传统的 Beam python 管道进行比较

with beam.Pipeline() as p:
  (p | beam.io.ReadFromText("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
                            skip_header_lines=1)
     | beam.Map(lambda line: line.split(','))
     # Parse CSV, create key - value pairs
     | beam.Map(lambda splits: (int(splits[8] or 0),  # DOLocationID
                                int(splits[3] or 0))) # passenger_count
     # Sum values per key
     | beam.CombinePerKey(sum)
     | beam.MapTuple(lambda loc_id, pc: f'{loc_id},{pc}')
     | beam.io.WriteToText(known_args.output))

DataFrame 示例更容易快速检查和理解,因为它允许您简洁地表达分组聚合,而无需使用低级别的 CombinePerKey

除了更具表现力之外,用 DataFrame API 编写的管道通常比传统的 Beam 管道更高效。这是因为 DataFrame API 尽可能地依赖于非常高效的列式 Pandas 实现。

DataFrame 作为 DSL

您可能已经了解 Beam SQL,它是使用 Beam 的 Java SDK 构建的领域特定语言 (DSL)。SQL 被认为是一种 DSL,因为它可以完全用 SQL 表达整个管道,包括 I/O 和复杂的操作。

类似地,DataFrame API 是使用 Python SDK 构建的 DSL。您可以看到上面的示例是使用传统的 Beam 结构(如 I/O、ParDo 或 CombinePerKey)编写的。事实上,唯一传统的 Beam 类型是 Pipeline 实例!否则,此管道完全使用 DataFrame API 编写。这是可能的,因为 DataFrame API 不仅实现了 Pandas 的计算操作,还包括基于 Pandas 本地实现的 I/O (pd.read_{csv,parquet,...}pd.DataFrame.to_{csv,parquet,...})。

与 SQL 类似,也可以通过使用 模式 将 DataFrame API 嵌入到更大的管道中。可以将模式感知 PCollection 转换为 DataFrame,进行处理,并将结果转换回另一个模式感知 PCollection。例如,如果您想使用传统的 Beam I/O 而不是 DataFrame I/O 中的任何一个,您可以像这样重写上面的管道

from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

with beam.Pipeline() as p:
  ...
  schema_pc = (p | beam.ReadFromText(..)
                 # Use beam.Select to assign a schema
                 | beam.Select(DOLocationID=lambda line: int(...),
                               passenger_count=lambda line: int(...)))
  df = to_dataframe(schema_pc)
  agg = df.groupby('DOLocationID').sum()
  agg_pc = to_pcollection(pc)

  # agg_pc has a schema based on the structure of agg
  (agg_pc | beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
          | beam.WriteToText(..))

也可以通过将函数传递给 DataframeTransform 来使用 DataFrame API

from apache_beam.dataframe.transforms import DataframeTransform

with beam.Pipeline() as p:
  ...
  | beam.Select(DOLocationID=lambda line: int(..),
                passenger_count=lambda line: int(..))
  | DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
  | beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
  ...

注意事项

如上所述,Beam 的 DataFrame API 与 Pandas API 之间存在一些差异。最显著的差异是 Beam DataFrame API 是延迟的,就像 Beam API 的其他部分一样。这意味着您不能print() DataFrame 实例来检查数据,因为我们还没有计算数据!计算直到管道run() 才会进行。在此之前,我们只知道结果的形状/模式(即列的名称和类型),而不是结果本身。

在尝试使用某些 Pandas 操作时,您可能会看到一些常见的异常

  • NotImplementedError: 表示这是一种我们还没有时间查看的操作或参数。我们尝试在这个新 API 的预览版中提供尽可能多的 Pandas 操作,但仍有许多操作需要完成。
  • WontImplementError: 表示这是一种我们目前不打算支持的操作或参数,因为它们与 Beam 模型不兼容。引发此错误的操作的最大类别是那些对顺序敏感的操作(例如 shift、cummax、cummin、head、tail 等)。这些操作不能简单地映射到 Beam,因为 PCollection(代表分布式数据集)是无序的。请注意,即使这些操作中的一些可能会在将来实现 - 我们实际上有一些关于如何支持顺序敏感操作的想法 - 但它还有很长的路要走。

最后,重要的是要注意,这是一个新功能的预览,它将在接下来的几个 Beam 版本中得到完善。我们希望您现在可以尝试一下并给我们一些反馈,但我们目前不建议将其用于生产工作负载。

如何参与

参与此工作的最简单方法是尝试一下 DataFrame 并告诉我们您的想法!您可以向 user@beam.apache.org 发送问题,或者在 jira 中提交错误报告和功能请求。特别是,如果您发现我们还没有实现的但您觉得有用的操作,请告知我们,以便我们优先考虑。

如果您想了解更多关于 DataFrame API 的内部工作原理,并参与开发,建议您查看 设计文档 和我们的 Beam 峰会演示文稿。从那里开始,帮助的最佳方式是完成一些未实现的操作。我们在 BEAM-9547 中协调这项工作。