侧输入模式
此页面上的示例向您展示了常见的 Beam 侧输入模式。侧输入是 DoFn
在每次处理输入 PCollection
中的元素时可以访问的额外输入。有关更多信息,请参阅 编程指南中关于侧输入的部分。
如果您尝试通过对远程服务的键值查找来丰富您的数据,您可能首先想考虑 增强转换,它可以抽象化一些侧输入的细节并提供诸如客户端限流之类的额外好处。
- Java SDK
- Python SDK
缓慢更新全局窗口侧输入
您可以检索来自全局窗口的侧输入,以在具有非全局窗口(如 FixedWindow
)的管道作业中使用它们。
在具有非全局窗口的管道中缓慢更新全局窗口侧输入
编写一个
DoFn
,它定期从有界源中拉取数据到全局窗口。a. 使用
GenerateSequence
源转换定期发出值。b. 实例化一个数据驱动触发器,该触发器在每个元素上激活并从有界源中拉取数据。
c. 触发触发器以将数据传递到全局窗口。
创建下游转换的侧输入。侧输入应该适合内存。
全局窗口侧输入根据处理时间触发,因此主管道非确定性地将侧输入与事件时间中的元素匹配。
例如,以下代码示例使用 Map
创建 DoFn
。Map
成为 View.asSingleton
侧输入,该侧输入在每次计数器滴答时重建。侧输入每 5 秒更新一次,以演示工作流程。在现实世界场景中,侧输入通常每隔几个小时或每天更新一次。
public static void sideInputPatterns() {
// This pipeline uses View.asSingleton for a placeholder external service.
// Run in debug mode to see the output.
Pipeline p = Pipeline.create();
// Create a side input that updates every 5 seconds.
// View as an iterable, not singleton, so that if we happen to trigger more
// than once before Latest.globally is computed we can handle both elements.
PCollectionView<Iterable<Map<String, String>>> mapIterable =
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(
@Element Long input,
@Timestamp Instant timestamp,
OutputReceiver<Map<String, String>> o) {
// Replace map with test data from the placeholder external service.
// Add external reads here.
o.output(PlaceholderExternalService.readTestData(timestamp));
}
}))
.apply(
Window.<Map<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Latest.globally())
.apply(View.asIterable());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(
ParDo.of(
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext c, @Timestamp Instant timestamp) {
Iterable<Map<String, String>> si = c.sideInput(mapIterable);
// Take an element from the side input iterable (likely length 1)
Map<String, String> keyMap = si.iterator().next();
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.info(
"Value is {} with timestamp {}, using key A from side input with time {}.",
c.element(),
timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")),
keyMap.get("Key_A"));
}
})
.withSideInputs(mapIterable));
p.run();
}
/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {
public static Map<String, String> readTestData(Instant timestamp) {
Map<String, String> map = new HashMap<>();
map.put("Key_A", timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")));
return map;
}
}
使用窗口化缓慢更新侧输入
您可以定期将侧输入数据读取到不同的 PCollection 窗口中。当您将侧输入应用于主输入时,每个主输入窗口会自动匹配到单个侧输入窗口。这保证了在单个窗口的持续时间内的一致性,这意味着主输入上的每个窗口都将匹配到侧输入数据的单个版本。
定期将侧输入数据读取到不同的 PCollection 窗口中
- 使用 PeriodicImpulse 或 PeriodicSequence PTransform 来
- 以所需的处理时间间隔生成无限的元素序列
- 将它们分配到不同的窗口。
- 使用由 PCollection 元素到达触发的 SDF Read 或 ReadAll PTransform 获取数据。
- 应用侧输入。
PCollectionView<List<Long>> sideInput =
p.apply(
"SIImpulse",
PeriodicImpulse.create()
.startAt(startAt)
.stopAt(stopAt)
.withInterval(interval1)
.applyWindowing())
.apply(
"FileToRead",
ParDo.of(
new DoFn<Instant, String>() {
@DoFn.ProcessElement
public void process(@Element Instant notUsed, OutputReceiver<String> o) {
o.output(fileToRead);
}
}))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(TextIO.readFiles())
.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void process(@Element String src, OutputReceiver<String> o) {
o.output(src);
}
}))
.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
.apply(View.asList());
PCollection<Instant> mainInput =
p.apply(
"MIImpulse",
PeriodicImpulse.create()
.startAt(startAt.minus(Duration.standardSeconds(1)))
.stopAt(stopAt.minus(Duration.standardSeconds(1)))
.withInterval(interval2)
.applyWindowing());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
PCollection<Long> result =
mainInput.apply(
"generateOutput",
ParDo.of(
new DoFn<Instant, Long>() {
@ProcessElement
public void process(ProcessContext c) {
c.output((long) c.sideInput(sideInput).size());
}
})
.withSideInputs(sideInput));
最后更新于 2024/10/31
您找到您要找的所有东西了吗?
它们是否有用且清晰?您想更改任何内容吗?请告诉我们!