RxJava 2 等同于 isUnsubscribed
RxJava 2 equivalent to isUnsubscribed
我一直在研究本书 Reactive Programming with RxJava 中的示例,该书针对的是版本 1 而不是版本 2。无限流的介绍有以下示例(并指出有更好的方法来处理并发数):
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler = () -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
但是,在 RxJava 2 中,传递给 create()
方法的 lambda 表达式是 ObservableEmitter
类型,并且没有 isUnsubscribed()
方法。我查看了 What's Different in 2.0 并搜索了存储库,但找不到任何此类方法。
如何在 2.0 中实现相同的功能?
编辑以包含如下给出的解决方案(n.b。使用 kotlin):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: BigInteger = BigInteger.ZERO
while (!emitter.isDisposed) {
emitter.onNext(int)
int = int.add(BigInteger.ONE)
}
}).start()
}
val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }
Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
订阅Observable后返回Disposable
。您可以将其保存到本地变量并检查 disposable.isDisposed()
以查看它是否仍在订阅。
我一直在研究本书 Reactive Programming with RxJava 中的示例,该书针对的是版本 1 而不是版本 2。无限流的介绍有以下示例(并指出有更好的方法来处理并发数):
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler = () -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
但是,在 RxJava 2 中,传递给 create()
方法的 lambda 表达式是 ObservableEmitter
类型,并且没有 isUnsubscribed()
方法。我查看了 What's Different in 2.0 并搜索了存储库,但找不到任何此类方法。
如何在 2.0 中实现相同的功能?
编辑以包含如下给出的解决方案(n.b。使用 kotlin):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: BigInteger = BigInteger.ZERO
while (!emitter.isDisposed) {
emitter.onNext(int)
int = int.add(BigInteger.ONE)
}
}).start()
}
val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }
Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
订阅Observable后返回Disposable
。您可以将其保存到本地变量并检查 disposable.isDisposed()
以查看它是否仍在订阅。