如何更改 apache beam 中的事件时间?

How to change the event time in apache beam?

我正在通过 Pub/sub 摄取数据(json 文件) 所以我的事件时间默认是在主题中发布的时间。 我想强制事件时间并更改它。 我在我的数据中添加了一个日期时间字段。
我想根据我的 json 文件的新时间戳字段进行聚合和组合。

Ps:字段名为"timestamp",是一个字符串。这就是为什么我将其转换为日期时间,然后是数据流中的时间戳

def get_timestamp(data):
    my_date = (data['timestamp']) # date : 2010-09-18......string
    times = datetime.fromisoformat(my_date) #type: datetime.datetime
    return beam.window.TimestampedValue(data, datetime.timestamp(times))

稍后我将在我的窗口化之前调用我的管道中的函数

我从 pubsub 收到我的数据:

lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
        subscription=known_args.in_topic).with_input_types(str) 
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8')) 
        | 'jsonload' >> beam.Map(lambda x: json.loads(x))

然后做我的处理:

 (lines |'timestamp' >> beam.Map(get_timestamp)
           | 'print timestamp' >> beam.ParDo(PrintFn2())
           | 'window' >> beam.WindowInto(
            window.FixedWindows(10),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
    )

从 PubSub 读取时,为元素设置 EvenTime 的最佳方法是使用

Java withTimestampAttribute

Python timestamp_attribute

这将设置元素时间戳并确保 watermark signals 具有良好的数据。

如果这不是一个选项,您可以按照 Adding Timestamps to a PCollection 更改 DoFn 中元素的时间戳。但是,此方法不允许设置时间戳小于当前水印。这就是为什么 withTimestampAttribute 方法是解决此模式的最佳方法。