带时间戳

为集合中的所有元素分配时间戳。

示例

在以下示例中,我们创建一个带有 PCollection 的管道,并将时间戳值附加到它的每个元素。当窗口和延迟数据在流管道中起着重要作用时,时间戳尤其有用。

示例 1:按事件时间添加时间戳

元素本身通常已经包含时间戳字段。beam.window.TimestampedValue 接受一个值和一个以秒为单位的 Unix 时间戳

要从 time.struct_time 转换为 unix_time,可以使用 time.mktime。有关时间格式选项的更多信息,请参阅 time.strftime

import time

time_tuple = time.strptime('2020-03-19 20:50:00', '%Y-%m-%d %H:%M:%S')
unix_time = time.mktime(time_tuple)

要从 datetime.datetime 转换为 unix_time,可以使用 datetime.timetuple 先将其转换为 time.struct_time

import time
import datetime

now = datetime.datetime.now()
time_tuple = now.timetuple()
unix_time = time.mktime(time_tuple)

示例 2:按逻辑时钟添加时间戳

如果每个元素都有一个时间顺序号,这些号可以用作 逻辑时钟。这些数字必须转换为“秒”等效值,这对于你的窗口和延迟数据规则尤其重要。

示例 3:按处理时间添加时间戳

如果元素没有任何时间数据可用,你也可以使用当前处理时间作为每个元素的时间戳。请注意,这会获取正在处理每个元素的工作者的本地时间。工作者可能存在时间差,因此使用这种方法不是进行精确排序的可靠方法。

通过使用处理时间,无法知道数据是否延迟到达,因为时间戳是在元素进入管道时附加的。