使用 take 方法访问原始 faust/kafka 消息
Accessing raw faust/kafka message while using take method
我需要对 Faust 流中的事件进行批处理,所以我使用了 take()
方法。但是,我还想访问 header 消息,尤其是时间戳。
通常您会使用以下方式访问 header:
async for event in stream.events()
然后使用 event.header
调用 header,但是由于我们使用的是 take 方法:
async for event in stream.take(500, 1)
我们似乎无法访问原始消息。关于如何获得这个的任何想法?我们只是试图通过将每个管道的时间戳监视为 header,而不是将其添加为已发送消息的 value
部分的一部分,来突出显示管道的缓慢部分。
是否还有另一个 'hidden' 但我错过了的原始时间戳?
编辑
使用 faust-streaming==0.8.4 所以绝对是最新的
Event and Message object 具有 headers 作为可以在流中访问的属性。 events
和 take
都使用 EventT
object,因此您应该能够以相同的方式访问它们。唯一的区别是 take
及其派生词将 EventT
object 解包到缓冲区列表中的字典中,而 events
一次产生一个单独的 EventT
。如果将 take
缓冲区大小设置为 1,则可以单独访问 EventT
objects。
faust-streaming==0.7.7
中引入了一个名为 stream.take_with_timestamp
的函数,它与 stream.take
几乎相同,可以通过以下方式使用:
async for event in stream.take_with_timestamp(1, 1, 'timestamp'):
print(stream.current_event)
if stream.current_event is not None:
print(stream.current_event.headers)
print(stream.current_event.message.headers)
这将显示每个 event
的时间戳。这里需要注意的是,如果您将缓冲区设置为 >1 并且您的流超时,您的 stream.current_event
object 将是 None
.
或者您可以模仿 take_with_timestamp
中的赋值并在 stream.events()
中访问 event.message.timestamp
:
async for event in stream.events():
print(event.message.timestamp)
print(event.headers)
print(event.message.headers)
我需要对 Faust 流中的事件进行批处理,所以我使用了 take()
方法。但是,我还想访问 header 消息,尤其是时间戳。
通常您会使用以下方式访问 header:
async for event in stream.events()
然后使用 event.header
调用 header,但是由于我们使用的是 take 方法:
async for event in stream.take(500, 1)
我们似乎无法访问原始消息。关于如何获得这个的任何想法?我们只是试图通过将每个管道的时间戳监视为 header,而不是将其添加为已发送消息的 value
部分的一部分,来突出显示管道的缓慢部分。
是否还有另一个 'hidden' 但我错过了的原始时间戳?
编辑
使用 faust-streaming==0.8.4 所以绝对是最新的
Event and Message object 具有 headers 作为可以在流中访问的属性。 events
和 take
都使用 EventT
object,因此您应该能够以相同的方式访问它们。唯一的区别是 take
及其派生词将 EventT
object 解包到缓冲区列表中的字典中,而 events
一次产生一个单独的 EventT
。如果将 take
缓冲区大小设置为 1,则可以单独访问 EventT
objects。
faust-streaming==0.7.7
中引入了一个名为 stream.take_with_timestamp
的函数,它与 stream.take
几乎相同,可以通过以下方式使用:
async for event in stream.take_with_timestamp(1, 1, 'timestamp'):
print(stream.current_event)
if stream.current_event is not None:
print(stream.current_event.headers)
print(stream.current_event.message.headers)
这将显示每个 event
的时间戳。这里需要注意的是,如果您将缓冲区设置为 >1 并且您的流超时,您的 stream.current_event
object 将是 None
.
或者您可以模仿 take_with_timestamp
中的赋值并在 stream.events()
中访问 event.message.timestamp
:
async for event in stream.events():
print(event.message.timestamp)
print(event.headers)
print(event.message.headers)