Spring 在入站适配器消息处理程序中使用出站网关的 Sftp 获取文件

Spring Sftp fetch file using outbound gateway within inbound adapter message handler

我正在使用使用 Java DSL 的入站适配器从 SFTP 服务器轮询 pdf 文件。我有一个用例,在获取 pdf 文件后,应用程序将获取 SFTP 服务器上具有相同名称的 CSV 格式的配置文件。获取配置文件后,应用程序将使用配置文件中定义的属性处理原始 pdf 文件,并使用出站适配器将其上传回 SFTP 服务器。

我在使用出站网关在同一线程的处理程序中获取配置文件时遇到问题。

这是我的代码:

注册集成流程:

  for (String client : clientsArr) {
      this.flowContext.registration(getInboundIntegrationFlow(client)).register();
  }

  this.flowContext.registration(getOutboundIntegrationFlow()).register();
  this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();

入站适配器集成流程:


  @Autowired
  private SftpPdfMessageHandler messageHandler;

  private IntegrationFlow getInboundIntegrationFlow(String client) {

    String remoteDirectory = getRemoteDirectory(client);
    String localDirectory = getLocalDirectory(client);
    String inboundAdapterId = getInboundAdapterId(client);

    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(remoteDirectory)
                .autoCreateLocalDirectory(true)
                .localDirectory(new File(localDirectory))
                .maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
                .filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
                .deleteRemoteFiles(true),
            e -> e.id(inboundAdapterId)
                .autoStartup(true)
                .poller(Pollers
                    .fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
                    .receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
                    .maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
                ))
        .handle(inBoundHandler())
        .get();
  }

  public MessageHandler inBoundHandler() {
    return message -> {
      File file = (File) message.getPayload();
      messageHandler.handleMessage(file);
    };
  }

出站适配器集成流程:

  private IntegrationFlow getOutboundIntegrationFlow() {

    return IntegrationFlows.from("sftpOutboundChannel")
        .handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
            .remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
  }

  @Bean("sftpOutboundChannel")
  public MessageChannel sftpOutboundChannel() {
    return new DirectChannel();
  }

SFTP 消息处理程序:

  @Async("sftpHandlerAsyncExecutor")
  public void handleMessage(File originalFile) {

    File configFile = fetchConfigFile();

    /*
      process original file and store processed file in output file path on local directory
     */
      
    boolean success = uploadFileToSftpServer(outputFilePath, client, entity);

    if (success) {
      deleteFileFromLocal(originalFile);
    }
  }

出站网关 GET 集成流程:

  private IntegrationFlow sftpGatewayGetIntegrationFlow() {
    return IntegrationFlows.from("sftpGetInputChannel")
        .handle(Sftp.outboundGateway(sftpSessionFactory(),
            AbstractRemoteFileOutboundGateway.Command.GET, "payload")
            .options(AbstractRemoteFileOutboundGateway.Option.DELETE,
                AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
            .localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
            .autoCreateLocalDirectory(true))
        .channel("nullChannel")
        .get();
  }

  @Bean("sftpGetInputChannel")
  public MessageChannel sftpGetInputChannel() {
    return new DirectChannel();
  }

messageHandler.handleMessage() 方法在异步(使用 ThreadPoolTask​​Executor)中调用,它使用出站网关在内部获取配置文件。但是我找不到可以在同一个线程中发送和接收消息负载的单一通道。我在 spring 文档中找到了 MessagingTemplate,但找不到将其与我的出站网关集成流程连接起来的方法。

sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers)) 使用 DirectChannel() 给出“Dispatcher 没有频道订阅者”异常。

我正在寻找一种解决方案,我可以通过以下任何一种方式从服务器获取所需的文件:

Application needs to decide the directory path on runtime while using the GET command.

您可能需要了解什么是 @MessagingGateway 以及如何使其与出站网关上的通道交互。

有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway

如果你真的想得到一个配置文件,你不应该这样做 .channel("nullChannel")。有了网关,将有 replyChannel header 和网关填充的 TemporaryReplyChannel 实例。然后在您的代码中,您只需将该功能接口用作 API 即可调用。

事实上,消息传递网关使用提到的 MessagingTemplate.sendAndReceive()