生产事件B后手动确认Kafka事件A消费
Manually acknowledge Kafka Event A consuming after producing event B
我有一个情况,我必须使用事件 A 并进行一些处理,然后生成事件 B。所以我的问题是处理崩溃并且应用程序在已经使用时无法生成 B A. 我的方法是在成功发布后确认 B,我是否正确或应该针对这种情况实施其他解决方案?
@KafkaListener(
id = TOPIC_ID,
topics = TOPIC_ID,
groupId = GROUP_ID,
containerFactory = LISTENER_CONTAINER_FACTORY
)
public void listen(List<Message<A>> messages, Acknowledgment acknowledgment) {
try {
final AEvent aEvent = messages.stream()
.filter(message -> null != message.getPayload())
.map(Message::getPayload)
.findFirst()
.get();
processDao.doSomeProcessing() // returns a Mono<Example> by calling an externe API
.subscribe(
response -> {
ProducerRecord<String, BEvent> BEventRecord = new ProducerRecord<>(TOPIC_ID, null, BEvent);
ListenableFuture<SendResult<String, BEvent>> future = kafkaProducerTemplate.send(buildBEvent());
future.addCallback(new ListenableFutureCallback<SendResult<String, BEvent>>() {
@Override
public void onSuccess(SendResult<String, BEvent> BEventSendResult) {
//TODO: do when event published successfully
}
@Override
public void onFailure(Throwable exception) {
exception.printStackTrace();
throw new ExampleException();
}
});
},
error -> {
error.printStackTrace();
throw new ExampleException();
}
);
acknowledgment.acknowledge(); // ??
} catch (ExampleException) {
exception.printStackTrace();
}
}
使用reactor等异步代码时无法管理kafka"acknowledgments"
Kafka 不管理每个 topic/partition 的离散确认,只是管理分区的最后提交的偏移量。
如果您异步处理两条记录,您将竞争先提交哪个偏移量。
您需要在侦听器容器线程上执行发送以保持正确的顺序。
我有一个情况,我必须使用事件 A 并进行一些处理,然后生成事件 B。所以我的问题是处理崩溃并且应用程序在已经使用时无法生成 B A. 我的方法是在成功发布后确认 B,我是否正确或应该针对这种情况实施其他解决方案?
@KafkaListener(
id = TOPIC_ID,
topics = TOPIC_ID,
groupId = GROUP_ID,
containerFactory = LISTENER_CONTAINER_FACTORY
)
public void listen(List<Message<A>> messages, Acknowledgment acknowledgment) {
try {
final AEvent aEvent = messages.stream()
.filter(message -> null != message.getPayload())
.map(Message::getPayload)
.findFirst()
.get();
processDao.doSomeProcessing() // returns a Mono<Example> by calling an externe API
.subscribe(
response -> {
ProducerRecord<String, BEvent> BEventRecord = new ProducerRecord<>(TOPIC_ID, null, BEvent);
ListenableFuture<SendResult<String, BEvent>> future = kafkaProducerTemplate.send(buildBEvent());
future.addCallback(new ListenableFutureCallback<SendResult<String, BEvent>>() {
@Override
public void onSuccess(SendResult<String, BEvent> BEventSendResult) {
//TODO: do when event published successfully
}
@Override
public void onFailure(Throwable exception) {
exception.printStackTrace();
throw new ExampleException();
}
});
},
error -> {
error.printStackTrace();
throw new ExampleException();
}
);
acknowledgment.acknowledge(); // ??
} catch (ExampleException) {
exception.printStackTrace();
}
}
使用reactor等异步代码时无法管理kafka"acknowledgments"
Kafka 不管理每个 topic/partition 的离散确认,只是管理分区的最后提交的偏移量。
如果您异步处理两条记录,您将竞争先提交哪个偏移量。
您需要在侦听器容器线程上执行发送以保持正确的顺序。