从 Apache Spark 开始
如果您已经了解 Apache Spark,使用 Beam 应该很容易。基本概念相同,API 也类似。
Spark 将数据存储在Spark DataFrames 中(用于结构化数据)以及弹性分布式数据集 (RDD) 中(用于非结构化数据)。在本指南中,我们将使用 RDD。
Spark RDD 代表元素集合,而在 Beam 中它被称为并行集合 (PCollection)。Beam 中的 PCollection 没有排序保证。
同样,Beam 中的转换被称为并行转换 (PTransform)。
以下是一些常见操作及其在 PySpark 和 Beam 中的等效操作示例。
概述
这是一个简单的 PySpark 管道示例,它从一到四获取数字,将它们乘以二,将所有值加在一起,并打印结果。
在 Beam 中,您可以使用管道运算符|
将数据通过管道传输到管道中,例如data | beam.Map(...)
,而不是像data.map(...)
那样链接方法,但它们执行的是相同的操作。
以下是 Beam 中等效管道的示例。
ℹ️ 请注意,我们在
Map
转换中调用了
需要注意的另一点是,Beam 管道是惰性构建的。这意味着当您使用管道|
传输数据时,您只声明转换及其执行顺序,但实际计算不会发生。管道在with beam.Pipeline() as pipeline
上下文关闭后运行。
ℹ️ 当
with beam.Pipeline() as pipeline
上下文关闭时,它隐式调用pipeline.run()
,这会触发计算发生。
然后,管道将发送到您选择的运行器,它会处理数据。
ℹ️ 该管道可以在本地使用DirectRunner运行,或者在分布式运行器中运行,例如 Flink、Spark 或 Dataflow。Spark 运行器与 PySpark 无关。
可以使用右移运算符>>
可选地为转换添加标签,例如data | 'My description' >> beam.Map(...)
。这既用作注释,又使您的管道更容易调试。
这是添加标签后的管道外观。
设置
以下是关于如何在 PySpark 和 Beam 中入门的比较。
PySpark | Beam | |
---|---|---|
安装 | $ pip install pyspark | $ pip install apache-beam |
导入 | import pyspark | import apache_beam as beam |
创建 本地管道 | sc = pyspark.SparkContext() as sc # 您的管道代码在此处。 | with beam.Pipeline() as pipeline # 您的管道代码在此处。 |
创建值 | values = sc.parallelize([1, 2, 3, 4]) | values = pipeline | beam.Create([1, 2, 3, 4]) |
创建 键值对 | pairs = sc.parallelize([ ('key1', 'value1'), ('key2', 'value2'), ('key3', 'value3'), ]) | pairs = pipeline | beam.Create([ ('key1', 'value1'), ('key2', 'value2'), ('key3', 'value3'), ]) |
运行 本地管道 | $ spark-submit spark_pipeline.py | $ python beam_pipeline.py |
转换
以下是 PySpark 和 Beam 中一些常见转换的等效项。
PySpark | Beam | |
---|---|---|
映射 | values.map(lambda x: x * 2) | values | beam.Map(lambda x: x * 2) |
筛选 | values.filter(lambda x: x % 2 == 0) | values | beam.Filter(lambda x: x % 2 == 0) |
扁平映射 | values.flatMap(lambda x: range(x)) | values | beam.FlatMap(lambda x: range(x)) |
按键分组 | pairs.groupByKey() | pairs | beam.GroupByKey() |
归约 | values.reduce(lambda x, y: x+y) | values | beam.CombineGlobally(sum) |
按键归约 | pairs.reduceByKey(lambda x, y: x+y) | pairs | beam.CombinePerKey(sum) |
去重 | values.distinct() | values | beam.Distinct() |
计数 | values.count() | values | beam.combiners.Count.Globally() |
按键计数 | pairs.countByKey() | pairs | beam.combiners.Count.PerKey() |
获取最小值 | values.takeOrdered(3) | values | beam.combiners.Top.Smallest(3) |
获取最大值 | values.takeOrdered(3, lambda x: -x) | values | beam.combiners.Top.Largest(3) |
随机采样 | values.takeSample(False, 3) | values | beam.combiners.Sample.FixedSizeGlobally(3) |
联合 | values.union(otherValues) | (values, otherValues) | beam.Flatten() |
共组 | pairs.cogroup(otherPairs) | {'Xs': pairs, 'Ys': otherPairs} | beam.CoGroupByKey() |
ℹ️ 要了解有关 Beam 中可用转换的更多信息,请查看Python 转换库。
使用计算值
由于我们在潜在的分布式环境中工作,因此我们无法保证我们计算的结果在任何给定机器上都可用。
在 PySpark 中,我们可以使用data.collect()
从元素集合 (RDD) 中获取结果,或者使用其他聚合,例如reduce()
、count()
等等。
以下是一个将数字缩放到零到一之间的范围的示例。
import pyspark
sc = pyspark.SparkContext()
values = sc.parallelize([1, 2, 3, 4])
min_value = values.reduce(min)
max_value = values.reduce(max)
# We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`.
scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value))
# But to access `scaled_values`, we need to call `collect`.
print(scaled_values.collect())
在 Beam 中,所有转换的结果都会产生 PCollection。我们使用侧输入将 PCollection 馈送到转换并访问其值。
任何接受函数的转换(例如 Map
)都可以接受侧输入。如果我们只需要一个值,可以使用beam.pvalue.AsSingleton
并将它们访问为 Python 值。如果我们需要多个值,可以使用beam.pvalue.AsIter
并将它们访问为可迭代对象
。
import apache_beam as beam
with beam.Pipeline() as pipeline:
values = pipeline | beam.Create([1, 2, 3, 4])
min_value = values | beam.CombineGlobally(min)
max_value = values | beam.CombineGlobally(max)
# To access `min_value` and `max_value`, we need to pass them as a side input.
scaled_values = values | beam.Map(
lambda x, minimum, maximum: (x - minimum) / (maximum - minimum),
minimum=beam.pvalue.AsSingleton(min_value),
maximum=beam.pvalue.AsSingleton(max_value),
)
scaled_values | beam.Map(print)
ℹ️ 在 Beam 中,我们需要显式传递侧输入,但我们获得了归约或聚合不需要适合内存的好处。惰性计算侧输入还允许我们仅计算
values
一次,而不是为每个不同的归约计算(或者需要显式缓存 RDD)。
下一步
- 查看Python 转换库中所有可用的转换。
- 了解如何在编程指南的管道 I/O部分中从文件读取和写入文件。
- 在WordCount 示例演练中逐步了解其他 WordCount 示例。
- 通过我们的学习资源进行自我步调的巡礼。
- 深入了解我们最喜欢的视频和播客。
- 加入 Beamusers@邮件列表。
- 如果您有兴趣为 Apache Beam 代码库做出贡献,请查看贡献指南。
如果您遇到任何问题,请不要犹豫联系我们!
上次更新时间:2024/10/31
您是否找到了所有您要查找的内容?
它是否全部有用且清晰?是否有您想更改的内容?告诉我们!