Java WatchService,使用线程对事件执行操作

Java WatchService, perform action on event using threads

我可以通过使用 WatchKey 注册 c 来监视目录(网络上有大量示例)但是这个观察器会捕获每个事件。例如。在 windows 如果我正在监视 d:/temp 目录并创建一个新的 .txt 文件并重命名它,我会收到以下事件。

ENTRY_CREATE: d:\temp\test\New Text Document.txt
ENTRY_MODIFY: d:\temp\test
ENTRY_DELETE: d:\temp\test\New Text Document.txt
ENTRY_CREATE: d:\temp\test\test.txt
ENTRY_MODIFY: d:\temp\test

我想在创建或更新新文件时执行一个操作。 但是我不希望在上面的示例中操作 运行 5 次。

我的第一个想法: 因为我现在只需要 运行 操作(在本例中是推送到私人 Git 服务器)一次然后(例如,每 10 秒检查一次,仅当监视目录发生更改时才执行推送)我想到了一个带有布尔参数的对象,我可以从单独的线程中获取和设置它。

现在这个工作还不错(除非专家们可以帮助我了解为什么这是一个糟糕的想法)问题是如果在 SendToGit 线程的操作和这个操作期间捕获了一个文件事件完成后将“Found”参数设置为 false。紧接着其他事件之一被捕获(如上例所示),它们将再次将“Found”参数设置为 true。这并不理想,因为我将立即再次 运行 SendToGit 操作,这是不必要的。

我的第二个想法调查暂停检查 MonitorFolder 线程中的更改,直到 SendToGit 操作完成(即继续检查 ChangesFound Found 参数是否已被设置回 false。当此参数为 false 时再次开始检查更改。

问题

  1. 这是一种可以接受的方式吗?还是我走入了一个没有希望 return 的兔子洞?
  2. 如果我沿着我的第二个想法走下去,如果我忙于 SendToGit 操作并且在监视文件夹中进行了更改,会发生什么情况?我怀疑这不会被识别,我可能会错过更改。

其余代码

ChangesFound.java

package com.acme;

public class ChangesFound {
    
    private boolean found = false;

    public boolean wereFound() {
        return this.found;
    }

    public void setFound(boolean commitToGit) {
        this.found = commitToGit;
    }
}

在我的主应用程序中,我启动了 2 个线程。

  1. MonitorFolder.java 监视目录,当发现 Watcher 事件时,将 ChangesFound 变量“found”设置为 true。
  2. SendToGit.java 每 10 秒检查 ChangesFound 变量 found 是否为真,如果为真则执行推送。 (或者在这种情况下只打印一条消息)

这是我启动线程的应用程序:

package com.acme;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;

public class App {

    private static ChangesFound chg;
    
    public static void main(String[] args) throws IOException {

        String dirToMonitor = "D:/Temp";
        boolean recursive = true;
        chg = new ChangesFound();

        Runnable r = new SendToGit(chg);
        new Thread(r).start();

        Path dir = Paths.get(dirToMonitor);
        Runnable m = new MonitorFolder(chg, dir, recursive);
        new Thread(m).start();        
    }
}

SendToGit.java

package com.acme;

public class SendToGit implements Runnable {

    private ChangesFound changes;

    public SendToGit(ChangesFound chg) {
        changes = chg;
    }
    
    public void run() {

        while (true) {           
            try {
                Thread.sleep(10000);
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }

            System.out.println(java.time.LocalDateTime.now() + " [SendToGit] waking up.");

            if (changes.wereFound()) {
                System.out.println("\t***** CHANGES FOUND push to Git.");
                changes.setFound(false);
            } else {
                System.out.println("\t***** Nothing changed.");
            }
        }
    }
}

MonitorFolder.java(很抱歉 class 我只是在这里添加它以防它对其他人有帮助。)

package com.acme;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;

public class MonitorFolder implements Runnable  {
    
    private static WatchService watcher;
    private static Map<WatchKey, Path> keys;
    private static boolean recursive;
    private static boolean trace = false;
    private static boolean commitGit = false;
    private static ChangesFound changes;

    @SuppressWarnings("unchecked")
    static <T> WatchEvent<T> cast(WatchEvent<?> event) {
        return (WatchEvent<T>) event;
    }

    /**
     * Creates a WatchService and registers the given directory
     */
    MonitorFolder(ChangesFound chg, Path dir, boolean rec) throws IOException {
        changes = chg;
        watcher = FileSystems.getDefault().newWatchService();
        keys = new HashMap<WatchKey, Path>();
        recursive = rec;

        if (recursive) {
            System.out.format("[MonitorFolder] Scanning %s ...\n", dir);
            registerAll(dir);
            System.out.println("Done.");
        } else {
            register(dir);
        }

        // enable trace after initial registration
        this.trace = true;
    }

    /**
     * Register the given directory with the WatchService
     */
    private static void register(Path dir) throws IOException {
        WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
        if (trace) {
            Path prev = keys.get(key);
            if (prev == null) {
                System.out.format("register: %s\n", dir);
            } else {
                if (!dir.equals(prev)) {
                    System.out.format("update: %s -> %s\n", prev, dir);
                }
            }
        }
        keys.put(key, dir);
    }

    /**
     * Register the given directory, and all its sub-directories, with the
     * WatchService.
     */
    private static void registerAll(final Path start) throws IOException {
        // register directory and sub-directories
        Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
                    throws IOException {
                register(dir);
                return FileVisitResult.CONTINUE;
            }
        });
    }

    /**
     * Process all events for keys queued to the watcher
     */
    public void run() {
        for (;;) {

            // wait for key to be signalled
            WatchKey key;
            try {
                key = watcher.take();
            } catch (InterruptedException x) {
                return;
            }

            Path dir = keys.get(key);
            if (dir == null) {
                System.err.println("WatchKey not recognized!!");
                continue;
            }

            for (WatchEvent<?> event : key.pollEvents()) {
                WatchEvent.Kind kind = event.kind();

                // TBD - provide example of how OVERFLOW event is handled
                if (kind == OVERFLOW) {
                    System.out.println("Something about Overflow");
                    continue;
                }

                // Context for directory entry event is the file name of entry
                WatchEvent<Path> ev = cast(event);
                Path name = ev.context();
                Path child = dir.resolve(name);

                // print out event and set ChangesFound object Found parameter to True
                System.out.format("[MonitorFolder] " + java.time.LocalDateTime.now() + " - %s: %s\n", event.kind().name(), child);
                changes.setFound(true);

                // if directory is created, and watching recursively, then
                // register it and its sub-directories
                if (recursive && (kind == ENTRY_CREATE)) {
                    try {
                        if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
                            registerAll(child);
                        }
                    } catch (IOException x) {
                        // ignore to keep sample readbale
                    }
                }
            }

            // reset key and remove from set if directory no longer accessible
            boolean valid = key.reset();
            if (!valid) {
                keys.remove(key);

                // all directories are inaccessible
                if (keys.isEmpty()) {
                    System.out.println("keys.isEmpty");
                    break;
                }
            }
        }
    }
}

