使用 Spring AMQP Java 配置为每个队列配置专用侦听器容器

Configuring a Dedicated Listener Container for each Queue using Spring AMQP Java Configuration

我在 XML 中配置了这样的监听器

<rabbit:listener-container connection-factory="connectionFactory" concurrency="1" acknowledge="manual">
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s1}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s2}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s3}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s4}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s5}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s6}" exclusive="true"/>
</rabbit:listener-container>

我正在尝试将其移至 Java 配置,但我没有看到将多个 MessageListener 添加到 ListenerContainer 的方法。在我的情况下,创建多个 ListenerContainer bean 不是一个选项,因为直到运行时我才知道要使用的队列数。队列名称将来自配置文件。

我做了以下事情

@PostConstruct
public void init() 
{
    for (String queue : queues.split(","))
    {
        // The Consumers would not connect if I don't call the 'start()' method.
        messageListenerContainer(queue).start();
    }
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer(String queue)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
    container.setQueueNames(queue);
    container.setMessageListener(messageListener());

    // Set Exclusive Consumer 'ON'
    container.setExclusive(true);

    // Should be restricted to '1' to maintain data consistency.
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}

它 "sort" 的作品,但我看到一些奇怪的行为,许多幽灵通道被打开,这在 XML 配置中从未发生过。所以这让我怀疑我做错了什么。我想知道在 Java 配置中创建 MessageListenerContainers 的正确方法?简而言之,"How does Spring convert 'rabbit:listener-container' with multiple 'rabbit:listener' to java objects properly?" 任何 help/insight 将不胜感激。

商业案例 我们有一个发布用户配置文件更新的发布者。发布者可以为同一用途分派多个更新,我们必须以正确的顺序处理它们以维护数据存储中的数据完整性。

示例:用户:ABC,发布 -> {UsrA:Change1,....,UsrA:Change 2,....,UsrA:Change 3} -> 消费者必须按顺序处理 {UsrA:Change1,...., UsrA:Change 2,....,UsrA:Change 3}.

在我们之前的设置中,我们有 1 个队列获取所有用户更新,我们有一个并发 = 5 的消费者应用程序。消费者应用程序有多个应用程序服务器 运行。这导致 *5 * '消费者应用程序的实例数量' channels/threads* 可以处理传入的消息。速度太棒了!但是我们经常乱序处理导致数据损坏。

为了保持严格的 FIFO 顺序并尽可能并行处理消息,我们实施了队列分片。我们有一个“x-consistent-hash,在 employee-id 上有一个 hash-header。我们的 Publisher 将消息发布到 hash exchange,我们有多个分片队列绑定到 hash exchange。我们的想法是,我们将对一个给定用户(例如用户 A)在同一个分片中排队。然后我们让我们的消费者以 'Exclusive' 模式和 'ConcurrentConsumers = 1' 连接到分片队列并处理消息。这样我们一定会处理以正确的顺序处理消息,同时仍然并行处理消息。我们可以通过增加分片的数量来使其更加并行。

现在开始消费者配置

我们在多个应用服务器上部署了消费者应用。

原方法:

我只是在我的消费者应用程序中添加了多个 'rabbit:listener' 到我的 'rabbit:listener-container' ,正如你在上面看到的那样,它工作得很好,除了首先启动的服务器在所有分片队列上获得独占锁而其他服务器只是坐在那里不工作。

新方法:

我们将分片队列名称移到了应用程序配置文件中。像这样

Consumer Instance 1 : Properties
queues=user.queue.s1,user.queue.s2,user.queue.s3

Consumer Instance 2 : Properties
queues=user.queue.s4,user.queue.s5,user.queue.s6

同样值得注意的是,我们可以有任意数量的消费者实例,并且分片可以根据资源可用性在实例之间不均匀分布。

将队列名称移至配置文件后,XML 配置将不再有效,因为我们无法像以前那样将 'rabbit:listener' 动态添加到我的 'rabbit:listener-container'。

然后我们决定切换到 Java 配置。这就是我们被困住的地方!

我们最初是这样做的

@Bean
    public SimpleMessageListenerContainer messageListenerContainer()
    {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(queues.split(","));
        container.setMessageListener(messageListener());
        container.setMissingQueuesFatal(false);

        // Set Exclusive Consumer 'ON'
        container.setExclusive(true);

        // Should be restricted to '1' to maintain data consistency.
        container.setConcurrentConsumers(1);

        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.start();

        return container;
    }

它可以工作,但我们所有的队列都在一个连接上共享 1 个通道。这对速度不利。我们想要的是一个连接,每个队列都有自己的通道。

下一步

这里还没有成功!。我原来问题中的 java 配置就是我们现在的位置。

我很困惑为什么这很难做到。显然 XML 配置做了一些在 Java 配置中不容易做到的事情(或者至少对我来说是这样)。我认为这是一个需要填补的空白,除非我完全遗漏了什么。如果我错了,请纠正我。这是一个真正的商业案例,而不是一些虚构的边缘案例。如果您有不同看法,请随时发表评论。

这里是Java创建SimpleMessageListenerContainer的配置

@Value("#{'${queue.names}'.split(',')}")
private String[] queueNames;

 @Bean
public SimpleMessageListenerContainer listenerContainer(final ConnectionFactory connectionFactory) {
    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueNames);
    container.setMessageListener(vehiclesReceiver());
    setCommonQueueProperties(container);
    return container;
}

每个 <rabbit:listener > 使用相同的 ConnectionFactory 创建自己的 SimpleListenerContainer bean。要在 Java 配置中执行类似操作,您必须声明与队列一样多的 SimpleListenerContainer bean:每个队列一个。

您也可以考虑使用 @RabbitListener 方法:https://docs.spring.io/spring-amqp/docs/2.0.4.RELEASE/reference/html/_reference.html#async-annotation-driven

and it works BUT all our queues are on one connection sharing 1 channel. That is NOT good for speed. What we want is One connection and every queue gets its own channel.

如果切换到 DirectMessageListenerContainer,该配置中的每个队列都有自己的 Channel

See the documentation.

回答您原来的问题(预编辑):

@Bean
public SimpleMessageListenerContainer messageListenerContainer1(@Value("${address.queue.s1}") String queue)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
    container.setQueueNames(queue);
    container.setMessageListener(messageListener());

    // Set Exclusive Consumer 'ON'
    container.setExclusive(true);

    // Should be restricted to '1' to maintain data consistency.
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}

...

@Bean
public SimpleMessageListenerContainer messageListenerContainer6(@Value("${address.queue.s6}" ) String queue)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
    container.setQueueNames(queue);
    container.setMessageListener(messageListener());

    // Set Exclusive Consumer 'ON'
    container.setExclusive(true);

    // Should be restricted to '1' to maintain data consistency.
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}