Spring 集成流程:在流程中执行任务
Spring integration flow: perform task within flow
希望这是我要问的关于 spring 集成的最后一个问题。
面临以下问题:在相当长的 IntegrationFlow dsl sheet 末尾有一个代码:
return IntegrationFlows.
//...
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("jms_replyTo", responseQueue(), true)) // IntegrationMessageHeaderAccessor.CORRELATION_ID is not acceptable though message came from outgoingGateway of another application with this header been set
.handle(requestRepository::save)
.handle(
Jms.outboundAdapter(queueConnectionFactory()).destination(serverQueue())
)
.get();
问题是在像 requestRepository::save
这样的代码之后,处理程序链被破坏了。这个技巧只有在有一个网关作为处理程序参数传入时才有效。
如何克服这个限制?我认为在这里使用 wireTap 不会达成协议,因为它是异步的。在这里,实际上,我保存消息以存储它的 jms_replyTo
header 并在相应的回复从服务器返回后用保存的消息替换它(智能代理企业集成模式)。
有什么建议吗?
不知道你为什么说 "the last question"。您要放弃 Spring 集成吗? :-(
我猜你的问题是下一个 .handle()
因为你的 requestRepository::save
是单向的 MessageHandler
(void
return 来自 save()
方法)。或者你的 save()
returns null
.
IntegrationFlow
是一个执行链,下一个执行将在前一个执行后以其非空结果调用。
所以,请分享您的 requestRepository::save
!
更新
Neither did help declaring MessageHandler bean as a (m) -> requestRepository.save(m) and passing it into handle(..) method as a param.
是啊...我想看看你的签名requestRepository::save
。
所以,看。使用 .handle()
的方法参考,您应该确定您的场景。如果您 one-way
处理流量停止,则足以遵循 org.springframework.messaging.MessageHandler
合同。你的方法签名应该是这样的:
public void myHandle(Message<?> message)
如果您想继续流程,您应该 return 服务方法中的任何内容。该结果成为下一个 .handle()
的 payload
。
在这种情况下,您的方法应遵循 org.springframework.integration.dsl.support.GenericHandler
合同。您的方法签名可能如下所示:
public Bar myHandle(Foo foo, Map<String, Object> headers)
这就是 .handle()
方法参考的工作原理。
您应该了解这种 method chain
风格的工作原理。前一个方法的输出是下一个方法的输入。在我们的例子中,我们保护流免受死代码的影响,比如 MessageHandler
和 void
return,但是有下一个流成员。这就是您看到 This is the end of the integration flow.
错误的原因。
终于想出了解决办法:
@Bean
public MessageHandler requestPersistingHandler() {
return new ServiceActivatingHandler(message -> {
requestRepository.save(message);
return message.getPayload();
});
}
//...
@Bean
public IntegrationFlow requestFlow() {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(queueConnectionFactory()).destination(bookingQueue())
)
.wireTap(controlBusMessageChannel())
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(JMS_REPLY_HEADER, responseQueue(), true))
.handle(requestPersistingHandler())
.handle(
Jms.outboundAdapter(queueConnectionFactory()).destination(serverQueue())
)
.get();
}
我只是不确定是否有更直接的方式。
剩下的唯一问题是如何在 enrichHeaders
方法内更改 "from-the-server" IntegrationFlow 中的 header:不知道如何使用规范访问现有 header。
希望这是我要问的关于 spring 集成的最后一个问题。
面临以下问题:在相当长的 IntegrationFlow dsl sheet 末尾有一个代码:
return IntegrationFlows.
//...
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("jms_replyTo", responseQueue(), true)) // IntegrationMessageHeaderAccessor.CORRELATION_ID is not acceptable though message came from outgoingGateway of another application with this header been set
.handle(requestRepository::save)
.handle(
Jms.outboundAdapter(queueConnectionFactory()).destination(serverQueue())
)
.get();
问题是在像 requestRepository::save
这样的代码之后,处理程序链被破坏了。这个技巧只有在有一个网关作为处理程序参数传入时才有效。
如何克服这个限制?我认为在这里使用 wireTap 不会达成协议,因为它是异步的。在这里,实际上,我保存消息以存储它的 jms_replyTo
header 并在相应的回复从服务器返回后用保存的消息替换它(智能代理企业集成模式)。
有什么建议吗?
不知道你为什么说 "the last question"。您要放弃 Spring 集成吗? :-(
我猜你的问题是下一个 .handle()
因为你的 requestRepository::save
是单向的 MessageHandler
(void
return 来自 save()
方法)。或者你的 save()
returns null
.
IntegrationFlow
是一个执行链,下一个执行将在前一个执行后以其非空结果调用。
所以,请分享您的 requestRepository::save
!
更新
Neither did help declaring MessageHandler bean as a (m) -> requestRepository.save(m) and passing it into handle(..) method as a param.
是啊...我想看看你的签名requestRepository::save
。
所以,看。使用 .handle()
的方法参考,您应该确定您的场景。如果您 one-way
处理流量停止,则足以遵循 org.springframework.messaging.MessageHandler
合同。你的方法签名应该是这样的:
public void myHandle(Message<?> message)
如果您想继续流程,您应该 return 服务方法中的任何内容。该结果成为下一个 .handle()
的 payload
。
在这种情况下,您的方法应遵循 org.springframework.integration.dsl.support.GenericHandler
合同。您的方法签名可能如下所示:
public Bar myHandle(Foo foo, Map<String, Object> headers)
这就是 .handle()
方法参考的工作原理。
您应该了解这种 method chain
风格的工作原理。前一个方法的输出是下一个方法的输入。在我们的例子中,我们保护流免受死代码的影响,比如 MessageHandler
和 void
return,但是有下一个流成员。这就是您看到 This is the end of the integration flow.
错误的原因。
终于想出了解决办法:
@Bean
public MessageHandler requestPersistingHandler() {
return new ServiceActivatingHandler(message -> {
requestRepository.save(message);
return message.getPayload();
});
}
//...
@Bean
public IntegrationFlow requestFlow() {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(queueConnectionFactory()).destination(bookingQueue())
)
.wireTap(controlBusMessageChannel())
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(JMS_REPLY_HEADER, responseQueue(), true))
.handle(requestPersistingHandler())
.handle(
Jms.outboundAdapter(queueConnectionFactory()).destination(serverQueue())
)
.get();
}
我只是不确定是否有更直接的方式。
剩下的唯一问题是如何在 enrichHeaders
方法内更改 "from-the-server" IntegrationFlow 中的 header:不知道如何使用规范访问现有 header。