DefaultJmsListenerContainerFactory 配置进程并发

DefaultJmsListenerContainerFactory configure process concurrency

我正在为我的 JmsFactory 配置并发以使用我的队列。

我做到了:

  @Bean(name = "myFactory")
  public DefaultJmsListenerContainerFactory sqsFactory(SQSConnectionFactory connectionFactory,
      CustomJmsListenerConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("1-3");
    return factory;
  }

我看到DefaultJmsListenerContainerFactory.setConcurrency后面调用了DefaultMessageListenerContainer

我在我的应用程序中配置了 2 个队列,我正在使用 spring boot:

@JmsListener(destination = "queue1", containerFactory = "myFactory")
@JmsListener(destination = "queue2", containerFactory = "myFactory")

我正在阅读 spring 文档并面对一些方法,现在我有一些疑问。

1 - 有什么区别:

setConcurrency(String concurrency)
setConcurrentConsumers(int concurrentConsumers)

即使阅读了文档,我也没有理解其中的区别以及此配置如何改变应用程序行为。 我认为 setConcurrency 应该是每个@jmsLister 将用于从队列中获取消息的多个线程... 您能否解释一个配置映像示例,我有 100 条消息排队(每个配置的队列)?

2 - setMaxMessagesPerTask(int maxMessagesPerTask)

如果队列中有 100 条消息,并发性 = 3 并且此数字为 10(默认值),行为是什么?

阅读两种情况下的 Javadocs。

1.

setConcurrency(String concurrency)

这只是为了方便

/**
 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
 * upper limit String, e.g. "10" (the lower limit will be 1 in this case).
 * <p>This listener container will always hold on to the minimum number of consumers
 * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
 * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
 */
@Override
public void setConcurrency(String concurrency) {
    try {
        int separatorIndex = concurrency.indexOf('-');
        if (separatorIndex != -1) {
            setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
            setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
        }
        else {
            setConcurrentConsumers(1);
            setMaxConcurrentConsumers(Integer.parseInt(concurrency));
        }
    }
    catch (NumberFormatException ex) {
        throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
                "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
    }
}

setConcurrency("1-3") 等同于

setConcurrentConsumers(1);
setMaxConcurrentConsumers(3);
  1. 对消息处理没有影响;这只是意味着消费者任务每 10 条消息被回收(停止和启动)。
/**
 * Specify the maximum number of messages to process in one task.
 * More concretely, this limits the number of message reception attempts
 * per task, which includes receive iterations that did not actually
 * pick up a message until they hit their timeout (see the
 * {@link #setReceiveTimeout "receiveTimeout"} property).
 * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
 * reusing the original invoker threads until shutdown (at the
 * expense of limited dynamic scheduling).
 * <p>In case of a SchedulingTaskExecutor indicating a preference for
 * short-lived tasks, the default is 10 instead. Specify a number
 * of 10 to 100 messages to balance between rather long-lived and
 * rather short-lived tasks here.
 * <p>Long-lived tasks avoid frequent thread context switches through
 * sticking with the same thread all the way through, while short-lived
 * tasks allow thread pools to control the scheduling. Hence, thread
 * pools will usually prefer short-lived tasks.
 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
 * @see #setTaskExecutor
 * @see #setReceiveTimeout
 * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
 */
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
    Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
    synchronized (this.lifecycleMonitor) {
        this.maxMessagesPerTask = maxMessagesPerTask;
    }
}