这个 Promise 取消实现是否用于减少正确轨道上的异步迭代?

Is this Promise cancellation implementation for reducing an async iterable on the right track?

我想为我的库 reduce 的一种方法启用 Promise 取消。我只对取消异步迭代的 Promise 感兴趣,因为它们很可能会无限期挂起。

const reduceAsyncIterable = async (fn, possiblyX0, state, x) => {
  const iter = x[Symbol.asyncIterator]()
  const y0 = isUndefined(possiblyX0) ? (await iter.next()).value : possiblyX0
  if (isUndefined(y0)) {
    throw new TypeError('reduce(...)(x); x cannot be empty')
  }
  let y = await fn(y0, (await iter.next()).value)
  for await (const xi of iter) {
    if (state.cancelled) return // stops async iterating if `cancel` called
    y = await fn(y, xi)
  }
  return y
}

const reduce = (fn, x0) => {
  if (!isFunction(fn)) {
    throw new TypeError('reduce(x, y); x is not a function')
  }
  return x => {
    if (isIterable(x)) return reduceIterable(fn, x0, x)
    if (isAsyncIterable(x)) {
      const state = { cancelled: false, resolve: () => {} }
      const p = new Promise((resolve, reject) => {
        state.resolve = resolve
        reduceAsyncIterable(fn, x0, state, x).then(
          y => state.cancelled || resolve(y)
        ).catch(reject)
      })
      p.cancel = () => { state.cancelled = true; state.resolve() } // shortcircuit the Promise `p` on `cancel` call
      return p
    }
    if (is(Object)(x)) return reduceObject(fn, x0, x)
    throw new TypeError('reduce(...)(x); x invalid')
  }
}

上面的代码似乎可以运行,但我不禁觉得这里有内存泄漏。特别是 await iter.next()for await (const xi of iter)。如果这些 await 语句永远持续(对于异步迭代器,它们可能会如此),reduceAsyncIterable 可能永远不会 return。从用户的角度来看这很好,因为 reduce 中发生了短路,因为用户看到的 Promise 已解决。但是从计算机的角度来看,取消这个操作的Promise会不会导致内存泄露?

我希望能够像这样在 returned promise 上使用 cancel 函数:

const myOngoingTaskPromise = reduce(someReducer, null)(myInfiniteAsyncIterable)

myOngoingTaskPromise.cancel() // resolves myOngoingTaskPromise with undefined

myOngoingTaskPromise // Promise { undefined }

找到方法了,Promise.race好像是秘密武器什么的

    if (isAsyncIterable(x)) {
      const state = { cancel: () => {} }
      const cancelToken = new Promise((_, reject) => { state.cancel = reject })
      const p = Promise.race([
        reduceAsyncIterable(fn, x0, x),
        cancelToken,
      ])
      p.cancel = () => { state.cancel(new Error('cancelled')) }
      return p
    }

没有内存泄漏