如何为我的任务实施实施相当于 Promise.all 的措施?

How to implement the equivalent of Promise.all for my Task implementation?

这是我的 Task 实现(即某种 Promise 但符合 monad 法则并且可取消)。它工作坚如磐石:

const Task = k =>
  ({runTask: (res, rej) => k(res, rej)});

const tAp = tf => tk =>
  Task((res, rej) => tf.runTask(f => tk.runTask(x => res(f(x)), rej), rej));

const tOf = x => Task((res, rej) => res(x));

const tMap = f => tk =>
  Task((res, rej) => tk.runTask(x => res(f(x)), rej));

const tChain = fm => mx =>
  Task((res, rej) => mx.runTask(x => fm(x).runTask(res, rej), rej));

const log = x => console.log(x);
const elog = e => console.error(e);

const fetchName = (id, cb) => {
  const r = setTimeout(id_ => {
    const m = new Map([[1, "Beau"], [2, "Dev"], [3, "Liz"]]);

    if (m.has(id_))
      return cb(null, m.get(id_));

    else
      return cb("unknown id", null);
  }, 0, id);

  return () => clearTimeout(r);
};

const fetchNameAsync = id =>
  Task((res, rej) =>
    fetchName(id, (err, data) =>
      err === null
        ? res(data)
        : rej(err)));

const a = tAp(tMap(x => y => x.length + y.length)
  (fetchNameAsync(1)))
    (fetchNameAsync(3));

const b = tAp(tMap(x => y => x.length + y.length)
  (fetchNameAsync(1)))
    (fetchNameAsync(5));

a.runTask(log, elog); // 7
b.runTask(log, elog); // Error: "unknown id"

但是,我不知道如何实现 awaitAll,它应该具有以下特征:


const awaitAll = ms =>
  Task((res, rej) => ms.map(mx => mx.runTask(...?)));

如有任何提示,我们将不胜感激!

这是一种可能的方法,使用一个计数器和一个包含在另一个任务中的循环。使用计数器是因为任务可以按任何顺序完成,否则很难知道外部任务何时可以最终解决 -

const assign = (o = {}, [ k, v ]) =>
  Object .assign (o, { [k]: v })

const tAll = (ts = []) =>
{ let resolved = 0
  const acc = []
  const run = (res, rej) =>
  { for (const [ i, t ] of ts .entries ())
      t .runTask
        ( x =>
            ++resolved === ts.length
              ? res (assign (acc, [ i, x ]))
              : assign (acc, [ i, x ])
        , rej
        )
  }
  return Task (run)
}

我们写一个简单的delay函数来测试它-

const delay = (ms, x) =>
  Task ((res, _) => setTimeout (res, ms, x))

const tasks =
  [ delay (200, 'a')
  , delay (300, 'b')
  , delay (100, 'c')
  ]

tAll (tasks) .runTask (console.log, console.error)
// ~300 ms later
// => [ 'a', 'b', 'c' ]

在事件任何任务失败时,外部任务被拒绝-

const tasks =
  [ delay (200, 'a')
  , delay (300, 'b')
  , Task ((_, rej) => rej (Error('bad')))
  ]

tAll (tasks) .runTask (console.log, console.error)
// => Error: bad

展开下面的代码片段以在您自己的浏览器中验证结果 -

const assign = (o = {}, [ k, v ]) =>
  Object .assign (o, { [k]: v })

const Task = k =>
  ({runTask: (res, rej) => k(res, rej)});

const tAll = (ts = []) =>
{ let resolved = 0
  const acc = []
  const run = (res, rej) =>
  { for (const [ i, t ] of ts .entries ())
      t .runTask
        ( x =>
            ++resolved === ts.length
              ? res (assign (acc, [ i, x ]))
              : assign (acc, [ i, x ])
        , rej
        )
  }
  return Task (run)
}

const delay = (ms, x) =>
  Task ((res, _) => setTimeout (res, ms, x))

const tasks =
  [ delay (200, 'a')
  , delay (300, 'b')
  , delay (100, 'c')
  ]

tAll (tasks) .runTask (console.log, console.error)
// ~300 ms later
// => [ 'a', 'b', 'c' ]


这是 tAll 的另一种实现,它将 for 换成 forEach 并删除了一个命令式块,{ ... } -

const tAll = (ts = []) =>
{ let resolved = 0
  const acc = []
  const run = (res, rej) => (t, i) =>
    t .runTask
      ( x =>
          ++resolved === ts.length
            ? res (assign (acc, [ i, x ]))
            : assign (acc, [ i, x ])
      , rej
      )
  return Task ((res, rej) => ts .forEach (run (res, rej)))
}

另一种使用递归和 2 任务基本情况的解决方案,然后只允许管理两个变量中的状态:

  const tAll = ([first, second, ...rest]) =>
   !second
     ? first
     : rest.length 
        ? tMap(
            results => results.flat()
          )(tAll([ tAll([first, second]), tAll(rest) ]))
        : Task((res, rej, a, b, done) => (
            first.runTask(
               value => !done && b ? (res([value, b.value]), done = true) : (a = { value }),
               err => !done && (rej(err), done = true)
            ),
            second.runTask(
               value => !done && a ? (res([a.value, value]), done = true) : (b = { value }),
              err => !done && (rej(err), done = true)
            ) 
         ));

