DefaultJmsListenerContainerFactory 配置进程并发
DefaultJmsListenerContainerFactory configure process concurrency
我正在为我的 JmsFactory 配置并发以使用我的队列。
我做到了:
@Bean(name = "myFactory")
public DefaultJmsListenerContainerFactory sqsFactory(SQSConnectionFactory connectionFactory,
CustomJmsListenerConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrency("1-3");
return factory;
}
我看到DefaultJmsListenerContainerFactory.setConcurrency后面调用了DefaultMessageListenerContainer
我在我的应用程序中配置了 2 个队列,我正在使用 spring boot:
@JmsListener(destination = "queue1", containerFactory = "myFactory")
@JmsListener(destination = "queue2", containerFactory = "myFactory")
我正在阅读 spring 文档并面对一些方法,现在我有一些疑问。
1 - 有什么区别:
setConcurrency(String concurrency)
setConcurrentConsumers(int concurrentConsumers)
即使阅读了文档,我也没有理解其中的区别以及此配置如何改变应用程序行为。
我认为 setConcurrency 应该是每个@jmsLister 将用于从队列中获取消息的多个线程...
您能否解释一个配置映像示例,我有 100 条消息排队(每个配置的队列)?
2 - setMaxMessagesPerTask(int maxMessagesPerTask)
如果队列中有 100 条消息,并发性 = 3 并且此数字为 10(默认值),行为是什么?
阅读两种情况下的 Javadocs。
1.
setConcurrency(String concurrency)
这只是为了方便
/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10" (the lower limit will be 1 in this case).
* <p>This listener container will always hold on to the minimum number of consumers
* ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
* of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
*/
@Override
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
}
else {
setConcurrentConsumers(1);
setMaxConcurrentConsumers(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
}
}
setConcurrency("1-3")
等同于
setConcurrentConsumers(1);
setMaxConcurrentConsumers(3);
- 对消息处理没有影响;这只是意味着消费者任务每 10 条消息被回收(停止和启动)。
/**
* Specify the maximum number of messages to process in one task.
* More concretely, this limits the number of message reception attempts
* per task, which includes receive iterations that did not actually
* pick up a message until they hit their timeout (see the
* {@link #setReceiveTimeout "receiveTimeout"} property).
* <p>Default is unlimited (-1) in case of a standard TaskExecutor,
* reusing the original invoker threads until shutdown (at the
* expense of limited dynamic scheduling).
* <p>In case of a SchedulingTaskExecutor indicating a preference for
* short-lived tasks, the default is 10 instead. Specify a number
* of 10 to 100 messages to balance between rather long-lived and
* rather short-lived tasks here.
* <p>Long-lived tasks avoid frequent thread context switches through
* sticking with the same thread all the way through, while short-lived
* tasks allow thread pools to control the scheduling. Hence, thread
* pools will usually prefer short-lived tasks.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
* @see #setTaskExecutor
* @see #setReceiveTimeout
* @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
*/
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
synchronized (this.lifecycleMonitor) {
this.maxMessagesPerTask = maxMessagesPerTask;
}
}
我正在为我的 JmsFactory 配置并发以使用我的队列。
我做到了:
@Bean(name = "myFactory")
public DefaultJmsListenerContainerFactory sqsFactory(SQSConnectionFactory connectionFactory,
CustomJmsListenerConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrency("1-3");
return factory;
}
我看到DefaultJmsListenerContainerFactory.setConcurrency后面调用了DefaultMessageListenerContainer
我在我的应用程序中配置了 2 个队列,我正在使用 spring boot:
@JmsListener(destination = "queue1", containerFactory = "myFactory")
@JmsListener(destination = "queue2", containerFactory = "myFactory")
我正在阅读 spring 文档并面对一些方法,现在我有一些疑问。
1 - 有什么区别:
setConcurrency(String concurrency)
setConcurrentConsumers(int concurrentConsumers)
即使阅读了文档,我也没有理解其中的区别以及此配置如何改变应用程序行为。 我认为 setConcurrency 应该是每个@jmsLister 将用于从队列中获取消息的多个线程... 您能否解释一个配置映像示例,我有 100 条消息排队(每个配置的队列)?
2 - setMaxMessagesPerTask(int maxMessagesPerTask)
如果队列中有 100 条消息,并发性 = 3 并且此数字为 10(默认值),行为是什么?
阅读两种情况下的 Javadocs。
1.
setConcurrency(String concurrency)
这只是为了方便
/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10" (the lower limit will be 1 in this case).
* <p>This listener container will always hold on to the minimum number of consumers
* ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
* of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
*/
@Override
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
}
else {
setConcurrentConsumers(1);
setMaxConcurrentConsumers(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
}
}
setConcurrency("1-3")
等同于
setConcurrentConsumers(1);
setMaxConcurrentConsumers(3);
- 对消息处理没有影响;这只是意味着消费者任务每 10 条消息被回收(停止和启动)。
/**
* Specify the maximum number of messages to process in one task.
* More concretely, this limits the number of message reception attempts
* per task, which includes receive iterations that did not actually
* pick up a message until they hit their timeout (see the
* {@link #setReceiveTimeout "receiveTimeout"} property).
* <p>Default is unlimited (-1) in case of a standard TaskExecutor,
* reusing the original invoker threads until shutdown (at the
* expense of limited dynamic scheduling).
* <p>In case of a SchedulingTaskExecutor indicating a preference for
* short-lived tasks, the default is 10 instead. Specify a number
* of 10 to 100 messages to balance between rather long-lived and
* rather short-lived tasks here.
* <p>Long-lived tasks avoid frequent thread context switches through
* sticking with the same thread all the way through, while short-lived
* tasks allow thread pools to control the scheduling. Hence, thread
* pools will usually prefer short-lived tasks.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
* @see #setTaskExecutor
* @see #setReceiveTimeout
* @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
*/
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
synchronized (this.lifecycleMonitor) {
this.maxMessagesPerTask = maxMessagesPerTask;
}
}