将 FileWatcher 与多线程一起使用

Using FileWatcher with Multithreading

我正尝试在 java 中将多线程与 FileWatcher 服务集成。也就是说,我一直在听一个特定的目录 -> 每当创建一个新文件时,我都需要生成一个新线程来处理该文件(比如说它打印文件内容)。我设法编写了一个可以编译和工作的代码(但不是预期的那样)。它按顺序工作意味着文件 2 在文件 1 之后处理,文件 3 在文件 2 之后处理。我希望它并行执行。

添加代码片段:

while(true) {
            WatchKey key;
            try {
                key = watcher.take();
                Path dir = keys.get(key);
                for (WatchEvent<?> event: key.pollEvents()) {
                    WatchEvent.Kind<?> kind = event.kind();
                    if (kind == StandardWatchEventKinds.OVERFLOW) {
                        continue;
                    }
                    if(kind == StandardWatchEventKinds.ENTRY_CREATE){
                        boolean valid = key.reset();
                        if (!valid) {
                            break;
                        }
                        log.info("New entry is created in the listening directory, Calling the FileProcessor");
                        WatchEvent<Path> ev = (WatchEvent<Path>)event;
                        Path newFileCreatedResolved = dir.resolve(ev.context());
                        try{
                        FileProcessor processFile = new FileProcessor(newFileCreatedResolved.getFileName().toString());
                        Future<String> result = executor.submit(processFile);
                            try {
                                System.out.println("Processed File" + result.get());
                            } catch (ExecutionException e) {
                                e.printStackTrace();
                            }
                        //executor.shutdown(); add logic to shut down
                        }   
                    }
                }
            }
        }

和文件处理器class

public class FileProcessor implements Callable <String>{
    FileProcessor(String triggerFile) throws FileNotFoundException, IOException{
        this.triggerFile = triggerFile;
    }
    public String call() throws Exception{
        //logic to write to another file, this new file is specific to the input file
        //returns success
    }

现在发生了什么 -> 如果我一次传输 3 个文件,它们是按顺序传输的。首先将 file1 写入其目标文件,然后是 file2、file3,依此类推。

我说得有道理吗?我需要更改哪一部分以使其平行?或者 Executor 服务被设计成那样工作。

Future.get() 的调用正在阻塞。当然,在处理完成之前结果不可用,并且您的代码在此之前不会提交另一个任务。

将您的 Executor 包装在 CompletionService and submit() 任务中。让另一个线程使用 CompletionService 的结果以在任务完成后执行任何必要的处理。

或者,您可以使用 CompletableFuture 的辅助方法来设置等效的操作管道。

第三个更简单但可能不太灵活的选项是简单地将 post-processing 合并到任务本身中。我 demonstrated 一个简单的任务包装器来展示这是如何完成的。