RxJava2,Observable/Flowable 有 2 个订阅者,但是 onNext 被任何一个订阅者调用
RxJava2, 2 subscribers for an Observable/Flowable but onNext getting called on any one
rxjava2 版本 2.1.5
试图了解 RxJava2 对一个 observable 的多重订阅。
有一个简单的文件监视服务,可以跟踪目录中文件的创建、修改和删除。
我添加了 2 个订阅者,并希望在两个订阅者上打印事件。
当我将文件复制到监视目录中时,我看到一个订阅者打印出该事件。然后,当我删除文件时,我看到第二个订阅者打印出事件。
我期待两个订阅者打印事件。我在这里错过了什么?
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class MyRxJava2DirWatcher {
public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {
return Flowable.create(subscriber -> {
boolean error = false;
WatchKey key;
try {
key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}
catch (IOException e) {
subscriber.onError(e);
error = true;
}
while (!error) {
key = watcher.take();
for (final WatchEvent<?> event : key.pollEvents()) {
subscriber.onNext(event);
}
key.reset();
}
}, BackpressureStrategy.BUFFER);
}
public static void main(String[] args) throws IOException, InterruptedException {
Path path = Paths.get("c:\temp\delete");
final FileSystem fileSystem = path.getFileSystem();
WatchService watcher = fileSystem.newWatchService();
MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
+ Thread.currentThread().getName());
}, onError -> {
System.out.println("1>>" + Thread.currentThread().getName());
onError.printStackTrace();
});
// MyRxJava2DirWatcher my2 = new MyRxJava2DirWatcher();
my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
+ Thread.currentThread().getName());
}, onError -> {
System.out.println("2>>" + Thread.currentThread().getName());
onError.printStackTrace();
});
TimeUnit.MINUTES.sleep(1000);
}
}
输出如下所示
2>>Event kind:ENTRY_CREATE. File affected: 1.txt. RxCachedThreadScheduler-2
2>>Event kind:ENTRY_MODIFY. File affected: 1.txt. RxCachedThreadScheduler-2
1>>Event kind:ENTRY_DELETE. File affected: 1.txt. RxCachedThreadScheduler-1
您正在为两个不同的订阅者创建两个不同的 Flowable。是否像下面这样一个Flowable被订阅了两次
public static void main(String[] args) throws IOException, InterruptedException {
Path path = Paths.get("c:\temp\delete");
final FileSystem fileSystem = path.getFileSystem();
WatchService watcher = fileSystem.newWatchService();
MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
Flowable myFlowable = my.createFlowable(watcher, path);
myFlowable.subscribeOn(Schedulers.computation()).subscribe(event -> {
System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
+ Thread.currentThread().getName());
}, onError -> {
System.out.println("1>>" + Thread.currentThread().getName());
onError.printStackTrace();
});
myFlowable.subscribeOn(Schedulers.computation()).subscribe(event -> {
System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
+ Thread.currentThread().getName());
}, onError -> {
System.out.println("2>>" + Thread.currentThread().getName());
onError.printStackTrace();
});
TimeUnit.MINUTES.sleep(1000);
}
}
发生的事情是,您在两个 Flowable
之间共享相同的 WatchService
,并且他们在其中争夺事件。如果您改为传入 FileSystem
并在 Flowable.create
中调用 newWatchService()
,您应该收到与 Subscriber
一样多的所有事件:
public Flowable<WatchEvent<?>> createFlowable(FileSystem fs, Path path) {
return Flowable.create(subscriber -> {
WatchService watcher = fs.newWatchService();
subscriber.setCancellable(() -> watcher.close());
boolean error = false;
WatchKey key;
try {
key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}
catch (IOException e) {
subscriber.onError(e);
error = true;
}
while (!error) {
key = watcher.take();
for (final WatchEvent<?> event : key.pollEvents()) {
subscriber.onNext(event);
}
key.reset();
}
}, BackpressureStrategy.BUFFER);
}
另请注意,您应该使用 subscribeOn(Schedulers.computation(), false)
以避免 poll
与 Subscriber
死锁。
rxjava2 版本 2.1.5
试图了解 RxJava2 对一个 observable 的多重订阅。 有一个简单的文件监视服务,可以跟踪目录中文件的创建、修改和删除。 我添加了 2 个订阅者,并希望在两个订阅者上打印事件。 当我将文件复制到监视目录中时,我看到一个订阅者打印出该事件。然后,当我删除文件时,我看到第二个订阅者打印出事件。 我期待两个订阅者打印事件。我在这里错过了什么?
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class MyRxJava2DirWatcher {
public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {
return Flowable.create(subscriber -> {
boolean error = false;
WatchKey key;
try {
key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}
catch (IOException e) {
subscriber.onError(e);
error = true;
}
while (!error) {
key = watcher.take();
for (final WatchEvent<?> event : key.pollEvents()) {
subscriber.onNext(event);
}
key.reset();
}
}, BackpressureStrategy.BUFFER);
}
public static void main(String[] args) throws IOException, InterruptedException {
Path path = Paths.get("c:\temp\delete");
final FileSystem fileSystem = path.getFileSystem();
WatchService watcher = fileSystem.newWatchService();
MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
+ Thread.currentThread().getName());
}, onError -> {
System.out.println("1>>" + Thread.currentThread().getName());
onError.printStackTrace();
});
// MyRxJava2DirWatcher my2 = new MyRxJava2DirWatcher();
my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
+ Thread.currentThread().getName());
}, onError -> {
System.out.println("2>>" + Thread.currentThread().getName());
onError.printStackTrace();
});
TimeUnit.MINUTES.sleep(1000);
}
}
输出如下所示
2>>Event kind:ENTRY_CREATE. File affected: 1.txt. RxCachedThreadScheduler-2
2>>Event kind:ENTRY_MODIFY. File affected: 1.txt. RxCachedThreadScheduler-2
1>>Event kind:ENTRY_DELETE. File affected: 1.txt. RxCachedThreadScheduler-1
您正在为两个不同的订阅者创建两个不同的 Flowable。是否像下面这样一个Flowable被订阅了两次
public static void main(String[] args) throws IOException, InterruptedException {
Path path = Paths.get("c:\temp\delete");
final FileSystem fileSystem = path.getFileSystem();
WatchService watcher = fileSystem.newWatchService();
MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
Flowable myFlowable = my.createFlowable(watcher, path);
myFlowable.subscribeOn(Schedulers.computation()).subscribe(event -> {
System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
+ Thread.currentThread().getName());
}, onError -> {
System.out.println("1>>" + Thread.currentThread().getName());
onError.printStackTrace();
});
myFlowable.subscribeOn(Schedulers.computation()).subscribe(event -> {
System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
+ Thread.currentThread().getName());
}, onError -> {
System.out.println("2>>" + Thread.currentThread().getName());
onError.printStackTrace();
});
TimeUnit.MINUTES.sleep(1000);
}
}
发生的事情是,您在两个 Flowable
之间共享相同的 WatchService
,并且他们在其中争夺事件。如果您改为传入 FileSystem
并在 Flowable.create
中调用 newWatchService()
,您应该收到与 Subscriber
一样多的所有事件:
public Flowable<WatchEvent<?>> createFlowable(FileSystem fs, Path path) {
return Flowable.create(subscriber -> {
WatchService watcher = fs.newWatchService();
subscriber.setCancellable(() -> watcher.close());
boolean error = false;
WatchKey key;
try {
key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}
catch (IOException e) {
subscriber.onError(e);
error = true;
}
while (!error) {
key = watcher.take();
for (final WatchEvent<?> event : key.pollEvents()) {
subscriber.onNext(event);
}
key.reset();
}
}, BackpressureStrategy.BUFFER);
}
另请注意,您应该使用 subscribeOn(Schedulers.computation(), false)
以避免 poll
与 Subscriber
死锁。