Apache Beam 中的循环计时器

Apache Beam 的基元允许您构建表达力强的 数据管道,适合各种用例。一个特定的用例是时间序列数据的分析,其中跨窗口边界的连续序列很重要。当您处理这种类型的数据时,会遇到一些有趣的挑战,在本博客中,我们将更详细地探讨其中一个挑战,并使用“循环计时器”模式利用计时器 API(博客文章)。

使用 Beam 的流式模式,您可以获取数据流并构建分析转换以生成数据的 结果。但对于时间序列数据而言,数据缺失是有用信息。那么,如何在没有数据的情况下生成结果呢?

让我们用一个更具体的例子来阐明这个需求。假设您有一个简单的管道,它每分钟对来自物联网设备的事件数量求和。我们希望在特定时间间隔内未看到任何数据时生成值 0。那么,为什么这会很棘手呢?嗯,构建一个简单的管道来计算到达的事件很容易,但是当没有事件时,就没有任何东西可以计算!

让我们构建一个简单的管道来使用

  // We will start our timer at 1 sec from the fixed upper boundary of our
  // minute window
  Instant now = Instant.parse("2000-01-01T00:00:59Z");

  // ----- Create some dummy data

  // Create 3 elements, incrementing by 1 minute and leaving a time gap between
  // element 2 and element 3
  TimestampedValue<KV<String, Integer>> time_1 =
    TimestampedValue.of(KV.of("Key_A", 1), now);

  TimestampedValue<KV<String, Integer>> time_2 =
    TimestampedValue.of(KV.of("Key_A", 2),
    now.plus(Duration.standardMinutes(1)));

  // No Value for start time + 2 mins
  TimestampedValue<KV<String, Integer>> time_3 =
    TimestampedValue.of(KV.of("Key_A", 3),
    now.plus(Duration.standardMinutes(3)));

  // Create pipeline
  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
    .as(PipelineOptions.class);

  Pipeline p = Pipeline.create(options);

  // Apply a fixed window of duration 1 min and Sum the results
  p.apply(Create.timestamped(time_1, time_2, time_3))
   .apply(
      Window.<KV<String,Integer>>into(
FixedWindows.<Integer>of(Duration.standardMinutes(1))))
        .apply(Sum.integersPerKey())
        .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {

          @ProcessElement public void process(ProcessContext c) {
            LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp());
          }
       }));

  p.run();

运行该管道将产生以下输出

INFO  LoopingTimer  - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z

注意:输出的无序性是预期的,但是键-窗口元组已正确计算。

正如预期的那样,我们看到每个包含时间戳介于窗口的最小值和最大值之间的 数据点的间隔窗口中的输出。在时间戳 00:00:59、00:01:59 和 00:03:59 处有一个数据点,它落入以下间隔窗口中。

  • [00:00:00, 00:00:59.999)
  • [00:01:00, 00:01:59.999)
  • [00:03:00, 00:03:59.999)

但是由于在 00:02:00 和 00:02:59 之间没有数据,因此不会为间隔窗口 [00:02:00,00:02:59.999) 生成任何值。

如何让 Beam 为该丢失的窗口输出值?首先,让我们浏览一些不使用计时器 API 的选项。

选项 1:外部心跳

我们可以使用外部系统为每个时间间隔发出一个值,并将其注入 Beam 使用的数据流中。这个简单的选项将任何复杂性都移出 Beam 管道。但是使用外部系统意味着我们需要监视此系统,并与 Beam 管道一起执行其他维护任务。

选项 2:在 Beam 管道中使用生成的源

我们可以使用生成源来使用此代码片段发出值

pipeline.apply(GenerateSequence.
            from(0).withRate(1,Duration.standardSeconds(1L)))

然后我们可以

  1. 使用 DoFn 将值转换为零。
  2. 将此值与真实源合并。
  3. 生成一个 PCollection,它在每个时间间隔内都有滴答声。

这也是在每个时间间隔内生成一个值的一种简单方法。

