是否可以使用 Kafka Streams 访问消息 headers?
Is it possible to access message headers with Kafka Streams?
在 Kafka 0.11 中添加 Headers to the records (ProducerRecord & ConsumerRecord) 后,在使用 Kafka Streams 处理主题时是否可以获取这些 headers?当在 KStream
上调用 map
之类的方法时,它提供了记录的 key
和 value
的参数,但我看不到访问 headers
.如果我们可以 map
超过 ConsumerRecord
就好了。
例如
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像这样的东西会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
记录 headers 自版本 2.0.0 起可访问(参见 KIP-244 了解详情)。
您可以通过处理器 API(即通过 transform()
、transformValues()
或 process()
)通过给定的“上下文”object(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context)。
更新
从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478),添加了新的 type-safe api.Processor
class process(Record)
而不是 process(K, V)
方法。对于这种情况,headers(和记录元数据)可通过 Record
class).
访问
此新功能在 DSL 的“PAPI 方法中尚不可用(例如,KStream#process()
、KStream#transform()
和兄弟姐妹)。
+++++
在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳---但不公开 headers 实际上在旧版本中被 Streams 丢弃的内容。
元数据在 DSL 级别不可用。然而,还有一些工作正在进行,通过 KIP-159.
扩展 DSL
在 Kafka 0.11 中添加 Headers to the records (ProducerRecord & ConsumerRecord) 后,在使用 Kafka Streams 处理主题时是否可以获取这些 headers?当在 KStream
上调用 map
之类的方法时,它提供了记录的 key
和 value
的参数,但我看不到访问 headers
.如果我们可以 map
超过 ConsumerRecord
就好了。
例如
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像这样的东西会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
记录 headers 自版本 2.0.0 起可访问(参见 KIP-244 了解详情)。
您可以通过处理器 API(即通过 transform()
、transformValues()
或 process()
)通过给定的“上下文”object(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context)。
更新
从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478),添加了新的 type-safe api.Processor
class process(Record)
而不是 process(K, V)
方法。对于这种情况,headers(和记录元数据)可通过 Record
class).
此新功能在 DSL 的“PAPI 方法中尚不可用(例如,KStream#process()
、KStream#transform()
和兄弟姐妹)。
+++++
在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳---但不公开 headers 实际上在旧版本中被 Streams 丢弃的内容。
元数据在 DSL 级别不可用。然而,还有一些工作正在进行,通过 KIP-159.
扩展 DSL