如何从 RabbitMQ Spring 手动拉取消息
How to pull messages from RabbitMQ Spring manually
我需要从 RabbitMQ 队列中手动提取记录,而不是自动提取记录。以下是我为此创建的内容。但是,我想知道是否有更好的 Spring/RabbitMQ 方法来做到这一点?
您会注意到 channel.basicAck 和 channel.basicNack 的使用。我把它们放在它们可以 acknowledge/NotAcknowledge if/when 消费者抛出异常的地方,这样记录就可以保持排队状态。
我会使用 RabbitTemplate.receiveAndConvert,但这会自动确认该记录,尽管可能有一些不应该确认的原因。
import java.util.Properties;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.GetResponse;
@Service
public class RabbitAdminServices {
private static final Logger logger = LoggerFactory.getLogger(RabbitAdminServices.class);
@Autowired
AmqpAdmin rabbitAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
MessageConverter messageConverter;
private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
public int getCount(String queueName) {
Properties properties = rabbitAdmin.getQueueProperties(queueName);
return (Integer)properties.get(RabbitAdmin.QUEUE_MESSAGE_COUNT);
}
public <T> void processQueue(String queueName, Integer count, Class<T> clazz, Consumer<T> consumer) {
int reprocessCount = getCount(queueName);
int requestCount = reprocessCount;
if(count != null) {
requestCount = count;
}
for(int i = 0; i < reprocessCount && i < requestCount; i++) {
rabbitTemplate.execute(new ChannelCallback<T>() {
@Override
public T doInRabbit(Channel channel) throws Exception {
GetResponse response = channel.basicGet(queueName, false);
T result = null;
try {
MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
if(response.getMessageCount() >= 0) {
messageProps.setMessageCount(response.getMessageCount());
}
Message message = new Message(response.getBody(), messageProps);
result = (T)messageConverter.fromMessage(message);
consumer.accept(result);
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
catch(Exception e) {
channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
}
return result;
}
});
}
}
}
您的解决方案对于模板的当前状态是正确的;我们应该考虑向 RabbitTemplate
添加重载方法,例如
<T> void template.receiveAndConvert(..., Class<T> clazz, Consumer<T> consumer);
并根据消费者是否抛出异常来获得模板 ack/nack。这将避免您必须在代码中进行转换。
我需要从 RabbitMQ 队列中手动提取记录,而不是自动提取记录。以下是我为此创建的内容。但是,我想知道是否有更好的 Spring/RabbitMQ 方法来做到这一点?
您会注意到 channel.basicAck 和 channel.basicNack 的使用。我把它们放在它们可以 acknowledge/NotAcknowledge if/when 消费者抛出异常的地方,这样记录就可以保持排队状态。
我会使用 RabbitTemplate.receiveAndConvert,但这会自动确认该记录,尽管可能有一些不应该确认的原因。
import java.util.Properties;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.GetResponse;
@Service
public class RabbitAdminServices {
private static final Logger logger = LoggerFactory.getLogger(RabbitAdminServices.class);
@Autowired
AmqpAdmin rabbitAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
MessageConverter messageConverter;
private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
public int getCount(String queueName) {
Properties properties = rabbitAdmin.getQueueProperties(queueName);
return (Integer)properties.get(RabbitAdmin.QUEUE_MESSAGE_COUNT);
}
public <T> void processQueue(String queueName, Integer count, Class<T> clazz, Consumer<T> consumer) {
int reprocessCount = getCount(queueName);
int requestCount = reprocessCount;
if(count != null) {
requestCount = count;
}
for(int i = 0; i < reprocessCount && i < requestCount; i++) {
rabbitTemplate.execute(new ChannelCallback<T>() {
@Override
public T doInRabbit(Channel channel) throws Exception {
GetResponse response = channel.basicGet(queueName, false);
T result = null;
try {
MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
if(response.getMessageCount() >= 0) {
messageProps.setMessageCount(response.getMessageCount());
}
Message message = new Message(response.getBody(), messageProps);
result = (T)messageConverter.fromMessage(message);
consumer.accept(result);
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
catch(Exception e) {
channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
}
return result;
}
});
}
}
}
您的解决方案对于模板的当前状态是正确的;我们应该考虑向 RabbitTemplate
添加重载方法,例如
<T> void template.receiveAndConvert(..., Class<T> clazz, Consumer<T> consumer);
并根据消费者是否抛出异常来获得模板 ack/nack。这将避免您必须在代码中进行转换。