你的两种策略都会导致问题,因为 Watch Service 非常冗长并且会发送很多消息,而实际上你的下游处理可能需要一两条消息 - 所以有时你可能会做不必要的工作或错过事件。

使用 WatchService 时,您可以将多个通知整理在一起并作为一个事件传递,列出一组最近的删除、创建和更新:

  1. DELETE 后跟 CREATE => 作为 UPDATE 发送
  2. CREATE 后跟 MODIFY => 作为 CREATE 发送
  3. CREATE 或 MODIFY 后跟 DELETE => 作为 DELETE 发送

不是调用 WatchService.take() 并对每条消息采取行动,而是使用 WatchService.poll(timeout) 并且仅当没有返回任何内容时才将前面的事件集作为一个事件的联合行动 - 而不是在每次成功轮询后单独行动。

将问题分离为两个组件会更容易,这样您下次需要时就不会重复 WatchService 代码:

  1. 一个 watch 管理器,它处理 watch 服务 + 目录注册并整理副本以作为一个组发送给事件侦听器
  2. 监听器 class 接收更改组并对设置执行操作。

此示例可能有助于说明 - 请参阅 WatchExample,它是设置注册但将更少的事件传递给 setListener 定义的回调的管理器。您可以将 MonitorFolder 设置为 WatchExample 以减少发现的事件,并使您在 SendToGit 中的代码成为一个监听器,它使用 fileChange(deletes, creates, updates) 的聚合集按需调用。

