与 pandas 的区别
Apache Beam DataFrame API 旨在成为 pandas 的直接替代品,但需要注意一些差异。此页面描述了 Beam 和 pandas API 之间的差异,并提供了使用 Beam DataFrame API 的提示。有关 Beam DataFrame API 支持哪些操作和参数的完整参考,请参阅 apache_beam.dataframe.frames
API 参考。
使用 pandas 源
Beam 操作始终与管道相关联。要将源数据读取到 Beam DataFrame 中,您必须将源应用于管道对象。例如,要从 CSV 文件中读取输入,您可以使用 read_csv,如下所示
df = p | beam.dataframe.io.read_csv(...)
这类似于 pandas 的 read_csv,但 df
是一个延迟的 Beam DataFrame,它代表文件的内容。输入文件名可以是 fileio.MatchFiles 识别的任何文件模式。
有关使用 DataFrame API 中的源和接收器的示例,请参阅 taxiride.py。
不支持的操作类别
以下部分描述了 Beam DataFrame API 尚未支持或支持有条件的操作类别。在适用情况下将建议解决方法。
不可并行化操作
示例:DeferredDataFrame.quantile
,DeferredDataFrame.mode
为了支持分布式处理,Beam 在数据子集上并行调用 DataFrame 操作。有些 DataFrame 操作无法并行化,这些操作默认情况下会引发 NonParallelOperation 错误。
解决方法
如果您想使用非并行化操作,可以使用 beam.dataframe.allow_non_parallel_operations
块进行保护。例如
from apache_beam import dataframe
with dataframe.allow_non_parallel_operations():
quantiles = df.quantile()
请注意,这会将整个输入数据集收集到单个节点上,因此存在内存不足的风险。您应该只在确信输入足够小可以在单个工作程序上处理的情况下使用此解决方法。
生成非延迟列的操作
示例:DeferredDataFrame.pivot
,DeferredDataFrame.transpose
,DeferredSeries.factorize
Beam DataFrame 操作是延迟的,但生成的 DataFrames 的模式不是,这意味着结果列必须在没有访问数据的情况下计算。某些 DataFrame 操作不支持此用法,因此无法实现。这些操作会引发 WontImplementError。
目前还没有此问题的解决方法。但在将来,Beam Dataframe 可能会支持对分类列进行非延迟列操作。此工作正在 Issue 20958 中跟踪。
生成非延迟值或图的操作
示例:DeferredSeries.to_list
,DeferredSeries.array
,DeferredDataFrame.plot
实现生成非延迟值或图的 DataFrame 操作是不可行的,因为 Beam 是一个延迟的 API。如果调用了这些操作,它们将引发 WontImplementError。
这些操作可能在将来通过与 Interactive Beam 的更紧密集成而得到支持。要跟踪此问题的进展,请关注 Issue 21638。如果您认为我们应该优先考虑此工作,您也可以 联系我们 以告知我们。
解决方法
如果您使用的是 Interactive Beam,则可以使用 collect
将数据集带入本地内存,然后执行这些操作。
对顺序敏感的操作
示例:DeferredDataFrame.head
,DeferredSeries.diff
,DeferredDataFrame.interpolate
Beam PCollection 本质上是无序的,因此不支持对行排序敏感的 pandas 操作。这些操作会引发 WontImplementError。
将来可能会支持对顺序敏感的操作。要跟踪此问题的进展,请关注 Issue 20862。如果您认为我们应该优先考虑此工作,您也可以 联系我们 以告知我们。
解决方法
如果您使用的是 Interactive Beam,则可以使用 collect
将数据集带入本地内存,然后执行这些操作。
或者,可能有一些方法可以重写您的代码,使其不依赖于顺序。例如,pandas 用户经常调用对顺序敏感的 head
操作来查看数据,但如果您只是想查看一部分元素,也可以使用 sample
,它不需要您先收集数据。类似地,您可以使用 nlargest
而不是 sort_values(...).
。
生成延迟标量值的操作
有些 DataFrame 操作会生成延迟的标量值。在 Beam 中,值的实际计算是延迟的,因此值不可用于控制流。例如,您可以使用 Series.sum
计算总和,但您无法立即根据结果进行分支,因为结果数据无法立即获得。Series.is_unique
是一个类似的例子。使用延迟的标量值进行分支逻辑或真值测试会引发 TypeError。
尚未实现的操作
Beam DataFrame API 实现了许多常用的 pandas DataFrame 操作,我们正在积极努力支持剩余的操作。但 pandas 有一个庞大的 API,仍然存在一些差距 (Issue 20318)。如果您调用了尚未实现的操作,它将引发 NotImplementedError
。如果您遇到缺少的操作,并且认为应该优先考虑,请 告知我们。
使用 Interactive Beam 访问完整的 pandas API
Interactive Beam 是一个专为在交互式笔记本中使用而设计的模块。该模块按惯例导入为 ib
,它提供一个 ib.collect
函数,该函数将 PCollection
或延迟的 DataFrame 带入本地内存,并将其作为 pandas DataFrame。在使用 ib.collect
物化延迟的 DataFrame 后,您将能够执行 pandas API 中的任何操作,而不仅仅是 Beam 中支持的操作。
![]() |
要开始在笔记本中使用 Beam,请参阅 尝试 Apache Beam。