ProcessorContext#header() 为空

ProcessorContext#header() is Empty

我们有kafka流应用程序。生产者在将其发送到 Kafka Streaming 应用程序之前在 kafka 消息中添加 header。

在 Kafka 流应用程序中,我们使用 AbstractProcessorcontext.forward(null, Optional.of(event)); 将消息转发到另一个主题。

但是 header 正在迷路。我希望 header 从输入消息到输出主题。

ProcessorContext 接口。 headers() 方法说 Returns 当前输入记录的 headers 但在我的情况下它是空的,尽管我正在发送带有 header 的消息。

     * Returns the headers of the current input record; could be null if it is not available
     * @return the headers
     */
    Headers headers();

卡夫卡流API版本:2.3.1

如果使用处理器,

context.headers() 应在 process() 中调用,如果使用转换器,则应在 transform() 中调用。