使用 take 方法访问原始 faust/kafka 消息

Accessing raw faust/kafka message while using take method

我需要对 Faust 流中的事件进行批处理,所以我使用了 take() 方法。但是,我还想访问 header 消息,尤其是时间戳。

通常您会使用以下方式访问 header:

async for event in stream.events()

然后使用 event.header 调用 header,但是由于我们使用的是 take 方法:

async for event in stream.take(500, 1)

我们似乎无法访问原始消息。关于如何获得这个的任何想法?我们只是试图通过将每个管道的时间戳监视为 header,而不是将其添加为已发送消息的 value 部分的一部分,来突出显示管道的缓慢部分。

是否还有另一个 'hidden' 但我错过了的原始时间戳?

编辑

使用 faust-streaming==0.8.4 所以绝对是最新的

Event and Message object 具有 headers 作为可以在流中访问的属性。 eventstake 都使用 EventT object,因此您应该能够以相同的方式访问它们。唯一的区别是 take 及其派生词将 EventT object 解包到缓冲区列表中的字典中,而 events 一次产生一个单独的 EventT。如果将 take 缓冲区大小设置为 1,则可以单独访问 EventT objects。

faust-streaming==0.7.7 中引入了一个名为 stream.take_with_timestamp 的函数,它与 stream.take 几乎相同,可以通过以下方式使用:

async for event in stream.take_with_timestamp(1, 1, 'timestamp'):
  print(stream.current_event)
  if stream.current_event is not None:
    print(stream.current_event.headers)
    print(stream.current_event.message.headers)

这将显示每个 event 的时间戳。这里需要注意的是,如果您将缓冲区设置为 >1 并且您的流超时,您的 stream.current_event object 将是 None.

或者您可以模仿 take_with_timestamp 中的赋值并在 stream.events() 中访问 event.message.timestamp:

async for event in stream.events():
  print(event.message.timestamp)
  print(event.headers)
  print(event.message.headers)