如何在 RabbitMQ 中将消息从一个队列移动到另一个队列
How to move messages from one queue to another in RabbitMQ
在 RabbitMQ 中,我有一个失败队列,其中包含来自不同队列的所有失败消息。现在我想提供 'Retry' 的功能,以便管理员可以再次将失败的消息移动到各自的队列中。这个想法是这样的:
上图是我的失败队列的结构。单击重试 link 后,消息应移至原始队列,即队列 1、队列 2 等
如果您正在寻找 Java 代码来执行此操作,那么您只需使用要移动的消息并将这些消息发布到所需的队列。对基本的消费和发布操作不熟悉的就去rabbitmq教程页面看看吧
这不是直接消费和发布。 RabbitMQ 不是那样设计的。它考虑到交换和队列都可以是临时的并且可以被删除。这是嵌入在通道中以在单次发布后关闭连接。
假设:
- 你有一个持久的队列和目的地交换(发送到)
- 你有一个持久的目标队列(取自)
这是执行此操作的代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
public object shovelMessage(
String exchange,
String targetQueue,
String destinationQueue,
String host,
Integer port,
String user,
String pass,
int count) throws IOException, TimeoutException, InterruptedException {
if(StringUtils.isEmpty(exchange) || StringUtils.isEmpty(targetQueue) || StringUtils.isEmpty(destinationQueue)) {
return null;
}
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(StringUtils.isEmpty(host)?internalHost.split(":")[0]:host);
factory.setPort(port>0 ? port: Integer.parseInt(internalPort.split(":")[1]));
factory.setUsername(StringUtils.isEmpty(user)? this.user: user);
factory.setPassword(StringUtils.isEmpty(pass)? this.pass: pass);
Channel tgtChannel = null;
try {
org.springframework.amqp.rabbit.connection.Connection connection = factory.createConnection();
tgtChannel = connection.createChannel(false);
tgtChannel.queueDeclarePassive(targetQueue);
QueueingConsumer consumer = new QueueingConsumer(tgtChannel);
tgtChannel.basicQos(1);
tgtChannel.basicConsume(targetQueue, false, consumer);
for (int i = 0; i < count; i++) {
QueueingConsumer.Delivery msg = consumer.nextDelivery(500);
if(msg == null) {
// if no message found, break from the loop.
break;
}
//Send it to destination Queue
// This repetition is required as channel looses the connection with
//queue after single publish and start throwing queue or exchange not
//found connection.
Channel destChannel = connection.createChannel(false);
try {
destChannel.queueDeclarePassive(destinationQueue);
SerializerMessageConverter serializerMessageConverter = new SerializerMessageConverter();
Message message = new Message(msg.getBody(), new MessageProperties());
Object o = serializerMessageConverter.fromMessage(message);
// for some reason msg.getBody() writes byte array which is read as a byte array // on the consumer end due to which this double conversion.
destChannel.basicPublish(exchange, destinationQueue, null, serializerMessageConverter.toMessage(o, new MessageProperties()).getBody());
tgtChannel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
} catch (Exception ex) {
// Send Nack if not able to publish so that retry is attempted
tgtChannel.basicNack(msg.getEnvelope().getDeliveryTag(), true, true);
log.error("Exception while producing message ", ex);
} finally {
try {
destChannel.close();
} catch (Exception e) {
log.error("Exception while closing destination channel ", e);
}
}
}
} catch (Exception ex) {
log.error("Exception while creating consumer ", ex);
} finally {
try {
tgtChannel.close();
} catch (Exception e) {
log.error("Exception while closing destination channel ", e);
}
}
return null;
}
我也实现了类似的东西,所以我可以将消息从 dlq 移回处理。 Link: https://github.com/kestraa/rabbit-move-messages
要重新排队消息,您可以使用 receiveAndReply
method。以下代码会将所有消息从 dlq
-队列移动到 queue
-队列:
do {
val movedToQueue = rabbitTemplate.receiveAndReply<String, String>(dlq, { it }, "", queue)
} while (movedToQueue)
在上面的代码示例中,dlq
是源队列,{ it }
是身份函数(你可以在这里转换消息),""
是默认交换和queue
是目标队列。
这里是一些 administrative/supporting 任务的更通用的工具,management-ui 无法完成。
Link: https://github.com/bkrieger1991/rabbitcli
即使在 message-content 或 message-headers 上使用过滤器,它也允许您 fetch/move/dump 来自队列的消息 :)
在 RabbitMQ 中,我有一个失败队列,其中包含来自不同队列的所有失败消息。现在我想提供 'Retry' 的功能,以便管理员可以再次将失败的消息移动到各自的队列中。这个想法是这样的:
上图是我的失败队列的结构。单击重试 link 后,消息应移至原始队列,即队列 1、队列 2 等
如果您正在寻找 Java 代码来执行此操作,那么您只需使用要移动的消息并将这些消息发布到所需的队列。对基本的消费和发布操作不熟悉的就去rabbitmq教程页面看看吧
这不是直接消费和发布。 RabbitMQ 不是那样设计的。它考虑到交换和队列都可以是临时的并且可以被删除。这是嵌入在通道中以在单次发布后关闭连接。
假设: - 你有一个持久的队列和目的地交换(发送到) - 你有一个持久的目标队列(取自)
这是执行此操作的代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
public object shovelMessage(
String exchange,
String targetQueue,
String destinationQueue,
String host,
Integer port,
String user,
String pass,
int count) throws IOException, TimeoutException, InterruptedException {
if(StringUtils.isEmpty(exchange) || StringUtils.isEmpty(targetQueue) || StringUtils.isEmpty(destinationQueue)) {
return null;
}
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(StringUtils.isEmpty(host)?internalHost.split(":")[0]:host);
factory.setPort(port>0 ? port: Integer.parseInt(internalPort.split(":")[1]));
factory.setUsername(StringUtils.isEmpty(user)? this.user: user);
factory.setPassword(StringUtils.isEmpty(pass)? this.pass: pass);
Channel tgtChannel = null;
try {
org.springframework.amqp.rabbit.connection.Connection connection = factory.createConnection();
tgtChannel = connection.createChannel(false);
tgtChannel.queueDeclarePassive(targetQueue);
QueueingConsumer consumer = new QueueingConsumer(tgtChannel);
tgtChannel.basicQos(1);
tgtChannel.basicConsume(targetQueue, false, consumer);
for (int i = 0; i < count; i++) {
QueueingConsumer.Delivery msg = consumer.nextDelivery(500);
if(msg == null) {
// if no message found, break from the loop.
break;
}
//Send it to destination Queue
// This repetition is required as channel looses the connection with
//queue after single publish and start throwing queue or exchange not
//found connection.
Channel destChannel = connection.createChannel(false);
try {
destChannel.queueDeclarePassive(destinationQueue);
SerializerMessageConverter serializerMessageConverter = new SerializerMessageConverter();
Message message = new Message(msg.getBody(), new MessageProperties());
Object o = serializerMessageConverter.fromMessage(message);
// for some reason msg.getBody() writes byte array which is read as a byte array // on the consumer end due to which this double conversion.
destChannel.basicPublish(exchange, destinationQueue, null, serializerMessageConverter.toMessage(o, new MessageProperties()).getBody());
tgtChannel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
} catch (Exception ex) {
// Send Nack if not able to publish so that retry is attempted
tgtChannel.basicNack(msg.getEnvelope().getDeliveryTag(), true, true);
log.error("Exception while producing message ", ex);
} finally {
try {
destChannel.close();
} catch (Exception e) {
log.error("Exception while closing destination channel ", e);
}
}
}
} catch (Exception ex) {
log.error("Exception while creating consumer ", ex);
} finally {
try {
tgtChannel.close();
} catch (Exception e) {
log.error("Exception while closing destination channel ", e);
}
}
return null;
}
我也实现了类似的东西,所以我可以将消息从 dlq 移回处理。 Link: https://github.com/kestraa/rabbit-move-messages
要重新排队消息,您可以使用 receiveAndReply
method。以下代码会将所有消息从 dlq
-队列移动到 queue
-队列:
do {
val movedToQueue = rabbitTemplate.receiveAndReply<String, String>(dlq, { it }, "", queue)
} while (movedToQueue)
在上面的代码示例中,dlq
是源队列,{ it }
是身份函数(你可以在这里转换消息),""
是默认交换和queue
是目标队列。
这里是一些 administrative/supporting 任务的更通用的工具,management-ui 无法完成。
Link: https://github.com/bkrieger1991/rabbitcli
即使在 message-content 或 message-headers 上使用过滤器,它也允许您 fetch/move/dump 来自队列的消息 :)