RxJava3。为什么 FlowableSubscriber onNext 没有被调用?
RxJava3. Why FlowableSubscriber onNext has not been called?
我需要获取订阅对象才能有机会取消订阅听众。为此,我想提供一个 FlowableSubscriber 来运行。
代码:
FlowableSubscriber fs = new FlowableSubscriber() {
@Override
public void onSubscribe(@NonNull Subscription s) {
System.out.println("Flowable onSubs");
}
@Override
public void onNext(Object o) {
System.out.println("Flowable onNext");
}
@Override
public void onError(Throwable t) {
System.out.println("Flowable onErr");
}
@Override
public void onComplete() {
System.out.println("Flowable onComlet");
}
};
日志是:
Running...
Flowable onSubs
如果我使用 lambdas 它可以工作,但没有 onSubscribe 回调。
如何获得订阅以及为什么这些方法没有被调用?
由于 Flowable
支持背压,您必须通过在 Subscription
上调用 request
方法来实际控制可以消耗的项目数量,以便 [=11= 可以发出它们] :
FlowableSubscriber fs = new FlowableSubscriber() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
System.out.println("Flowable onSubs");
subscription=s;
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("Flowable onNext");
//subscription.request(1); you can also request more items from onNext method - it is up to you
}
@Override
public void onError(Throwable t) {
System.out.println("Flowable onErr");
}
@Override
public void onComplete() {
System.out.println("Flowable onComlet");
}
};
在示例中,Integer.MAX_VALUE
请求订阅,但这可能不是最好的主意。问题是你应该从 onSubscribe
调用 Subscription::request
来请求初始项目,然后从 onNext
调用它并决定你可以实际处理多少项目。
我需要获取订阅对象才能有机会取消订阅听众。为此,我想提供一个 FlowableSubscriber 来运行。
代码:
FlowableSubscriber fs = new FlowableSubscriber() {
@Override
public void onSubscribe(@NonNull Subscription s) {
System.out.println("Flowable onSubs");
}
@Override
public void onNext(Object o) {
System.out.println("Flowable onNext");
}
@Override
public void onError(Throwable t) {
System.out.println("Flowable onErr");
}
@Override
public void onComplete() {
System.out.println("Flowable onComlet");
}
};
日志是:
Running...
Flowable onSubs
如果我使用 lambdas 它可以工作,但没有 onSubscribe 回调。
如何获得订阅以及为什么这些方法没有被调用?
由于 Flowable
支持背压,您必须通过在 Subscription
上调用 request
方法来实际控制可以消耗的项目数量,以便 [=11= 可以发出它们] :
FlowableSubscriber fs = new FlowableSubscriber() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
System.out.println("Flowable onSubs");
subscription=s;
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("Flowable onNext");
//subscription.request(1); you can also request more items from onNext method - it is up to you
}
@Override
public void onError(Throwable t) {
System.out.println("Flowable onErr");
}
@Override
public void onComplete() {
System.out.println("Flowable onComlet");
}
};
在示例中,Integer.MAX_VALUE
请求订阅,但这可能不是最好的主意。问题是你应该从 onSubscribe
调用 Subscription::request
来请求初始项目,然后从 onNext
调用它并决定你可以实际处理多少项目。