Observable.create(SyncOnSubscribe.createStateless(...)) 抛出 IllegalStateException:"onNext called multiple times!"
Observable.create(SyncOnSubscribe.createStateless(...)) throws IllegalStateException: "onNext called multiple times!"
此示例方法 returns Observable<Integer>
数字 1 到 9:
public class Provider {
public static Observable<Integer> test() {
return Observable.create(SyncOnSubscribe.createStateless(new Action1<Observer<? super Integer>>() {
@Override
public void call(Observer<? super Integer> observer) {
for (int i = 1; i < 9; i++)
// when i == 2, throws IllegalStateException:
// "onNext called multiple times!":
observer.onNext(i);
}
}));
}
}
这个只过滤1到9是3的倍数的数字:
public class RxJavaUnitTest {
@Test
public void rxJavaTest(){
List<Integer> multiplesOf3 = Provider.test().filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return i % 3 == 0;
}
}).toList().toBlocking().single();
}
}
但它抛出 IllegalStateException: “onNext called multiple times!”
。如果我不能多次调用 onNext
,我该如何为 observer
提供更多值?
也许SyncOnSubscribe.createStateless
这里的方法不对,要换成别的方法吗?
为什么不使用 Observable.from
,
Observable.fromCallable
or Observable.defer
或类似变体而不是 Observable.create
?
但是对于您的回答:
如 SyncOnSubscribe.createStateless
Javadoc 中所述:
... This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when the next function closes over it's state.
这个 Javadoc 在这方面不是很清楚,更多可以找到 SyncOnSubscribe.next Javadoc :
... To emit data to a downstream subscriber call observer.onNext(t)
. To signal an error condition call observer.onError(throwable)
or throw an Exception
. To signal the end of a data stream call observer.onCompleted()
. Implementations of this method must follow the following rules.
- Must not call
observer.onNext(t)
more than 1 time per invocation.
- Must not call
observer.onNext(t)
concurrently.
The value returned from an invocation of this method will be passed in as the state argument of the next invocation of this method.
也就是说在回调动作中
onNext
后应紧跟 onCompleted
或 onError
- 并且每个回调调用应该只有一个
onNext
调用
这对于在并发环境中强制执行安全性是必要的(这就是为什么它被称为 Sync
OnSubscribe
)以及用于支持压力.
可以在回调中考虑 for 循环:
return Observable.create(SyncOnSubscribe.createStateless(
new Action1<Observer<? super Integer>>() {
int counter = 0;
@Override
public void call(Observer<? super Integer> observer) {
if (counter < 9) {
observer.onNext(counter++);
} else {
observer.onCompleted();
}
}
}));
注意 onCompleted
调用,否则,你的 monad 将永远 运行。
这将生成以下列表 {0, 3, 6}
。但是代码很丑,违反了 SyncOnSubscribe.createStateless
约定。 SyncOnSubscribe.createStateless
对无状态制作很有用,比如随机。相反,应该使用 SyncOnSubscribe.createStateful
:
return Observable.create(SyncOnSubscribe.createStateful(
() -> 0,
(counter, observer) -> {
if (counter < 9) {
observer.onNext(counter);
} else {
observer.onCompleted();
}
return counter + 1;
}
));
但是for循环仍然需要被分解出来,仍然需要调用onCompleted
.
此示例方法 returns Observable<Integer>
数字 1 到 9:
public class Provider {
public static Observable<Integer> test() {
return Observable.create(SyncOnSubscribe.createStateless(new Action1<Observer<? super Integer>>() {
@Override
public void call(Observer<? super Integer> observer) {
for (int i = 1; i < 9; i++)
// when i == 2, throws IllegalStateException:
// "onNext called multiple times!":
observer.onNext(i);
}
}));
}
}
这个只过滤1到9是3的倍数的数字:
public class RxJavaUnitTest {
@Test
public void rxJavaTest(){
List<Integer> multiplesOf3 = Provider.test().filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return i % 3 == 0;
}
}).toList().toBlocking().single();
}
}
但它抛出 IllegalStateException: “onNext called multiple times!”
。如果我不能多次调用 onNext
,我该如何为 observer
提供更多值?
也许SyncOnSubscribe.createStateless
这里的方法不对,要换成别的方法吗?
为什么不使用 Observable.from
,
Observable.fromCallable
or Observable.defer
或类似变体而不是 Observable.create
?
但是对于您的回答:
如 SyncOnSubscribe.createStateless
Javadoc 中所述:
... This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when the next function closes over it's state.
这个 Javadoc 在这方面不是很清楚,更多可以找到 SyncOnSubscribe.next Javadoc :
... To emit data to a downstream subscriber call
observer.onNext(t)
. To signal an error condition callobserver.onError(throwable)
or throw anException
. To signal the end of a data stream callobserver.onCompleted()
. Implementations of this method must follow the following rules.
- Must not call
observer.onNext(t)
more than 1 time per invocation.- Must not call
observer.onNext(t)
concurrently.The value returned from an invocation of this method will be passed in as the state argument of the next invocation of this method.
也就是说在回调动作中
onNext
后应紧跟onCompleted
或onError
- 并且每个回调调用应该只有一个
onNext
调用
这对于在并发环境中强制执行安全性是必要的(这就是为什么它被称为 Sync
OnSubscribe
)以及用于支持压力.
可以在回调中考虑 for 循环:
return Observable.create(SyncOnSubscribe.createStateless(
new Action1<Observer<? super Integer>>() {
int counter = 0;
@Override
public void call(Observer<? super Integer> observer) {
if (counter < 9) {
observer.onNext(counter++);
} else {
observer.onCompleted();
}
}
}));
注意 onCompleted
调用,否则,你的 monad 将永远 运行。
这将生成以下列表 {0, 3, 6}
。但是代码很丑,违反了 SyncOnSubscribe.createStateless
约定。 SyncOnSubscribe.createStateless
对无状态制作很有用,比如随机。相反,应该使用 SyncOnSubscribe.createStateful
:
return Observable.create(SyncOnSubscribe.createStateful(
() -> 0,
(counter, observer) -> {
if (counter < 9) {
observer.onNext(counter);
} else {
observer.onCompleted();
}
return counter + 1;
}
));
但是for循环仍然需要被分解出来,仍然需要调用onCompleted
.