如何将 WatchServiceDirectoryScanner 与 Spring Cloud Stream 文件供应商一起使用?
How to use WatchServiceDirectoryScanner with Spring Cloud Stream file supplier?
我正在尝试配置 SpCS 的文件供应商以监视目录并在创建或修改文件时发布消息。基于 ,我对 FileInboundChannelAdapterSpec
bean 进行了以下自定义配置:
@Bean
public BeanPostProcessor inboundFileAdaptorCustomizer() {
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
if (bean instanceof FileInboundChannelAdapterSpec) {
FileInboundChannelAdapterSpec spec = (FileInboundChannelAdapterSpec) bean;
spec.get().setDirectory(properties.getDirectory());
spec.autoCreateDirectory(false);
spec.useWatchService(true);
spec.watchEvents(WatchEventType.CREATE, WatchEventType.MODIFY);
spec.preventDuplicates(false); // We use a custom duplicates filter, see below
spec.nioLocker();
spec.filter(fileFilters());
}
return bean;
}
};
}
当我 运行 使用此配置时,出现以下异常:
java.lang.IllegalStateException: The WatchService hasn't been started
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.9.jar:5.3.9]
at org.springframework.integration.file.FileReadingMessageSource$WatchServiceDirectoryScanner.listEligibleFiles(FileReadingMessageSource.java:466) ~[spring-integration-file-5.5.3.jar:5.5.3]
at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:90) ~[spring-integration-file-5.5.3.jar:5.5.3]
at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:375) ~[spring-integration-file-5.5.3.jar:5.5.3]
at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:349) ~[spring-integration-file-5.5.3.jar:5.5.3]
使用断点我注意到从未调用 WatchServiceDirectoryScanner
的 start() 方法。相反,doReceived()
并因此 listElibibleFiles()
在它开始之前被调用,这是异常的来源。
为什么 SI 甚至在启动之前就告诉扫描仪列出文件?
这是 FileSupplierConfiguration
中的错误 - 缺少对 start()
的调用。修复是 already merged,但与此同时,解决方法是这样的:
@Bean
public SmartLifecycle fileReadingStart(FileReadingMessageSource fileMessageSource) {
return new SmartLifecycle() {
@Override
public int getPhase() {
// Make sure this runs as early as possible
return Integer.MIN_VALUE;
}
@Override
public void start() {
fileMessageSource.start();
}
@Override
public void stop() {
fileMessageSource.stop();
}
@Override
public boolean isRunning() {
return false;
}
};
}
我正在尝试配置 SpCS 的文件供应商以监视目录并在创建或修改文件时发布消息。基于 FileInboundChannelAdapterSpec
bean 进行了以下自定义配置:
@Bean
public BeanPostProcessor inboundFileAdaptorCustomizer() {
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
if (bean instanceof FileInboundChannelAdapterSpec) {
FileInboundChannelAdapterSpec spec = (FileInboundChannelAdapterSpec) bean;
spec.get().setDirectory(properties.getDirectory());
spec.autoCreateDirectory(false);
spec.useWatchService(true);
spec.watchEvents(WatchEventType.CREATE, WatchEventType.MODIFY);
spec.preventDuplicates(false); // We use a custom duplicates filter, see below
spec.nioLocker();
spec.filter(fileFilters());
}
return bean;
}
};
}
当我 运行 使用此配置时,出现以下异常:
java.lang.IllegalStateException: The WatchService hasn't been started
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.9.jar:5.3.9]
at org.springframework.integration.file.FileReadingMessageSource$WatchServiceDirectoryScanner.listEligibleFiles(FileReadingMessageSource.java:466) ~[spring-integration-file-5.5.3.jar:5.5.3]
at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:90) ~[spring-integration-file-5.5.3.jar:5.5.3]
at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:375) ~[spring-integration-file-5.5.3.jar:5.5.3]
at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:349) ~[spring-integration-file-5.5.3.jar:5.5.3]
使用断点我注意到从未调用 WatchServiceDirectoryScanner
的 start() 方法。相反,doReceived()
并因此 listElibibleFiles()
在它开始之前被调用,这是异常的来源。
为什么 SI 甚至在启动之前就告诉扫描仪列出文件?
这是 FileSupplierConfiguration
中的错误 - 缺少对 start()
的调用。修复是 already merged,但与此同时,解决方法是这样的:
@Bean
public SmartLifecycle fileReadingStart(FileReadingMessageSource fileMessageSource) {
return new SmartLifecycle() {
@Override
public int getPhase() {
// Make sure this runs as early as possible
return Integer.MIN_VALUE;
}
@Override
public void start() {
fileMessageSource.start();
}
@Override
public void stop() {
fileMessageSource.stop();
}
@Override
public boolean isRunning() {
return false;
}
};
}