kafka 流中的访问记录分区数

Access record's partition numebr in kafka streams

我正在使用 Kafka 2.6 和 spring 云流 kafka-streams 活页夹。我想在我的 Kafka 流应用程序中访问记录头、分区号等。我阅读了有关使用 Processor API、使用 ProcessorContext 等的信息。但是每次 ProcessorContext 对象都为空。

下面是代码

@StreamListener(Bindings.input)
@SendTo(Bindings.output)
public KStream<String, String> process(KStream<String, String> input)
{
    return input.transform(new TransformerSupplier<String, String, KeyValue<String, String>>()
    {
        public Transformer<String, String, KeyValue<String, String>> get() 
        {
            return new Transformer<String, String, KeyValue<String, String>>() 
            {

                private int total = 0;
                ProcessorContext context;

                @Override
                public void close() {
                }

                @Override
                public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
                {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String k, String v)
                {
                    System.out.println("ProcessorContext: "+this.context);
                    System.out.println("value: "+v);
                    return new KeyValue<>(k, v);
                }
            };
        }
    });
}

在此代码中,ProcessorContext 始终打印为 null。我还尝试将 ListenerContainerCustomizer 用于 spring-boot。但这也行不通

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
    return (container, dest, group) ->
    {
        container.setRecordInterceptor(record ->
        {
            System.out.println(">>>> Received record, checking headers");
            Headers headers = record.headers();
            System.out.println(">>>> Header length: " + headers.toArray().length);
            for (Header header : headers)
            {
                if (header.key().equalsIgnoreCase("eventtype"))
                {
                    String value = String.valueOf(header.value());
                    if (!value.equalsIgnoreCase("PUBLISHED"))
                    {
                        System.out.println("Event type from header not PUBLISHED, skipping record");
                        return null;
                    }
                }
            }
            System.out.println("Processing record");
            return record;
        });
    };
}

我打印了用 bean 注册的 bean 列表,我可以在上面看到。但它永远行不通。 无论如何,我需要第一种方法来工作,因为我喜欢 运行 一些带有分区号的业务逻辑。

卡了好多天了,请大家帮忙

请更改

@Override
 public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
 {
                    this.context = context;
}

@Override
 public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
 {
                    this.context = pc;
}

this.context = context // 两者相同,看起来像打字错误。