有没有办法为卡夫卡流中消费的每条消息获取偏移量?

Is there a way to get offset for each message consumed in kafka streams?

为了避免在 KAFKA STREAMS 被杀死时读取已处理但未提交的消息,我想获取每条消息的偏移量以及键和值,以便我可以将其存储在某个地方并且使用它来避免重新处理已处理的消息。

是的,这是可能的。请参阅 http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information 上的常见问题条目。

我将复制粘贴以下关键信息:

Accessing record metadata such as topic, partition, and offset information?

Record metadata is accessible through the Processor API. It is also accessible indirectly through the DSL thanks to its Processor API integration.

With the Processor API, you can access record metadata through a ProcessorContext. You can store a reference to the context in an instance field of your processor during Processor#init(), and then query the processor context within Processor#process(), for example (same for Transformer). The context is updated automatically to match the record that is currently being processed, which means that methods such as ProcessorContext#partition() always return the current record’s metadata. Some caveats apply when calling the processor context within punctuate(), see the Javadocs for details.

If you use the DSL combined with a custom Transformer, for example, you could transform an input record’s value to also include partition and offset metadata, and subsequent DSL operations such as map or filter could then leverage this information.