将 FileWatcher 与多线程一起使用
Using FileWatcher with Multithreading
我正尝试在 java 中将多线程与 FileWatcher 服务集成。也就是说,我一直在听一个特定的目录 -> 每当创建一个新文件时,我都需要生成一个新线程来处理该文件(比如说它打印文件内容)。我设法编写了一个可以编译和工作的代码(但不是预期的那样)。它按顺序工作意味着文件 2 在文件 1 之后处理,文件 3 在文件 2 之后处理。我希望它并行执行。
添加代码片段:
while(true) {
WatchKey key;
try {
key = watcher.take();
Path dir = keys.get(key);
for (WatchEvent<?> event: key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
if(kind == StandardWatchEventKinds.ENTRY_CREATE){
boolean valid = key.reset();
if (!valid) {
break;
}
log.info("New entry is created in the listening directory, Calling the FileProcessor");
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path newFileCreatedResolved = dir.resolve(ev.context());
try{
FileProcessor processFile = new FileProcessor(newFileCreatedResolved.getFileName().toString());
Future<String> result = executor.submit(processFile);
try {
System.out.println("Processed File" + result.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
//executor.shutdown(); add logic to shut down
}
}
}
}
}
和文件处理器class
public class FileProcessor implements Callable <String>{
FileProcessor(String triggerFile) throws FileNotFoundException, IOException{
this.triggerFile = triggerFile;
}
public String call() throws Exception{
//logic to write to another file, this new file is specific to the input file
//returns success
}
现在发生了什么 -> 如果我一次传输 3 个文件,它们是按顺序传输的。首先将 file1 写入其目标文件,然后是 file2、file3,依此类推。
我说得有道理吗?我需要更改哪一部分以使其平行?或者 Executor 服务被设计成那样工作。
对 Future.get()
的调用正在阻塞。当然,在处理完成之前结果不可用,并且您的代码在此之前不会提交另一个任务。
将您的 Executor
包装在 CompletionService
and submit()
任务中。让另一个线程使用 CompletionService
的结果以在任务完成后执行任何必要的处理。
或者,您可以使用 CompletableFuture
的辅助方法来设置等效的操作管道。
第三个更简单但可能不太灵活的选项是简单地将 post-processing 合并到任务本身中。我 demonstrated 一个简单的任务包装器来展示这是如何完成的。
我正尝试在 java 中将多线程与 FileWatcher 服务集成。也就是说,我一直在听一个特定的目录 -> 每当创建一个新文件时,我都需要生成一个新线程来处理该文件(比如说它打印文件内容)。我设法编写了一个可以编译和工作的代码(但不是预期的那样)。它按顺序工作意味着文件 2 在文件 1 之后处理,文件 3 在文件 2 之后处理。我希望它并行执行。
添加代码片段:
while(true) {
WatchKey key;
try {
key = watcher.take();
Path dir = keys.get(key);
for (WatchEvent<?> event: key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
if(kind == StandardWatchEventKinds.ENTRY_CREATE){
boolean valid = key.reset();
if (!valid) {
break;
}
log.info("New entry is created in the listening directory, Calling the FileProcessor");
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path newFileCreatedResolved = dir.resolve(ev.context());
try{
FileProcessor processFile = new FileProcessor(newFileCreatedResolved.getFileName().toString());
Future<String> result = executor.submit(processFile);
try {
System.out.println("Processed File" + result.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
//executor.shutdown(); add logic to shut down
}
}
}
}
}
和文件处理器class
public class FileProcessor implements Callable <String>{
FileProcessor(String triggerFile) throws FileNotFoundException, IOException{
this.triggerFile = triggerFile;
}
public String call() throws Exception{
//logic to write to another file, this new file is specific to the input file
//returns success
}
现在发生了什么 -> 如果我一次传输 3 个文件,它们是按顺序传输的。首先将 file1 写入其目标文件,然后是 file2、file3,依此类推。
我说得有道理吗?我需要更改哪一部分以使其平行?或者 Executor 服务被设计成那样工作。
对 Future.get()
的调用正在阻塞。当然,在处理完成之前结果不可用,并且您的代码在此之前不会提交另一个任务。
将您的 Executor
包装在 CompletionService
and submit()
任务中。让另一个线程使用 CompletionService
的结果以在任务完成后执行任何必要的处理。
或者,您可以使用 CompletableFuture
的辅助方法来设置等效的操作管道。
第三个更简单但可能不太灵活的选项是简单地将 post-processing 合并到任务本身中。我 demonstrated 一个简单的任务包装器来展示这是如何完成的。