PipedOutput/InputStream 使用 spring 集成上传到 sftp

PipedOutput/InputStream upload to sftp using spring integration

我想分两次将数据传输到 sftp 服务器。我正在使用 spring 引导集成。我已经像这样设置了 SftpRemoteFileTemplate

  @Autowired private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

  @Bean
  public SftpRemoteFileTemplate sftpRemoteFileTemplate() {
    final SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sftpSessionFactory);

    template.setRemoteDirectoryExpression(
            new LiteralExpression(contactSenderSftpProperties.getSftpSessionProperties().getBaseSftpPath()));

    template.setTemporaryFileSuffix(".tmp");

    return template;
  }

但是我的目标文件没有被附加,而是被我发送数据到的临时文件的最新内容覆盖。

我的作家长这样

  public void write(List<? extends Item> items) throws Exception {
    log.debug("Write {}", items);

    final int timeoutSeconds = 60;

    try (PipedInputStream pipedInputStream = new PipedInputStream()) {
      log.debug("Preparing to write...");
      final CountDownLatch countDownLatch = new CountDownLatch(1);

      writerClient.write(items, pipedInputStream, countDownLatch);

      if (!countDownLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {
        throw new TimeoutException("Operation stream not connected");
      }

      sftpRemoteFileTemplate.send(
              MessageBuilder.withPayload(pipedInputStream).setHeader(FileHeaders.FILENAME, "contacts.csv").build());

    }
  }

where 方法 WriterClient#write

  @Async("writerThreadPoolTaskExecutor")
  public void write(List<? extends Item> items, PipedInputStream pipedInputStream, CountDownLatch countDownLatch) throws IOException {
    try(final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream)){
      countDownLatch.countDown();
      csvSerializer.serialize(pipedOutputStream, items.stream());
    }
  }

和 writerThreadPoolTask​​Executor

  @Bean
  public ThreadPoolTaskExecutor writerThreadPoolTaskExecutor(TaskExecutorBuilder taskExecutorBuilder) {
    return taskExecutorBuilder
            .corePoolSize(properties.getWriterThreadPoolCorePoolSize())
            .maxPoolSize(properties.getWriterThreadPoolMaxPoolSize())
            .queueCapacity(properties.getWriterThreadPoolQueueCapacity())
            .threadNamePrefix("writer-task-thread")
            .build();
  }

简而言之,我想写很多小的临时文件并将它们合并到一个包含所有数据的文件中。我不太确定 PipedInput/OutputStream。当没有更多数据可写时,是否可以多次附加 PipedOutputStream 并仅将 PipedInputStream 上传一次到 sftp?但是问题出现了,我怎么知道是否写了所有日期?

查看 SFTP 出站通道适配器及其 APPEND 模式:https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#spel-and-the-sftp-outbound-adapter

RemoteFileTemplate 上使用 append() 方法。

    /**
     * Send a file to a remote server, based on information in a message, appending.
     *
     * @param message The message.
     * @return The remote path, or null if no local file was found.
     * @since 4.1
     */
    String append(Message<?> message);

实现看起来像这样

    @Override
    public String append(Message<?> message, String subDirectory) {
        return send(message, subDirectory, FileExistsMode.APPEND);
    }