CoGroupByKey

Pydoc Pydoc




根据其键汇总所有输入元素,并允许下游处理使用与该键关联的所有值。虽然 GroupByKey 在单个输入集合上执行此操作,因此在单个类型的输入值上执行此操作,但 CoGroupByKey 在多个输入集合上执行此操作。因此,每个键的结果是每个输入集合中与该键关联的值的元组。

Beam 编程指南 中查看更多信息。

示例

在以下示例中,我们创建一个包含两个 PCollection 的管道,一个包含图标,一个包含持续时间,两者都具有相同的产物品种名称键。然后,我们应用 CoGroupByKey 使用其键将两个 PCollection 加入在一起。

CoGroupByKey 需要一个名为键控 PCollection 的字典,并生成按其键加入的元素。每个输出元素的值是字典,其中名称对应于输入字典,包含为该键找到的所有值的列表。

import apache_beam as beam

with beam.Pipeline() as pipeline:
  icon_pairs = pipeline | 'Create icons' >> beam.Create([
      ('Apple', '🍎'),
      ('Apple', '🍏'),
      ('Eggplant', '🍆'),
      ('Tomato', '🍅'),
  ])

  duration_pairs = pipeline | 'Create durations' >> beam.Create([
      ('Apple', 'perennial'),
      ('Carrot', 'biennial'),
      ('Tomato', 'perennial'),
      ('Tomato', 'annual'),
  ])

  plants = (({
      'icons': icon_pairs, 'durations': duration_pairs
  })
            | 'Merge' >> beam.CoGroupByKey()
            | beam.Map(print))

输出

('Apple', {'icons': ['🍎', '🍏'], 'durations': ['perennial']})
('Carrot', {'icons': [], 'durations': ['biennial']})
('Tomato', {'icons': ['🍅'], 'durations': ['perennial', 'annual']})
('Eggplant', {'icons': ['🍆'], 'durations': []})
Pydoc Pydoc