长时间处理大数组的每个元素后可观察到的延迟

Observable delay after long processing of each element of large array

假设我们有一个大数组,处理该数组中的每个元素需要很长时间(5 秒)。我们想在处理下一个元素之前添加一些延迟 (2s)。

我已经设法实现了这样的行为:

let arr = [1, 2, 3]
let i = 0

Rx.Observable.of(null)
.map(val => arr[i++])
.do(val => {
  console.log(val + ' preloop')
  let time = +new Date() + 5000; while (+new Date() < time) {}
  console.log(val + ' afterloop')
})
.delay(2000)
.flatMap(val => (i < arr.length) ? Rx.Observable.throw(null) : Rx.Observable.empty())
.retry()
.subscribe(console.log, console.log, () => console.log('completed'))

输出符合预期:

1 preloop
delay 5s
1 afterloop
delay 2s
2 preloop
...
completed

但这段代码很丑陋,不可重用且有错误,不符合 rx 的理念。什么是更好的方法?

请注意,数组(或者它甚至可能根本不是数组)很大, 在这里不起作用。

虽然我能想到一些用例,但这个问题是假设性的。

我不确定为什么数组在这里很重要,您可以使用您提供的 link 中的相同方法解决您的问题。

对于您的长期操作,更好的做法可能是一种承诺。我用了一个async sleep函数来模拟5秒的操作,你可以换成你的promise。

额外延迟的技巧是连接一个虚拟元素,延迟它然后忽略它。

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));

Rx.Observable.from([1, 2, 3])
  .concatMap(item => 
    Rx.Observable.defer(() => sleep(5000))
      .mapTo(item)
      .concat(Rx.Observable.of(null).delay(2000).ignoreElements()) 
  )
  .subscribe(console.log, console.log, () => console.log('completed'))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

不延迟第一次发射的延迟替代方案,

source
  .first()
  .merge(
    source.skip(1).delay(2000)
  )