获取正在使用 StreamBridge 处理 kafka 消息的分区和偏移量编号
Get Partition and Offset number in which kafka message is being processed using StreamBridge
我需要 print/log/store 正在处理我的消息的 kafka 分区和偏移量。
我怎样才能做到这一点?
我正在使用 StreamBridge 从生产者发送消息,还使用功能性 spring kafka 流方法
Public delegateToSupplier(String id, Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY, id.getBytes()).build();
streamBridge.send("out-topic", message);
}
记录元数据可通过元数据通道(异步)获得:
@SpringBootApplication
public class So66436499Application {
public static void main(String[] args) {
SpringApplication.run(So66436499Application.class, args);
}
@Autowired
StreamBridge bridge;
@Bean
public ApplicationRunner runner() {
return args -> {
this.bridge.send("myBinding", "test");
Thread.sleep(5000);
};
}
@ServiceActivator(inputChannel = "meta")
void meta(Message<?> sent) {
System.out.println("Sent: " + sent.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
}
}
spring.cloud.stream.bindings.myBinding.destination=foo
spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
Sent: foo-0@5
我需要 print/log/store 正在处理我的消息的 kafka 分区和偏移量。 我怎样才能做到这一点? 我正在使用 StreamBridge 从生产者发送消息,还使用功能性 spring kafka 流方法
Public delegateToSupplier(String id, Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY, id.getBytes()).build();
streamBridge.send("out-topic", message);
}
记录元数据可通过元数据通道(异步)获得:
@SpringBootApplication
public class So66436499Application {
public static void main(String[] args) {
SpringApplication.run(So66436499Application.class, args);
}
@Autowired
StreamBridge bridge;
@Bean
public ApplicationRunner runner() {
return args -> {
this.bridge.send("myBinding", "test");
Thread.sleep(5000);
};
}
@ServiceActivator(inputChannel = "meta")
void meta(Message<?> sent) {
System.out.println("Sent: " + sent.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
}
}
spring.cloud.stream.bindings.myBinding.destination=foo
spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
Sent: foo-0@5