ProcessorContext#header() 为空
ProcessorContext#header() is Empty
我们有kafka流应用程序。生产者在将其发送到 Kafka Streaming 应用程序之前在 kafka 消息中添加 header。
在 Kafka 流应用程序中,我们使用 AbstractProcessor
和 context.forward(null, Optional.of(event));
将消息转发到另一个主题。
但是 header 正在迷路。我希望 header 从输入消息到输出主题。
ProcessorContext
接口。 headers()
方法说 Returns 当前输入记录的 headers 但在我的情况下它是空的,尽管我正在发送带有 header 的消息。
* Returns the headers of the current input record; could be null if it is not available
* @return the headers
*/
Headers headers();
卡夫卡流API版本:2.3.1
如果使用处理器,context.headers()
应在 process()
中调用,如果使用转换器,则应在 transform()
中调用。
我们有kafka流应用程序。生产者在将其发送到 Kafka Streaming 应用程序之前在 kafka 消息中添加 header。
在 Kafka 流应用程序中,我们使用 AbstractProcessor
和 context.forward(null, Optional.of(event));
将消息转发到另一个主题。
但是 header 正在迷路。我希望 header 从输入消息到输出主题。
ProcessorContext
接口。 headers()
方法说 Returns 当前输入记录的 headers 但在我的情况下它是空的,尽管我正在发送带有 header 的消息。
* Returns the headers of the current input record; could be null if it is not available
* @return the headers
*/
Headers headers();
卡夫卡流API版本:2.3.1
context.headers()
应在 process()
中调用,如果使用转换器,则应在 transform()
中调用。