使用 spring-amqp,从 PublisherReturn 回调内部向 rabbitmq 发送消息的最佳方式是什么?

With spring-amqp, what's the best way to send a message to rabbitmq from inside a PublisherReturn callback?

我正在使用 spring-amqp:2.1.6.RELEASE

我有一个带有 PublisherReturn 回调的 RabbitTemplate。

经过一些调查,我得出的结论是 rabbitTemplate and/or 连接被阻塞,直到原始消息被完全处理。

如果我创建第二个 CachingConnectionFactory 和 RabbitTemplate,并在 PublisherReturn 回调中使用它们,那么它似乎工作正常。

那么,问题来了:使用 spring-amqp 在 PublisherReturn 回调中发送消息的最佳方式是什么?

我进行了搜索,但找不到任何内容来说明您应该如何执行此操作。

以下是我所拥有的简化细节:

@Configuration
public class MyConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setPublisherReturns(true);
        // ... other settings left out for brevity
        return connectionFactory;
    }

    @Bean
    @Qualifier("rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
        // ... other settings left out for brevity
        return rabbitTemplate;
    }

    @Bean
    @Qualifier("connectionFactoryForUndeliverable")
    public ConnectionFactory connectionFactoryForUndeliverable() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        // ... other settings left out for brevity
        return connectionFactory;
    }

    @Bean
    @Qualifier("rabbitTemplateForUndeliverable")
    public RabbitTemplate rabbitTemplateForUndeliverable() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
        // ... other settings left out for brevity
        return rabbitTemplate;
    }

}

然后发送我正在使用的消息

    @Autowired
    @Qualifier("rabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    public void send(Message message) {
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "primary-key",
            message);
    }

而ReturnCallback中的代码是

@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {

    @Autowired
    @Qualifier("rabbitTemplateForUndeliverable")
    private RabbitTemplate rabbitTemplate;

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "alternative-key",
            message);
    }

}

编辑

重现问题的简化示例。 给运行吧:

  1. 有RabbitMq 运行ning
  2. 将名为 foo 的交换绑定到名为 foo 的队列
  3. 运行 作为 spring 启动应用程序

您将看到以下输出:

in returnCallback before message send

但你不会看到:

in returnCallback after message send

如果您注释掉 connectionFactory.setPublisherConfirms(true); 它 运行 没问题。

@SpringBootApplication
public class HangingApplication {

    public static void main(String[] args) {
      SpringApplication.run(HangingApplication.class, args);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      connectionFactory.setPublisherReturns(true);
      connectionFactory.setPublisherConfirms(true);
      return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setExchange("foo");
      rabbitTemplate.setMandatory(true);

      rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("Confirm callback for main template. Ack=" + ack);
      });

      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        System.out.println("in returnCallback before message send");
        rabbitTemplate.send("foo", message);
        System.out.println("in returnCallback after message send");
      });

      return rabbitTemplate;
    }

    @Bean
    public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {

      return args -> {
        template.convertAndSend("BADKEY", "foo payload");
      };
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
      System.out.println("Message received on undeliverable queue : " + in);
    }

}

这是我使用的build.gradle:

plugins {
    id 'org.springframework.boot' version '2.1.5.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group 'pcoates'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.11

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.springframework.boot:spring-boot-starter-amqp'
}

它会导致 amqp-client 代码出现某种死锁。最简单的解决方案是在单独的线程上发送 - 在回调中使用 TaskExecutor...

exec.execute(() -> template.send(...));

您可以使用相同的 template/connection 工厂,但发送必须 运行 在不同的线程上。

我以为我们最近更改了框架以始终在不同的线程上调用 return 回调(在最后一个人报告此之后),但看起来它被遗漏了。

我打开了an issue this time.

编辑

您确定使用的是 2.1.6 吗?

我们在 2.1.0 中通过阻止发送尝试使用与 return 到达的相同通道来解决此问题。这对我来说很好...

@SpringBootApplication
public class So57234770Application {

    public static void main(String[] args) {
        SpringApplication.run(So57234770Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            template.send("foo", message);
        });
        return args -> {
            template.convertAndSend("BADKEY", "foo");
        };
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

如果您可以提供展示此行为的示例应用程序,我会看一看到底发生了什么。