为什么我的处理程序方法在定义为 lambda 时没有被触发?
Why is my handler method not triggered when defined as a lambda?
我正在定义一个 IntegrationFlow
以这种方式使用 DSL 语法从 SFTP 流式传输到 S3 :
return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
.remoteDirectory("remoteDirectory"),
e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.transform(new StreamTransformer())
.handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
.get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
return s3MessageHandler;
}
它按预期工作:文件已很好地上传到我的 S3 存储桶。但是,我想避免 SPEL
语法,并将消息中的 headers 注入 s3uploadMessageHandler
方法,这样我就可以使用简单的 ValueExpression
来设置 keyExpression
在 s3UploadMessageHandler
方法中。
为此,我更改了
handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
到
handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3
但是现在这个处理程序似乎不再被触发了。日志中没有错误,我从日志中知道 SFTP 轮询仍在工作。
我试图找到这背后的原因,我看到在 IntegrationFlowdefinition.java
中输入 handle 方法时, messageHandler
class 类型不同:它是一个 S3MessageHandler
在没有 lambda 的情况下调用,在使用 lambda 表达式调用时为 MyCallingClass$lambda
。
我错过了什么让我的场景正常工作?
有两种处理消息的方法。一种是通过 MessageHandler
实现——这是最有效的方法,它是在通道适配器实现框架中完成的,例如 S3MessageHandler
。另一种方法是 POJO 方法调用 - 当您不需要担心任何框架接口时,这是最用户友好的方法。
所以,当你像这样使用它时 .handle(s3UploadMessageHandler(...))
你引用了一个 MessageHandler
并且框架知道必须为那个 MessageHandler
注册一个 bean 因为你的 s3UploadMessageHandler()
不是 @Bean
.
当您将它用作 lambda 时,框架将其视为 POJO 方法调用,并且有一个为 MethodInvokingMessageHandler
而不是您的 S3MessageHandler
注册的 bean。
无论如何,即使您将 s3UploadMessageHandler()
更改为 @Bean
方法,它也不会起作用,因为您不允许框架调用 S3MessageHandler.handleMessage()
。您在这里所做的只是在运行时调用 private
方法来针对每个请求消息创建一个 S3MessageHandler
实例:MethodInvokingMessageHandler
在其 handleMessage()
中调用您的 lambda,仅此而已 - S3 不会发生任何事情。
ValueExpression
在这里无法帮助您,因为您需要根据每个请求消息评估目标文件。因此,您需要一个运行时表达式。 new SpelExpressionParser().parseExpression()
确实没有错。只是因为我们别无选择,必须只有一个无状态 S3MessageHandler
,并且不要在运行时在每个请求上重新创建它,就像您尝试使用那个可疑的 lambda 和 ValueExpression
.
我正在定义一个 IntegrationFlow
以这种方式使用 DSL 语法从 SFTP 流式传输到 S3 :
return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
.remoteDirectory("remoteDirectory"),
e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.transform(new StreamTransformer())
.handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
.get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
return s3MessageHandler;
}
它按预期工作:文件已很好地上传到我的 S3 存储桶。但是,我想避免 SPEL
语法,并将消息中的 headers 注入 s3uploadMessageHandler
方法,这样我就可以使用简单的 ValueExpression
来设置 keyExpression
在 s3UploadMessageHandler
方法中。
为此,我更改了
handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
到
handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3
但是现在这个处理程序似乎不再被触发了。日志中没有错误,我从日志中知道 SFTP 轮询仍在工作。
我试图找到这背后的原因,我看到在 IntegrationFlowdefinition.java
中输入 handle 方法时, messageHandler
class 类型不同:它是一个 S3MessageHandler
在没有 lambda 的情况下调用,在使用 lambda 表达式调用时为 MyCallingClass$lambda
。
我错过了什么让我的场景正常工作?
有两种处理消息的方法。一种是通过 MessageHandler
实现——这是最有效的方法,它是在通道适配器实现框架中完成的,例如 S3MessageHandler
。另一种方法是 POJO 方法调用 - 当您不需要担心任何框架接口时,这是最用户友好的方法。
所以,当你像这样使用它时 .handle(s3UploadMessageHandler(...))
你引用了一个 MessageHandler
并且框架知道必须为那个 MessageHandler
注册一个 bean 因为你的 s3UploadMessageHandler()
不是 @Bean
.
当您将它用作 lambda 时,框架将其视为 POJO 方法调用,并且有一个为 MethodInvokingMessageHandler
而不是您的 S3MessageHandler
注册的 bean。
无论如何,即使您将 s3UploadMessageHandler()
更改为 @Bean
方法,它也不会起作用,因为您不允许框架调用 S3MessageHandler.handleMessage()
。您在这里所做的只是在运行时调用 private
方法来针对每个请求消息创建一个 S3MessageHandler
实例:MethodInvokingMessageHandler
在其 handleMessage()
中调用您的 lambda,仅此而已 - S3 不会发生任何事情。
ValueExpression
在这里无法帮助您,因为您需要根据每个请求消息评估目标文件。因此,您需要一个运行时表达式。 new SpelExpressionParser().parseExpression()
确实没有错。只是因为我们别无选择,必须只有一个无状态 S3MessageHandler
,并且不要在运行时在每个请求上重新创建它,就像您尝试使用那个可疑的 lambda 和 ValueExpression
.