Observable.create(SyncOnSubscribe.createStateless(...)) 抛出 IllegalStateException:"onNext called multiple times!"

Observable.create(SyncOnSubscribe.createStateless(...)) throws IllegalStateException: "onNext called multiple times!"

此示例方法 returns Observable<Integer> 数字 1 到 9:

public class Provider {
    public static Observable<Integer> test() {
        return Observable.create(SyncOnSubscribe.createStateless(new Action1<Observer<? super Integer>>() {
            @Override
            public void call(Observer<? super Integer> observer) {
                for (int i = 1; i < 9; i++)
                    // when i == 2, throws IllegalStateException:
                    // "onNext called multiple times!":
                    observer.onNext(i);
            }
        }));
    }
}

这个只过滤1到9是3的倍数的数字:

public class RxJavaUnitTest {
    @Test
    public void rxJavaTest(){
        List<Integer> multiplesOf3 = Provider.test().filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer i) {
                return i % 3 == 0;
            }
        }).toList().toBlocking().single();
    }
}

但它抛出 IllegalStateException: “onNext called multiple times!”。如果我不能多次调用 onNext,我该如何为 observer 提供更多值?

也许SyncOnSubscribe.createStateless这里的方法不对,要换成别的方法吗?

为什么不使用 Observable.fromObservable.fromCallable or Observable.defer 或类似变体而不是 Observable.create ?


但是对于您的回答:

SyncOnSubscribe.createStateless Javadoc 中所述:

... This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when the next function closes over it's state.

这个 Javadoc 在这方面不是很清楚,更多可以找到 SyncOnSubscribe.next Javadoc :

... To emit data to a downstream subscriber call observer.onNext(t). To signal an error condition call observer.onError(throwable) or throw an Exception. To signal the end of a data stream call observer.onCompleted(). Implementations of this method must follow the following rules.

  • Must not call observer.onNext(t) more than 1 time per invocation.
  • Must not call observer.onNext(t) concurrently.

The value returned from an invocation of this method will be passed in as the state argument of the next invocation of this method.

也就是说在回调动作中

  • onNext 后应紧跟 onCompletedonError
  • 并且每个回调调用应该只有一个 onNext 调用

这对于在并发环境中强制执行安全性是必要的(这就是为什么它被称为 SyncOnSubscribe)以及用于支持压力.

可以在回调中考虑 for 循环:

return Observable.create(SyncOnSubscribe.createStateless(
        new Action1<Observer<? super Integer>>() {
            int counter = 0;

            @Override
            public void call(Observer<? super Integer> observer) {
                if (counter < 9) {
                    observer.onNext(counter++);
                } else {
                    observer.onCompleted();
                }
            }

        }));

注意 onCompleted 调用,否则,你的 monad 将永远 运行。

这将生成以下列表 {0, 3, 6}。但是代码很丑,违反了 SyncOnSubscribe.createStateless 约定。 SyncOnSubscribe.createStateless 对无状态制作很有用,比如随机。相反,应该使用 SyncOnSubscribe.createStateful :

return Observable.create(SyncOnSubscribe.createStateful(
        () -> 0,
        (counter, observer) -> {
            if (counter < 9) {
                observer.onNext(counter);
            } else {
                observer.onCompleted();
            }
            return counter + 1;
        }
));

但是for循环仍然需要被分解出来,仍然需要调用onCompleted.