选项 1 & 2 多个键的问题

选项 1 和 2 都适用于管道处理单个键的情况。现在让我们处理这种情况,而不是 1 个物联网设备,而是有 1000 个或 100,000 个这样的设备,每个设备都有一个唯一的键。为了使选项 1 或选项 2 在这种情况下起作用,我们需要执行额外的步骤:创建 FanOut DoFn。每个滴答声都需要分发到所有潜在的键,因此我们需要创建一个 FanOut DoFn,它接受虚拟值并为每个可用键生成键-值对。

例如,假设我们有 3 个键对应 3 个物联网设备,{key1,key2,key3}。使用我们在选项 2 中概述的方法,当我们从 GenerateSequence 获取第一个元素时,我们需要在 DoFn 中创建一个循环来生成 3 个键-值对。这些对成为每个物联网设备的心跳值。

当我们需要处理大量物联网设备时,事情变得更加有趣,这些设备的键列表会动态更改。我们需要添加一个转换来执行 Distinct 操作,并将生成的数据作为侧输入馈送到 FanOut DoFn 中。

选项 3:使用 Beam 计时器实现心跳

那么计时器如何提供帮助呢?好吧,让我们来看看一个新的转换

编辑:循环计时器状态已从 Boolean 更改为 Long 以允许最小值检查。

public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> {

    Instant stopTimerTime;

    LoopingStatefulTimer(Instant stopTime){
      this.stopTimerTime = stopTime;
    }

    @StateId("loopingTimerTime")
    private final StateSpec<ValueState<Long>> loopingTimerTime =
        StateSpecs.value(BigEndianLongCoder.of());

    @StateId("key")
    private final StateSpec<ValueState<String>> key =
        StateSpecs.value(StringUtf8Coder.of());

    @TimerId("loopingTimer")
    private final TimerSpec loopingTimer =
        TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement public void process(ProcessContext c, @StateId("key") ValueState<String> key,
        @StateId("loopingTimerTime") ValueState<Long> loopingTimerTime,
        @TimerId("loopingTimer") Timer loopingTimer) {

      // If the timer has been set already, or if the value is smaller than
      // the current element + window duration, do not set
      Long currentTimerValue = loopingTimerTime.read();
      Instant nextTimerTimeBasedOnCurrentElement = c.timestamp().plus(Duration.standardMinutes(1));

      if (currentTimerValue == null || currentTimerValue >
          nextTimerTimeBasedOnCurrentElement.getMillis()) {
        loopingTimer.set(nextTimerTimeBasedOnCurrentElement);
        loopingTimerTime.write(nextTimerTimeBasedOnCurrentElement.getMillis());
      }

      // We need this value so that we can output a value for the correct key in OnTimer
      if (key.read() == null) {
        key.write(c.element().getKey());
      }

      c.output(c.element());
    }

    @OnTimer("loopingTimer")
    public void onTimer(
        OnTimerContext c,
        @StateId("key") ValueState<String> key,
        @TimerId("loopingTimer") Timer loopingTimer) {

      LOG.info("Timer @ {} fired", c.timestamp());
      c.output(KV.of(key.read(), 0));

      // If we do not put in a “time to live” value, then the timer would loop forever
      Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
      if (nextTimer.isBefore(stopTimerTime)) {
        loopingTimer.set(nextTimer);
      } else {
        LOG.info(
            "Timer not being set as exceeded Stop Timer value {} ",
            stopTimerTime);
      }
    }
  }

状态 API 需要保留两个数据值

  1. 一个布尔值 timeRunning 用于避免计时器已经在运行时重置它。
  2. 一个“key”状态对象值,允许我们存储我们正在使用的键。此信息将在稍后的 OnTimer 事件中需要。

我们还有一个 ID 为 **loopingTimer** 的计时器,它充当我们的每个间隔闹钟。请注意,计时器是一个 事件计时器。它根据水位线触发,而不是根据管道运行时的 时间流逝触发。

