Apache Beam Java SDK 扩展
Join-library
Join-library 提供了内部连接、外部左连接和外部右连接函数。其目标是简化最常见的连接情况,使其成为一个简单的函数调用。
这些函数是通用的,支持任何 Beam 支持的类型的连接。连接函数的输入是 PCollections
的 Key
/ Value
。左右 PCollection
的键类型必须相同。所有连接函数都返回一个 Key
/ Value
,其中 Key
是连接键,值是一个 Key
/ Value
,其中键是左值,右值是右值。
对于外部连接,用户必须提供一个表示 null
的值,因为 null
无法序列化。
示例用法
PCollection<KV<String, String>> leftPcollection = ...
PCollection<KV<String, Long>> rightPcollection = ...
PCollection<KV<String, KV<String, Long>>> joinedPcollection =
Join.innerJoin(leftPcollection, rightPcollection);
排序器
此模块提供了 SortValues
变换,它接收一个 PCollection<KV<K, Iterable<KV<K2, V>>>>
并生成一个 PCollection<KV<K, Iterable<KV<K2, V>>>>
,其中,对于每个主鍵 K
,配对的 Iterable<KV<K2, V>>
已按次要鍵(K2
)的字节编码排序。它是针对可迭代对象的有效且可扩展的排序器,即使它们很大(不适合内存)。
注意事项
- 此变换执行仅值排序;与每个鍵配对的可迭代对象已排序,但不同鍵之间没有关系,因为 Beam 不支持对
PCollection
中不同元素之间定义的关系。
- 每个
Iterable<KV<K2, V>>
在单个工作器上使用本地内存和磁盘进行排序。这意味着当在不同的管道中使用时,SortValues
可能是性能和/或可扩展性的瓶颈。例如,不建议在单个元素的PCollection
上使用SortValues
来全局排序大型PCollection
。如果排序溢出到磁盘,则使用的磁盘空间数量(粗略估计)为numRecords * (numSecondaryKeyBytesPerRecord + numValueBytesPerRecord + 16) * 3
。
选项
- 如果排序需要溢出到磁盘,用户可以通过创建一个
BufferedExternalSorter.Options
的自定义实例传递给SortValues.create
来定制使用的临时位置和使用的最大内存量。
SortValues
的示例用法
PCollection<KV<String, KV<String, Integer>>> input = ...
// Group by primary key, bringing <SecondaryKey, Value> pairs for the same key together.
PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
input.apply(GroupByKey.<String, KV<String, Integer>>create());
// For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key.
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
grouped.apply(
SortValues.<String, String, Integer>create(BufferedExternalSorter.options()));