将 Spring JMSTemplate / ActiveMQ 与请求/响应一起使用,而不阻塞队列中的其他消息
Using Spring JMSTemplate / ActiveMQ with request / response without blocking other messages in the queue
我们为要等待响应的特定用例实施了分布式请求/响应类型架构。我们使用的 JMS 代理是 ActiveMq,代码使用 Spring 连接在一起。
我们看到的问题是,如果将一堆请求发送到同一目的地,任何需要花费大量时间才能完成的请求似乎都会阻止其后的请求消息。消费者使用的 SessionAwareMessageListener 接口只支持 onMessage() 方法。在这里实现并行性的最佳方法是什么,即如果特定请求需要很长时间,队列中的其他消息不应被阻塞?
有这个 SO post 但它没有回答我的问题。
JMS: Can we get multiple messages from queue in OnMessage() withtout commit or rollback
谢谢
相关代码片段(为简洁起见删除了异常处理等)
制作人
public class MyJmsProducer {
private ProcessingResponse sendMessage(final Serializable serializable) {
//send JMS request and wait for response
return jmsMessagingTemplate.convertSendAndReceive(destination, serializable, ProcessingResponse.class); //this operation seems to be blocking + sync
}
}
和听众(消费者)
public class MyJmsListener
implements SessionAwareMessageListener<Message>, NotificationHandler<Task> {
@Override
public void onMessage(Message message, Session session)
throws JMSException {
ProcessingRequest processingRequest = (ProcessingRequest) ((ObjectMessage) message).getObject();
// handle the request here (THIS COULD TAKE A WHILE)
handleRequest(processingRequest);
// done handling the request, now create a response message
final ObjectMessage responseMessage = new ActiveMQObjectMessage();
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
responseMessage.setObject(processingResponse);
// Message sent back to the replyTo address of the income message.
final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(responseMessage);
}
}
您可以使用 DMLC 的 ConcurrentConsumers
来提高消息的消费速度并解决消费缓慢的问题:
@Bean
public DefaultMessageListenerContainer dmlc() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setMaxConcurrentConsumers(10);
dmlc.setConcurrentConsumers(5);
return dmlc;
}
您需要调整 prefetchPolicy 以适应并发消费者:
persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)
所有消息都发送给第一个连接的消费者,当另一个消费者连接到同一目的地时,他不会收到消息,因此要更改此行为,您需要将 prefetchPolicy 设置为低于默认值的值。例如,将此 jms.prefetchPolicy.queuePrefetch=1
添加到 activemq.xml 中的 uri 配置或在客户端 url 上设置它,如下所示
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
return connectionFactory;
}
Large prefetch values are recommended for high performance with high
message volumes. However, for lower message volumes, where each
message takes a long time to process, the prefetch should be set to 1.
This ensures that a consumer is only processing one message at a time.
Specifying a prefetch limit of zero, however, will cause the consumer
to poll for messages, one at a time, instead of the message being
pushed to the consumer.
看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html
和
我们为要等待响应的特定用例实施了分布式请求/响应类型架构。我们使用的 JMS 代理是 ActiveMq,代码使用 Spring 连接在一起。
我们看到的问题是,如果将一堆请求发送到同一目的地,任何需要花费大量时间才能完成的请求似乎都会阻止其后的请求消息。消费者使用的 SessionAwareMessageListener 接口只支持 onMessage() 方法。在这里实现并行性的最佳方法是什么,即如果特定请求需要很长时间,队列中的其他消息不应被阻塞?
有这个 SO post 但它没有回答我的问题。 JMS: Can we get multiple messages from queue in OnMessage() withtout commit or rollback
谢谢
相关代码片段(为简洁起见删除了异常处理等)
制作人
public class MyJmsProducer {
private ProcessingResponse sendMessage(final Serializable serializable) {
//send JMS request and wait for response
return jmsMessagingTemplate.convertSendAndReceive(destination, serializable, ProcessingResponse.class); //this operation seems to be blocking + sync
}
}
和听众(消费者)
public class MyJmsListener
implements SessionAwareMessageListener<Message>, NotificationHandler<Task> {
@Override
public void onMessage(Message message, Session session)
throws JMSException {
ProcessingRequest processingRequest = (ProcessingRequest) ((ObjectMessage) message).getObject();
// handle the request here (THIS COULD TAKE A WHILE)
handleRequest(processingRequest);
// done handling the request, now create a response message
final ObjectMessage responseMessage = new ActiveMQObjectMessage();
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
responseMessage.setObject(processingResponse);
// Message sent back to the replyTo address of the income message.
final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(responseMessage);
}
}
您可以使用 DMLC 的 ConcurrentConsumers
来提高消息的消费速度并解决消费缓慢的问题:
@Bean
public DefaultMessageListenerContainer dmlc() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setMaxConcurrentConsumers(10);
dmlc.setConcurrentConsumers(5);
return dmlc;
}
您需要调整 prefetchPolicy 以适应并发消费者:
persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)
所有消息都发送给第一个连接的消费者,当另一个消费者连接到同一目的地时,他不会收到消息,因此要更改此行为,您需要将 prefetchPolicy 设置为低于默认值的值。例如,将此 jms.prefetchPolicy.queuePrefetch=1
添加到 activemq.xml 中的 uri 配置或在客户端 url 上设置它,如下所示
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
return connectionFactory;
}
Large prefetch values are recommended for high performance with high message volumes. However, for lower message volumes, where each message takes a long time to process, the prefetch should be set to 1. This ensures that a consumer is only processing one message at a time. Specifying a prefetch limit of zero, however, will cause the consumer to poll for messages, one at a time, instead of the message being pushed to the consumer.
看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html
和