自定义窗口模式

此页面上的示例演示了常见的自定义窗口模式。您可以使用 WindowFn 函数 创建自定义窗口。有关更多信息,请参阅 窗口化编程指南部分

注意:Python(使用 fnapi)不支持自定义合并窗口。

使用数据动态设置会话窗口间隔

您可以修改 assignWindows 函数以使用数据驱动间隔,然后将传入数据窗口化为会话。

通过 WindowFn.AssignContext.element() 访问 assignWindows 函数。原始的、固定持续时间的 assignWindows 函数是

  public Collection<IntervalWindow> assignWindows(WindowFn.AssignContext c) {

    // Assign each element into a window from its timestamp until gapDuration in the
    // future.  Overlapping windows (representing elements within gapDuration of
    // each other) will be merged.
    return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
  }

创建数据驱动间隔

要创建数据驱动间隔,请将以下代码段添加到 assignWindows 函数

例如,以下函数将每个元素分配到时间戳和 gapDuration 之间的窗口

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
  // Assign each element into a window from its timestamp until gapDuration in the
  // future.  Overlapping windows (representing elements within gapDuration of
  // each other) will be merged.
  Duration dataDrivenGap;
  TableRow message = c.element();

  try {
    dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.get("gap").toString()));
  } catch (Exception e) {
    dataDrivenGap = gapDuration;
  }
  return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap));
}

然后,在窗口化函数中设置 gapDuration 字段

public static class DynamicSessions extends WindowFn<TableRow, IntervalWindow> {
  /** Duration of the gaps between sessions. */
  private final Duration gapDuration;

  /** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
  private DynamicSessions(Duration gapDuration) {
    this.gapDuration = gapDuration;
  }

将消息窗口化为会话

在创建数据驱动间隔后,您可以将传入数据窗口化为新的、自定义会话。

首先,将会话长度设置为间隔持续时间

/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
public static DynamicSessions withDefaultGapDuration(Duration gapDuration) {
  return new DynamicSessions(gapDuration);
}

最后,在您的管道中将数据窗口化为会话

p.apply(
    "Window into sessions",
    Window.<TableRow>into(
        DynamicSessions.withDefaultGapDuration(Duration.standardSeconds(10))));

示例数据和窗口

以下测试数据统计了两个用户的得分,使用和不使用 gap 属性

.apply("Create data", Create.timestamped(
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"12\",\"gap\":\"5\"}", new Instant()),
            TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"4\"}", new Instant()),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"-3\",\"gap\":\"5\"}", new Instant().plus(2000)),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"2\",\"gap\":\"5\"}", new Instant().plus(9000)),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"7\",\"gap\":\"5\"}", new Instant().plus(12000)),
            TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"10\"}", new Instant().plus(12000)))
        .withCoder(StringUtf8Coder.of()))

下图可视化了测试数据

Two sets of data and the standard and dynamic sessions with which the data is windowed.

标准会话

标准会话使用以下窗口和得分

user=user-2, score=4, window=[2019-05-26T13:28:49.122Z..2019-05-26T13:28:59.122Z)
user=user-1, score=18, window=[2019-05-26T13:28:48.582Z..2019-05-26T13:29:12.774Z)
user=user-2, score=10, window=[2019-05-26T13:29:03.367Z..2019-05-26T13:29:13.367Z)

用户 #1 看到两个事件,间隔 12 秒。在标准会话中,间隔默认为 10 秒;两个得分都在不同的会话中,因此得分不会相加。

用户 #2 看到四个事件,分别间隔 2 秒、7 秒和 3 秒。由于所有间隔都不大于默认值,因此这四个事件在同一个标准会话中,并加在一起(18 分)。

动态会话

动态会话指定了 5 秒的间隔,因此它们使用以下窗口和得分

user=user-2, score=4, window=[2019-05-26T14:30:22.969Z..2019-05-26T14:30:32.969Z)
user=user-1, score=9, window=[2019-05-26T14:30:22.429Z..2019-05-26T14:30:30.553Z)
user=user-1, score=9, window=[2019-05-26T14:30:33.276Z..2019-05-26T14:30:41.849Z)
user=user-2, score=10, window=[2019-05-26T14:30:37.357Z..2019-05-26T14:30:47.357Z)

在动态会话中,用户 #2 会获得不同的得分。第三条消息在第二条消息之后 7 秒到达,因此它被分组到另一个会话中。大型的 18 分会话被拆分为两个 9 分会话。