卡夫卡消费者中死信队列的好选择是什么
What is the good choice for Dead letter queue in kafka consumer
我正在写一个kafka cosumer。消费者的工作主要是创建多个数据库实体并在处理有效负载后保存它们。我正在尝试编写代码来处理在使用数据时可能发生的错误。
为此,我可以想到 2 个选项(在 Spring 生态系统中)
- 将失败的消息发送到 dead-letter-kafka-topic
- 将失败的消息发送到新数据库table(错误-table)
失败的消息需要重新处理。
案例 1:
我又得写另一个@KafkaListner,它监听死信主题并处理消息。这里的问题是我无法更好地控制如何启动重新处理流程。 (像一个调度器)因为KafkaListener会在死信topic中一发布数据就开始处理数据
在案例 2 中:
我可以更好地控制重新处理流程,因为我可以编写一个 REST 端点或调度程序来尝试重新处理失败的消息。
(在这里我对使用哪个数据库感到两难。关系或一些键值存储)
我基本上陷入了设计困境,无法确定在 Spring 生态系统中哪种方法更好。
感谢回复。
我认为使用 Kafka 是最好的解决方案。
Because KafkaListener will start processing the data as soon as the data is published in the dead letter topic.
您可以通过在该侦听器上将 autoStartup
设置为 false 来控制行为,然后 start/stop 侦听器根据需要使用 KafkaListenerEndpointRegistry
:
registry.getListenerContainer(myListenerId).start();
或者您可以使用自己的 KafkaConsumer
(由消费者工厂创建)并轮询任意数量的记录,并在完成后关闭消费者。
我同意 Gary Russell 的回答,您可以创建 KafkaConsumer
实例并控制其生命周期。 class 来自 org.apache.kafka:kafka-clients
库。
在您的特定情况下,您可以添加 Thread.sleep(schedulerDelay)
来实现调度。这是简化的示例:
@Component
class Scheduler() {
public void init() {
// create kafka consumer connected to your DLQ topic
}
public void run() {
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
processRecordLogicGoesHere(record);
Thread.sleep(schedulerDelay);
}
} finally {
consumer.close();
}
}
}
schedulerDelay
应该仔细挑选以跟上传入的消息,不要让它们被 Kafka 的日志清理策略丢失。
关于如何使用 Kafka 的官方 API 有很多教程,这里是其中之一:Introducing the Kafka Consumer
此外,您可能会在这里找到一些想法:Retrying consumer architecture in the Apache Kafka
我正在写一个kafka cosumer。消费者的工作主要是创建多个数据库实体并在处理有效负载后保存它们。我正在尝试编写代码来处理在使用数据时可能发生的错误。 为此,我可以想到 2 个选项(在 Spring 生态系统中)
- 将失败的消息发送到 dead-letter-kafka-topic
- 将失败的消息发送到新数据库table(错误-table)
失败的消息需要重新处理。
案例 1: 我又得写另一个@KafkaListner,它监听死信主题并处理消息。这里的问题是我无法更好地控制如何启动重新处理流程。 (像一个调度器)因为KafkaListener会在死信topic中一发布数据就开始处理数据
在案例 2 中: 我可以更好地控制重新处理流程,因为我可以编写一个 REST 端点或调度程序来尝试重新处理失败的消息。 (在这里我对使用哪个数据库感到两难。关系或一些键值存储)
我基本上陷入了设计困境,无法确定在 Spring 生态系统中哪种方法更好。
感谢回复。
我认为使用 Kafka 是最好的解决方案。
Because KafkaListener will start processing the data as soon as the data is published in the dead letter topic.
您可以通过在该侦听器上将 autoStartup
设置为 false 来控制行为,然后 start/stop 侦听器根据需要使用 KafkaListenerEndpointRegistry
:
registry.getListenerContainer(myListenerId).start();
或者您可以使用自己的 KafkaConsumer
(由消费者工厂创建)并轮询任意数量的记录,并在完成后关闭消费者。
我同意 Gary Russell 的回答,您可以创建 KafkaConsumer
实例并控制其生命周期。 class 来自 org.apache.kafka:kafka-clients
库。
在您的特定情况下,您可以添加 Thread.sleep(schedulerDelay)
来实现调度。这是简化的示例:
@Component
class Scheduler() {
public void init() {
// create kafka consumer connected to your DLQ topic
}
public void run() {
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
processRecordLogicGoesHere(record);
Thread.sleep(schedulerDelay);
}
} finally {
consumer.close();
}
}
}
schedulerDelay
应该仔细挑选以跟上传入的消息,不要让它们被 Kafka 的日志清理策略丢失。
关于如何使用 Kafka 的官方 API 有很多教程,这里是其中之一:Introducing the Kafka Consumer
此外,您可能会在这里找到一些想法:Retrying consumer architecture in the Apache Kafka