在 RxJava 中让一个订阅者订阅一个主题
Making a Subscriber subscribe to a Subject in RxJava
在 RxJava 3 中,有没有办法将 Subscriber
订阅到 Subject
?
Subject 也是一个 Observable
这意味着它应该能够提供一个上游,当 Subject 的 onNext
发出一些东西。但是下面的代码给出了编译时错误,我找不到任何其他机制来实现它。
public class NewsPublisher {
static Subject<Integer> newsSubject = PublishSubject.create();
public class NewsSubscriber
extends DisposableSubscriber<Integer> {
void bind() {
NewsPublisher.newsSubject.subscribe(this);
}
错误消息:无法解析方法 'subscribe(NewsSubscriber)'
Subject 的 subscribe()
方法只允许 Consumer
或 Observer
作为输入。但从逻辑上讲,我想要的只是当 Subject(也是一个 Observable)发出一些东西时,我可以让一个 Subscriber 监听,因此它的 onNext()
被调用。
更新:
决定自己尝试代码并找到一种解决方法。使用 lambda 可以链接 onNext() 方法。 lambda istelf 是一个匿名的 Consumer 对象,因此它可能会破坏整个 class 的目的(或者不是?取决于你在做什么),但这有效。
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subscribers.DisposableSubscriber;
public class NewsSubscriber extends DisposableSubscriber<Integer> {
Disposable disposable;
void bind() {
disposable = NewsPublisher.newsSubject.subscribe(this::onNext);
}
public void onNext(Integer integer) {
// implement
}
public void onError(Throwable throwable) {
if (disposable != null) disposable.dispose();
}
public void onComplete() {
disposable.dispose();
}
}
在 RxJava 3 中,有没有办法将 Subscriber
订阅到 Subject
?
Subject 也是一个 Observable
这意味着它应该能够提供一个上游,当 Subject 的 onNext
发出一些东西。但是下面的代码给出了编译时错误,我找不到任何其他机制来实现它。
public class NewsPublisher {
static Subject<Integer> newsSubject = PublishSubject.create();
public class NewsSubscriber
extends DisposableSubscriber<Integer> {
void bind() {
NewsPublisher.newsSubject.subscribe(this);
}
错误消息:无法解析方法 'subscribe(NewsSubscriber)'
Subject 的 subscribe()
方法只允许 Consumer
或 Observer
作为输入。但从逻辑上讲,我想要的只是当 Subject(也是一个 Observable)发出一些东西时,我可以让一个 Subscriber 监听,因此它的 onNext()
被调用。
更新:
决定自己尝试代码并找到一种解决方法。使用 lambda 可以链接 onNext() 方法。 lambda istelf 是一个匿名的 Consumer 对象,因此它可能会破坏整个 class 的目的(或者不是?取决于你在做什么),但这有效。
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subscribers.DisposableSubscriber;
public class NewsSubscriber extends DisposableSubscriber<Integer> {
Disposable disposable;
void bind() {
disposable = NewsPublisher.newsSubject.subscribe(this::onNext);
}
public void onNext(Integer integer) {
// implement
}
public void onError(Throwable throwable) {
if (disposable != null) disposable.dispose();
}
public void onComplete() {
disposable.dispose();
}
}