如何更改 apache beam 中的事件时间?
How to change the event time in apache beam?
我正在通过 Pub/sub 摄取数据(json 文件)
所以我的事件时间默认是在主题中发布的时间。
我想强制事件时间并更改它。
我在我的数据中添加了一个日期时间字段。
我想根据我的 json 文件的新时间戳字段进行聚合和组合。
Ps:字段名为"timestamp",是一个字符串。这就是为什么我将其转换为日期时间,然后是数据流中的时间戳
def get_timestamp(data):
my_date = (data['timestamp']) # date : 2010-09-18......string
times = datetime.fromisoformat(my_date) #type: datetime.datetime
return beam.window.TimestampedValue(data, datetime.timestamp(times))
稍后我将在我的窗口化之前调用我的管道中的函数
我从 pubsub 收到我的数据:
lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
subscription=known_args.in_topic).with_input_types(str)
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'jsonload' >> beam.Map(lambda x: json.loads(x))
然后做我的处理:
(lines |'timestamp' >> beam.Map(get_timestamp)
| 'print timestamp' >> beam.ParDo(PrintFn2())
| 'window' >> beam.WindowInto(
window.FixedWindows(10),
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| 'CountGlobally' >> beam.CombineGlobally(
beam.combiners.CountCombineFn()
).without_defaults()
)
从 PubSub 读取时,为元素设置 EvenTime 的最佳方法是使用
Python timestamp_attribute
这将设置元素时间戳并确保 watermark signals 具有良好的数据。
如果这不是一个选项,您可以按照 Adding Timestamps to a PCollection 更改 DoFn 中元素的时间戳。但是,此方法不允许设置时间戳小于当前水印。这就是为什么 withTimestampAttribute 方法是解决此模式的最佳方法。
我正在通过 Pub/sub 摄取数据(json 文件)
所以我的事件时间默认是在主题中发布的时间。
我想强制事件时间并更改它。
我在我的数据中添加了一个日期时间字段。
我想根据我的 json 文件的新时间戳字段进行聚合和组合。
Ps:字段名为"timestamp",是一个字符串。这就是为什么我将其转换为日期时间,然后是数据流中的时间戳
def get_timestamp(data):
my_date = (data['timestamp']) # date : 2010-09-18......string
times = datetime.fromisoformat(my_date) #type: datetime.datetime
return beam.window.TimestampedValue(data, datetime.timestamp(times))
稍后我将在我的窗口化之前调用我的管道中的函数
我从 pubsub 收到我的数据:
lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
subscription=known_args.in_topic).with_input_types(str)
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'jsonload' >> beam.Map(lambda x: json.loads(x))
然后做我的处理:
(lines |'timestamp' >> beam.Map(get_timestamp)
| 'print timestamp' >> beam.ParDo(PrintFn2())
| 'window' >> beam.WindowInto(
window.FixedWindows(10),
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| 'CountGlobally' >> beam.CombineGlobally(
beam.combiners.CountCombineFn()
).without_defaults()
)
从 PubSub 读取时,为元素设置 EvenTime 的最佳方法是使用
Python timestamp_attribute
这将设置元素时间戳并确保 watermark signals 具有良好的数据。
如果这不是一个选项,您可以按照 Adding Timestamps to a PCollection 更改 DoFn 中元素的时间戳。但是,此方法不允许设置时间戳小于当前水印。这就是为什么 withTimestampAttribute 方法是解决此模式的最佳方法。