为什么这个带有 Flux.create 块的 reator 代码不能工作?
Why this reator code with block inside Flux.create couldn't work?
我尝试使用 watchService 作为 Flux 生成器,但它无法工作,我还在 Flux.create 方法中尝试了一些简单的块,如 Thread.sleep,它可以工作。
我想知道为什么以及这些情况之间有什么区别?
可以工作的代码,
@Test
public void createBlockSleepTest() throws InterruptedException {
Flux.create(sink->{
while (true) {
try {
for(int i=0;i<2;i++)
sink.next(num++);
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).log().subscribeOn(Schedulers.parallel(),false).log()
.subscribe(System.out::println);
Thread.sleep(100000L);
}
无法运行的代码,
@Test
public void createBlockTest() throws IOException, InterruptedException {
WatchService watchService = fileSystem.newWatchService();
Path testPath = fileSystem.getPath("C:/testing");
Files.createDirectories(testPath);
WatchKey register = testPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,StandardWatchEventKinds.ENTRY_MODIFY);
Files.write(testPath.resolve("test1.txt"),"hello".getBytes());
Thread.sleep(5000L);
Flux.create(sink->{
while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).log().subscribeOn(Schedulers.parallel(),false).log()
.subscribe(System.out::println);
Files.write(testPath.resolve("test2.txt"),"hello".getBytes());
Thread.sleep(5000L);
Files.write(testPath.resolve("test3.txt"),"hello".getBytes());
Thread.sleep(10000L);
}
我注意到在 reactor 的参考资料中有一个关于在 create 方法中阻塞的通知。但为什么 Thread.sleep 有效?
create
不会并行化您的代码,也不会使其异步,甚至
尽管它 可以 与异步 API 一起使用。如果你在 create
lambda 中阻塞,
您使自己陷入僵局和类似的副作用。即使使用 subscribeOn
,
需要注意的是,长时间阻塞 create
lambda(例如无限循环调用
sink.next(t)
) 可以锁定管道:由于
循环饿死他们应该 运行 来自的同一个线程。使用 subscribeOn(Scheduler, false)
变体:requestOnSeparateThread = false
将为 create
使用 Scheduler
线程
并仍然通过在原始线程中执行 request
让数据流动。
谁能解开我的谜题?
Thread.sleep(5000L)
只会阻塞 5 秒,因此 create
会在该延迟后继续前进,而 WatchService#take
会无限期阻塞,除非有新的 WatchKey
注册(在此case 一个新文件)。由于创建文件的代码在create
之后,出现了死锁情况
这可以通过更改
来解决
while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
至
while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
key.reset();
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
我尝试使用 watchService 作为 Flux 生成器,但它无法工作,我还在 Flux.create 方法中尝试了一些简单的块,如 Thread.sleep,它可以工作。 我想知道为什么以及这些情况之间有什么区别?
可以工作的代码,
@Test
public void createBlockSleepTest() throws InterruptedException {
Flux.create(sink->{
while (true) {
try {
for(int i=0;i<2;i++)
sink.next(num++);
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).log().subscribeOn(Schedulers.parallel(),false).log()
.subscribe(System.out::println);
Thread.sleep(100000L);
}
无法运行的代码,
@Test
public void createBlockTest() throws IOException, InterruptedException {
WatchService watchService = fileSystem.newWatchService();
Path testPath = fileSystem.getPath("C:/testing");
Files.createDirectories(testPath);
WatchKey register = testPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,StandardWatchEventKinds.ENTRY_MODIFY);
Files.write(testPath.resolve("test1.txt"),"hello".getBytes());
Thread.sleep(5000L);
Flux.create(sink->{
while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).log().subscribeOn(Schedulers.parallel(),false).log()
.subscribe(System.out::println);
Files.write(testPath.resolve("test2.txt"),"hello".getBytes());
Thread.sleep(5000L);
Files.write(testPath.resolve("test3.txt"),"hello".getBytes());
Thread.sleep(10000L);
}
我注意到在 reactor 的参考资料中有一个关于在 create 方法中阻塞的通知。但为什么 Thread.sleep 有效?
create
不会并行化您的代码,也不会使其异步,甚至
尽管它 可以 与异步 API 一起使用。如果你在 create
lambda 中阻塞,
您使自己陷入僵局和类似的副作用。即使使用 subscribeOn
,
需要注意的是,长时间阻塞 create
lambda(例如无限循环调用
sink.next(t)
) 可以锁定管道:由于
循环饿死他们应该 运行 来自的同一个线程。使用 subscribeOn(Scheduler, false)
变体:requestOnSeparateThread = false
将为 create
使用 Scheduler
线程
并仍然通过在原始线程中执行 request
让数据流动。
谁能解开我的谜题?
Thread.sleep(5000L)
只会阻塞 5 秒,因此 create
会在该延迟后继续前进,而 WatchService#take
会无限期阻塞,除非有新的 WatchKey
注册(在此case 一个新文件)。由于创建文件的代码在create
之后,出现了死锁情况
这可以通过更改
来解决 while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
至
while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
key.reset();
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}