使用拓扑排序在有向非循环依赖图中并发处理的组任务

Group tasks for concurrent processing in directed acyclic dependency graph using topological sorting

我的任务 class 在执行前依赖于其他任务。我想对可以并行化的任务进行分组并对其进行排序。我决定先将它表示为 DAG 并尝试使用 JGrapht。 首先,我遍历任务的输入列表以获取所有任务及其依赖项并将它们收集在一个列表中。 然后对于每个任务,我在图中创建一个顶点。

DirectedAcyclicGraph<Task, DefaultEdge> d = new DirectedAcyclicGraph<>(DefaultEdge.class);
Set<Task> all = collectAllTasks(tasks);
all.forEach(d::addVertex);

然后使用相同的列表尝试在节点之间创建边。

all.forEach(task -> {
        Set<TaskName> predecessors = task.getPredecessors();

        predecessors.forEach(name -> {
            Task predecessor = taskService.getTaskByName(name);
            d.addEdge(predecessor, task);
        });

    });

然后我尝试对任务进行分组

private Set<Set<TaskId>> groupTasks(DirectedAcyclicGraph<TaskId, DefaultEdge> dag) {
    Set<Set<TaskId>> groups = new LinkedHashSet<>();
    Iterator<TaskId> iterator = new TopologicalOrderIterator<>(dag);

    iterator.forEachRemaining(taskId -> {
        //source?
        if (dag.incomingEdgesOf(taskId).isEmpty()) {
            if (groups.isEmpty()) {
                Set<TaskId> set = new HashSet<>();
                set.add(taskId);
                groups.add(set);
            } else {
                groups.iterator().next().add(taskId);
            }
        }

        Set<TaskId> tasks = new HashSet<>(Graphs.successorListOf(dag, taskId));

        //sink?
        if (tasks.isEmpty()) {
            return;
        }

        groups.add(featuresGroup);
    });

    return groups;
}

所以结果是有序和分组的任务,例如图

结果会是A, B, {C, D}, E.

然而,它完全打破了 B 也是 E 的前身的情况,就像这个例子

我怎样才能像以前一样为图表实现 A, B, {C, D}, E 的顺序? 有没有我可以查看的特定算法或如何以更好的方式实现该算法?谢谢。

可以使用以下过程获得解决方案:

  1. 第一组包含没有传入弧的所有任务:这些任务没有传入依赖项。
  2. 从图中删除第一组中的所有任务
  3. 第二组由修改图中没有传入弧的任务组成:已满足这些任务的所有依赖关系。
  4. 从修改后的图中删除第二组中的所有任务。继续此过程,直到删除所有任务。

使用 JGraphT:

public static List<List<String>> getGroups(Graph<String, DefaultEdge> taskGraph){
    List<List<String>> groups = new ArrayList<>();
    //The first group contains all vertices without incoming arcs
    List<String> group = new LinkedList<>();
    for(String task : taskGraph.vertexSet())
        if(taskGraph.inDegreeOf(task) == 0)
            group.add(task);
    //Next we construct all remaining groups. The group k+1 consists of al vertices without incoming arcs if we were
    //to remove all vertices in the previous group k from the graph.
    do {
        groups.add(group);
        List<String> nextGroup = new LinkedList<>();
        for (String task : group) {
            for (String nextTask : Graphs.neighborSetOf(taskGraph, task)) {
                if (taskGraph.inDegreeOf(nextTask) == 1)
                    nextGroup.add(nextTask);
            }
            taskGraph.removeVertex(task); //Removes a vertex and all its edges from the graph
        }
        group=nextGroup;
    }while(!group.isEmpty());
    return groups;
}
public static Graph<String, DefaultEdge> getGraph1(){
    Graph<String, DefaultEdge> taskGraph=new SimpleDirectedGraph<>(DefaultEdge.class);
    Graphs.addAllVertices(taskGraph, Arrays.asList("A","B","C","D","E"));
    taskGraph.addEdge("A","B");
    taskGraph.addEdge("B","C");
    taskGraph.addEdge("B","D");
    taskGraph.addEdge("C","E");
    taskGraph.addEdge("D","E");
    return taskGraph;
}
public static Graph<String, DefaultEdge> getGraph2(){
    Graph<String, DefaultEdge> taskGraph=new SimpleDirectedGraph<>(DefaultEdge.class);
    Graphs.addAllVertices(taskGraph, Arrays.asList("A","B","C","D","E"));
    taskGraph.addEdge("A","B");
    taskGraph.addEdge("B","C");
    taskGraph.addEdge("B","D");
    taskGraph.addEdge("B","E");
    taskGraph.addEdge("C","E");
    taskGraph.addEdge("D","E");
    return taskGraph;
}
public static void main(String[] args){
    System.out.println("Graph1: "+getGroups(getGraph1()));
    System.out.println("Graph2: "+getGroups(getGraph2()));
}

输出:

Graph1: [[A], [B], [C, D], [E]]
Graph2: [[A], [B], [C, D], [E]]

注意:以上代码假定输入图确实是一个有效的任务图。您可以构建一个额外的检查来识别循环依赖,例如如果你有这样的序列:A -> B -> A.