有没有办法为卡夫卡流中消费的每条消息获取偏移量?
Is there a way to get offset for each message consumed in kafka streams?
为了避免在 KAFKA STREAMS 被杀死时读取已处理但未提交的消息,我想获取每条消息的偏移量以及键和值,以便我可以将其存储在某个地方并且使用它来避免重新处理已处理的消息。
是的,这是可能的。请参阅 http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information 上的常见问题条目。
我将复制粘贴以下关键信息:
Accessing record metadata such as topic, partition, and offset information?
Record metadata is accessible through the Processor API.
It is also accessible indirectly through the DSL thanks to its
Processor API integration.
With the Processor API, you can access record metadata through a
ProcessorContext
. You can store a reference to the context in an
instance field of your processor during Processor#init()
, and then
query the processor context within Processor#process()
, for example
(same for Transformer
). The context is updated automatically to match
the record that is currently being processed, which means that methods
such as ProcessorContext#partition()
always return the current
record’s metadata. Some caveats apply when calling the processor
context within punctuate()
, see the Javadocs for details.
If you use the DSL combined with a custom Transformer
, for example,
you could transform an input record’s value to also include partition
and offset metadata, and subsequent DSL operations such as map
or
filter
could then leverage this information.
为了避免在 KAFKA STREAMS 被杀死时读取已处理但未提交的消息,我想获取每条消息的偏移量以及键和值,以便我可以将其存储在某个地方并且使用它来避免重新处理已处理的消息。
是的,这是可能的。请参阅 http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information 上的常见问题条目。
我将复制粘贴以下关键信息:
Accessing record metadata such as topic, partition, and offset information?
Record metadata is accessible through the Processor API. It is also accessible indirectly through the DSL thanks to its Processor API integration.
With the Processor API, you can access record metadata through a
ProcessorContext
. You can store a reference to the context in an instance field of your processor duringProcessor#init()
, and then query the processor context withinProcessor#process()
, for example (same forTransformer
). The context is updated automatically to match the record that is currently being processed, which means that methods such asProcessorContext#partition()
always return the current record’s metadata. Some caveats apply when calling the processor context withinpunctuate()
, see the Javadocs for details.If you use the DSL combined with a custom
Transformer
, for example, you could transform an input record’s value to also include partition and offset metadata, and subsequent DSL operations such asmap
orfilter
could then leverage this information.