幂等 Amqp.inboundAdapter()

Idempotent Amqp.inboundAdapter()

如何实现幂等 Amqp.inboundAdapter()?

我尝试使用 IdempotentReceiverInterceptor,但它不适用于 MessageProducers。

编辑

@Bean
IntegrationFlow someFlow(/*...*/) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue")
        .transform(Transformers.fromJson(), c -> c.advice(idempotentInterceptor))
        .channel("anotherChannel.input")
        .get();
}

您可以简单地将拦截器应用于入站适配器下游的第一个组件。如果第一个通道是 pub/sub 则添加一个桥接器。

或者,您可以编写自定义通知并将其添加到入站适配器容器的通知链中。

编辑

@SpringBootApplication
public class So40289644Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So40289644Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend("myqueue", "foo");
        template.convertAndSend("myqueue", "bar");
        context.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);
        context.getBean(RabbitAdmin.class).declareQueue(new Queue("myqueue"));
        context.close();
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public CountDownLatch latch() {
        return new CountDownLatch(1);
    }

    @Bean
    IntegrationFlow someFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue"))
                .channel("in")
                .get();
    }

    @Bean
    @Transformer(inputChannel = "in", adviceChain = "idempotentInterceptor", outputChannel = "next")
    public JsonToObjectTransformer transformer() {
        return Transformers.fromJson();
    }

    @Bean
    IntegrationFlow remaining() {
        return IntegrationFlows.from("next")
            .handle(m -> {
                System.out.println(m);
                latch().countDown();
            })
            .get();
    }

    @Bean
    public IdempotentReceiverInterceptor idempotentInterceptor() {
        IdempotentReceiverInterceptor interceptor = new IdempotentReceiverInterceptor(
                m -> !(new String(((byte[]) m.getPayload())).equals("\"foo\"")));
        interceptor.setDiscardChannel(new NullChannel());
        return interceptor;
    }

    @Bean
    public Queue queue() {
        return new Queue("myqueue");
    }