在 Apache Beam 中测试无界管道

Beam 编程模型统一了编写批处理和流管道。我们最近引入了一个新的 PTransform 来编写针对将在无界数据集上运行的管道的测试,这些管道必须处理乱序和延迟数据。

水印、窗口和触发器构成了 Beam 编程模型的核心部分,它们分别确定数据的分组方式、输入完成的时间以及何时生成结果。这适用于所有管道,无论它们是处理有界还是无界输入。如果您不熟悉 Beam 模型中的水印、窗口和触发,那么 流式处理 101流式处理 102 是一个很好的入门地方。这些文章中的一个关键要点是:在具有间歇性故障和断开连接的用户等现实流式处理场景中,数据可能会乱序到达或延迟。Beam 的基本元件为用户提供了一种方法,即使面临这些挑战,也能执行有用的、强大的和正确的计算。

作为 Beam 管道作者,我们需要全面的测试,覆盖关键的故障场景和极端情况,才能真正确信管道已准备好投入生产。Beam SDK 中现有的测试基础设施允许编写测试,这些测试检查执行时管道的 PCollection 内容。但是,编写针对可能接收延迟数据或多次触发并处理的管道的单元测试,从历史上看一直是复杂或不可能的,因为从无界源读取的管道在没有外部干预的情况下不会关闭,而专门从有界源读取的管道则无法测试延迟数据或大多数推测触发行为。如果没有额外的工具,使用自定义触发器并处理乱序数据的管道就无法轻松测试。

这篇博客文章介绍了我们新的框架,用于编写针对处理移动游戏示例系列中的排行榜管道中的延迟和乱序数据的管道的测试。

排行榜和移动游戏示例

排行榜Beam 移动游戏示例(以及 演练)的一部分,它会持续计算用户和团队得分。用户得分是在程序的整个生命周期内计算的,而团队得分是在固定窗口内计算的,默认持续时间为一小时。排行榜管道根据配置的触发和管道允许的延迟,适当地生成推测性窗格和延迟窗格。排行榜管道的预期输出根据元素相对于水印和处理时间进度的到达时间而有所不同,之前在测试中无法控制这些到达时间。

编写确定性测试以模拟非确定性

Beam 测试基础设施提供了 PAssert 方法,这些方法断言管道中 PCollection 内容的属性。我们已经扩展了这个基础设施,包括 TestStream,它是一个 PTransform,执行一系列事件,包括向管道添加额外元素、推进 TestStream 的水印以及推进管道处理时间时钟。TestStream 允许观察触发器对管道产生的输出的影响的测试。

在执行从 TestStream 读取的管道时,读取操作将等待每个事件的所有后果完成,然后再继续进行下一个事件,确保在处理时间推进时,基于处理时间的触发器会按预期触发。使用这种转换,可以在管道上观察触发和允许延迟的影响,包括对推测性窗格和延迟窗格以及丢弃数据的反应。

元素时间

元素的到达时间要么早于水印、与水印一致,要么晚于水印,将它们归类为“早于”、“准时”和“延迟”三个类别。“延迟”元素可以根据元素分配到的窗口和窗口策略指定的最大允许延迟,进一步细分为“不可观察地延迟”、“可观察地延迟”和“可丢弃地延迟”。具有这些时间安排的元素被发射到窗格中,这些窗格可以是“早期”、“准时”或“延迟”,具体取决于发射窗格时水印的位置。

使用 TestStream,我们可以编写测试来演示在触发条件满足后输出推测性窗格,水印的推进会导致生成准时窗格,以及延迟到达的数据在到达最大允许延迟之前会生成改进,之后会进行丢弃。

以下示例演示了如何使用 TestStream 为管道提供事件序列,其中元素的到达时间与水印更新和处理时间推进交织在一起。每个事件在发生其他事件之前都会运行完成。

在图表中,事件在“真实”(事件)时间中发生的时刻随着图表向右移动而推进。管道接收它们的时刻随着图表向上移动而推进。水印由红色波浪线表示,每个星爆代表触发器触发及其关联的窗格。

Elements on the Event and Processing time axes, with the Watermark and produced panes

所有内容都准时到达

例如,如果我们创建一个 TestStream,其中所有数据都早于水印到达,并将结果 PCollection 作为输入提供给 CalculateTeamScores PTransform

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("sky", "blue", 12, new Instant(0L)),
                 new GameActionInfo("navy", "blue", 3, new Instant(0L)),
                 new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
    // Move the watermark past the end the end of the window
    .advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
                                       .plus(Duration.standardMinutes(1)))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
    .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));

然后我们可以断言结果 PCollection 包含到达的元素

Elements all arrive before the watermark, and are produced in the on-time pane
// Only one value is emitted for the blue team
PAssert.that(teamScores)
       .inWindow(window)
       .containsInAnyOrder(KV.of("blue", 18));
p.run();

一些元素延迟,但早于窗口结束到达

我们也可以在水印之后但早于窗口结束之前(如红色水印左侧所示)将数据添加到 TestStream,这演示了“不可观察地延迟”数据,即延迟到达的数据,但由于它在水印越过窗口结束之前到达,因此被系统提升为准时数据。

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("sky", "blue", 3, new Instant(0L)),
                 new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
    // Move the watermark up to "near" the end of the window
    .advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
                                       .minus(Duration.standardMinutes(1)))
    .addElements(new GameActionInfo("sky", "blue", 12, Duration.ZERO))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
    .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
