如何访问 ThreadPoolExecutor 中的 运行 个线程?

How to access running threads inside ThreadPoolExecutor?

我有一个 运行 线程队列,我想在它执行时公开它的一些数据,以监视进程。

ThreadPoolExecutor 提供对其队列的访问,我可以遍历这些对象以调用我重写的 toString() 方法,但这些只是等待执行的线程。

有没有办法访问当前正在 运行 调用我的方法的线程?或者也许有更好的方法来完成这项任务?

为了进一步阐明目的,这里有一些大致的代码:

public class GetDataTask implements Runnable {
    private String pageNumber;
    private int dataBlocksParsed;
    private String source;
    private String dataType;


    public GetDataTask(String source, String dataType) {
        this.source = source;
        this.dataType = dataType;
    }

    @Override
    public void run() {
        //do stuff that affects pageNumber and dataBlocksParsed
    }

    @Override
    public String toString() {
        return "GetDataTask{" +
            "source=" + source +
            ", dataType=" + dataType +
            ", pageNumber=" + pageNumber +
            ", dataBlocksParsed=" + dataBlocksParsed +
            '}';
    }
}

和一个class持有执行人:

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new GetDataTask(source, dataType));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()
        for (Runnable r : executor.getActiveThreads()) {
            info.append(((GetDataTask) r).toString()).append('\n');
        }
        return info.append(executor.toString()).toString();
   }
}

每当您将线程添加到队列时,也会将其添加到第二个数据结构,比如 HashSet。然后,如果您需要访问一个 运行 线程,您可以检查 ExecutorService 的队列以找到仍在等待执行的线程:您的 HashSet 中的每个线程都不是在 ExecutorService 的队列中当前是 运行。

像这样包装Runnable怎么样。

static class MonitorRunnable implements Runnable {

    static final List<Runnable> activeTasks = Collections.synchronizedList(new ArrayList<>());

    private final Runnable runnable;

    public MonitorRunnable(Runnable runnable) {
        this.runnable = runnable;
    }

    @Override
    public void run() {
        activeTasks.add(runnable);
        runnable.run();
        activeTasks.remove(runnable);
    }
}

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new MonitorRunnable(new GetDataTask(source, dataType)));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()
        synchronized (MonitorRunnable.activeTasks) {
            for (Runnable r : MonitorRunnable.activeTasks) {
                info.append(((GetDataTask) r).toString()).append('\n');
            }
        }
        return info.append(executor.toString()).toString();
   }
}

就像我在评论中写的那样。我会对共享统计对象方法进行主动更新:

我会像这样更改任务:

public class GetDataTask implements Runnable {
    private String pageNumber;
    private int dataBlocksParsed;
    private String source;
    private String dataType;
    HashMap<GetDataTask,String> statistics


    public GetDataTask(String source, String dataType, HashMap<GetDataTask,String> statistics) {
        this.source = source;
        this.dataType = dataType;
        this.statistics = statistics;
    }

    @Override
    public void run() {
        // you'll probably want to immediately have stats available:
        statistics.put(this, this.toString());

        //do stuff that affects pageNumber and dataBlocksParsed
        // vv this will probably be inside your "do stuff" loop
        statistics.put(this, this.toString());
        // loop end

        // if you do not want stats of finished tasks, remove "this" here.
    }

    @Override
    public String toString() {
        return "GetDataTask{" +
            "source=" + source +
            ", dataType=" + dataType +
            ", pageNumber=" + pageNumber +
            ", dataBlocksParsed=" + dataBlocksParsed +
            '}';
    }
}

和经理:

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    private HashMap<GetDataTask,String> stats = new ConcurrentHashMap<GetDataTask,String>();       

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new GetDataTask(source, dataType, stats));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()

        // >>> iterate "stats"'s values to build the info string ...            

        return info.append(executor.toString()).toString();
   }
}

更新

您可以通过迭代 Map 的 keys(正在执行的任务)并在其上调用 toString 来轻松更改获取信息的方法。不过,这与 saka 的方法非常相似。也许你对他的感觉更舒服。

