RxJava - blockingGet() 方法在使用 mergeWith 运算符时停止线程
RxJava - blockingGet() method stops thread when using mergeWith operator
我有一个用例,我需要一个反馈循环,它会根据某些条件从已经发出的项目中发出一个项目。
示例代码:
Flowable<Integer> range1 = Flowable.range(1, 10);
UnicastProcessor<Integer> publishProcessor = UnicastProcessor.create();
Single<List<Integer>> pollResponse = range1
.mergeWith(publishProcessor) //On Commenting this line code works without wanted behaviour
.map(integer -> {
if (integer % 2 == 0 && integer <= 10) {
publishProcessor.onNext(20 + integer);
}
return integer;
})
.flatMap(integer -> flatMapMock(integer, publishProcessor))
.toList()
.doOnError(throwable -> System.out.println(throwable));
List<Integer> integers = pollResponse.blockingGet();
System.out.println(integers.size());
flatMapMock 函数:
private static Flowable<Integer> flatMapMock(Integer integer,
FlowableProcessor<Integer> feedbackSource){
return Flowable.just(integer)
.map(integer1 -> integer1);
}
我的问题是:
- 如果我不将 publishProcessor 与 range1 flowable 合并,那么我
正在达到打印列表的大小。 但是,在合并时为什么不
有用吗?
- 我在这里遗漏了什么吗?任何指针都可以。
问题是您正在使用需要有限流的 toList
,但是您合并了从未完成的 UnicastProcessor
,因此 mergeWith
永远不会完成。您可能应该重新考虑您想要实现的目标。
我有一个用例,我需要一个反馈循环,它会根据某些条件从已经发出的项目中发出一个项目。
示例代码:
Flowable<Integer> range1 = Flowable.range(1, 10);
UnicastProcessor<Integer> publishProcessor = UnicastProcessor.create();
Single<List<Integer>> pollResponse = range1
.mergeWith(publishProcessor) //On Commenting this line code works without wanted behaviour
.map(integer -> {
if (integer % 2 == 0 && integer <= 10) {
publishProcessor.onNext(20 + integer);
}
return integer;
})
.flatMap(integer -> flatMapMock(integer, publishProcessor))
.toList()
.doOnError(throwable -> System.out.println(throwable));
List<Integer> integers = pollResponse.blockingGet();
System.out.println(integers.size());
flatMapMock 函数:
private static Flowable<Integer> flatMapMock(Integer integer,
FlowableProcessor<Integer> feedbackSource){
return Flowable.just(integer)
.map(integer1 -> integer1);
}
我的问题是:
- 如果我不将 publishProcessor 与 range1 flowable 合并,那么我 正在达到打印列表的大小。 但是,在合并时为什么不 有用吗?
- 我在这里遗漏了什么吗?任何指针都可以。
问题是您正在使用需要有限流的 toList
,但是您合并了从未完成的 UnicastProcessor
,因此 mergeWith
永远不会完成。您可能应该重新考虑您想要实现的目标。