An element arrives late, but before the watermark passes the end of the window, and is produced in the on-time pane
// Only one value is emitted for the blue team
PAssert.that(teamScores)
       .inWindow(window)
       .containsInAnyOrder(KV.of("blue", 18));
p.run();

元素延迟,并且晚于窗口结束到达

通过在添加延迟数据之前将水印推进到更远的时间,我们可以演示导致系统发射准时窗格的触发行为,然后在延迟数据到达之后,发射一个改进结果的窗格。

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("sky", "blue", 3, new Instant(0L)),
                 new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
    // Move the watermark up to "near" the end of the window
    .advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
                                       .minus(Duration.standardMinutes(1)))
    .addElements(new GameActionInfo("sky", "blue", 12, Duration.ZERO))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
    .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
Elements all arrive before the watermark, and are produced in the on-time pane
// An on-time pane is emitted with the events that arrived before the window closed
PAssert.that(teamScores)
       .inOnTimePane(window)
       .containsInAnyOrder(KV.of("blue", 6));
// The final pane contains the late refinement
PAssert.that(teamScores)
       .inFinalPane(window)
       .containsInAnyOrder(KV.of("blue", 18));
p.run();

元素延迟,并且晚于窗口结束加上允许的延迟到达

如果我们将水印推进到更远的未来,超过配置的最大允许延迟,我们可以演示延迟元素会被系统丢弃。

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("sky", "blue", 3, Duration.ZERO),
                 new GameActionInfo("navy", "blue", 3, Duration.standardMinutes(3)))
    // Move the watermark up to "near" the end of the window
    .advanceWatermarkTo(new Instant(0).plus(TEAM_WINDOW_DURATION)
                                         .plus(ALLOWED_LATENESS)
                                         .plus(Duration.standardMinutes(1)))
    .addElements(new GameActionInfo(
                     "sky",
                     "blue",
                     12,
                     new Instant(0).plus(TEAM_WINDOW_DURATION).minus(Duration.standardMinutes(1))))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
    .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
Elements all arrive before the watermark, and are produced in the on-time pane
// An on-time pane is emitted with the events that arrived before the window closed
PAssert.that(teamScores)
       .inWindow(window)
       .containsInAnyOrder(KV.of("blue", 6));

p.run();

元素早于窗口结束到达,并且处理时间过去了

使用其他方法,我们可以演示推测触发器的行为,方法是推进 TestStream 的处理时间。如果我们将元素添加到输入 PCollection,偶尔推进处理时间时钟,并应用 CalculateUserScores

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("scarlet", "red", 3, new Instant(0L)),
                 new GameActionInfo("scarlet", "red", 2, new Instant(0L).plus(Duration.standardMinutes(1))))
    .advanceProcessingTime(Duration.standardMinutes(12))
    .addElements(new GameActionInfo("oxblood", "red", 2, new Instant(0L)).plus(Duration.standardSeconds(22)),
                 new GameActionInfo("scarlet", "red", 4, new Instant(0L).plus(Duration.standardMinutes(2))))
    .advanceProcessingTime(Duration.standardMinutes(15))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> userScores =
    p.apply(createEvents).apply(new CalculateUserScores(ALLOWED_LATENESS));
Elements all arrive before the watermark, and are produced in the on-time pane
PAssert.that(userScores)
       .inEarlyGlobalWindowPanes()
       .containsInAnyOrder(KV.of("scarlet", 5),
                           KV.of("scarlet", 9),
                           KV.of("oxblood", 2));

p.run();

TestStream - 幕后

TestStream 依赖于我们引入的管道概念,称为静默,以便在提供关于运行器何时会调用根转换的保证的同时,利用现有的运行器基础设施。这包括关于待处理元素和触发器的属性,即

  • 不允许触发器触发但尚未触发
  • 所有元素要么缓存在状态中,要么无法前进,直到侧输入可用

简而言之,这意味着在没有输入水印或处理时间的推进,或没有将额外元素添加到管道的情况下,管道将不会前进。每当 TestStream PTransform 执行操作时,运行器必须在管道静默之前不重新调用同一个实例。这确保了 TestStream 指定的事件“按顺序”发生,从而确保输入水印和系统时钟不会领先于它们希望延迟的元素。

DirectRunner 已被修改为使用静默作为信号,指示它应该向管道添加更多工作,并且该运行器中 TestStream 的实现利用了这一事实来执行每个事件的单一输出。DirectRunner 实现还直接控制运行器的系统时钟,确保即使管道中存在多分钟的处理时间触发器,测试也能迅速完成。

TestStream 转换在 DirectRunner 中受支持。对于大多数用户来说,使用 TestPipeline 和 PAsserts 编写的测试在使用 TestStream 时会自动正常运行。

总结

在 PAssert 中添加 TestStream 以及特定于窗口和窗格的匹配器,使得能够测试生成推测性窗格和延迟窗格的管道。这允许对所有类型的管道进行测试,直接在 Java SDK 中表达。如果您有任何问题或意见,我们很乐意在 邮件列表 上收到您的反馈。