由于您可以控制所使用的执行器,我会使用 ThreadPoolExecutorbeforeExecuteafterExecute 方法来跟踪 运行 任务并使用它创建一个 getActiveTasks 方法。

import java.util.Set;
import java.util.concurrent.*;

public class ActiveTasksThreadPool extends ThreadPoolExecutor {

    private final ConcurrentHashMap<Runnable, Boolean> activeTasks = new ConcurrentHashMap<>();

    public ActiveTasksThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        activeTasks.put(r, Boolean.TRUE);
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        activeTasks.remove(r);
    }

    public Set<Runnable> getActiveTasks() {
        // the returned set will not throw a ConcurrentModificationException.
        return activeTasks.keySet();
    }

    public static void main(String[] args) {

        final int maxTasks = 5;
        ActiveTasksThreadPool tp = new ActiveTasksThreadPool(maxTasks, maxTasks, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        try {
            System.out.println("Active tasks: " + tp.getActiveTasks());
            final CountDownLatch latch = new CountDownLatch(1); 
            for (int i = 0; i < maxTasks; i ++) {
                final int rnumber = i;
                tp.execute(new Runnable() {
                    @Override
                    public void run() {
                        try { latch.await(); } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    @Override
                    public String toString() {
                        return "Runnable " + rnumber;
                    }
                });
            }
            Thread.sleep(100L); // give threads a chance to start
            System.out.println("Active tasks: " + tp.getActiveTasks());
            latch.countDown();
            Thread.sleep(100L); // give threads a chance to finish
            System.out.println("Active tasks: " + tp.getActiveTasks());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            tp.shutdownNow();
        }
    }

}

您只需要将对 运行 线程的引用存储在将在 ThreadPoolExecutor 中触发的某处,在其他答案之上添加,这是一个读取线程状态的小型应用程序示例 运行 在 ThreadPoolExecutor 中每 1 秒执行一次,直到关闭:

package sample;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

        for (int i = 1; i <= 10; i++)
        {
            Task task = new Task("Task " + i);
            executor.execute(task);
        }

        executor.shutdown();

        try {
            while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                System.out.println("Awaiting completion of threads, threads states: " + Task.getThreadsStateCount());
            }

        } catch (InterruptedException e) {
        }

        System.out.println("Executor shutdown -> " + executor.isShutdown());
    }
}

class Task implements Runnable {

    static final List<Thread> activeTasks = Collections.synchronizedList(new ArrayList<>());
    static final Random r = new Random();

    private String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("current thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
        activeTasks.add(t);

        try {
            int tries = 0;

            while (tries < 10) {
                int randomNum = r.nextInt(10000);
                // do some expensive computation
                for(int i = 0; i < 4; i++) {
                    isPrime(r.nextLong());
                }

                // now sleep
                Thread.sleep(randomNum);
                tries++;
            }

        } catch (InterruptedException e) {
        }

        System.out.println("completed task for thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
    }

    static boolean isPrime(long n)
    {
        if (n <= 1)
            return false;
        if (n <= 3)
            return true;

        if (n % 2 == 0 || n % 3 == 0)
            return false;

        for (int i = 5; i * i <= n; i = i + 6)
            if (n % i == 0 || n % (i + 2) == 0)
                return false;

        return true;
    }

    public static String getThreadsStateCount() {
        return "NEW: " + getCountThreadsState(Thread.State.NEW) +
                " ,RUNNABLE: " + getCountThreadsState(Thread.State.RUNNABLE) +
                " ,WAITING: " + getCountThreadsState(Thread.State.WAITING) +
                " ,TIMED_WAITING: " + getCountThreadsState(Thread.State.TIMED_WAITING) +
                " ,BLOCKED: " + getCountThreadsState(Thread.State.BLOCKED) +
                " ,TERMINATED: " + getCountThreadsState(Thread.State.TERMINATED);
    }

    public static long getCountThreadsState(Thread.State state) {
        return activeTasks.stream().filter(x -> x.getState() == state).count();
    }
}

// prints something like:

Awaiting completion of threads, threads states: NEW: 0 ,RUNNABLE: 1 ,WAITING: 0 ,TIMED_WAITING: 9 ,BLOCKED: 0 ,TERMINATED: 0