Spring 集成 AMQP
Spring Integration AMQP
刚开始学习spring-集成
我想在队列中接收一条消息并并行执行 2 个步骤:
第 1 步 -> 使用 bean 处理它
第 2 步 -> 转换并发送到另一个队列。
像 :
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
.configureContainer(simpleMessageListenerContainerSpec -> {
simpleMessageListenerContainerSpec.concurrentConsumers(3);
}))
.log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
.handle(serviceBean, "process")
.<String,String>transform(String::toLowerCase)
.log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
.handle(
Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("exchange")
.routingKey("queue2.routing"))
.get();
我错过了什么?第一个句柄之后的动作没有被执行。我想我没有正确理解这部分。
另外,我怎样才能同时执行这两个步骤?
您应该从理论开始理解Spring集成中的许多概念和组件。
“并行 2 步”- 正是发布-订阅模式:https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html and Spring Integration provides an implementation for it: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-publishsubscribechannel。正如您根据文档看到的那样,要实现并行性,您需要使用 TaskExecutor
.
配置这样的通道
使用 Java DSL,我们为发布-订阅配置提供高级 API:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows
所以,要让你的 .handle(serviceBean, "process")
和 .<String,String>transform(String::toLowerCase)
平行,你需要有这样的东西:
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
.configureContainer(simpleMessageListenerContainerSpec -> {
simpleMessageListenerContainerSpec.concurrentConsumers(3);
}))
.log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.handle(serviceBean, "process")))
.<String,String>transform(String::toLowerCase)
.log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
.handle(
Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("exchange")
.routingKey("queue2.routing"))
.get();
刚开始学习spring-集成 我想在队列中接收一条消息并并行执行 2 个步骤: 第 1 步 -> 使用 bean 处理它 第 2 步 -> 转换并发送到另一个队列。 像 :
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
.configureContainer(simpleMessageListenerContainerSpec -> {
simpleMessageListenerContainerSpec.concurrentConsumers(3);
}))
.log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
.handle(serviceBean, "process")
.<String,String>transform(String::toLowerCase)
.log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
.handle(
Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("exchange")
.routingKey("queue2.routing"))
.get();
我错过了什么?第一个句柄之后的动作没有被执行。我想我没有正确理解这部分。 另外,我怎样才能同时执行这两个步骤?
您应该从理论开始理解Spring集成中的许多概念和组件。
“并行 2 步”- 正是发布-订阅模式:https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html and Spring Integration provides an implementation for it: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-publishsubscribechannel。正如您根据文档看到的那样,要实现并行性,您需要使用 TaskExecutor
.
使用 Java DSL,我们为发布-订阅配置提供高级 API:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows
所以,要让你的 .handle(serviceBean, "process")
和 .<String,String>transform(String::toLowerCase)
平行,你需要有这样的东西:
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
.configureContainer(simpleMessageListenerContainerSpec -> {
simpleMessageListenerContainerSpec.concurrentConsumers(3);
}))
.log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.handle(serviceBean, "process")))
.<String,String>transform(String::toLowerCase)
.log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
.handle(
Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("exchange")
.routingKey("queue2.routing"))
.get();