这是从此处的其他答案以及链接的 folktale/task 中汲取灵感的另一种方式。我们不会实施复杂的 tAll 来迭代任务列表 组合任务,而是将关注点分离到单独的函数中。

这是一个简化的 tAnd -

const tAnd = (t1, t2) =>
{ const acc = []

  const guard = (res, i) => x =>
    ( acc[i] = x
    , acc[0] !== undefined && acc[1] !== undefined
        ? res (acc)
        : void 0
    )

  return Task
    ( (res, rej) =>
        ( t1 .runTask (guard (res, 0), rej) // rej could be called twice!
        , t2 .runTask (guard (res, 1), rej) // we'll fix this below
        )
    )
}

它是这样工作的-

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b' ]

现在 tAll 实施起来轻而易举 -

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tAnd (t, tAll (...ts))

呜呜呜,一路上不要忘记拉平-

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll(...ts))
        )

它是这样工作的-

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll 也能正确处理错误 -

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// test passed

尽管与最初的 tAll 相比,我们已经限制了程序的范围,但要使 tAnd 正确是非常困难的。组合任务应该只解决一次,拒绝一次——不能同时拒绝。这意味着还应避免使用双 resolve/reject。执行这些约束需要更多代码 -

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

展开下面的代码片段以在您自己的浏览器中验证结果 -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false
  
  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b' ]
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed


串行处理

最棘手的一点是并行处理要求。如果需求要求 串行 行为,实施起来会容易得多 -

const tAnd = (t1, t2) =>
  Task
    ( (res, rej) =>
        t1 .runTask
          ( a =>
              t2 .runTask
                ( b =>
                    res ([ a, b ])
                , rej
                )
          , rej
          )
    )

tAll 的实现当然保持不变。请注意现在延迟的差异,因为任务现在按顺序 运行 -

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2.5 seconds later
// [ 'a', 'b' ]

还有许多任务 tAll -

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~ 9 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

展开下面的代码片段以在您自己的浏览器中验证结果 -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAnd = (t1, t2) =>
  Task
    ( (res, rej) =>
        t1 .runTask
          ( a =>
              t2 .runTask
                ( b =>
                    res ([ a, b ])
                , rej
                )
          , rej
          )
    )

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2.5 seconds later
// [ 'a', 'b' ]

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~ 9 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed


如何实现tOrtRace

为了完整起见,这里是tOr。注意这里的tOr相当于folktale的Task.concat-

const tOr = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const guard = (res, rej) =>
    [ x =>
        resolved || rejected
          ? void 0
          : ( resolved = true
            , res (x)
            )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej))
        , t2 .runTask (...guard (res, rej))
        )
    )
}

解决或拒绝最先完成的两个任务 -

tOr
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~500 ms later
// 'b' 

tRace-

const tRace = (t = tOf (undefined), ...ts) =>
  ts .reduce (tOr, t)

解决或拒绝许多任务中最先完成的 -

tRace
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~300 ms later
// 'f'

展开下面的代码片段以在您自己的浏览器中验证结果 -

const Task = k =>
  ({ runTask: (a, b) => k (a, b) })

const tOr = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const guard = (res, rej) =>
    [ x =>
        resolved || rejected
          ? void 0
          : ( resolved = true
            , res (x)
            )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej))
        , t2 .runTask (...guard (res, rej))
        )
    )
}

const tRace = (t = tOf (undefined), ...ts) =>
  ts. reduce (tOr, t)

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tOr
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~500 ms later
// 'b' 

tRace
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~300 ms later
// note `f` appears in the output first because this tRace demo finishes before the tOr demo above
// 'f'

tRace
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed


如何实现tAp

在评论中,我们谈论的是应用,tAp。我认为 tAll 使实施变得相当容易 -

const tAp = (f, ...ts) =>
  tMap
    ( ([ f, ...xs ]) => f (...xs)
    , tAll (f, ...ts)
    )

tAp 接受任务包装函数和任意数量的任务包装值,以及 returns 一个新任务 -

const sum = (v, ...vs) =>
  vs.length === 0
    ? v
    : v + sum (...vs)

tAp
  ( delay (2000, sum)
  , delay (500, 1)
  , delay (900, 2)
  , delay (1500, 3)
  , delay (1800, 4)
  , delay (300, 5)
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// 15

除非任务有副作用,否则我看不出 tAp 的 "parallel" 实施违反适用法则的原因。

展开下面的代码片段以在您自己的浏览器中验证结果 -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAp = (f, ...ts) =>
  tMap
    ( ([ f, ...xs ]) => f (...xs)
    , tAll (f, ...ts)
    )

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false
  
  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

const sum = (v, ...vs) =>
  vs.length === 0
    ? v
    : v + sum (...vs)

tAp
  ( delay (2000, sum)
  , delay (500, 1)
  , delay (900, 2)
  , delay (1500, 3)
  , delay (1800, 4)
  , delay (300, 5)
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// 15