我怎样才能同时拥有一致的哈希交换和主题交换功能?

How can i have both consistent hashing exchange and topic exchange functionality at the same time?

我有一个可水平扩展的应用程序。我的应用程序是 spring-boot 服务,spring 框架版本为 2.1.1.RELEASE.

每个工作人员都有一个队列(及其各自的侦听器)并绑定到 "x-consistent-hash" 类型的交换。我使用 Rabbitmq 版本 3.7.7 的 rabbitmq-consistent-hash-exchange 插件。

消息根据它们的 message_id (as described here) 以一致的方式正确散列,并且始终由我的应用程序的相同工作队列使用,如上所示:

我实现这个的代码如下:

public static final String EXCHANGE = "e3";
private static final String EXCHANGE_TYPE = "x-consistent-hash";
@Bean
 public Queue autoDeleteQueue1() {
    return new AnonymousQueue();
    }

    @Qualifier("testlistenerAdapter")
    @Bean
    MessageListenerAdapter testlistenerAdapter(TestListener receiver) {
        MessageListenerAdapter msgadapter = new MessageListenerAdapter(receiver, "testMessageReceived");
        return msgadapter;
    }

    @Qualifier("testcontainer")
    @Bean
    SimpleMessageListenerContainer testcontainer(ConnectionFactory connectionFactory,
            @Qualifier("testlistenerAdapter") MessageListenerAdapter listenerAdapter) throws IOException {

        Connection conn = connectionFactory.createConnection();
        Channel ch = conn.createChannel(true);


        ch.queueDeclare(autoDeleteQueue1().getName(), true, true, true, null);
        ch.queuePurge(autoDeleteQueue1().getName());

        Map<String, Object> args = new HashMap<>();
        args.put("hash-property", "message_id");

        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, false, false, args);

        ch.queueBind(autoDeleteQueue1().getName(), EXCHANGE, "20");

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(autoDeleteQueue1().getName());
        container.setMessageListener(listenerAdapter);
        return container;
    }

我想做的是将所有具有相同 message_id 的消息路由到与以前相同的工作人员但在不同的队列侦听器中。第一种情况很容易,因为一个工人只有一个队列。现在一个工作人员有多个队列,我希望在同一个工作人员中一致地散列消息。

我试图通过在每条消息中添加不同的 header/routing_key 来做到这一点,但没有成功。

例如,任何带有 message_id 的消息:"test" 总是路由到同一个工作人员(比如说 "worker B")并且取决于它具有的 header/routing_key (foo或 bar) 将被 foo 或 bar 队列侦听器使用(见下图)。我的应用程序的业务逻辑是有状态的,因此我需要具有相同 message_id 的消息由同一个工作人员提供服务。

我尝试了很多实现方案,但看起来这不是一件微不足道的事情。恐怕我想做的是不可行的,因为 rabbitmq 插件会在队列级别进行哈希处理。一个快速的解决方案是停留在第一种情况。只保留一种类型的侦听器,然后在同一端点内进行业务逻辑分离。关于如何在不必将两个侦听器(foo 和 bar)的功能合并为一个的情况下实现此类功能的任何想法?

这是不可行的。在rabbitmq中,不同的队列监听器属于不同的队列。
由于每个工作人员创建自己的队列,我只能保证消息将始终在同一个队列中,但不会在同一个工作人员中。
一致性哈希是在队列级别而不是工作级别完成的。

我通过为每个工作人员只维护一个侦听器队列来完成我的实施。通过这种方式,我可以确保消息始终由同一个工作人员路由。

我的示例项目在这里,如果有人有兴趣看一下https://github.com/ubitech/generic-policy-engine-with-consistent-hashing