TaskExecutor 不工作 Spring 集成
TaskExecutor is not working Spring Integration
我已经用任务执行器设置了文件轮询器
ExecutorService executorService = Executors.newFixedThreadPool(10);
LOG.info("Setting up the poller for directory {} ", finalDirectory);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
.taskExecutor(executorService)
.maxMessagesPerPoll(10)
.advice(new LoggerSourceAdvisor(finalDirectory))
))
//move file to processing first processing
.transform(new FileMoveTransformer("C:/processing", true))
.channel("fileRouter")
.get();
如上所示,我已将 threadpool
固定为 10,每个轮询的最大消息数为 10。如果我放 10 个文件,它仍然会一个一个地处理。这里可能有什么问题?
* 更新 *
虽然我现在遇到了其他问题,但在 Gary 的回答后它工作得很好。
我已经像这样设置了我的轮询器
setDirectory(new File(path));
DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();
scanner.setFilter(new AcceptAllFileListFilter<>());
setScanner(scanner);
之所以使用AcceptAll
,是因为同一个文件可能会再次出现,这就是我先移动文件的原因。但是当我启用线程执行器时,多个线程正在处理同一个文件,我假设是因为 AcceptAllFile
如果我更改为 AcceptOnceFileListFilter
它可以工作,但是再次出现的同一个文件将不会被再次拾取!可以做些什么来避免这个问题?
Issue/Bug
在ClassAbstractPersistentAcceptOnceFileListFilter
我们有这个代码
@Override
public boolean accept(F file) {
String key = buildKey(file);
synchronized (this.monitor) {
String newValue = value(file);
String oldValue = this.store.putIfAbsent(key, newValue);
if (oldValue == null) { // not in store
flushIfNeeded();
return true;
}
// same value in store
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
flushIfNeeded();
return true;
}
return false;
}
}
现在,例如,如果我设置了每次轮询 5 的最大值并且有两个文件,那么它可能的同一个文件将被两个线程拾取。
假设我的代码在我读取文件后移动了文件。
但是另一个线程获取了 accept
方法
如果文件不存在,那么它将 return lastModified 时间设为 0,并且 return 为真。
导致问题的原因是文件不存在。
如果它是 0 那么它应该 return false 因为文件已经不存在了。
当您将任务执行器添加到轮询器时;调度程序线程所做的只是将轮询任务交给线程池中的一个线程; maxMessagesPerPoll
是轮询任务的一部分。轮询器本身每 5 秒仅运行一次。为了得到你想要的,你应该在流程中添加一个执行者通道...
@SpringBootApplication
public class So53521593Application {
private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);
public static void main(String[] args) {
SpringApplication.run(So53521593Application.class, args);
}
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(() -> "foo", e -> e
.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.<String>handle((p, h) -> {
try {
logger.info(p);
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
}
编辑
对我来说很好用...
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.handle((p, h) -> {
try {
logger.info(p.toString());
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
和
2018-11-28 11:46:05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt
和 touch test1.txt
2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt
EDIT1
同意 - 用这个转载...
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.<File>handle((p, h) -> {
try {
p.delete();
logger.info(p.toString());
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
和
2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt
我已经用任务执行器设置了文件轮询器
ExecutorService executorService = Executors.newFixedThreadPool(10);
LOG.info("Setting up the poller for directory {} ", finalDirectory);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
.taskExecutor(executorService)
.maxMessagesPerPoll(10)
.advice(new LoggerSourceAdvisor(finalDirectory))
))
//move file to processing first processing
.transform(new FileMoveTransformer("C:/processing", true))
.channel("fileRouter")
.get();
如上所示,我已将 threadpool
固定为 10,每个轮询的最大消息数为 10。如果我放 10 个文件,它仍然会一个一个地处理。这里可能有什么问题?
* 更新 *
虽然我现在遇到了其他问题,但在 Gary 的回答后它工作得很好。
我已经像这样设置了我的轮询器
setDirectory(new File(path));
DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();
scanner.setFilter(new AcceptAllFileListFilter<>());
setScanner(scanner);
之所以使用AcceptAll
,是因为同一个文件可能会再次出现,这就是我先移动文件的原因。但是当我启用线程执行器时,多个线程正在处理同一个文件,我假设是因为 AcceptAllFile
如果我更改为 AcceptOnceFileListFilter
它可以工作,但是再次出现的同一个文件将不会被再次拾取!可以做些什么来避免这个问题?
Issue/Bug
在ClassAbstractPersistentAcceptOnceFileListFilter
我们有这个代码
@Override
public boolean accept(F file) {
String key = buildKey(file);
synchronized (this.monitor) {
String newValue = value(file);
String oldValue = this.store.putIfAbsent(key, newValue);
if (oldValue == null) { // not in store
flushIfNeeded();
return true;
}
// same value in store
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
flushIfNeeded();
return true;
}
return false;
}
}
现在,例如,如果我设置了每次轮询 5 的最大值并且有两个文件,那么它可能的同一个文件将被两个线程拾取。
假设我的代码在我读取文件后移动了文件。
但是另一个线程获取了 accept
方法
如果文件不存在,那么它将 return lastModified 时间设为 0,并且 return 为真。
导致问题的原因是文件不存在。
如果它是 0 那么它应该 return false 因为文件已经不存在了。
当您将任务执行器添加到轮询器时;调度程序线程所做的只是将轮询任务交给线程池中的一个线程; maxMessagesPerPoll
是轮询任务的一部分。轮询器本身每 5 秒仅运行一次。为了得到你想要的,你应该在流程中添加一个执行者通道...
@SpringBootApplication
public class So53521593Application {
private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);
public static void main(String[] args) {
SpringApplication.run(So53521593Application.class, args);
}
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(() -> "foo", e -> e
.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.<String>handle((p, h) -> {
try {
logger.info(p);
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
}
编辑
对我来说很好用...
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.handle((p, h) -> {
try {
logger.info(p.toString());
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
和
2018-11-28 11:46:05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt
和 touch test1.txt
2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt
EDIT1
同意 - 用这个转载...
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.<File>handle((p, h) -> {
try {
p.delete();
logger.info(p.toString());
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
和
2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt