kafka 流中的访问记录分区数
Access record's partition numebr in kafka streams
我正在使用 Kafka 2.6 和 spring 云流 kafka-streams 活页夹。我想在我的 Kafka 流应用程序中访问记录头、分区号等。我阅读了有关使用 Processor API、使用 ProcessorContext 等的信息。但是每次 ProcessorContext 对象都为空。
下面是代码
@StreamListener(Bindings.input)
@SendTo(Bindings.output)
public KStream<String, String> process(KStream<String, String> input)
{
return input.transform(new TransformerSupplier<String, String, KeyValue<String, String>>()
{
public Transformer<String, String, KeyValue<String, String>> get()
{
return new Transformer<String, String, KeyValue<String, String>>()
{
private int total = 0;
ProcessorContext context;
@Override
public void close() {
}
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = context;
}
@Override
public KeyValue<String, String> transform(String k, String v)
{
System.out.println("ProcessorContext: "+this.context);
System.out.println("value: "+v);
return new KeyValue<>(k, v);
}
};
}
});
}
在此代码中,ProcessorContext 始终打印为 null。我还尝试将 ListenerContainerCustomizer 用于 spring-boot。但这也行不通
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
return (container, dest, group) ->
{
container.setRecordInterceptor(record ->
{
System.out.println(">>>> Received record, checking headers");
Headers headers = record.headers();
System.out.println(">>>> Header length: " + headers.toArray().length);
for (Header header : headers)
{
if (header.key().equalsIgnoreCase("eventtype"))
{
String value = String.valueOf(header.value());
if (!value.equalsIgnoreCase("PUBLISHED"))
{
System.out.println("Event type from header not PUBLISHED, skipping record");
return null;
}
}
}
System.out.println("Processing record");
return record;
});
};
}
我打印了用 bean 注册的 bean 列表,我可以在上面看到。但它永远行不通。
无论如何,我需要第一种方法来工作,因为我喜欢 运行 一些带有分区号的业务逻辑。
卡了好多天了,请大家帮忙
请更改
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = context;
}
至
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = pc;
}
this.context = context // 两者相同,看起来像打字错误。
我正在使用 Kafka 2.6 和 spring 云流 kafka-streams 活页夹。我想在我的 Kafka 流应用程序中访问记录头、分区号等。我阅读了有关使用 Processor API、使用 ProcessorContext 等的信息。但是每次 ProcessorContext 对象都为空。
下面是代码
@StreamListener(Bindings.input)
@SendTo(Bindings.output)
public KStream<String, String> process(KStream<String, String> input)
{
return input.transform(new TransformerSupplier<String, String, KeyValue<String, String>>()
{
public Transformer<String, String, KeyValue<String, String>> get()
{
return new Transformer<String, String, KeyValue<String, String>>()
{
private int total = 0;
ProcessorContext context;
@Override
public void close() {
}
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = context;
}
@Override
public KeyValue<String, String> transform(String k, String v)
{
System.out.println("ProcessorContext: "+this.context);
System.out.println("value: "+v);
return new KeyValue<>(k, v);
}
};
}
});
}
在此代码中,ProcessorContext 始终打印为 null。我还尝试将 ListenerContainerCustomizer 用于 spring-boot。但这也行不通
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
return (container, dest, group) ->
{
container.setRecordInterceptor(record ->
{
System.out.println(">>>> Received record, checking headers");
Headers headers = record.headers();
System.out.println(">>>> Header length: " + headers.toArray().length);
for (Header header : headers)
{
if (header.key().equalsIgnoreCase("eventtype"))
{
String value = String.valueOf(header.value());
if (!value.equalsIgnoreCase("PUBLISHED"))
{
System.out.println("Event type from header not PUBLISHED, skipping record");
return null;
}
}
}
System.out.println("Processing record");
return record;
});
};
}
我打印了用 bean 注册的 bean 列表,我可以在上面看到。但它永远行不通。 无论如何,我需要第一种方法来工作,因为我喜欢 运行 一些带有分区号的业务逻辑。
卡了好多天了,请大家帮忙
请更改
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = context;
}
至
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = pc;
}
this.context = context // 两者相同,看起来像打字错误。