为什么这个带有 Flux.create 块的 reator 代码不能工作?

Why this reator code with block inside Flux.create couldn't work?

我尝试使用 watchService 作为 Flux 生成器,但它无法工作,我还在 Flux.create 方法中尝试了一些简单的块,如 Thread.sleep,它可以工作。 我想知道为什么以及这些情况之间有什么区别?

可以工作的代码,

    @Test
    public void createBlockSleepTest() throws InterruptedException {
        Flux.create(sink->{
            while (true) {
                try {
                    for(int i=0;i<2;i++)
                        sink.next(num++);
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).log().subscribeOn(Schedulers.parallel(),false).log()
                .subscribe(System.out::println);
        Thread.sleep(100000L);
    }

无法运行的代码,

    @Test
    public void createBlockTest() throws IOException, InterruptedException {
        WatchService watchService = fileSystem.newWatchService();
        Path testPath = fileSystem.getPath("C:/testing");
        Files.createDirectories(testPath);
        WatchKey register = testPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,StandardWatchEventKinds.ENTRY_MODIFY);
        Files.write(testPath.resolve("test1.txt"),"hello".getBytes());
        Thread.sleep(5000L);
        Flux.create(sink->{
            while (true) {
                try {
                    WatchKey key = watchService.take();
                    System.out.println("-----------------"+key);
                    for(WatchEvent event:key.pollEvents()){
                        sink.next(event.context());
                    }
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).log().subscribeOn(Schedulers.parallel(),false).log()
                .subscribe(System.out::println);
        Files.write(testPath.resolve("test2.txt"),"hello".getBytes());
        Thread.sleep(5000L);
        Files.write(testPath.resolve("test3.txt"),"hello".getBytes());
        Thread.sleep(10000L);
    }

我注意到在 reactor 的参考资料中有一个关于在 create 方法中阻塞的通知。但为什么 Thread.sleep 有效?

create 不会并行化您的代码,也不会使其异步,甚至 尽管它 可以 与异步 API 一起使用。如果你在 create lambda 中阻塞, 您使自己陷入僵局和类似的副作用。即使使用 subscribeOn, 需要注意的是,长时间阻塞 create lambda(例如无限循环调用 sink.next(t)) 可以锁定管道:由于 循环饿死他们应该 运行 来自的同一个线程。使用 subscribeOn(Scheduler, false) 变体:requestOnSeparateThread = false 将为 create 使用 Scheduler 线程 并仍然通过在原始线程中执行 request 让数据流动。

谁能解开我的谜题?

Thread.sleep(5000L) 只会阻塞 5 秒,因此 create 会在该延迟后继续前进,而 WatchService#take 会无限期阻塞,除非有新的 WatchKey 注册(在此case 一个新文件)。由于创建文件的代码在create之后,出现了死锁情况

这可以通过更改

来解决
            while (true) {
            try {
                WatchKey key = watchService.take();
                System.out.println("-----------------"+key);
                for(WatchEvent event:key.pollEvents()){
                    sink.next(event.context());
                }
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

            while (true) {
            try {
                WatchKey key = watchService.take();
                System.out.println("-----------------"+key);
                for(WatchEvent event:key.pollEvents()){
                    sink.next(event.context());
                }
                key.reset();
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }