使用 reactor Kafka 从主题读取消息并将消息批量写入 REST 端点
Read from topic and write the messages in batches to a REST endpoint using reactor Kafka
我正在开发一个使用反应式 Kafka 的项目,该项目使用来自 Kafka 主题的消息并将消息批量发布到 REST 端点。
我被困在批处理部分并将该批处理发送到端点。我需要从主题中读取 N 条消息(这里的 N 条消息是可配置的),然后将这 N 条消息发送到 REST 端点。如何使用反应器 Kafka 读取 N 条消息?我查看了 https://projectreactor.io/docs/kafka/release/reference/#_overview 中的示例,但找不到与我的问题类似的示例。解决此问题的任何指示都将非常有帮助。
这是我到目前为止用来读取来自主题的消费消息的代码
@Slf4j
@Service
public class Service implements CommandLineRunner {
@Autowired
@Qualifier("KafkaConsumerTemplate")
public ReactiveKafkaConsumerTemplate<String, String> KafkaConsumerTemplate;
public Flux<String> consume() {
return KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume().subscribe();
}
}
因此您可以使用 #buffer(int) 操作来实现此类目的。
针对您的具体情况:
int bufferSize = 10;
return KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
).map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
.buffer(bufferSize)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
@Override
public void run(String... args) throws Exception {
consume().subscribe(it -> {
//it is a List of batched entities, here you can do whatever you want with your data.
});
}
我正在开发一个使用反应式 Kafka 的项目,该项目使用来自 Kafka 主题的消息并将消息批量发布到 REST 端点。 我被困在批处理部分并将该批处理发送到端点。我需要从主题中读取 N 条消息(这里的 N 条消息是可配置的),然后将这 N 条消息发送到 REST 端点。如何使用反应器 Kafka 读取 N 条消息?我查看了 https://projectreactor.io/docs/kafka/release/reference/#_overview 中的示例,但找不到与我的问题类似的示例。解决此问题的任何指示都将非常有帮助。
这是我到目前为止用来读取来自主题的消费消息的代码
@Slf4j
@Service
public class Service implements CommandLineRunner {
@Autowired
@Qualifier("KafkaConsumerTemplate")
public ReactiveKafkaConsumerTemplate<String, String> KafkaConsumerTemplate;
public Flux<String> consume() {
return KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume().subscribe();
}
}
因此您可以使用 #buffer(int) 操作来实现此类目的。
针对您的具体情况:
int bufferSize = 10;
return KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
).map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
.buffer(bufferSize)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
@Override
public void run(String... args) throws Exception {
consume().subscribe(it -> {
//it is a List of batched entities, here you can do whatever you want with your data.
});
}