创建死信队列
Creating a dead letter queue
如何创建死信队列并对其进行测试?我有生产者和消费者。我也有像 acks, retries 这样的生产者配置。
Producer.java
package org.timothy.producer;
import org.apache.kafka.clients.producer.*;
import org.timothy.producer.common.AppConfigs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.PropConfigs;
import java.util.concurrent.ExecutionException;
public class Producer{
private static final Logger logger = LogManager.getLogger(Producer.class);
public static void main(String[] args) {
logger.info("Creating Kafka Producer...");
KafkaProducer<Integer, String> producer = PropConfigs.prodProps();
logger.info("Start sending messages...");
for (int i = 1; i <= AppConfigs.numEvents; i++) {
ProducerRecord<Integer, String> record = new ProducerRecord<>(AppConfigs.topicName, "This is Message: " + i);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent with key " + i + " to partition " + metadata.partition()
+ " with offset " + metadata.offset());
}
catch (ExecutionException e) {
System.out.println("Error in sending record");
//System.out.println(e);
e.printStackTrace();
}
catch (InterruptedException e) {
System.out.println("Error in sending record");
//System.out.println(e);
}
}
producer.flush();
producer.close();
logger.info("Finished - Closing Kafka Producer.");
}
}
Consumer.java
package org.timothy.producer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.AppConfigs;
import org.timothy.producer.common.PropConfigs;
import java.time.Duration;
import java.util.Collections;
public class Consumer{
private static final Logger logger = LogManager.getLogger(Consumer.class);
public static void main(String[] args) {
KafkaConsumer<Integer, String> consumer = PropConfigs.consProps();
consumer.subscribe(Collections.singleton(AppConfigs.topicName));
int noMessageFound = 0;
while(true){
ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofMillis(1000));
if (records.count() == 0) {
noMessageFound++;
if (noMessageFound > AppConfigs.MAX_NO_MESSAGE_FOUND_COUNT)
break;
else
continue;
}
records.forEach(record -> logger.info("Received new record: " +
" Key: " + record.key() +
", Value: " + record.value() +
", Topic: " + record.topic() +
", Partition: " + record.partition() +
", Offset: " + record.offset() + "\n"
));
consumer.commitAsync();
}
}
}
如何在我的代码中实现 DLQ?如何测试重试,当我 运行 我的程序时我没有遇到错误,这就是为什么我不知道程序是否重试。
when I run my program I don't encounter errors
然后介绍一下。创建一个计数器,当它达到某个值时,抛出 RuntimeException。在 catch 块中,使用您的生产者实例将事件发送到新主题。
如果您想测试重试,请关闭代理或引入其他一些网络异常
如何创建死信队列并对其进行测试?我有生产者和消费者。我也有像 acks, retries 这样的生产者配置。
Producer.java
package org.timothy.producer;
import org.apache.kafka.clients.producer.*;
import org.timothy.producer.common.AppConfigs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.PropConfigs;
import java.util.concurrent.ExecutionException;
public class Producer{
private static final Logger logger = LogManager.getLogger(Producer.class);
public static void main(String[] args) {
logger.info("Creating Kafka Producer...");
KafkaProducer<Integer, String> producer = PropConfigs.prodProps();
logger.info("Start sending messages...");
for (int i = 1; i <= AppConfigs.numEvents; i++) {
ProducerRecord<Integer, String> record = new ProducerRecord<>(AppConfigs.topicName, "This is Message: " + i);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent with key " + i + " to partition " + metadata.partition()
+ " with offset " + metadata.offset());
}
catch (ExecutionException e) {
System.out.println("Error in sending record");
//System.out.println(e);
e.printStackTrace();
}
catch (InterruptedException e) {
System.out.println("Error in sending record");
//System.out.println(e);
}
}
producer.flush();
producer.close();
logger.info("Finished - Closing Kafka Producer.");
}
}
Consumer.java
package org.timothy.producer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.AppConfigs;
import org.timothy.producer.common.PropConfigs;
import java.time.Duration;
import java.util.Collections;
public class Consumer{
private static final Logger logger = LogManager.getLogger(Consumer.class);
public static void main(String[] args) {
KafkaConsumer<Integer, String> consumer = PropConfigs.consProps();
consumer.subscribe(Collections.singleton(AppConfigs.topicName));
int noMessageFound = 0;
while(true){
ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofMillis(1000));
if (records.count() == 0) {
noMessageFound++;
if (noMessageFound > AppConfigs.MAX_NO_MESSAGE_FOUND_COUNT)
break;
else
continue;
}
records.forEach(record -> logger.info("Received new record: " +
" Key: " + record.key() +
", Value: " + record.value() +
", Topic: " + record.topic() +
", Partition: " + record.partition() +
", Offset: " + record.offset() + "\n"
));
consumer.commitAsync();
}
}
}
如何在我的代码中实现 DLQ?如何测试重试,当我 运行 我的程序时我没有遇到错误,这就是为什么我不知道程序是否重试。
when I run my program I don't encounter errors
然后介绍一下。创建一个计数器,当它达到某个值时,抛出 RuntimeException。在 catch 块中,使用您的生产者实例将事件发送到新主题。
如果您想测试重试,请关闭代理或引入其他一些网络异常