幂等 Amqp.inboundAdapter()
Idempotent Amqp.inboundAdapter()
如何实现幂等 Amqp.inboundAdapter()?
我尝试使用 IdempotentReceiverInterceptor,但它不适用于 MessageProducers。
编辑
@Bean
IntegrationFlow someFlow(/*...*/) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue")
.transform(Transformers.fromJson(), c -> c.advice(idempotentInterceptor))
.channel("anotherChannel.input")
.get();
}
您可以简单地将拦截器应用于入站适配器下游的第一个组件。如果第一个通道是 pub/sub 则添加一个桥接器。
或者,您可以编写自定义通知并将其添加到入站适配器容器的通知链中。
编辑
@SpringBootApplication
public class So40289644Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So40289644Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("myqueue", "foo");
template.convertAndSend("myqueue", "bar");
context.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);
context.getBean(RabbitAdmin.class).declareQueue(new Queue("myqueue"));
context.close();
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public CountDownLatch latch() {
return new CountDownLatch(1);
}
@Bean
IntegrationFlow someFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue"))
.channel("in")
.get();
}
@Bean
@Transformer(inputChannel = "in", adviceChain = "idempotentInterceptor", outputChannel = "next")
public JsonToObjectTransformer transformer() {
return Transformers.fromJson();
}
@Bean
IntegrationFlow remaining() {
return IntegrationFlows.from("next")
.handle(m -> {
System.out.println(m);
latch().countDown();
})
.get();
}
@Bean
public IdempotentReceiverInterceptor idempotentInterceptor() {
IdempotentReceiverInterceptor interceptor = new IdempotentReceiverInterceptor(
m -> !(new String(((byte[]) m.getPayload())).equals("\"foo\"")));
interceptor.setDiscardChannel(new NullChannel());
return interceptor;
}
@Bean
public Queue queue() {
return new Queue("myqueue");
}
如何实现幂等 Amqp.inboundAdapter()?
我尝试使用 IdempotentReceiverInterceptor,但它不适用于 MessageProducers。
编辑
@Bean
IntegrationFlow someFlow(/*...*/) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue")
.transform(Transformers.fromJson(), c -> c.advice(idempotentInterceptor))
.channel("anotherChannel.input")
.get();
}
您可以简单地将拦截器应用于入站适配器下游的第一个组件。如果第一个通道是 pub/sub 则添加一个桥接器。
或者,您可以编写自定义通知并将其添加到入站适配器容器的通知链中。
编辑
@SpringBootApplication
public class So40289644Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So40289644Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("myqueue", "foo");
template.convertAndSend("myqueue", "bar");
context.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);
context.getBean(RabbitAdmin.class).declareQueue(new Queue("myqueue"));
context.close();
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public CountDownLatch latch() {
return new CountDownLatch(1);
}
@Bean
IntegrationFlow someFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue"))
.channel("in")
.get();
}
@Bean
@Transformer(inputChannel = "in", adviceChain = "idempotentInterceptor", outputChannel = "next")
public JsonToObjectTransformer transformer() {
return Transformers.fromJson();
}
@Bean
IntegrationFlow remaining() {
return IntegrationFlows.from("next")
.handle(m -> {
System.out.println(m);
latch().countDown();
})
.get();
}
@Bean
public IdempotentReceiverInterceptor idempotentInterceptor() {
IdempotentReceiverInterceptor interceptor = new IdempotentReceiverInterceptor(
m -> !(new String(((byte[]) m.getPayload())).equals("\"foo\"")));
interceptor.setDiscardChannel(new NullChannel());
return interceptor;
}
@Bean
public Queue queue() {
return new Queue("myqueue");
}