如何为 Spring 集成 SFTP 入站适配器动态定义文件过滤器模式?

How to dynamically define file filter pattern for Spring Integration SFTP Inbound Adapter?

我需要将特定文件从不同 sftp 服务器的不同目录动态拉取到 spring 启动应用程序的本地服务器目录。

我将路径和文件模式存储在 postgres 数据库中。我让一切正常,但我不知道如何根据 spring 集成 sftp 入站适配器的远程目录动态定义文件过滤器模式,以便不会提取该特定目录中的所有 xml 文件。

我使用 RotatingServerAdvice 和 DelegatingSessionFactory 来访问动态目录和服务器。

对于动态文件模式过滤器,我尝试使用

.filterFunction(f -> do_some_filtering_based_on_filename(f)

我想读取该文件的远程目录,但 f 来自类型 ChannelSftp.LsEntry,并且没有包含远程目录的字段。否则我会从数据库加载配置数据,搜索路径并应用文件模式。

我的场景有更好的方法吗?

SFTP 示例:

127.0.0.1:22

目录:root/partner1

... test_p1_2343545.xml

... test_p1_453453.xml

... don_t_pull_this_file_453453.xml

127.0.0.2:22

目录:root/partner2

... companyname_2343545.xml

... companyname_453453.xml

... don_t_pull_this_file_3434.xml

数据库配置示例:

| URL       | PATH      | FILE_PATTERN      |
|-----------|-----------|-------------------|
| 127.0.0.1 | /partner1 | test_p1_*.xml     |
| 127.0.0.2 | /partner2 | companyname_*.xml |
我的 spring 启动应用程序的

适配器 Class 具有工作代码,但由于 .patternFilter("*.xml 提取所有 xml 文件"):

import com.jcraft.jsch.ChannelSftp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.file.remote.aop.RotatingServerAdvice;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.sftp.dsl.Sftp;
import org.springframework.integration.sftp.dsl.SftpInboundChannelAdapterSpec;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

import java.io.File;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
 * flow.
 */
@Configuration
@Component
public class SFTIntegration {

  public static final String TIMEZONE_UTC = "UTC";
  public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
  public static final String TEMPORARY_FILE_SUFFIX = ".part";
  public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
  public static final int MAX_MESSAGES_PER_POLL = 100;

  private static final Logger LOG = LoggerFactory.getLogger(SFTIntegration.class);
  private static final String CHANNEL_INTERMEDIATE_STAGE = "intermediateChannel";

  /** database access repository */
  private final PartnerConfigRepo partnerConfigRepo;

  @Value("${app.tmp-dir}")
  private String localTemporaryPath;

  public SFTIntegration(final PartnerConfigRepo partnerConfigRepo) {
    this.partnerConfigRepo = partnerConfigRepo;
  }

  /**
   * The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
   *
   * @return default poller.
   */
  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata poller() {
    return Pollers
        .fixedDelay(POLLER_FIXED_PERIOD_DELAY)
        .advice(advice())
        .maxMessagesPerPoll(MAX_MESSAGES_PER_POLL)
        .transactional()
        .get();
  }

  /**
   * The direct channel for the flow.
   *
   * @return MessageChannel
   */
  @Bean
  public MessageChannel stockIntermediateChannel() {
    return new DirectChannel();
  }

  /**
   * Get the files from a remote directory. Add a timestamp to the filename
   * and write them to a local temporary folder.
   *
   * @return IntegrationFlow
   */
  @Bean
  public IntegrationFlow stockInboundFlowFromSFTPServer() {
    // Source definition
    final SftpInboundChannelAdapterSpec sourceSpec = Sftp.inboundAdapter(delegatingSFtpSessionFactory())

        .preserveTimestamp(true)
        .patternFilter("*.xml")
        //.filterFunction(f -> do_some_filtering_based_on_filename(f, delegatingSFtpSessionFactory().getSession()))
        //.filter(new ModifiedFilter())
        //.filterExpression("#remoteDirectory")
        .deleteRemoteFiles(true)
        .maxFetchSize(MAX_MESSAGES_PER_POLL)
        .remoteDirectory("/")
        .localDirectory(new File(localTemporaryPath))
        .temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
        .localFilenameExpression(new FunctionExpression<String>(s -> {
          final int fileTypeSepPos = s.lastIndexOf('.');
          return
              DateTimeFormatter
                  .ofPattern(TIMESTAMP_FORMAT_OF_FILES)
                  .withZone(ZoneId.of(TIMEZONE_UTC))
                  .format(Instant.now())
                  + "_"
                  + s.substring(0, fileTypeSepPos)
                  + s.substring(fileTypeSepPos);
        }));

    // Poller definition
    final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
        .id("stockInboundPoller")
        .autoStartup(true)
        .poller(poller());

    return IntegrationFlows
        .from(sourceSpec, stockInboundPoller)
        .transform(File.class, p -> {
          // log step
          LOG.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
          return p;
        })
        .channel(CHANNEL_INTERMEDIATE_STAGE)
        .get();
  }

  @Bean
  public IntegrationFlow stockIntermediateStageChannel() {
    return IntegrationFlows
        .from(CHANNEL_INTERMEDIATE_STAGE)
        .transform(p -> {
          //log step
          LOG.info("flow=stockIntermediateStageChannel, message=rename file: " + p);
          return p;
        })
        //TODO
        .channel(new NullChannel())
        .get();
  }

  public DefaultSftpSessionFactory createNewSftpSessionFactory(final PartnerConfigEntity pc) {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(
        false);
    factory.setHost(pc.getServerIp());
    factory.setPort(pc.getPort());
    factory.setUser(pc.getUsername());
    factory.setPassword(pc.getPassword());
    factory.setAllowUnknownKeys(true);
    return factory;
  }

  @Bean
  public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSFtpSessionFactory() {
    final List<PartnerConfigEntity> partnerConnections = partnerConfigRepo
        .findByTypeAndActiveIsTrue(PartnerConfigType.STOCK);

    if (partnerConnections.isEmpty()) {
      return null;
    }

    final Map<Object, SessionFactory<ChannelSftp.LsEntry>> factories = new LinkedHashMap<>(10);

    for (PartnerConfigEntity pc : partnerConnections) {
      // create a factory for every key containing server type, url and port
      if (factories.get(pc.getKey()) == null) {
        factories.put(pc.getKey(), createNewSftpSessionFactory(pc));
      }
    }

    // use the first SF as the default
    return new DelegatingSessionFactory<>(factories, factories.values().iterator().next());
  }

  @Bean
  public RotatingServerAdvice advice() {
    final List<PartnerConfigEntity> partnerConnections = partnerConfigRepo.findByTypeAndActiveIsTrue("A_TYPE");

    final List<RotatingServerAdvice.KeyDirectory> keyDirectories = new ArrayList<>();
    for (PartnerConfigEntity pc : partnerConnections) {
      keyDirectories
          .add(new RotatingServerAdvice.KeyDirectory(pc.getKey(), pc.getServerPath()));
    }

    return new RotatingServerAdvice(delegatingSFtpSessionFactory(), keyDirectories, true);
  }

  //  private Boolean do_some_filtering_based_on_filename(final ChannelSftp.LsEntry f,
  //      final Session<ChannelSftp.LsEntry> session) {
  //    Object instance = session.getClientInstance();
  //    System.out.println(f);
  //    return true;
  //  }

  //
  //  private class ModifiedFilter<F> extends AbstractFileListFilter<F> {
  //    private final Logger log = LoggerFactory.getLogger(ModifiedFilter.class);
  //
  //    @Override public boolean accept(final F file) {
  //      log.info(file.toString());
  //      return false;
  //    }
  //  }
}

我建议您实现 SftpSimplePatternFileListFilter 逻辑的自定义变体,并有一个 setter 用于在运行时更改模式。

然后你去 RotatingServerAdvice 并在那里注入一个自定义 RotationPolicy,假设作为 StandardRotationPolicy 的副本,注入你的自定义 FileListFilter 然后在configureSource()之后:

((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
                    .setRemoteDirectory(this.current.getDirectory());

您执行数据库查询以通过 this.current.getDirectory() 获取模式并将其设置到您的 FileListFilter.

您的代码中提到的 Sftp.inboundAdapter() 必须使用相同的过滤器,而不是 .patternFilter("*.xml")

我认为我们需要将 StandardRotationPolicy 中的所有属性设为 protected 或使用 getter 以允许继承者访问。随时就此事提出 JIRA:https://jira.spring.io/browse/INT!

希望这能有所帮助。

这是我为 Artem Bilan 提供的解决方案的源代码。如果有人想看的话。

我复制了 spring 集成 类 并按照 Artem Bilan 的建议对其进行了改编。

@Artem Bilan 非常感谢!

The Abstract Filter:

/**
 * based on {@link org.springframework.integration.file.filters.AbstractSimplePatternFileListFilter}
 *
 * @param <F>
 */
public abstract class MyAbstractSimplePatternFileListFilter<F> extends AbstractDirectoryAwareFileListFilter<F> {

  private final AntPathMatcher matcher = new AntPathMatcher();

  private String path;

  public MyAbstractSimplePatternFileListFilter(final String path) {
    this.path = path;
  }

  /**
   * Accepts the given file if its name matches the pattern.
   */
  @Override
  public final boolean accept(final F file) {
    return alwaysAccept(file) || (file != null && this.matcher.match(this.path, this.getFilename(file)));
  }

  /**
   * Subclasses must implement this method to extract the file's name.
   *
   * @param file The file.
   * @return The file name.
   */
  protected abstract String getFilename(F file);

  /**
   * Get directory path at runtime.
   *
   * @return directory path
   */
  public String getPath() {
    return path;
  }

  /**
   * Set directory path at runtime.
   *
   * @param path the directory path
   */
  public void setPath(final String path) {
    this.path = path;
  }
}

The Filter implementation using the setter from abstract filter class before:

import com.jcraft.jsch.ChannelSftp.LsEntry;
import org.springframework.integration.file.filters.AbstractSimplePatternFileListFilter;

/**
 * Implementation of {@link AbstractSimplePatternFileListFilter} for SFTP with logic for changing the file pattern at runtime.
 */
public class MySftpPatternFileListFilter extends MyAbstractSimplePatternFileListFilter<LsEntry> {

  public MySftpPatternFileListFilter(final String pattern) {
    super(pattern);
  }

  @Override
  protected String getFilename(final LsEntry entry) {
    return (entry != null) ? entry.getFilename() : null;
  }

  @Override
  protected boolean isDirectory(final LsEntry file) {
    return file.getAttrs().isDir();
  }

  /**
   * Sets the file pattern for the file filter
   *
   * @param pattern a file pattern like "*.xml"
   */
  public void setPattern(final String pattern) {
    setPath(pattern);
  }

}

The Rotation Policy with the added filter and repository

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
import org.springframework.integration.file.remote.aop.RotatingServerAdvice;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * Adapted version of {@link RotatingServerAdvice.RotationPolicy} with own FileListFilter {@link MySftpPatternFileListFilter} and
 * database access for configuration.
 * <p>
 * Standard rotation policy; iterates over key/directory pairs; when the end
 * is reached, starts again at the beginning. If the fair option is true
 * the rotation occurs on every poll, regardless of result. Otherwise rotation
 * occurs when the current pair returns no message.
 */
public class MyStandardRotationPolicy implements RotatingServerAdvice.RotationPolicy {

  protected final Log logger = LogFactory.getLog(getClass());

  private final DelegatingSessionFactory<?> factory;

  private final List<RotatingServerAdvice.KeyDirectory> keyDirectories = new ArrayList<>();

  private final boolean fair;
  private final MySftpPatternFileListFilter fileListFilter;
  private final PartnerConfigRepo partnerConfigRepo;
  private volatile Iterator<RotatingServerAdvice.KeyDirectory> iterator;
  private volatile RotatingServerAdvice.KeyDirectory current;
  private volatile boolean initialized;

  public MyStandardRotationPolicy(final DelegatingSessionFactory<?> factory,
      final List<RotatingServerAdvice.KeyDirectory> keyDirectories,
      final boolean fair,
      final MySftpPatternFileListFilter fileListFilter,
      final PartnerConfigRepo partnerConfigRepo) {

    Assert.notNull(factory, "factory cannot be null");
    Assert.notNull(keyDirectories, "keyDirectories cannot be null");
    Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required");
    this.factory = factory;
    this.keyDirectories.addAll(keyDirectories);
    this.fair = fair;
    this.iterator = this.keyDirectories.iterator();
    this.fileListFilter = fileListFilter;
    this.partnerConfigRepo = partnerConfigRepo;
  }

  protected Iterator<RotatingServerAdvice.KeyDirectory> getIterator() {
    return this.iterator;
  }

  protected void setIterator(final Iterator<RotatingServerAdvice.KeyDirectory> iterator) {
    this.iterator = iterator;
  }

  protected boolean isInitialized() {
    return this.initialized;
  }

  protected void setInitialized(final boolean initialized) {
    this.initialized = initialized;
  }

  protected DelegatingSessionFactory<?> getFactory() {
    return this.factory;
  }

  protected List<RotatingServerAdvice.KeyDirectory> getKeyDirectories() {
    return this.keyDirectories;
  }

  protected boolean isFair() {
    return this.fair;
  }

  @Override
  public void beforeReceive(final MessageSource<?> source) {
    if (this.fair || !this.initialized) {
      configureSource(source);
      this.initialized = true;
    }
    if (this.logger.isTraceEnabled()) {
      this.logger.trace("Next poll is for " + this.current);
    }
    this.factory.setThreadKey(this.current.getKey());
  }

  @Override
  public void afterReceive(final boolean messageReceived, final MessageSource<?> source) {
    if (this.logger.isTraceEnabled()) {
      this.logger.trace("Poll produced "
          + (messageReceived ? "a" : "no")
          + " message");
    }
    this.factory.clearThreadKey();
    if (!this.fair && !messageReceived) {
      configureSource(source);
    }
  }

  protected void configureSource(final MessageSource<?> source) {
    Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource
            || source instanceof AbstractRemoteFileStreamingMessageSource,
        "source must be an AbstractInboundFileSynchronizingMessageSource or a "
            + "AbstractRemoteFileStreamingMessageSource");
    if (!this.iterator.hasNext()) {
      this.iterator = this.keyDirectories.iterator();
    }
    this.current = this.iterator.next();

    fileListFilter.setPattern(getPatternFromDataBase(this.current.getDirectory()));

    if (source instanceof AbstractRemoteFileStreamingMessageSource) {
      ((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(this.current.getDirectory());
    } else {
      ((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
          .setRemoteDirectory(this.current.getDirectory());

    }
  }

  private String getPatternFromDataBase(final String directory) {
    //String Pattern;
    final List<PartnerConfigEntity> allStock = partnerConfigRepo.findByTypeAndActiveIsTrue(PartnerConfigType.STOCK);
    for (final PartnerConfigEntity s : allStock) {
      if (s.getServerPath().equals(directory)) {
        return s.getFileNamePattern();
      }
    }
    //TODO throw exception
    return "*.xml";
  }

}

Flow class with new getFilter() method, changed SftpInboundChannelAdapterSpec with .filter(getFilter()) and changed advice() method.

/**
 * flow.
 */
@Configuration
@Component
public class SFTIntegration {

  public static final String TIMEZONE_UTC = "UTC";
  public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
  public static final String TEMPORARY_FILE_SUFFIX = ".part";
  public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
  public static final int MAX_MESSAGES_PER_POLL = 100;
  private static final Logger LOG = LoggerFactory.getLogger(SFTIntegration.class);
  private static final String CHANNEL_INTERMEDIATE_STAGE = "stockIntermediateChannel";

  /**
   * database access repository
   */
  private final PartnerConfigRepo partnerConfigRepo;

  @Value("${app.tmp-dir}")
  private String localTemporaryPath;

  public SFTIntegration(final PartnerConfigRepo partnerConfigRepo) {
    this.partnerConfigRepo = partnerConfigRepo;
  }

  @Bean
  MySftpPatternFileListFilter getFilter() {
    //initial pattern
    return new MySftpPatternFileListFilter("*.xml");
  }

  /**
   * The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
   *
   * @return default poller.
   */
  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata poller() {
    return Pollers
        .fixedDelay(POLLER_FIXED_PERIOD_DELAY)
        .advice(advice())
        .maxMessagesPerPoll(MAX_MESSAGES_PER_POLL)
        .transactional()
        .get();
  }

  /**
   * The direct channel for the flow.
   *
   * @return MessageChannel
   */
  @Bean
  public MessageChannel stockIntermediateChannel() {
    return new DirectChannel();
  }

  /**
   * Get the files from a remote directory. Add a timestamp to the filename (milliseconds since midnight January 1st 1970, UTC)
   * and write them to a local temporary folder. Get the files from the local temporary folder.
   *
   * @return IntegrationFlow
   */
  @Bean
  public IntegrationFlow stockInboundFlowFromSFTPServer() {
    // Source definition
    final SftpInboundChannelAdapterSpec sourceSpec = Sftp.inboundAdapter(delegatingSFtpSessionFactory())

        .preserveTimestamp(true)
        .filter(getFilter())
        .deleteRemoteFiles(true)
        .maxFetchSize(MAX_MESSAGES_PER_POLL)
        .remoteDirectory("/")
        .localDirectory(new File(localTemporaryPath))
        .temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
        .localFilenameExpression(new FunctionExpression<String>(s -> {
          final int fileTypeSepPos = s.lastIndexOf('.');
          // use Instant.now().toEpochMilli() for utc time in milliseconds
          return
              DateTimeFormatter
                  .ofPattern(TIMESTAMP_FORMAT_OF_FILES)
                  .withZone(ZoneId.of(TIMEZONE_UTC))
                  .format(Instant.now())
                  + "_"
                  + (new SecureRandom()).nextInt(99999)
                  + "_"
                  + s.substring(0, fileTypeSepPos)
                  + s.substring(fileTypeSepPos);
        }));

    // Poller definition
    final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
        .id("stockInboundPoller")
        .autoStartup(true)
        .poller(poller());

    return IntegrationFlows
        .from(sourceSpec, stockInboundPoller)
        .transform(File.class, p -> {
          // log step
          LOG.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
          return p;
        })
        .channel(CHANNEL_INTERMEDIATE_STAGE)
        .get();
  }

  @Bean
  public IntegrationFlow stockIntermediateStageChannel() {
    return IntegrationFlows
        .from(CHANNEL_INTERMEDIATE_STAGE)
        .transform(p -> {
          //log step
          LOG.info("flow=stockIntermediateStageChannel, message=rename file: " + p);
          return p;
        })
        //TODO
        .channel(new NullChannel())
        .get();
  }

  public DefaultSftpSessionFactory createNewSftpSessionFactory(final PartnerConfigEntity pc) {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(
        false); //TODO set true but use caching session https://docs.spring.io/spring-integration/reference/html/sftp.html
    factory.setHost(pc.getServerIp());
    factory.setPort(pc.getPort());
    factory.setUser(pc.getUsername());
    factory.setPassword(pc.getPassword());
    factory.setAllowUnknownKeys(true);
    return factory;
  }

  @Bean
  public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSFtpSessionFactory() {
    final List<PartnerConfigEntity> partnerConnections = partnerConfigRepo
        .findByTypeAndActiveIsTrue(PartnerConfigType.STOCK);

    if (partnerConnections.isEmpty()) {
      return null;
    }

    final Map<Object, SessionFactory<ChannelSftp.LsEntry>> factories = new LinkedHashMap<>(10);

    for (final PartnerConfigEntity pc : partnerConnections) {
      // create a factory for every key containing PartnerConfigEntity.getKey() attributes (server type, url and port)
      if (factories.get(pc.getKey()) == null) {
        factories.put(pc.getKey(), createNewSftpSessionFactory(pc));
      }
    }

    // use the first SF as the default
    return new DelegatingSessionFactory<>(factories, factories.values().iterator().next());
  }

  @Bean
  public RotatingServerAdvice advice() {
    final List<PartnerConfigEntity> partnerConnections = partnerConfigRepo
        .findByTypeAndActiveIsTrue(PartnerConfigType.STOCK);
    LOG.debug("Found " + partnerConnections.size() + " server entries for type stock.");

    final List<RotatingServerAdvice.KeyDirectory> keyDirectories = new ArrayList<>();
    for (final PartnerConfigEntity pc : partnerConnections) {
      keyDirectories
          .add(new RotatingServerAdvice.KeyDirectory(pc.getKey(), pc.getServerPath()));
    }
    final RotatingServerAdvice rot = new RotatingServerAdvice(
        new MyStandardRotationPolicy(delegatingSFtpSessionFactory(), keyDirectories, true,
            getFilter(), partnerConfigRepo));
    return rot;
  }

}