在 RxJava 中让一个订阅者订阅一个主题

Making a Subscriber subscribe to a Subject in RxJava

在 RxJava 3 中,有没有办法将 Subscriber 订阅到 Subject? Subject 也是一个 Observable 这意味着它应该能够提供一个上游,当 Subject 的 onNext 发出一些东西。但是下面的代码给出了编译时错误,我找不到任何其他机制来实现它。

public class NewsPublisher {
    static Subject<Integer>  newsSubject = PublishSubject.create();

public class NewsSubscriber
    extends DisposableSubscriber<Integer> {

    void bind() {
        NewsPublisher.newsSubject.subscribe(this);
    }

错误消息:无法解析方法 'subscribe(NewsSubscriber)'

Subject 的 subscribe() 方法只允许 ConsumerObserver 作为输入。但从逻辑上讲,我想要的只是当 Subject(也是一个 Observable)发出一些东西时,我可以让一个 Subscriber 监听,因此它的 onNext() 被调用。

更新:

决定自己尝试代码并找到一种解决方法。使用 lambda 可以链接 onNext() 方法。 lambda istelf 是一个匿名的 Consumer 对象,因此它可能会破坏整个 class 的目的(或者不是?取决于你在做什么),但这有效。

import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subscribers.DisposableSubscriber;

public class NewsSubscriber extends DisposableSubscriber<Integer> {

    Disposable disposable;

    void bind() {
        disposable = NewsPublisher.newsSubject.subscribe(this::onNext);
    }

    public void onNext(Integer integer) {   
        // implement
    }

    public void onError(Throwable throwable) {
        if (disposable != null) disposable.dispose();
    }

    public void onComplete() {
        disposable.dispose();
    }
}