为什么我的处理程序方法在定义为 lambda 时没有被触发?

Why is my handler method not triggered when defined as a lambda?

我正在定义一个 IntegrationFlow 以这种方式使用 DSL 语法从 SFTP 流式传输到 S3 :

return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
                        .remoteDirectory("remoteDirectory"),
                e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
                .transform(new StreamTransformer())
                .handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
                .get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
        S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
        s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
        s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
        return s3MessageHandler;
    }

它按预期工作:文件已很好地上传到我的 S3 存储桶。但是,我想避免 SPEL 语法,并将消息中的 headers 注入 s3uploadMessageHandler 方法,这样我就可以使用简单的 ValueExpression 来设置 keyExpressions3UploadMessageHandler 方法中。 为此,我更改了

handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3

handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3

但是现在这个处理程序似乎不再被触发了。日志中没有错误,我从日志中知道 SFTP 轮询仍在工作。

我试图找到这背后的原因,我看到在 IntegrationFlowdefinition.java 中输入 handle 方法时, messageHandler class 类型不同:它是一个 S3MessageHandler 在没有 lambda 的情况下调用,在使用 lambda 表达式调用时为 MyCallingClass$lambda

我错过了什么让我的场景正常工作?

有两种处理消息的方法。一种是通过 MessageHandler 实现——这是最有效的方法,它是在通道适配器实现框架中完成的,例如 S3MessageHandler。另一种方法是 POJO 方法调用 - 当您不需要担心任何框架接口时,这是最用户友好的方法。

所以,当你像这样使用它时 .handle(s3UploadMessageHandler(...)) 你引用了一个 MessageHandler 并且框架知道必须为那个 MessageHandler 注册一个 bean 因为你的 s3UploadMessageHandler() 不是 @Bean.

当您将它用作 lambda 时,框架将其视为 POJO 方法调用,并且有一个为 MethodInvokingMessageHandler 而不是您的 S3MessageHandler 注册的 bean。

无论如何,即使您将 s3UploadMessageHandler() 更改为 @Bean 方法,它也不会起作用,因为您不允许框架调用 S3MessageHandler.handleMessage()。您在这里所做的只是在运行时调用 private 方法来针对每个请求消息创建一个 S3MessageHandler 实例:MethodInvokingMessageHandler 在其 handleMessage() 中调用您的 lambda,仅此而已 - S3 不会发生任何事情。

ValueExpression 在这里无法帮助您,因为您需要根据每个请求消息评估目标文件。因此,您需要一个运行时表达式。 new SpelExpressionParser().parseExpression()确实没有错。只是因为我们别无选择,必须只有一个无状态 S3MessageHandler,并且不要在运行时在每个请求上重新创建它,就像您尝试使用那个可疑的 lambda 和 ValueExpression.