自定义窗口模式
此页面上的示例演示了常见的自定义窗口模式。您可以使用 WindowFn
函数 创建自定义窗口。有关更多信息,请参阅 窗口化编程指南部分。
注意:Python(使用 fnapi)不支持自定义合并窗口。
使用数据动态设置会话窗口间隔
您可以修改 assignWindows
函数以使用数据驱动间隔,然后将传入数据窗口化为会话。
通过 WindowFn.AssignContext.element()
访问 assignWindows
函数。原始的、固定持续时间的 assignWindows
函数是
public Collection<IntervalWindow> assignWindows(WindowFn.AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}
创建数据驱动间隔
要创建数据驱动间隔,请将以下代码段添加到 assignWindows
函数
- 当自定义间隔不存在于数据中时,用于默认值
- 一种从主管道设置属性的方法,作为自定义窗口的方法
例如,以下函数将每个元素分配到时间戳和 gapDuration
之间的窗口
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
Duration dataDrivenGap;
TableRow message = c.element();
try {
dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.get("gap").toString()));
} catch (Exception e) {
dataDrivenGap = gapDuration;
}
return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap));
}
然后,在窗口化函数中设置 gapDuration
字段
public static class DynamicSessions extends WindowFn<TableRow, IntervalWindow> {
/** Duration of the gaps between sessions. */
private final Duration gapDuration;
/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
private DynamicSessions(Duration gapDuration) {
this.gapDuration = gapDuration;
}
将消息窗口化为会话
在创建数据驱动间隔后,您可以将传入数据窗口化为新的、自定义会话。
首先,将会话长度设置为间隔持续时间
最后,在您的管道中将数据窗口化为会话
示例数据和窗口
以下测试数据统计了两个用户的得分,使用和不使用 gap
属性
.apply("Create data", Create.timestamped(
TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"12\",\"gap\":\"5\"}", new Instant()),
TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"4\"}", new Instant()),
TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"-3\",\"gap\":\"5\"}", new Instant().plus(2000)),
TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"2\",\"gap\":\"5\"}", new Instant().plus(9000)),
TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"7\",\"gap\":\"5\"}", new Instant().plus(12000)),
TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"10\"}", new Instant().plus(12000)))
.withCoder(StringUtf8Coder.of()))
下图可视化了测试数据
标准会话
标准会话使用以下窗口和得分
user=user-2, score=4, window=[2019-05-26T13:28:49.122Z..2019-05-26T13:28:59.122Z)
user=user-1, score=18, window=[2019-05-26T13:28:48.582Z..2019-05-26T13:29:12.774Z)
user=user-2, score=10, window=[2019-05-26T13:29:03.367Z..2019-05-26T13:29:13.367Z)
用户 #1 看到两个事件,间隔 12 秒。在标准会话中,间隔默认为 10 秒;两个得分都在不同的会话中,因此得分不会相加。
用户 #2 看到四个事件,分别间隔 2 秒、7 秒和 3 秒。由于所有间隔都不大于默认值,因此这四个事件在同一个标准会话中,并加在一起(18 分)。
动态会话
动态会话指定了 5 秒的间隔,因此它们使用以下窗口和得分
user=user-2, score=4, window=[2019-05-26T14:30:22.969Z..2019-05-26T14:30:32.969Z)
user=user-1, score=9, window=[2019-05-26T14:30:22.429Z..2019-05-26T14:30:30.553Z)
user=user-1, score=9, window=[2019-05-26T14:30:33.276Z..2019-05-26T14:30:41.849Z)
user=user-2, score=10, window=[2019-05-26T14:30:37.357Z..2019-05-26T14:30:47.357Z)
在动态会话中,用户 #2 会获得不同的得分。第三条消息在第二条消息之后 7 秒到达,因此它被分组到另一个会话中。大型的 18 分会话被拆分为两个 9 分会话。
最后更新于 2024/10/31
您找到您要找的所有内容了吗?
所有内容都很有用且清晰吗?您想更改任何内容吗?请告诉我们!