public static void main(String[] args) throws IOException, InterruptedException {

    final List<Path> dirs = Arrays.stream(args).map(Path::of).map(Path::toAbsolutePath).collect(Collectors.toList());
    Kind<?> [] kinds = { StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE};

    // Should launch WatchExample PER Filesystem:
    WatchExample w = new WatchExample();
    w.setListener(WatchExample::fireEvents);

    for(Path dir : dirs)
        w.register(kinds, dir);

    // For 2 or more WatchExample use: new Thread(w[n]::run).start();
    w.run();
}

public class WatchExample implements Runnable {

    private final Set<Path> created = new LinkedHashSet<>();
    private final Set<Path> updated = new LinkedHashSet<>();
    private final Set<Path> deleted = new LinkedHashSet<>();

    private volatile boolean appIsRunning = true;
    // Decide how sensitive the polling is:
    private final int pollmillis = 100;
    private WatchService ws;

    private Listener listener = WatchExample::fireEvents;

    @FunctionalInterface
    interface Listener
    {
        public void fileChange(Set<Path> deleted, Set<Path> created, Set<Path> modified);
    }

    WatchExample() {
    }
    
    public void setListener(Listener listener) {
        this.listener = listener;
    }

    public void shutdown() {
        System.out.println("shutdown()");
        this.appIsRunning = false;
    }

    public void run() {
        System.out.println();
        System.out.println("run() START watch");
        System.out.println();

        try(WatchService autoclose = ws) {

            while(appIsRunning) {

                boolean hasPending = created.size() + updated.size() + deleted.size() > 0;
                System.out.println((hasPending ? "ws.poll("+pollmillis+")" : "ws.take()")+" as hasPending="+hasPending);

                // Use poll if last cycle has some events, as take() may block
                WatchKey wk = hasPending ? ws.poll(pollmillis,TimeUnit.MILLISECONDS) : ws.take();
                if (wk != null)  {
                    for (WatchEvent<?> event : wk.pollEvents()) {
                         Path parent = (Path) wk.watchable();
                         Path eventPath = (Path) event.context();
                         storeEvent(event.kind(), parent.resolve(eventPath));
                     }
                     boolean valid = wk.reset();
                     if (!valid) {
                         System.out.println("Check the path, dir may be deleted "+wk);
                     }
                }

                System.out.println("PENDING: cre="+created.size()+" mod="+updated.size()+" del="+deleted.size());

                // This only sends new notifications when there was NO event this cycle:
                if (wk == null && hasPending) {
                    listener.fileChange(deleted, created, updated);
                    deleted.clear();
                    created.clear();
                    updated.clear();
                }
            }
        }
        catch (InterruptedException e) {
            System.out.println("Watch was interrupted, sending final updates");
            fireEvents(deleted, created, updated);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }

        System.out.println("run() END watch");
    }

    public void register(Kind<?> [] kinds, Path dir) throws IOException {
        System.out.println("register watch for "+dir);

        // If dirs are from different filesystems WatchService will give errors later
        if (this.ws == null) {
            ws = dir.getFileSystem().newWatchService();
        }
        dir.register(ws, kinds);
    }

    /**
     * Save event for later processing by event kind EXCEPT for:
     * <li>DELETE followed by CREATE           => store as MODIFY
     * <li>CREATE followed by MODIFY           => store as CREATE
     * <li>CREATE or MODIFY followed by DELETE => store as DELETE
     */
    private void
    storeEvent(Kind<?> kind, Path path) {
        System.out.println("STORE "+kind+" path:"+path);

        boolean cre = false;
        boolean mod = false;
        boolean del = kind == StandardWatchEventKinds.ENTRY_DELETE;

        if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
            mod = deleted.contains(path);
            cre = !mod;
        }
        else if (kind == StandardWatchEventKinds.ENTRY_MODIFY) {
            cre = created.contains(path);
            mod = !cre;
        }
        addOrRemove(created, cre, path);
        addOrRemove(updated, mod, path);
        addOrRemove(deleted, del, path);
    }
    // Add or remove from the set:
    private static void addOrRemove(Set<Path> set, boolean add, Path path) {
        if (add) set.add(path);
        else     set.remove(path);
    }

    public static void fireEvents(Set<Path> deleted, Set<Path> created, Set<Path> modified) {
        System.out.println();
        System.out.println("fireEvents START");
        for (Path path : deleted)
            System.out.println("  DELETED: "+path);
        for (Path path : created)
            System.out.println("  CREATED: "+path);
        for (Path path : modified)
            System.out.println("  UPDATED: "+path);
        System.out.println("fireEvents END");
        System.out.println();
    }
}