Spring boot- RabbitMQ 消费者继续打印 "Retrieving delivery for Consumer"
Spring boot- RabbitMQ consumer keep on printing "Retrieving delivery for Consumer"
您好,我正在使用 Spring boot 1.3.5 和 starter RabbitMQ。在这个项目中,我有一个 RabbitMQ 消费者,它消费来自特定 queue.But 的消息,而 运行 它在 console.while 浏览 google 中继续打印消息下方的应用程序我读设置 heartBeat 会解决问题,但我运气不好,
14:08:14.892 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-rDkzcToXLMWqF4fFUIHS0A=notificationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@47914ea3 Shared Rabbit Connection: SimpleConnection@1096fe75 [delegate=amqp://guest@127.0.0.1:5672/], acknowledgeMode=AUTO local queue size=0
14:08:14.913 [SimpleAsyncTaskExecutor-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-jxyjMKfw6heu77XVdYh3tw=notificationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@47914ea3 Shared Rabbit Connection: SimpleConnection@1096fe75 [delegate=amqp://guest@127.0.0.1:5672/], acknowledgeMode=AUTO local queue size=0
14:08:14.917 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-AcbX0R5eM-ukqWN0a_nrwA=notificationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@47914ea3 Shared Rabbit Connection: SimpleConnection@1096fe75 [delegate=amqp://guest@127.0.0.1:5672/], acknowledgeMode=AUTO local queue size=0
14:08:15.893 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-rDkzcToXLMWqF4fFUIHS0A=notificationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@47914ea3 Shared Rabbit Connection: SimpleConnection@1096fe75 [delegate=amqp://guest@127.0.0.1:5672/], acknowledgeMode=AUTO local queue size=0
曾经ending.Kindly找到以下消费代码:
RabbitMqConfiguration.java
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitMqConfiguration{
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Value("${concurrent.consumers}")
public int concurrent_consumers;
@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(concurrent_consumers);
factory.setMaxConcurrentConsumers(max_concurrent_consumers);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter()
{
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
}
}
NotificationConsumer.java
@Component
public class NotificationConsumer {
private static final Logger logger = LoggerFactory.getLogger(NotificationConsumer.class);
@Value("${notification.queue}")
public String notificationQueue;
@RabbitListener(id="notification",containerFactory="rabbitListenerContainerFactory",queues = "#{notificationQueue}")
public void handleNotificationMessage(Transaction transaction)
{
System.out.println("Entered handleNotificationMessage::"+transaction.getId());
System.out.println("Exit handleNotificationMessage::"+transaction.getData());
logger.info("Entered handleNotificationMessage::", transaction.getId());
logger.info("Exit handleNotificationMessage::", transaction.getData());
}
@Bean
public Queue notificationQueue() {
return new Queue(notificationQueue, true, false, false);
}
}
application.yml
spring:
rabbitmq:
addresses: 127.0.0.1:5672
adminAddresses: http://127.0.0.1:15672
username: guest
password: guest
requested-heartbeat: 60
spring.rabbit.exchange: notification-exchange
notification.queue: notificationQueue
concurrent.consumers: 3
max.concurrent.consumers: 10
消息将由另一个应用程序生成,此应用程序将只使用该消息。
非常感谢您的帮助。
按照 Gary 的说法,将记录器级别从 DEBUG 切换到 INFO 解决了我的问题。
logging:
level:
ROOT: INFO
现在我遇到以下异常:
15:21:24.925 [SimpleAsyncTaskExecutor-2] WARN o.s.a.r.l.ConditionalRejectingErrorHandler - 致命消息转换错误;消息被拒绝;
如果这样配置,它将被丢弃或路由到死信交换:(正文:'{"id":"5784eed4f5a64b4d8663e706","clientIdentifier":"313131313131",
"data":"Sample data by vad","currentAction":{"state":{"status":"PENDING","errorCode":"404"},"data":"Sample data"},
"hasError":false,"errorMessage":""}'MessageProperties [headers={__TypeId__=com.global.produce.model.Transaction},
timestamp=null,messageId=null,userId=null,appId=null,clusterId=null,type=null,correlationId=null,replyTo=null,contentType=application/json,
contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=,
receivedRoutingKey=notificationQueue, deliveryTag=1, messageCount=0])
有人知道这个错误吗?
那只是一个 DEBUG
日志;将日志设置更改为 INFO 或 WARN 或 ERROR。
您好,我正在使用 Spring boot 1.3.5 和 starter RabbitMQ。在这个项目中,我有一个 RabbitMQ 消费者,它消费来自特定 queue.But 的消息,而 运行 它在 console.while 浏览 google 中继续打印消息下方的应用程序我读设置 heartBeat 会解决问题,但我运气不好,
14:08:14.892 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-rDkzcToXLMWqF4fFUIHS0A=notificationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@47914ea3 Shared Rabbit Connection: SimpleConnection@1096fe75 [delegate=amqp://guest@127.0.0.1:5672/], acknowledgeMode=AUTO local queue size=0
14:08:14.913 [SimpleAsyncTaskExecutor-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-jxyjMKfw6heu77XVdYh3tw=notificationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@47914ea3 Shared Rabbit Connection: SimpleConnection@1096fe75 [delegate=amqp://guest@127.0.0.1:5672/], acknowledgeMode=AUTO local queue size=0
14:08:14.917 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-AcbX0R5eM-ukqWN0a_nrwA=notificationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@47914ea3 Shared Rabbit Connection: SimpleConnection@1096fe75 [delegate=amqp://guest@127.0.0.1:5672/], acknowledgeMode=AUTO local queue size=0
14:08:15.893 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-rDkzcToXLMWqF4fFUIHS0A=notificationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@47914ea3 Shared Rabbit Connection: SimpleConnection@1096fe75 [delegate=amqp://guest@127.0.0.1:5672/], acknowledgeMode=AUTO local queue size=0
曾经ending.Kindly找到以下消费代码:
RabbitMqConfiguration.java
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitMqConfiguration{
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Value("${concurrent.consumers}")
public int concurrent_consumers;
@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(concurrent_consumers);
factory.setMaxConcurrentConsumers(max_concurrent_consumers);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter()
{
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
}
}
NotificationConsumer.java
@Component
public class NotificationConsumer {
private static final Logger logger = LoggerFactory.getLogger(NotificationConsumer.class);
@Value("${notification.queue}")
public String notificationQueue;
@RabbitListener(id="notification",containerFactory="rabbitListenerContainerFactory",queues = "#{notificationQueue}")
public void handleNotificationMessage(Transaction transaction)
{
System.out.println("Entered handleNotificationMessage::"+transaction.getId());
System.out.println("Exit handleNotificationMessage::"+transaction.getData());
logger.info("Entered handleNotificationMessage::", transaction.getId());
logger.info("Exit handleNotificationMessage::", transaction.getData());
}
@Bean
public Queue notificationQueue() {
return new Queue(notificationQueue, true, false, false);
}
}
application.yml
spring:
rabbitmq:
addresses: 127.0.0.1:5672
adminAddresses: http://127.0.0.1:15672
username: guest
password: guest
requested-heartbeat: 60
spring.rabbit.exchange: notification-exchange
notification.queue: notificationQueue
concurrent.consumers: 3
max.concurrent.consumers: 10
消息将由另一个应用程序生成,此应用程序将只使用该消息。
非常感谢您的帮助。
按照 Gary 的说法,将记录器级别从 DEBUG 切换到 INFO 解决了我的问题。
logging:
level:
ROOT: INFO
现在我遇到以下异常:
15:21:24.925 [SimpleAsyncTaskExecutor-2] WARN o.s.a.r.l.ConditionalRejectingErrorHandler - 致命消息转换错误;消息被拒绝; 如果这样配置,它将被丢弃或路由到死信交换:(正文:'{"id":"5784eed4f5a64b4d8663e706","clientIdentifier":"313131313131", "data":"Sample data by vad","currentAction":{"state":{"status":"PENDING","errorCode":"404"},"data":"Sample data"}, "hasError":false,"errorMessage":""}'MessageProperties [headers={__TypeId__=com.global.produce.model.Transaction}, timestamp=null,messageId=null,userId=null,appId=null,clusterId=null,type=null,correlationId=null,replyTo=null,contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=notificationQueue, deliveryTag=1, messageCount=0])
有人知道这个错误吗?
那只是一个 DEBUG
日志;将日志设置更改为 INFO 或 WARN 或 ERROR。