接下来,让我们拆解 @ProcessElement 块中发生的事情

进入该块的第一个元素将

  1. timerRunner 的状态设置为 True。
  2. 将键-值对中的键的值写入键 StateValue 中。
  3. 代码将计时器的值设置为在元素时间戳后一分钟触发。请注意,此时间戳允许的最大值为 XX:XX:59.999。这将最大报警值置于下一个时间间隔的上边界。
  4. 最后,我们使用 c.output 输出来自 @ProcessElement 块的数据。

在 @OnTimer 块中,将发生以下情况

  1. 代码将使用从我们的键 StateValue 中提取的键发出一个值为 0 的值。事件的时间戳对应于计时器触发的事件时间。
  2. 我们从现在起一分钟设置一个新的计时器,除非我们已经过了 stopTimerTime 值。您的用例通常会有更复杂的停止条件,但我们在 这里使用一个简单的条件来使我们能够保持说明性代码的简单性。停止条件的主题将在后面更详细地讨论。

就是这样,让我们将我们的转换添加回管道中

  // Apply a fixed window of duration 1 min and Sum the results
  p.apply(Create.timestamped(time_1, time_2, time_3)).apply(
    Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(1))))
    // We use a combiner to reduce the number of calls in keyed state
    // from all elements to 1 per FixedWindow
    .apply(Sum.integersPerKey())
    .apply(Window.into(new GlobalWindows()))
    .apply(ParDo.of(new LoopingStatefulTimer(Instant.parse("2000-01-01T00:04:00Z"))))
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {

      @ProcessElement public void process(ProcessContext c) {

        LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp());

     }
  }));
  1. 在管道的第一部分,我们创建 FixedWindows 并将每个键的值减少到一个 Sum。
  2. 接下来,我们将输出重新窗口化为 GlobalWindow。由于状态和计时器是按窗口进行的,因此必须在窗口边界内设置它们。我们希望循环计时器跨越所有固定窗口,因此我们在全局窗口中设置它。
  3. 然后,我们添加 LoopingStatefulTimer DoFn。
  4. 最后,我们重新应用 FixedWindows 并对我们的值求和。

此管道确保每个间隔窗口都存在一个值为零的值,即使管道源在间隔窗口的最小值和最大值边界内发出一个值也是如此。这意味着我们可以标记数据缺失。

您可能想知道为什么我们使用两个 reducer 以及多个 Sum.integersPerKey。为什么不只使用一个呢?从功能上来说,使用一个也可以产生正确的结果。但是,使用两个 Sum.integersPerKey 给了我们一个不错的性能优势。它将元素的数量从许多减少到每个时间间隔只有一个。这可以减少 @ProcessElement 调用期间对 State API 的读取次数。

以下是运行我们修改后的管道的日志输出

INFO  LoopingTimer  - Timer @ 2000-01-01T00:01:59.999Z fired
INFO  LoopingTimer  - Timer @ 2000-01-01T00:02:59.999Z fired
INFO  LoopingTimer  - Timer @ 2000-01-01T00:03:59.999Z fired
INFO  LoopingTimer  - Timer not being set as exceeded Stop Timer value 2000-01-01T00:04:00.000Z
INFO  LoopingTimer  - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 0} timestamp is 2000-01-01T00:02:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z

耶!现在我们有了时间间隔 [00:01:00, 00:01:59.999) 的输出,即使源数据集在该间隔中没有元素也是如此。

在本博客中,我们介绍了时间序列用例周围的一些有趣领域,并介绍了几种选择,包括计时器 API 的高级用例。祝大家循环愉快!

注意:循环计时器是计时器 API 的一个有趣的新用例,运行器需要使用所有更高级的功能集来添加对其的支持。您可以使用 DirectRunner 尝试使用此模式。对于其他运行器,请查看其关于在生产环境中处理此用例的支持的发布说明。

(功能矩阵)

运行器特定说明:Google Cloud Dataflow 运行器的 Drain 功能不支持循环计时器(链接到矩阵)