RabbitMQ 死信 exchange/queue
RabbitMQ dead letter exchange/queue
我不太明白死信exchange/queue。在线文档说:
republished to another exchange when any of the following events occur:
The message is rejected (basic.reject or basic.nack) with requeue=false,
The TTL for the message expires; or
The queue length limit is exceeded.
这是否意味着当这些事件发生时,消息将自动移至死信队列?或者我必须在我的代码中专门将那些 'dead' 消息移动到那个 DLQ?
另外,如何为我的普通队列设置 DLX/DLQ?说当我的正常队列 failed/expired 中有消息时,它会移动到 DLX/DLQ?
这是一个完整的例子:
public class DXtest {
public static void main(String[] argv)
throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("my.exchange", "fanout");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "my.exchange");
args.put("x-max-length", 2);
channel.queueDeclare("myqueue", false, false, false, args); // here you setup your queue,
//for example with x-max-length, when the limit is reach, the message head message queue will be redirect
//to the my.exchange and then to my-dead-letter-queue
channel.queueDeclare("my-dead-letter-queue", false, false, false, null);
channel.queueBind("my-dead-letter-queue","my.exchange","");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Message '" + message + "'" + new Date());
}
};
channel.basicConsume("my-dead-letter-queue", true, consumer);
}
}
我不太明白死信exchange/queue。在线文档说:
republished to another exchange when any of the following events occur:
The message is rejected (basic.reject or basic.nack) with requeue=false,
The TTL for the message expires; or
The queue length limit is exceeded.
这是否意味着当这些事件发生时,消息将自动移至死信队列?或者我必须在我的代码中专门将那些 'dead' 消息移动到那个 DLQ?
另外,如何为我的普通队列设置 DLX/DLQ?说当我的正常队列 failed/expired 中有消息时,它会移动到 DLX/DLQ?
这是一个完整的例子:
public class DXtest {
public static void main(String[] argv)
throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("my.exchange", "fanout");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "my.exchange");
args.put("x-max-length", 2);
channel.queueDeclare("myqueue", false, false, false, args); // here you setup your queue,
//for example with x-max-length, when the limit is reach, the message head message queue will be redirect
//to the my.exchange and then to my-dead-letter-queue
channel.queueDeclare("my-dead-letter-queue", false, false, false, null);
channel.queueBind("my-dead-letter-queue","my.exchange","");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Message '" + message + "'" + new Date());
}
};
channel.basicConsume("my-dead-letter-queue", true, consumer);
}
}