Spring 集成 DSL:lambda 到 return 句柄方法中的消息<T>,例如使用 DelegatingSessionFactory?
Spring Integration DSL: lambda to return a Message<T> in handle method, e.g. with DelegatingSessionFactory?
动机:我需要在路由到 Sftp 出站网关之前为 DelegatingSessionFactory 设置 threadKey,然后取消设置 threadKey。
根据租户的不同,我需要使用不同的 Sftp 用户帐户。用户帐户是我 application.yml 中的配置问题,我不想为每个新租户编写单独的路由。
public IntegrationFlow aDynamicSftpFlow() {
f -> f
.handle(tenantSessionDefine()) // how can I use a lambda instead?
.handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
.handle(...) // undefine sftp session
}
设置 threadKey 需要 Message<?>
,而不仅仅是负载和 headers。所以我使用了一个 bean,因为它需要一条消息:
public class TenantSessionDefine {
private DelegatingSessionFactory delegatingSessionFactory;
public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
this.delegatingSessionFactory = delegatingSessionFactory;
}
public Message<?> defineSession(Message<?> message) {
return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
.get("tenantId", String.class));
// used by SessionFactoryLocator
}
}
我想把它写成 lambda,如
.handle(message -> delegatingSessionFactory.setThreadKey(message,
message.getPayload().getTenant())
但这并不容易。可以与带有 Message<T>
的 handle()
一起使用的 lambda 结束流程,因为它是一个空函数(MessageHandler
函数接口)。另一个 lambda 是一个 GenericHandler,它不会结束流程,但它需要有效负载和 headers,而不是消息。
这只是一个例子,我时不时地希望我可以在 lambda 中使用 handle()
和消息而不结束流程。我该怎么做?
更新
DelegatingSessionFactory
不是一个特别合适的例子。由于设置和清除线程密钥应该在 sftp 调用之前和之后发生,因此建议比在调用之前和之后定义处理程序更合适。
知道了。 handle()
的 javadoc 说
Use handle(Class, GenericHandler)
if you need to access the entire message.
Class参数必须是Message.class
:
.handle(Message.class,
(message, headers) -> sftpSessionFactory
.setThreadKey(message, headers.get("tenantId")))
动机:我需要在路由到 Sftp 出站网关之前为 DelegatingSessionFactory 设置 threadKey,然后取消设置 threadKey。
根据租户的不同,我需要使用不同的 Sftp 用户帐户。用户帐户是我 application.yml 中的配置问题,我不想为每个新租户编写单独的路由。
public IntegrationFlow aDynamicSftpFlow() {
f -> f
.handle(tenantSessionDefine()) // how can I use a lambda instead?
.handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
.handle(...) // undefine sftp session
}
设置 threadKey 需要 Message<?>
,而不仅仅是负载和 headers。所以我使用了一个 bean,因为它需要一条消息:
public class TenantSessionDefine {
private DelegatingSessionFactory delegatingSessionFactory;
public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
this.delegatingSessionFactory = delegatingSessionFactory;
}
public Message<?> defineSession(Message<?> message) {
return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
.get("tenantId", String.class));
// used by SessionFactoryLocator
}
}
我想把它写成 lambda,如
.handle(message -> delegatingSessionFactory.setThreadKey(message,
message.getPayload().getTenant())
但这并不容易。可以与带有 Message<T>
的 handle()
一起使用的 lambda 结束流程,因为它是一个空函数(MessageHandler
函数接口)。另一个 lambda 是一个 GenericHandler,它不会结束流程,但它需要有效负载和 headers,而不是消息。
这只是一个例子,我时不时地希望我可以在 lambda 中使用 handle()
和消息而不结束流程。我该怎么做?
更新
DelegatingSessionFactory
不是一个特别合适的例子。由于设置和清除线程密钥应该在 sftp 调用之前和之后发生,因此建议比在调用之前和之后定义处理程序更合适。
知道了。 handle()
的 javadoc 说
Use
handle(Class, GenericHandler)
if you need to access the entire message.
Class参数必须是Message.class
:
.handle(Message.class,
(message, headers) -> sftpSessionFactory
.setThreadKey(message, headers.get("tenantId")))