运行 并行的 TaskEithers 数组,但如果 1 个或多个任务失败则继续

Running an array of TaskEithers in parallel, but continue if 1 or more task fails

我必须并行进行一组 IO 调用,如果成功则合并调用的内​​容。如果一个失败,其他的将按正常方式处理,但会出现错误消息。

我对如何实现这一点的思考过程:

Array<TE<E, A>> -> TE<E, Array<A>> -> TE<E, MergedA> -> [E, A]

我目前在做什么:

我目前正在对 TE 阵列进行测序,但链中的任何故障都会产生一个左侧。

pipe(
sequenceT(TE.taskEither)(arrayofTE), //TE<E,A>[] -> TE<E,A[]>
TE.map(mergeFn), //TE<E, A[]> -> TE<E, MergedA> 
???
)

如何停止短路?

您可以将 T.task 而不是 TE.taskEither 传递给 sequence/sequenceT (docs):

Action: execute an array of tasks in parallel, collecting all failures and successes

TaskEither: array.sequence(T.task)(taskEithers) - same for sequenceT

sequence: 运行 并行任务

import { pipeable as P, taskEither as TE, task as T, array as A, either as E } from "fp-ts";

const arrayofTE: TE.TaskEither<string, number>[] = [
  TE.right(1),
  TE.right(2),
  TE.left("Oh shit")
];

const run = P.pipe(
  // change to T.task instead of TE.taskEither here
  A.array.sequence(T.task)(arrayofTE),
  mergeFn
);

run(); // run side effect
// returns Promise<{"errors":["Oh shit"],"results":[1,2]}>

// whatever merged result you want to have; this one collects all errors and all results
declare function mergeFn(te: T.Task<E.Either<string, number>[]>): T.Task<Results> 

type Results = { errors: string[]; results: number[] };

sequenceT: 运行 并行不同类型的任务

import { apply as AP /* and others above */ } from "fp-ts";

// Here, TaskEither result can be number | boolean (success case), string on error
const arrayofTE = [TE.right(1), TE.right(true), TE.left("Oh shit")] as const;

const run = P.pipe(
  AP.sequenceT(T.task)(...arrayofTE), // we use sequenceT here and pass T.task again
  mergeFn
);

declare function mergeFn(a: T.Task<E.Either<string, number | boolean>[]>): T.Task<Results>

这里有 mergeFn 实施的沙箱:sequence , sequenceT.