如何正确处理 amqp 消息然后将其发送到 Spring Integration DSL 中的另一个队列
How to properly handle an amqp msg then send it to another queue in Spring Integratin DSL
我想处理一条 AMQP 消息,然后将其发送到另一个队列进行进一步处理。
我正在使用 Spring 集成 DSL 将其存档如下,
@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
OCRService ocrService) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
.concurrentConsumers(2))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing ocr here
amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
})
.get();
}
@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
.concurrentConsumers(4))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing some work for type processing
}).get();
}
但是我发现在类型处理阶段处理新消息时,NOTE_INCOMING_QUEUE队列中的消息仍然未被确认。下面是rabbitmq管理的截图。
我想知道为什么即使处理程序已经成功执行,NOTE_INCOMING_QUEUE 中的消息仍然未被确认。是 spring 集成 amqp 的设计还是我的代码有问题?
使用 Amqp.inboundAdapter()
而不是网关 - 网关正在等待永远不会到达的回复。
网关用于 request/reply 个场景;通道适配器适用于单向场景。
我想处理一条 AMQP 消息,然后将其发送到另一个队列进行进一步处理。
我正在使用 Spring 集成 DSL 将其存档如下,
@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
OCRService ocrService) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
.concurrentConsumers(2))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing ocr here
amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
})
.get();
}
@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
.concurrentConsumers(4))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing some work for type processing
}).get();
}
但是我发现在类型处理阶段处理新消息时,NOTE_INCOMING_QUEUE队列中的消息仍然未被确认。下面是rabbitmq管理的截图。
我想知道为什么即使处理程序已经成功执行,NOTE_INCOMING_QUEUE 中的消息仍然未被确认。是 spring 集成 amqp 的设计还是我的代码有问题?
使用 Amqp.inboundAdapter()
而不是网关 - 网关正在等待永远不会到达的回复。
网关用于 request/reply 个场景;通道适配器适用于单向场景。