我可以有条件地 "merge" 一个带有 Observable 的 Single 吗?
can i conditionally "merge" a Single with an Observable?
我是 RxJava 的新手,我在思考如何执行以下操作时遇到了一些麻烦。
- 我正在使用 Retrofit 来调用 returns 我
Single<Foo>
的网络请求,这是我最终想通过我的 订阅者 [=39= 使用的类型]实例(称之为SingleFooSubscriber
)
Foo
有一个内部 属性 items
类型为 List<String>
。
- 如果
Foo.items
不为空,我想为其每个值调用单独的并发网络请求。 (这些请求的实际结果对于 SingleFooSubscriber
无关紧要,因为结果将在外部缓存)。
SingleFooSubscriber.onComplete()
应仅在 Foo
和所有 Foo.items
已被提取时调用。
fetchFooCall
.subscribeOn(Schedulers.io())
// Approach #1...
// the idea here would be to "merge" the results of both streams into a single
// reactive type, but i'm not sure how this would work given that the item emissions
// could be far greater than one. using zip here i don't think it would every
// complete.
.flatMap { foo ->
if(foo.items.isNotEmpty()) {
Observable.zip(
Observable.fromIterable(foo.items),
Observable.just(foo),
{ source1, source2 ->
// hmmmm...
}
).toSingle()
} else {
Single.just(foo)
}
}
// ...or Approach #2...
// i think this would result in the streams for Foo and items being handled sequentially,
// which is not really ideal because
// 1) i think it would entail nested streams (i get the feeling i should be using flatMap
// instead)
// 2) and i'm not sure SingleFooSubscriber.onComplete() would depend on the completion of
// the stream for items
.doOnSuccess { data ->
if(data.items.isNotEmpty()) {
// hmmmm...
}
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ data -> /* onSuccess() */ },
{ error -> /* onError() */ }
)
任何关于如何解决这个问题的想法都将不胜感激!
奖励积分:在尝试对此提出解决方案时,我开始质疑使用 Single
反应类型与 Observable
反应类型的决定。大多数(所有,除了这个 Foo.items
案例?)我的流实际上围绕着消费某个东西的单个实例,所以我倾向于使用 Single 来表示我的流,因为我认为它会在代码周围增加一些语义清晰度。有人对何时使用其中一个与另一个有任何一般性指导吗?
您需要嵌套 flatMap
s 然后转换回 Single
:
retrofit.getMainObject()
.flatMap(v ->
Flowable.fromIterable(v.items)
.flatMap(w ->
retrofit.getItem(w.id).doOnNext(x -> w.property = x)
)
.ignoreElements()
.toSingle(v)
)
我是 RxJava 的新手,我在思考如何执行以下操作时遇到了一些麻烦。
- 我正在使用 Retrofit 来调用 returns 我
Single<Foo>
的网络请求,这是我最终想通过我的 订阅者 [=39= 使用的类型]实例(称之为SingleFooSubscriber
) Foo
有一个内部 属性items
类型为List<String>
。- 如果
Foo.items
不为空,我想为其每个值调用单独的并发网络请求。 (这些请求的实际结果对于SingleFooSubscriber
无关紧要,因为结果将在外部缓存)。 SingleFooSubscriber.onComplete()
应仅在Foo
和所有Foo.items
已被提取时调用。
fetchFooCall .subscribeOn(Schedulers.io())
// Approach #1...
// the idea here would be to "merge" the results of both streams into a single
// reactive type, but i'm not sure how this would work given that the item emissions
// could be far greater than one. using zip here i don't think it would every
// complete.
.flatMap { foo ->
if(foo.items.isNotEmpty()) {
Observable.zip(
Observable.fromIterable(foo.items),
Observable.just(foo),
{ source1, source2 ->
// hmmmm...
}
).toSingle()
} else {
Single.just(foo)
}
}
// ...or Approach #2...
// i think this would result in the streams for Foo and items being handled sequentially,
// which is not really ideal because
// 1) i think it would entail nested streams (i get the feeling i should be using flatMap
// instead)
// 2) and i'm not sure SingleFooSubscriber.onComplete() would depend on the completion of
// the stream for items
.doOnSuccess { data ->
if(data.items.isNotEmpty()) {
// hmmmm...
}
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ data -> /* onSuccess() */ },
{ error -> /* onError() */ }
)
任何关于如何解决这个问题的想法都将不胜感激!
奖励积分:在尝试对此提出解决方案时,我开始质疑使用 Single
反应类型与 Observable
反应类型的决定。大多数(所有,除了这个 Foo.items
案例?)我的流实际上围绕着消费某个东西的单个实例,所以我倾向于使用 Single 来表示我的流,因为我认为它会在代码周围增加一些语义清晰度。有人对何时使用其中一个与另一个有任何一般性指导吗?
您需要嵌套 flatMap
s 然后转换回 Single
:
retrofit.getMainObject()
.flatMap(v ->
Flowable.fromIterable(v.items)
.flatMap(w ->
retrofit.getItem(w.id).doOnNext(x -> w.property = x)
)
.ignoreElements()
.toSingle(v)
)