从 Apache Spark 开始

如果您已经了解 Apache Spark,使用 Beam 应该很容易。基本概念相同,API 也类似。

Spark 将数据存储在Spark DataFrames 中(用于结构化数据)以及弹性分布式数据集 (RDD) 中(用于非结构化数据)。在本指南中,我们将使用 RDD。

Spark RDD 代表元素集合,而在 Beam 中它被称为并行集合 (PCollection)。Beam 中的 PCollection 没有排序保证。

同样,Beam 中的转换被称为并行转换 (PTransform)。

以下是一些常见操作及其在 PySpark 和 Beam 中的等效操作示例。

概述

这是一个简单的 PySpark 管道示例,它从一到四获取数字,将它们乘以二,将所有值加在一起,并打印结果。

import pyspark

sc = pyspark.SparkContext()
result = (
    sc.parallelize([1, 2, 3, 4])
    .map(lambda x: x * 2)
    .reduce(lambda x, y: x + y)
)
print(result)

在 Beam 中,您可以使用管道运算符|将数据通过管道传输到管道中,例如data | beam.Map(...),而不是像data.map(...)那样链接方法,但它们执行的是相同的操作。

以下是 Beam 中等效管道的示例。

import apache_beam as beam

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | beam.Create([1, 2, 3, 4])
        | beam.Map(lambda x: x * 2)
        | beam.CombineGlobally(sum)
        | beam.Map(print)
    )

ℹ️ 请注意,我们在Map转换中调用了print。这是因为我们只能从 PTransform 内部访问 PCollection 的元素。要本地检查数据,可以使用交互式运行器

需要注意的另一点是,Beam 管道是惰性构建的。这意味着当您使用管道|传输数据时,您只声明转换及其执行顺序,但实际计算不会发生。管道在with beam.Pipeline() as pipeline上下文关闭后运行。

ℹ️ 当with beam.Pipeline() as pipeline上下文关闭时,它隐式调用pipeline.run(),这会触发计算发生。

然后,管道将发送到您选择的运行器,它会处理数据。

ℹ️ 该管道可以在本地使用DirectRunner运行,或者在分布式运行器中运行,例如 Flink、Spark 或 Dataflow。Spark 运行器与 PySpark 无关。

可以使用右移运算符>>可选地为转换添加标签,例如data | 'My description' >> beam.Map(...)。这既用作注释,又使您的管道更容易调试。

这是添加标签后的管道外观。

import apache_beam as beam

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create numbers' >> beam.Create([1, 2, 3, 4])
        | 'Multiply by two' >> beam.Map(lambda x: x * 2)
        | 'Sum everything' >> beam.CombineGlobally(sum)
        | 'Print results' >> beam.Map(print)
    )

设置

以下是关于如何在 PySpark 和 Beam 中入门的比较。

PySparkBeam
安装$ pip install pyspark$ pip install apache-beam
导入import pysparkimport apache_beam as beam
创建
本地管道
sc = pyspark.SparkContext() as sc
# 您的管道代码在此处。
with beam.Pipeline() as pipeline
    # 您的管道代码在此处。
创建值values = sc.parallelize([1, 2, 3, 4])values = pipeline | beam.Create([1, 2, 3, 4])
创建
键值对
pairs = sc.parallelize([
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
])
pairs = pipeline | beam.Create([
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
])
运行
本地管道
$ spark-submit spark_pipeline.py$ python beam_pipeline.py

转换

以下是 PySpark 和 Beam 中一些常见转换的等效项。

PySparkBeam
映射values.map(lambda x: x * 2)values | beam.Map(lambda x: x * 2)
筛选values.filter(lambda x: x % 2 == 0)values | beam.Filter(lambda x: x % 2 == 0)
扁平映射values.flatMap(lambda x: range(x))values | beam.FlatMap(lambda x: range(x))
按键分组pairs.groupByKey()pairs | beam.GroupByKey()
归约values.reduce(lambda x, y: x+y)values | beam.CombineGlobally(sum)
按键归约pairs.reduceByKey(lambda x, y: x+y)pairs | beam.CombinePerKey(sum)
去重values.distinct()values | beam.Distinct()
计数values.count()values | beam.combiners.Count.Globally()
按键计数pairs.countByKey()pairs | beam.combiners.Count.PerKey()
获取最小值values.takeOrdered(3)values | beam.combiners.Top.Smallest(3)
获取最大值values.takeOrdered(3, lambda x: -x)values | beam.combiners.Top.Largest(3)
随机采样values.takeSample(False, 3)values | beam.combiners.Sample.FixedSizeGlobally(3)
联合values.union(otherValues)(values, otherValues) | beam.Flatten()
共组pairs.cogroup(otherPairs){'Xs': pairs, 'Ys': otherPairs} | beam.CoGroupByKey()

ℹ️ 要了解有关 Beam 中可用转换的更多信息,请查看Python 转换库

使用计算值

由于我们在潜在的分布式环境中工作,因此我们无法保证我们计算的结果在任何给定机器上都可用。

在 PySpark 中,我们可以使用data.collect()从元素集合 (RDD) 中获取结果,或者使用其他聚合,例如reduce()count()等等。

以下是一个将数字缩放到零到一之间的范围的示例。

import pyspark

sc = pyspark.SparkContext()
values = sc.parallelize([1, 2, 3, 4])
min_value = values.reduce(min)
max_value = values.reduce(max)

# We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`.
scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value))

# But to access `scaled_values`, we need to call `collect`.
print(scaled_values.collect())

在 Beam 中,所有转换的结果都会产生 PCollection。我们使用侧输入将 PCollection 馈送到转换并访问其值。

任何接受函数的转换(例如 Map)都可以接受侧输入。如果我们只需要一个值,可以使用beam.pvalue.AsSingleton并将它们访问为 Python 值。如果我们需要多个值,可以使用beam.pvalue.AsIter并将它们访问为可迭代对象

import apache_beam as beam

with beam.Pipeline() as pipeline:
    values = pipeline | beam.Create([1, 2, 3, 4])
    min_value = values | beam.CombineGlobally(min)
    max_value = values | beam.CombineGlobally(max)

    # To access `min_value` and `max_value`, we need to pass them as a side input.
    scaled_values = values | beam.Map(
        lambda x, minimum, maximum: (x - minimum) / (maximum - minimum),
        minimum=beam.pvalue.AsSingleton(min_value),
        maximum=beam.pvalue.AsSingleton(max_value),
    )

    scaled_values | beam.Map(print)

ℹ️ 在 Beam 中,我们需要显式传递侧输入,但我们获得了归约或聚合不需要适合内存的好处。惰性计算侧输入还允许我们仅计算values一次,而不是为每个不同的归约计算(或者需要显式缓存 RDD)。

下一步

如果您遇到任何问题,请不要犹豫联系我们