SpringAMQP延迟

SpringAMQP delay

我无法确定在 SpringAMQP 中延迟消息级别的方法。 如果服务不可用或抛出异常,我将调用 Webservice,我将所有请求存储到 RabbitMQ 队列中,并不断重试服务调用,直到它成功执行。如果服务不断抛出错误或者它不可用,则 rabbitMQ 侦听器会不断循环。(意思是侦听器检索消息并在出现任何错误时进行服务调用,它会重新排队消息)

我使用 MessagePostProcessor 将循环限制为 X 小时,但是我想在消息级别和每次尝试访问服务时启用延迟。例如,第一次尝试 3000 毫秒延迟,第二次尝试 6000 毫秒,依此类推,直到我尝试 x 次。

如果能提供几个例子就好了。

你能给我一些想法吗?

嗯,你这样做是不可能的。

消息重新排队与事务回滚完全相似,其中系统 returns 回到异常前的状态。所以,绝对不能将消息修改为 return 到队列中。

出于同样的原因,您可能必须查看 Spring Retry 项目,并且只从队列中轮询消息一次,然后在内存中重试,直到成功回答或重试策略耗尽。最后,您可以从队列中删除消息或将其移至 DLQ。

Reference Manual 中查看更多信息。

I added CustomeMessage delay exchange

    @Bean
    CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, args);
    }

Added MessagePostProcessor

  if (message.getMessageProperties().getHeaders().get("x-delay") == null) {
            message.getMessageProperties().setHeader("x-delay", 10000);
        } else {

            Integer integer = (Integer) message.getMessageProperties().getHeaders().get("x-delay");
            if (integer < 60000) {
                integer = integer + 10000;
                message.getMessageProperties().setHeader("x-delay", integer);
            }
        }

First time it delays 30 seconds and adds 10seconds each time till it reaches 600 seconds.This should be configurable.

And finally send the message to 

    rabbitTemplate.convertAndSend("delayed-exchange", queueName,message, rabbitMQMessagePostProcessor);