如何将 WatchServiceDirectoryScanner 与 Spring Cloud Stream 文件供应商一起使用?

How to use WatchServiceDirectoryScanner with Spring Cloud Stream file supplier?

我正在尝试配置 SpCS 的文件供应商以监视目录并在创建或修改文件时发布消息。基于 ,我对 FileInboundChannelAdapterSpec bean 进行了以下自定义配置:

@Bean
public BeanPostProcessor inboundFileAdaptorCustomizer() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) {

            if (bean instanceof FileInboundChannelAdapterSpec) {
                FileInboundChannelAdapterSpec spec = (FileInboundChannelAdapterSpec) bean;
                spec.get().setDirectory(properties.getDirectory());
                spec.autoCreateDirectory(false);
                spec.useWatchService(true);
                spec.watchEvents(WatchEventType.CREATE, WatchEventType.MODIFY);
                spec.preventDuplicates(false);  // We use a custom duplicates filter, see below

                spec.nioLocker();

                spec.filter(fileFilters());
            }

            return bean;
        }
    };
}

当我 运行 使用此配置时,出现以下异常:

java.lang.IllegalStateException: The WatchService hasn't been started
    at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.9.jar:5.3.9]
    at org.springframework.integration.file.FileReadingMessageSource$WatchServiceDirectoryScanner.listEligibleFiles(FileReadingMessageSource.java:466) ~[spring-integration-file-5.5.3.jar:5.5.3]
    at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:90) ~[spring-integration-file-5.5.3.jar:5.5.3]
    at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:375) ~[spring-integration-file-5.5.3.jar:5.5.3]
    at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:349) ~[spring-integration-file-5.5.3.jar:5.5.3]

使用断点我注意到从未调用 WatchServiceDirectoryScanner 的 start() 方法。相反,doReceived() 并因此 listElibibleFiles() 在它开始之前被调用,这是异常的来源。

为什么 SI 甚至在启动之前就告诉扫描仪列出文件?

这是 FileSupplierConfiguration 中的错误 - 缺少对 start() 的调用。修复是 already merged,但与此同时,解决方法是这样的:

@Bean
public SmartLifecycle fileReadingStart(FileReadingMessageSource fileMessageSource) {
    return new SmartLifecycle() {

        @Override
        public int getPhase() {
            // Make sure this runs as early as possible
            return Integer.MIN_VALUE;
        }

        @Override 
        public void start() {
            fileMessageSource.start();
        }

        @Override 
        public void stop() {
            fileMessageSource.stop();
        }

        @Override 
        public boolean isRunning() {
            return false;
        }
        
    };
}