Rxswift map + 并行连接
Rxswift map + concat in parallel
此 Observable 正在执行以下操作
- 给定一个源可观察
- 我们使用 map 来执行一些异步工作
- 我们使用 concat 来 return 按顺序处理异步工作的结果
以下是return想要的结果,但我想开始
异步并行工作。
使用 Rx 的正确方法是什么?
import RxSwift
func delay(time: Int, closure: () -> Void) {
dispatch_after(
dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
dispatch_get_main_queue(), closure)
}
func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
return Observable.create() { (observer) -> Disposable in
print(desc)
delay(time) {
observer.onNext(value)
observer.onCompleted()
}
return NopDisposable.instance
}
}
let seq = Observable
.of(1, 2, 3, 4, 5)
.map { (n) -> Observable<Int> in
return doAsyncWork(n,
desc: "start \(n) - wait \(5 - n)",
time: 6 - n
)
}
.concat()
let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \([=10=])") }
sharedSeq.subscribeCompleted { print("=> completed") }
这个产品
//start 1 - wait 4
// => 1
//start 2 - wait 3
// => 2
//start 3 - wait 2
// => 3
//start 4 - wait 1
// => 4
//start 5 - wait 0
// => 5
所需的输出将是
//start 1 - wait 4
//start 2 - wait 3
//start 3 - wait 2
//start 4 - wait 1
//start 5 - wait 0
// => 1
// => 2
// => 3
// => 4
// => 5
这似乎可行,但不确定这是不是最佳答案
import RxSwift
func delay(time: Int, closure: () -> Void) {
dispatch_after(
dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
dispatch_get_main_queue(), closure)
}
func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
return Observable.create() { (observer) -> Disposable in
print(desc)
delay(time) {
observer.onNext(value)
observer.onCompleted()
}
return NopDisposable.instance
}
}
let seq = Observable
.of(1, 2, 3, 4, 5)
.map { (n) -> Observable<Int> in
let o = doAsyncWork(n,
desc: "start \(n) - wait \(5 - n)",
time: 6 - n
).shareReplay(1)
o.subscribe()
return o.asObservable()
}
.concat()
let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \([=10=])") }
sharedSeq.subscribeCompleted { print("=> completed") }
您的 "desired output" 似乎不同意您让 Observable
开始 "in parallel" 但延迟其元素的愿望,例如“5”没有延迟,“4”有延迟 1 秒,“3”延迟 2 秒,依此类推
我认为您正在寻找此输出:
start 1 - wait 4
start 2 - wait 3
start 3 - wait 2
start 4 - wait 1
start 5 - wait 0
5
4
3
2
1
这是你可以用来做到这一点的东西:
Observable.range(start: 1, count: 5)
.flatMap { n -> Observable<Int> in
let waitInterval = 5 - n
print("start \(n) - wait \(waitInterval)")
return Observable.just(n)
.delaySubscription(RxTimeInterval(waitInterval), scheduler: MainScheduler.instance)
}
.subscribeNext { i in
print(i)
}
.addDisposableTo(disposeBag)
如果您还有其他意思,您可能可以轻松调整此代码段以实现您的目标。
这目前对您没有帮助,但将来可能会对其他人有所帮助。
您要找的接线员叫concatMap
。但是,目前 RxSwift
.
中不存在它
当前存在一个已关闭的 PR here。
这不像您预期的那样工作的原因是 concat
订阅源可观察对象 一次一个 ,等待第一个完成它订阅了第二个,依此类推。
在 RxJava 中有 concatEager
,它可以做你想做的事——在一开始就订阅所有的资源,同时仍然保持顺序。但似乎 Swift 中没有。
你可以做的是压缩每个项目及其索引,flatMap,按索引排序并解压缩。
此 Observable 正在执行以下操作
- 给定一个源可观察
- 我们使用 map 来执行一些异步工作
- 我们使用 concat 来 return 按顺序处理异步工作的结果
以下是return想要的结果,但我想开始 异步并行工作。
使用 Rx 的正确方法是什么?
import RxSwift
func delay(time: Int, closure: () -> Void) {
dispatch_after(
dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
dispatch_get_main_queue(), closure)
}
func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
return Observable.create() { (observer) -> Disposable in
print(desc)
delay(time) {
observer.onNext(value)
observer.onCompleted()
}
return NopDisposable.instance
}
}
let seq = Observable
.of(1, 2, 3, 4, 5)
.map { (n) -> Observable<Int> in
return doAsyncWork(n,
desc: "start \(n) - wait \(5 - n)",
time: 6 - n
)
}
.concat()
let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \([=10=])") }
sharedSeq.subscribeCompleted { print("=> completed") }
这个产品
//start 1 - wait 4
// => 1
//start 2 - wait 3
// => 2
//start 3 - wait 2
// => 3
//start 4 - wait 1
// => 4
//start 5 - wait 0
// => 5
所需的输出将是
//start 1 - wait 4
//start 2 - wait 3
//start 3 - wait 2
//start 4 - wait 1
//start 5 - wait 0
// => 1
// => 2
// => 3
// => 4
// => 5
这似乎可行,但不确定这是不是最佳答案
import RxSwift
func delay(time: Int, closure: () -> Void) {
dispatch_after(
dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
dispatch_get_main_queue(), closure)
}
func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
return Observable.create() { (observer) -> Disposable in
print(desc)
delay(time) {
observer.onNext(value)
observer.onCompleted()
}
return NopDisposable.instance
}
}
let seq = Observable
.of(1, 2, 3, 4, 5)
.map { (n) -> Observable<Int> in
let o = doAsyncWork(n,
desc: "start \(n) - wait \(5 - n)",
time: 6 - n
).shareReplay(1)
o.subscribe()
return o.asObservable()
}
.concat()
let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \([=10=])") }
sharedSeq.subscribeCompleted { print("=> completed") }
您的 "desired output" 似乎不同意您让 Observable
开始 "in parallel" 但延迟其元素的愿望,例如“5”没有延迟,“4”有延迟 1 秒,“3”延迟 2 秒,依此类推
我认为您正在寻找此输出:
start 1 - wait 4
start 2 - wait 3
start 3 - wait 2
start 4 - wait 1
start 5 - wait 0
5
4
3
2
1
这是你可以用来做到这一点的东西:
Observable.range(start: 1, count: 5)
.flatMap { n -> Observable<Int> in
let waitInterval = 5 - n
print("start \(n) - wait \(waitInterval)")
return Observable.just(n)
.delaySubscription(RxTimeInterval(waitInterval), scheduler: MainScheduler.instance)
}
.subscribeNext { i in
print(i)
}
.addDisposableTo(disposeBag)
如果您还有其他意思,您可能可以轻松调整此代码段以实现您的目标。
这目前对您没有帮助,但将来可能会对其他人有所帮助。
您要找的接线员叫concatMap
。但是,目前 RxSwift
.
当前存在一个已关闭的 PR here。
这不像您预期的那样工作的原因是 concat
订阅源可观察对象 一次一个 ,等待第一个完成它订阅了第二个,依此类推。
在 RxJava 中有 concatEager
,它可以做你想做的事——在一开始就订阅所有的资源,同时仍然保持顺序。但似乎 Swift 中没有。
你可以做的是压缩每个项目及其索引,flatMap,按索引排序并解压缩。