以分层方式创建多级线程

Creating multiple level threads in a hierarchical manner

我正在尝试创建一个应用程序,它将在树中创建线程,例如 manner.My 主要方法位于 Level0,它在 Level1 中创建线程。然后 Level1 将创建一些线程。 Level1 中的每个线程将创建不同的线程集作为 Level2,依此类推。

下面是我尝试使用的代码,使用 ExecutorService:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadTree {

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();

//This is the main , level0

List ll = new ArrayList<>();
ll.add(2);
ll.add(5);
ll.add(8);

Iterator it = ll.iterator();
while (it.hasNext()) {

    exec.submit(new Level1((Integer)it.next()));
}
}
}

class Level1 implements Runnable{

private ScheduledExecutorService exec;
private Integer num;

public Level1(Integer n){
    num = n;
    exec = Executors
            .newScheduledThreadPool(n);
}

@Override
public void run() {

    for(int i=0;i<num;i++){
        exec.scheduleAtFixedRate(new Level2(), 0, 2, TimeUnit.SECONDS);
    }


}

}

class Level2 implements Runnable{

@Override
public void run() {

    System.out.println("Current Thread ID : " + Thread.currentThread().getId() + "Current Thread Name : " 
            + Thread.currentThread().getName()) ;
    //Do some task and create new threads

}

}

我有 2 个问题:

  1. 这是以树方式创建线程的唯一方法吗?还有其他方法可以通过一些分组来有效地处理这个问题吗?我已经显示了 3 个级别,但还可以有更多级别。
  2. 以防万一,这是一个好方法,什么是在其上一层等级别之一的线程中传播任何故障的最佳方法。

提前致谢。

我认为您可以按照以下步骤实现此目的 -

  • 只使用一个线程池执行器
  • 创建从子任务到父任务的通知机制。 (请注意 task 不同于 thread。Task 只是一个 Runnable 对象,而 Thread 是一个 Java thread 通过 executor 执行 运行nable 任务。)

请参考下面的示例 -

public class ThreadTree {

private static final ExecutorService executor = Executors.newCachedThreadPool();

public static void main(String[] args) {
    List<Integer> level1Nodes = new ArrayList<Integer>();
    level1Nodes.add(2);
    level1Nodes.add(5);
    level1Nodes.add(8);
    // start threads
    for (Integer num : level1Nodes) {
        executor.submit(new Level1(num));
    }
}

private static class Notification {
    private final Object result;
    private final Exception rootException;

    public Notification(Object result, Exception rootException) {
        this.result = result;
        this.rootException = rootException;
    }

    public Object getResult() {
        return result;
    }

    public Exception getRootException() {
        return rootException;
    }
}

private static abstract class NotificationHandler {
    private final AtomicInteger expectedNotifications;
    private final List<Notification> notifications;

    public NotificationHandler(int expectedNotifications) {
        this.expectedNotifications = new AtomicInteger(expectedNotifications);
        this.notifications = new ArrayList<Notification>();
    }

    public void handleNotification(Notification notification) {
        notifications.add(notification);
        if (expectedNotifications.decrementAndGet() == 0) {
            postRun(notifications);
        }
    }

    public void postRun(List<Notification> notifications) {
        for (Notification notification : notifications) {
            System.out.println("Status: " + (notification.getRootException() == null ? "Failed" : "Success") + ", Result: " + (notification.getResult() != null ? notification.getResult() : "No result"));
        }
    }
}

private static class Level1 extends NotificationHandler implements Runnable {
    private final int num;

    public Level1(int num) {
        super(num);
        this.num = num;
    }

    public void run() {
        for (int i = 0; i < num; i++) {
            executor.submit(new Level2(2, this)); // 2 is just an assumed number of nodes at level 2
        }
    }
}

private static class Level2 extends NotificationHandler implements Runnable {
    private final int num;
    private final NotificationHandler parentNotificationHandler;

    public Level2(int num, NotificationHandler parentNotificationHandler) {
        super(num);
        this.num = num;
        this.parentNotificationHandler = parentNotificationHandler;
    }

    public void run() {
        for (int i = 0; i < num; i++) {
            executor.submit(new Level2(2, this)); // 2 is just an assumed number of nodes at level 3
        }
        // execute the task and then notify parent
        parentNotificationHandler.handleNotification(new Notification("done", null));
    }
}

private static class Level3 extends NotificationHandler implements Runnable {
    private final int num;
    private final NotificationHandler parentNotificationHandler;

    public Level3(int num, NotificationHandler parentNotificationHandler) {
        super(num);
        this.num = num;
        this.parentNotificationHandler = parentNotificationHandler;
    }

    public void run() {
        // execute the task and then notify parent
        parentNotificationHandler.handleNotification(new Notification("done", null));
    }
}
}

此处 Notification 正在从 Level3 -> Level2' ->Level1' 传递。每个子任务都有责任在完成自己的工作后通知父任务。一旦所有子任务都通知了父任务,父任务将执行 post 运行 操作并通知其父任务。

在这里,线程池执行器使用了哪些线程并不重要。唯一重要的是遵循每个子任务通知父级的规则,然后父级执行 post 操作并进一步通知其父级。

Notification class 由 resultrootException 组成,可以从子任务中设置,以便父级可以知道子任务出了什么问题,异常可以到达顶层。