任务相互依赖的递归 Promise 链接
Recursive Promise chaining with tasks depending on each other
我试图解决一个关于递归和异步的编码问题,但我有点卡住了,这就是问题所在:
任务
您必须执行多项任务。任务就是任何函数(通常是异步的)。
有些任务可以相互依赖。所以他们必须等到他们依赖的任务先完成。
您必须等到所有任务完成并return他们的结果。
输入
以任务 ID 为键的对象,以描述任务为值的对象:
interface TaskDict {
[taskId: string]: {
dependencies: string[]; // an array of task ids.
task: (...dependencyResults: any[]) => any;
}
}
输出
使用任务 ID 作为键、任务结果作为值的对象解析的承诺:
interface TaskResultDict {
[taskId: string]: (
{
status: 'resolved',
value: any
} |
{
status: 'failed',
reason: any
} |
{
status: 'skipped',
unresolvedDependencies: string[]
}
);
}
请注意,如果任务的任何依赖项未解决(例如失败或被跳过),则不应执行该任务。
在这种情况下,状态将为 skipped
.
如果依赖项是循环的,则应该是相同的 skipped
状态。是的,输入中可能有这个错误。除此之外,输入将始终有效(无需编写验证)。
例子
const {deepStrictEqual} = require('assert');
const runTasks = (tasks: TaskDict): Promise<TaskResultDict> => {
// TODO
};
const taskResults = await runTasks({
a: {
dependencies: [],
task: () => Promise.resolve(4)
},
b: {
dependencies: ['a', 'c'],
task: async (a, c) => Math.sqrt(c * c - a * a)
},
c: {
dependencies: [],
task: () => new Promise((x) => setTimeout(x, 100)).then(() => 5)
},
d: {
dependencies: [],
task: () => Promise.reject('This will fail.')
},
e: {
dependencies: ['d', 'a', 'f'],
task: console.log
},
f: {
dependencies: ['f'],
task: () => console.log('Should never run - "f" depends on itself.')
}
});
deepStrictEqual(taskResults, {
a: {status: 'resolved', value: 4},
b: {status: 'resolved', value: 3},
c: {status: 'resolved', value: 5},
d: {status: 'failed', reason: 'This will fail.'},
e: {status: 'skipped', unresolvedDependencies: ['d', 'f']},
f: {status: 'skipped', unresolvedDependencies: ['f']}
});
当前方法
到目前为止,我已经采用了这种方法,但主要问题是,只要我不使用要解决的异步任务,逻辑就可以正常工作,在这种情况下,异步流程不会像我预期的那样工作所以在执行某些任务之前要解决的依赖关系没有得到正确解决,你有什么线索吗?
const resolveDependency = async (
taskId: string,
task: (...dependencyResults: any[]) => any,
dependencies: string[],
results: TaskResultDict
): Promise<TaskResultDict> => {
const unresolvedDependencies = Object.entries(results).filter(
(result) =>
dependencies.includes(result[0]) && result[1].status !== 'resolved'
);
try {
if (unresolvedDependencies.length > 0) {
return ({
[taskId]: {
status: 'skipped',
unresolvedDependencies: unresolvedDependencies.map(
(dependency) => dependency[0]
),
},
} as TaskResultDict);
}
const taskValue = await task(
...Object.entries(results)
.filter((result) => dependencies.includes(result[0]))
.map(
(result) => (result[1] as { status: 'resolved'; value: any }).value
)
);
return ({
[taskId]: {
status: 'resolved',
value: taskValue,
},
} as TaskResultDict);
} catch (error) {
return ({
[taskId]: {
status: 'failed',
reason: error,
},
} as TaskResultDict);
}
};
const runTaskWithDependencies = async (
tasks: TaskDict,
taskId: string,
results: TaskResultDict
): Promise<TaskResultDict> => {
const taskDependencies = tasks[taskId].dependencies;
const allDependenciesExecuted = Object.keys(results).length > 0 && Object.keys(results).every((taskId) =>
taskDependencies.includes(taskId)
);
if (taskDependencies.includes(taskId)) {
return {
[taskId]: {
status: 'skipped',
unresolvedDependencies: [taskId],
},
};
} else if (allDependenciesExecuted || taskDependencies.length === 0) {
const taskResult = await resolveDependency(
taskId,
tasks[taskId].task,
taskDependencies,
results
);
return {
...results,
...taskResult,
};
} else {
return (
await Promise.all(
taskDependencies
.map(
(dependency) =>
runTaskWithDependencies(tasks, dependency, results)
)
)
).reduce((previous, current) => {
return {
...previous,
...current,
};
}, {} as TaskResultDict);
}
};
export const runTasks = async (tasks: TaskDict): Promise<TaskResultDict> => {
const tasksIds = Object.keys(tasks);
return await tasksIds.reduce(async (previous, current) => {
const taskResult = await runTaskWithDependencies(
tasks,
current[0],
await previous
);
return {
...previous,
...taskResult,
};
}, Promise.resolve(<TaskResultDict>{}));
};
这是给你的一个想法:
- 从依赖关系构建图并添加两个额外的节点。 ENTRY 节点和 OUTPUT 节点。所以你可以有一个节点,它只是图中的一个入口点(这是因为节点没有任何依赖关系,对它们来说 ENTRY 节点将是唯一的依赖关系,它可以只是“Promise.resolve()”)和一个类似于图形“输出点”的节点。像这样:
编写一个函数来检查是否存在循环依赖,可以是一个简单的递归函数。您应该有一个 Set 来存储已经遍历的节点,如果当前遍历的节点在 Set 中,则意味着您具有循环依赖性。例如,在这种情况下,您可以抛出错误“检测到循环依赖性”。
现在,编写函数来遍历图形和链式承诺,但是从底部开始,从“OUTPUT”节点开始,因为在那种情况下您可以更轻松地链式承诺。最后,该函数应该 return 一个单一的 Promise,所以你可以这样做:
const chainPromises = async (graph) => {...}
// 开始一切
等待 chainPromises();
例如:
- 第一个遍历的节点是 OUTPUT 节点,它有两个依赖项(D 和 E),所以它应该像 await Promise.all(promiseFromD(), promiseFromE() )
- 第二个遍历的节点是D节点,它有两个依赖(A和B)所以应该像await一样等待Promise.all(promiseFromA(), promiseFromB())
等等...
任务必须 运行 根据它们的依赖关系按特定顺序排列,但是您的 runTasks
函数只是 运行 按照 Object.keys(tasks)
返回的顺序排列它们。
一种方法可以是 运行 批 运行 可用任务 直到没有任务要 运行 了,任务是运行nable 如果它的所有依赖项已经 运行。
我对您的类型进行了一些修改以简化以下功能(仍然与您的代码兼容):
interface Task {
dependencies: string[];
task: (...dependencyResults: any[]) => any;
}
interface TaskDict {
[taskId: string]: Task
}
interface ResolvedTask {
status: 'resolved',
value: any;
}
interface FailedTask {
status: 'failed';
reason: any;
}
interface SkippedTask {
status: 'skipped';
unresolvedDependencies: string[];
}
type TaskResult = ResolvedTask | FailedTask | SkippedTask;
interface TaskResultDict {
[taskId: string]: TaskResult;
}
现在,我们需要一个函数 运行s 一个任务 取决于先前 运行 任务的现有结果(如果有的话)(大致类似于您的runTaskWithDependencies
):
const runTask = async (task: Task, results: TaskResultDict): Promise<TaskResult> => {
const unresolvedDependencies = task.dependencies.filter((dep) => !results[dep] || results[dep].status !== 'resolved');
if (unresolvedDependencies.length > 0) {
return { status: 'skipped', unresolvedDependencies };
}
const dependencyResults = task.dependencies.map((dep) => (results[dep] as ResolvedTask).value);
try {
const value = await task.task(...dependencyResults);
return { status: 'resolved', value };
} catch (reason) {
return { status: 'failed', reason };
}
};
最后,我们 运行 批 运行 可用任务,直到 运行 不再有任务为止:
const runTasks = async (tasks: TaskDict): Promise<TaskResultDict> => {
let remainingTasks: [string, Task][] = Object.entries(tasks);
let results: TaskResultDict = {};
while (remainingTasks.length > 0) {
let runnableTasks = remainingTasks.filter(([_, { dependencies: deps }]) => deps.every((dep) => results.hasOwnProperty(dep)));
if (runnableTasks.length === 0) {
// there are remaining tasks but none of them are runnable: dependency issues, all will be skipped
runnableTasks = remainingTasks;
}
const newResultPromises = runnableTasks.map(([taskId, task]) => runTask(task, results).then((result) => ({ [taskId]: result })));
// await our batch:
const newResults = await Promise.all(newResultPromises);
// gather the results:
results = Object.assign(results, ...newResults);
remainingTasks = remainingTasks.filter(([taskId]) => !results.hasOwnProperty(taskId));
}
return results;
};
提出的图形方法看起来更优雅,可能是理想的实现方式,但我只是想分享另一个解决方案,可以说更容易掌握和实现。
这让我很困扰,所以这是一个可以说比 .
更优雅的答案
它首先创建行为类似于 thunks, i.e. that will be run only once and if needed (see memoization) 的“任务结果工厂”。
首先,您的类型进行了一些修改以简化以下功能:
interface Task {
dependencies: string[];
task: (...dependencyResults: any[]) => any;
}
interface TaskDict {
[taskId: string]: Task
}
interface ResolvedTask {
status: 'resolved',
value: any;
}
interface FailedTask {
status: 'failed';
reason: any;
}
interface SkippedTask {
status: 'skipped';
unresolvedDependencies: string[];
}
type TaskResult = ResolvedTask | FailedTask | SkippedTask;
interface TaskResultDict {
[taskId: string]: TaskResult;
}
现在,我们需要一个将“任务结果工厂”定义为 () => Promise<TaskResult>
:
的新类型
interface TaskResultFactoryDict {
[taskId: string]: () => Promise<TaskResult>;
}
然后,一个函数运行 一个 任务,一旦它的依赖关系被解决:
const runTask = async (taskId: string, task: Task, resultFactories: TaskResultFactoryDict): Promise<TaskResult> => {
// call the result factory of each dependency and await their results:
const dependencyResultPromises = task.dependencies.map(async (dep) => taskId !== dep && resultFactories[dep] && resultFactories[dep]());
const dependencyResults = await Promise.all(dependencyResultPromises);
// check whether some dependencies were "unresolved" (i.e. skipped, erroneous or inexistant):
const unresolvedDependencies = dependencyResults.reduce(
(accumulatedUnresolvedDependencies, result, i) => result && result.status === "resolved"
? accumulatedUnresolvedDependencies
: [...accumulatedUnresolvedDependencies, task.dependencies[i]],
[] as string[]
);
if (unresolvedDependencies.length === 0) {
// all dependencies resolved, use their values to run the task:
try {
const value = await task.task(...dependencyResults.map((result) => (result as ResolvedTask).value));
return { status: 'resolved', value };
} catch (reason) {
return { status: 'failed', reason };
}
} else {
return { status: 'skipped', unresolvedDependencies };
}
};
最后,创建工厂并触发计算的函数:
const runTasks = async (tasks: TaskDict): Promise<TaskResultDict> => {
// prepares the calculations without actually running them:
const resultFactories = Object.entries(tasks).reduce(
(accumulatedResultFactories, [taskId, task]) => {
let promise: Promise<TaskResult>;
return {
...accumulatedResultFactories,
[taskId]: () => {
// use a closure to build a thunk,
// so that a task will only ever run once,
// even if its result is needed mutliple times:
if (!promise) {
promise = runTask(taskId, task, resultFactories);
}
return promise;
}
};
},
{} as TaskResultFactoryDict
);
// triggers the calculations:
const resultPromises = Object.entries(resultFactories).map(async ([taskId, resultFactory]) => {
const result = await resultFactory();
return { [taskId]: result };
});
const results = await Promise.all(resultPromises);
// merges the results
return Object.assign({}, ...results);
};
我试图解决一个关于递归和异步的编码问题,但我有点卡住了,这就是问题所在:
任务
您必须执行多项任务。任务就是任何函数(通常是异步的)。
有些任务可以相互依赖。所以他们必须等到他们依赖的任务先完成。
您必须等到所有任务完成并return他们的结果。
输入
以任务 ID 为键的对象,以描述任务为值的对象:
interface TaskDict {
[taskId: string]: {
dependencies: string[]; // an array of task ids.
task: (...dependencyResults: any[]) => any;
}
}
输出
使用任务 ID 作为键、任务结果作为值的对象解析的承诺:
interface TaskResultDict {
[taskId: string]: (
{
status: 'resolved',
value: any
} |
{
status: 'failed',
reason: any
} |
{
status: 'skipped',
unresolvedDependencies: string[]
}
);
}
请注意,如果任务的任何依赖项未解决(例如失败或被跳过),则不应执行该任务。
在这种情况下,状态将为 skipped
.
如果依赖项是循环的,则应该是相同的 skipped
状态。是的,输入中可能有这个错误。除此之外,输入将始终有效(无需编写验证)。
例子
const {deepStrictEqual} = require('assert');
const runTasks = (tasks: TaskDict): Promise<TaskResultDict> => {
// TODO
};
const taskResults = await runTasks({
a: {
dependencies: [],
task: () => Promise.resolve(4)
},
b: {
dependencies: ['a', 'c'],
task: async (a, c) => Math.sqrt(c * c - a * a)
},
c: {
dependencies: [],
task: () => new Promise((x) => setTimeout(x, 100)).then(() => 5)
},
d: {
dependencies: [],
task: () => Promise.reject('This will fail.')
},
e: {
dependencies: ['d', 'a', 'f'],
task: console.log
},
f: {
dependencies: ['f'],
task: () => console.log('Should never run - "f" depends on itself.')
}
});
deepStrictEqual(taskResults, {
a: {status: 'resolved', value: 4},
b: {status: 'resolved', value: 3},
c: {status: 'resolved', value: 5},
d: {status: 'failed', reason: 'This will fail.'},
e: {status: 'skipped', unresolvedDependencies: ['d', 'f']},
f: {status: 'skipped', unresolvedDependencies: ['f']}
});
当前方法
到目前为止,我已经采用了这种方法,但主要问题是,只要我不使用要解决的异步任务,逻辑就可以正常工作,在这种情况下,异步流程不会像我预期的那样工作所以在执行某些任务之前要解决的依赖关系没有得到正确解决,你有什么线索吗?
const resolveDependency = async (
taskId: string,
task: (...dependencyResults: any[]) => any,
dependencies: string[],
results: TaskResultDict
): Promise<TaskResultDict> => {
const unresolvedDependencies = Object.entries(results).filter(
(result) =>
dependencies.includes(result[0]) && result[1].status !== 'resolved'
);
try {
if (unresolvedDependencies.length > 0) {
return ({
[taskId]: {
status: 'skipped',
unresolvedDependencies: unresolvedDependencies.map(
(dependency) => dependency[0]
),
},
} as TaskResultDict);
}
const taskValue = await task(
...Object.entries(results)
.filter((result) => dependencies.includes(result[0]))
.map(
(result) => (result[1] as { status: 'resolved'; value: any }).value
)
);
return ({
[taskId]: {
status: 'resolved',
value: taskValue,
},
} as TaskResultDict);
} catch (error) {
return ({
[taskId]: {
status: 'failed',
reason: error,
},
} as TaskResultDict);
}
};
const runTaskWithDependencies = async (
tasks: TaskDict,
taskId: string,
results: TaskResultDict
): Promise<TaskResultDict> => {
const taskDependencies = tasks[taskId].dependencies;
const allDependenciesExecuted = Object.keys(results).length > 0 && Object.keys(results).every((taskId) =>
taskDependencies.includes(taskId)
);
if (taskDependencies.includes(taskId)) {
return {
[taskId]: {
status: 'skipped',
unresolvedDependencies: [taskId],
},
};
} else if (allDependenciesExecuted || taskDependencies.length === 0) {
const taskResult = await resolveDependency(
taskId,
tasks[taskId].task,
taskDependencies,
results
);
return {
...results,
...taskResult,
};
} else {
return (
await Promise.all(
taskDependencies
.map(
(dependency) =>
runTaskWithDependencies(tasks, dependency, results)
)
)
).reduce((previous, current) => {
return {
...previous,
...current,
};
}, {} as TaskResultDict);
}
};
export const runTasks = async (tasks: TaskDict): Promise<TaskResultDict> => {
const tasksIds = Object.keys(tasks);
return await tasksIds.reduce(async (previous, current) => {
const taskResult = await runTaskWithDependencies(
tasks,
current[0],
await previous
);
return {
...previous,
...taskResult,
};
}, Promise.resolve(<TaskResultDict>{}));
};
这是给你的一个想法:
- 从依赖关系构建图并添加两个额外的节点。 ENTRY 节点和 OUTPUT 节点。所以你可以有一个节点,它只是图中的一个入口点(这是因为节点没有任何依赖关系,对它们来说 ENTRY 节点将是唯一的依赖关系,它可以只是“Promise.resolve()”)和一个类似于图形“输出点”的节点。像这样:
编写一个函数来检查是否存在循环依赖,可以是一个简单的递归函数。您应该有一个 Set 来存储已经遍历的节点,如果当前遍历的节点在 Set 中,则意味着您具有循环依赖性。例如,在这种情况下,您可以抛出错误“检测到循环依赖性”。
现在,编写函数来遍历图形和链式承诺,但是从底部开始,从“OUTPUT”节点开始,因为在那种情况下您可以更轻松地链式承诺。最后,该函数应该 return 一个单一的 Promise,所以你可以这样做:
const chainPromises = async (graph) => {...} // 开始一切 等待 chainPromises();
例如:
- 第一个遍历的节点是 OUTPUT 节点,它有两个依赖项(D 和 E),所以它应该像 await Promise.all(promiseFromD(), promiseFromE() )
- 第二个遍历的节点是D节点,它有两个依赖(A和B)所以应该像await一样等待Promise.all(promiseFromA(), promiseFromB()) 等等...
任务必须 运行 根据它们的依赖关系按特定顺序排列,但是您的 runTasks
函数只是 运行 按照 Object.keys(tasks)
返回的顺序排列它们。
一种方法可以是 运行 批 运行 可用任务 直到没有任务要 运行 了,任务是运行nable 如果它的所有依赖项已经 运行。
我对您的类型进行了一些修改以简化以下功能(仍然与您的代码兼容):
interface Task {
dependencies: string[];
task: (...dependencyResults: any[]) => any;
}
interface TaskDict {
[taskId: string]: Task
}
interface ResolvedTask {
status: 'resolved',
value: any;
}
interface FailedTask {
status: 'failed';
reason: any;
}
interface SkippedTask {
status: 'skipped';
unresolvedDependencies: string[];
}
type TaskResult = ResolvedTask | FailedTask | SkippedTask;
interface TaskResultDict {
[taskId: string]: TaskResult;
}
现在,我们需要一个函数 运行s 一个任务 取决于先前 运行 任务的现有结果(如果有的话)(大致类似于您的runTaskWithDependencies
):
const runTask = async (task: Task, results: TaskResultDict): Promise<TaskResult> => {
const unresolvedDependencies = task.dependencies.filter((dep) => !results[dep] || results[dep].status !== 'resolved');
if (unresolvedDependencies.length > 0) {
return { status: 'skipped', unresolvedDependencies };
}
const dependencyResults = task.dependencies.map((dep) => (results[dep] as ResolvedTask).value);
try {
const value = await task.task(...dependencyResults);
return { status: 'resolved', value };
} catch (reason) {
return { status: 'failed', reason };
}
};
最后,我们 运行 批 运行 可用任务,直到 运行 不再有任务为止:
const runTasks = async (tasks: TaskDict): Promise<TaskResultDict> => {
let remainingTasks: [string, Task][] = Object.entries(tasks);
let results: TaskResultDict = {};
while (remainingTasks.length > 0) {
let runnableTasks = remainingTasks.filter(([_, { dependencies: deps }]) => deps.every((dep) => results.hasOwnProperty(dep)));
if (runnableTasks.length === 0) {
// there are remaining tasks but none of them are runnable: dependency issues, all will be skipped
runnableTasks = remainingTasks;
}
const newResultPromises = runnableTasks.map(([taskId, task]) => runTask(task, results).then((result) => ({ [taskId]: result })));
// await our batch:
const newResults = await Promise.all(newResultPromises);
// gather the results:
results = Object.assign(results, ...newResults);
remainingTasks = remainingTasks.filter(([taskId]) => !results.hasOwnProperty(taskId));
}
return results;
};
这让我很困扰,所以这是一个可以说比
它首先创建行为类似于 thunks, i.e. that will be run only once and if needed (see memoization) 的“任务结果工厂”。
首先,您的类型进行了一些修改以简化以下功能:
interface Task {
dependencies: string[];
task: (...dependencyResults: any[]) => any;
}
interface TaskDict {
[taskId: string]: Task
}
interface ResolvedTask {
status: 'resolved',
value: any;
}
interface FailedTask {
status: 'failed';
reason: any;
}
interface SkippedTask {
status: 'skipped';
unresolvedDependencies: string[];
}
type TaskResult = ResolvedTask | FailedTask | SkippedTask;
interface TaskResultDict {
[taskId: string]: TaskResult;
}
现在,我们需要一个将“任务结果工厂”定义为 () => Promise<TaskResult>
:
interface TaskResultFactoryDict {
[taskId: string]: () => Promise<TaskResult>;
}
然后,一个函数运行 一个 任务,一旦它的依赖关系被解决:
const runTask = async (taskId: string, task: Task, resultFactories: TaskResultFactoryDict): Promise<TaskResult> => {
// call the result factory of each dependency and await their results:
const dependencyResultPromises = task.dependencies.map(async (dep) => taskId !== dep && resultFactories[dep] && resultFactories[dep]());
const dependencyResults = await Promise.all(dependencyResultPromises);
// check whether some dependencies were "unresolved" (i.e. skipped, erroneous or inexistant):
const unresolvedDependencies = dependencyResults.reduce(
(accumulatedUnresolvedDependencies, result, i) => result && result.status === "resolved"
? accumulatedUnresolvedDependencies
: [...accumulatedUnresolvedDependencies, task.dependencies[i]],
[] as string[]
);
if (unresolvedDependencies.length === 0) {
// all dependencies resolved, use their values to run the task:
try {
const value = await task.task(...dependencyResults.map((result) => (result as ResolvedTask).value));
return { status: 'resolved', value };
} catch (reason) {
return { status: 'failed', reason };
}
} else {
return { status: 'skipped', unresolvedDependencies };
}
};
最后,创建工厂并触发计算的函数:
const runTasks = async (tasks: TaskDict): Promise<TaskResultDict> => {
// prepares the calculations without actually running them:
const resultFactories = Object.entries(tasks).reduce(
(accumulatedResultFactories, [taskId, task]) => {
let promise: Promise<TaskResult>;
return {
...accumulatedResultFactories,
[taskId]: () => {
// use a closure to build a thunk,
// so that a task will only ever run once,
// even if its result is needed mutliple times:
if (!promise) {
promise = runTask(taskId, task, resultFactories);
}
return promise;
}
};
},
{} as TaskResultFactoryDict
);
// triggers the calculations:
const resultPromises = Object.entries(resultFactories).map(async ([taskId, resultFactory]) => {
const result = await resultFactory();
return { [taskId]: result };
});
const results = await Promise.all(resultPromises);
// merges the results
return Object.assign({}, ...results);
};