如何访问 ThreadPoolExecutor 中的 运行 个线程?
How to access running threads inside ThreadPoolExecutor?
我有一个 运行 线程队列,我想在它执行时公开它的一些数据,以监视进程。
ThreadPoolExecutor
提供对其队列的访问,我可以遍历这些对象以调用我重写的 toString()
方法,但这些只是等待执行的线程。
有没有办法访问当前正在 运行 调用我的方法的线程?或者也许有更好的方法来完成这项任务?
为了进一步阐明目的,这里有一些大致的代码:
public class GetDataTask implements Runnable {
private String pageNumber;
private int dataBlocksParsed;
private String source;
private String dataType;
public GetDataTask(String source, String dataType) {
this.source = source;
this.dataType = dataType;
}
@Override
public void run() {
//do stuff that affects pageNumber and dataBlocksParsed
}
@Override
public String toString() {
return "GetDataTask{" +
"source=" + source +
", dataType=" + dataType +
", pageNumber=" + pageNumber +
", dataBlocksParsed=" + dataBlocksParsed +
'}';
}
}
和一个class持有执行人:
public class DataParseManager {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
public void addParseDataTask(String source, String dataType) {
executor.execute(new GetDataTask(source, dataType));
}
// here's the method that I need
public String getInfo() {
StringBuilder info = new StringBuilder();
//and here's the method that I'm missing - executor.getActiveThreads()
for (Runnable r : executor.getActiveThreads()) {
info.append(((GetDataTask) r).toString()).append('\n');
}
return info.append(executor.toString()).toString();
}
}
每当您将线程添加到队列时,也会将其添加到第二个数据结构,比如 HashSet
。然后,如果您需要访问一个 运行 线程,您可以检查 ExecutorService
的队列以找到仍在等待执行的线程:您的 HashSet
中的每个线程都不是在 ExecutorService
的队列中当前是 运行。
像这样包装Runnable怎么样。
static class MonitorRunnable implements Runnable {
static final List<Runnable> activeTasks = Collections.synchronizedList(new ArrayList<>());
private final Runnable runnable;
public MonitorRunnable(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
activeTasks.add(runnable);
runnable.run();
activeTasks.remove(runnable);
}
}
和
public class DataParseManager {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
public void addParseDataTask(String source, String dataType) {
executor.execute(new MonitorRunnable(new GetDataTask(source, dataType)));
}
// here's the method that I need
public String getInfo() {
StringBuilder info = new StringBuilder();
//and here's the method that I'm missing - executor.getActiveThreads()
synchronized (MonitorRunnable.activeTasks) {
for (Runnable r : MonitorRunnable.activeTasks) {
info.append(((GetDataTask) r).toString()).append('\n');
}
}
return info.append(executor.toString()).toString();
}
}
就像我在评论中写的那样。我会对共享统计对象方法进行主动更新:
我会像这样更改任务:
public class GetDataTask implements Runnable {
private String pageNumber;
private int dataBlocksParsed;
private String source;
private String dataType;
HashMap<GetDataTask,String> statistics
public GetDataTask(String source, String dataType, HashMap<GetDataTask,String> statistics) {
this.source = source;
this.dataType = dataType;
this.statistics = statistics;
}
@Override
public void run() {
// you'll probably want to immediately have stats available:
statistics.put(this, this.toString());
//do stuff that affects pageNumber and dataBlocksParsed
// vv this will probably be inside your "do stuff" loop
statistics.put(this, this.toString());
// loop end
// if you do not want stats of finished tasks, remove "this" here.
}
@Override
public String toString() {
return "GetDataTask{" +
"source=" + source +
", dataType=" + dataType +
", pageNumber=" + pageNumber +
", dataBlocksParsed=" + dataBlocksParsed +
'}';
}
}
和经理:
public class DataParseManager {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
private HashMap<GetDataTask,String> stats = new ConcurrentHashMap<GetDataTask,String>();
public void addParseDataTask(String source, String dataType) {
executor.execute(new GetDataTask(source, dataType, stats));
}
// here's the method that I need
public String getInfo() {
StringBuilder info = new StringBuilder();
//and here's the method that I'm missing - executor.getActiveThreads()
// >>> iterate "stats"'s values to build the info string ...
return info.append(executor.toString()).toString();
}
}
更新
您可以通过迭代 Map 的 keys(正在执行的任务)并在其上调用 toString
来轻松更改获取信息的方法。不过,这与 saka 的方法非常相似。也许你对他的感觉更舒服。
由于您可以控制所使用的执行器,我会使用 ThreadPoolExecutor
的 beforeExecute
和 afterExecute
方法来跟踪 运行 任务并使用它创建一个 getActiveTasks
方法。
import java.util.Set;
import java.util.concurrent.*;
public class ActiveTasksThreadPool extends ThreadPoolExecutor {
private final ConcurrentHashMap<Runnable, Boolean> activeTasks = new ConcurrentHashMap<>();
public ActiveTasksThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
activeTasks.put(r, Boolean.TRUE);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
activeTasks.remove(r);
}
public Set<Runnable> getActiveTasks() {
// the returned set will not throw a ConcurrentModificationException.
return activeTasks.keySet();
}
public static void main(String[] args) {
final int maxTasks = 5;
ActiveTasksThreadPool tp = new ActiveTasksThreadPool(maxTasks, maxTasks, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
try {
System.out.println("Active tasks: " + tp.getActiveTasks());
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < maxTasks; i ++) {
final int rnumber = i;
tp.execute(new Runnable() {
@Override
public void run() {
try { latch.await(); } catch (Exception e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Runnable " + rnumber;
}
});
}
Thread.sleep(100L); // give threads a chance to start
System.out.println("Active tasks: " + tp.getActiveTasks());
latch.countDown();
Thread.sleep(100L); // give threads a chance to finish
System.out.println("Active tasks: " + tp.getActiveTasks());
} catch (Exception e) {
e.printStackTrace();
} finally {
tp.shutdownNow();
}
}
}
您只需要将对 运行 线程的引用存储在将在 ThreadPoolExecutor 中触发的某处,在其他答案之上添加,这是一个读取线程状态的小型应用程序示例 运行 在 ThreadPoolExecutor 中每 1 秒执行一次,直到关闭:
package sample;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++)
{
Task task = new Task("Task " + i);
executor.execute(task);
}
executor.shutdown();
try {
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("Awaiting completion of threads, threads states: " + Task.getThreadsStateCount());
}
} catch (InterruptedException e) {
}
System.out.println("Executor shutdown -> " + executor.isShutdown());
}
}
class Task implements Runnable {
static final List<Thread> activeTasks = Collections.synchronizedList(new ArrayList<>());
static final Random r = new Random();
private String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("current thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
activeTasks.add(t);
try {
int tries = 0;
while (tries < 10) {
int randomNum = r.nextInt(10000);
// do some expensive computation
for(int i = 0; i < 4; i++) {
isPrime(r.nextLong());
}
// now sleep
Thread.sleep(randomNum);
tries++;
}
} catch (InterruptedException e) {
}
System.out.println("completed task for thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
}
static boolean isPrime(long n)
{
if (n <= 1)
return false;
if (n <= 3)
return true;
if (n % 2 == 0 || n % 3 == 0)
return false;
for (int i = 5; i * i <= n; i = i + 6)
if (n % i == 0 || n % (i + 2) == 0)
return false;
return true;
}
public static String getThreadsStateCount() {
return "NEW: " + getCountThreadsState(Thread.State.NEW) +
" ,RUNNABLE: " + getCountThreadsState(Thread.State.RUNNABLE) +
" ,WAITING: " + getCountThreadsState(Thread.State.WAITING) +
" ,TIMED_WAITING: " + getCountThreadsState(Thread.State.TIMED_WAITING) +
" ,BLOCKED: " + getCountThreadsState(Thread.State.BLOCKED) +
" ,TERMINATED: " + getCountThreadsState(Thread.State.TERMINATED);
}
public static long getCountThreadsState(Thread.State state) {
return activeTasks.stream().filter(x -> x.getState() == state).count();
}
}
// prints something like:
Awaiting completion of threads, threads states: NEW: 0 ,RUNNABLE: 1
,WAITING: 0 ,TIMED_WAITING: 9 ,BLOCKED: 0 ,TERMINATED: 0
我有一个 运行 线程队列,我想在它执行时公开它的一些数据,以监视进程。
ThreadPoolExecutor
提供对其队列的访问,我可以遍历这些对象以调用我重写的 toString()
方法,但这些只是等待执行的线程。
有没有办法访问当前正在 运行 调用我的方法的线程?或者也许有更好的方法来完成这项任务?
为了进一步阐明目的,这里有一些大致的代码:
public class GetDataTask implements Runnable {
private String pageNumber;
private int dataBlocksParsed;
private String source;
private String dataType;
public GetDataTask(String source, String dataType) {
this.source = source;
this.dataType = dataType;
}
@Override
public void run() {
//do stuff that affects pageNumber and dataBlocksParsed
}
@Override
public String toString() {
return "GetDataTask{" +
"source=" + source +
", dataType=" + dataType +
", pageNumber=" + pageNumber +
", dataBlocksParsed=" + dataBlocksParsed +
'}';
}
}
和一个class持有执行人:
public class DataParseManager {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
public void addParseDataTask(String source, String dataType) {
executor.execute(new GetDataTask(source, dataType));
}
// here's the method that I need
public String getInfo() {
StringBuilder info = new StringBuilder();
//and here's the method that I'm missing - executor.getActiveThreads()
for (Runnable r : executor.getActiveThreads()) {
info.append(((GetDataTask) r).toString()).append('\n');
}
return info.append(executor.toString()).toString();
}
}
每当您将线程添加到队列时,也会将其添加到第二个数据结构,比如 HashSet
。然后,如果您需要访问一个 运行 线程,您可以检查 ExecutorService
的队列以找到仍在等待执行的线程:您的 HashSet
中的每个线程都不是在 ExecutorService
的队列中当前是 运行。
像这样包装Runnable怎么样。
static class MonitorRunnable implements Runnable {
static final List<Runnable> activeTasks = Collections.synchronizedList(new ArrayList<>());
private final Runnable runnable;
public MonitorRunnable(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
activeTasks.add(runnable);
runnable.run();
activeTasks.remove(runnable);
}
}
和
public class DataParseManager {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
public void addParseDataTask(String source, String dataType) {
executor.execute(new MonitorRunnable(new GetDataTask(source, dataType)));
}
// here's the method that I need
public String getInfo() {
StringBuilder info = new StringBuilder();
//and here's the method that I'm missing - executor.getActiveThreads()
synchronized (MonitorRunnable.activeTasks) {
for (Runnable r : MonitorRunnable.activeTasks) {
info.append(((GetDataTask) r).toString()).append('\n');
}
}
return info.append(executor.toString()).toString();
}
}
就像我在评论中写的那样。我会对共享统计对象方法进行主动更新:
我会像这样更改任务:
public class GetDataTask implements Runnable {
private String pageNumber;
private int dataBlocksParsed;
private String source;
private String dataType;
HashMap<GetDataTask,String> statistics
public GetDataTask(String source, String dataType, HashMap<GetDataTask,String> statistics) {
this.source = source;
this.dataType = dataType;
this.statistics = statistics;
}
@Override
public void run() {
// you'll probably want to immediately have stats available:
statistics.put(this, this.toString());
//do stuff that affects pageNumber and dataBlocksParsed
// vv this will probably be inside your "do stuff" loop
statistics.put(this, this.toString());
// loop end
// if you do not want stats of finished tasks, remove "this" here.
}
@Override
public String toString() {
return "GetDataTask{" +
"source=" + source +
", dataType=" + dataType +
", pageNumber=" + pageNumber +
", dataBlocksParsed=" + dataBlocksParsed +
'}';
}
}
和经理:
public class DataParseManager {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
private HashMap<GetDataTask,String> stats = new ConcurrentHashMap<GetDataTask,String>();
public void addParseDataTask(String source, String dataType) {
executor.execute(new GetDataTask(source, dataType, stats));
}
// here's the method that I need
public String getInfo() {
StringBuilder info = new StringBuilder();
//and here's the method that I'm missing - executor.getActiveThreads()
// >>> iterate "stats"'s values to build the info string ...
return info.append(executor.toString()).toString();
}
}
更新
您可以通过迭代 Map 的 keys(正在执行的任务)并在其上调用 toString
来轻松更改获取信息的方法。不过,这与 saka 的方法非常相似。也许你对他的感觉更舒服。
由于您可以控制所使用的执行器,我会使用 ThreadPoolExecutor
的 beforeExecute
和 afterExecute
方法来跟踪 运行 任务并使用它创建一个 getActiveTasks
方法。
import java.util.Set;
import java.util.concurrent.*;
public class ActiveTasksThreadPool extends ThreadPoolExecutor {
private final ConcurrentHashMap<Runnable, Boolean> activeTasks = new ConcurrentHashMap<>();
public ActiveTasksThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
activeTasks.put(r, Boolean.TRUE);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
activeTasks.remove(r);
}
public Set<Runnable> getActiveTasks() {
// the returned set will not throw a ConcurrentModificationException.
return activeTasks.keySet();
}
public static void main(String[] args) {
final int maxTasks = 5;
ActiveTasksThreadPool tp = new ActiveTasksThreadPool(maxTasks, maxTasks, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
try {
System.out.println("Active tasks: " + tp.getActiveTasks());
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < maxTasks; i ++) {
final int rnumber = i;
tp.execute(new Runnable() {
@Override
public void run() {
try { latch.await(); } catch (Exception e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Runnable " + rnumber;
}
});
}
Thread.sleep(100L); // give threads a chance to start
System.out.println("Active tasks: " + tp.getActiveTasks());
latch.countDown();
Thread.sleep(100L); // give threads a chance to finish
System.out.println("Active tasks: " + tp.getActiveTasks());
} catch (Exception e) {
e.printStackTrace();
} finally {
tp.shutdownNow();
}
}
}
您只需要将对 运行 线程的引用存储在将在 ThreadPoolExecutor 中触发的某处,在其他答案之上添加,这是一个读取线程状态的小型应用程序示例 运行 在 ThreadPoolExecutor 中每 1 秒执行一次,直到关闭:
package sample;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++)
{
Task task = new Task("Task " + i);
executor.execute(task);
}
executor.shutdown();
try {
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("Awaiting completion of threads, threads states: " + Task.getThreadsStateCount());
}
} catch (InterruptedException e) {
}
System.out.println("Executor shutdown -> " + executor.isShutdown());
}
}
class Task implements Runnable {
static final List<Thread> activeTasks = Collections.synchronizedList(new ArrayList<>());
static final Random r = new Random();
private String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("current thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
activeTasks.add(t);
try {
int tries = 0;
while (tries < 10) {
int randomNum = r.nextInt(10000);
// do some expensive computation
for(int i = 0; i < 4; i++) {
isPrime(r.nextLong());
}
// now sleep
Thread.sleep(randomNum);
tries++;
}
} catch (InterruptedException e) {
}
System.out.println("completed task for thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
}
static boolean isPrime(long n)
{
if (n <= 1)
return false;
if (n <= 3)
return true;
if (n % 2 == 0 || n % 3 == 0)
return false;
for (int i = 5; i * i <= n; i = i + 6)
if (n % i == 0 || n % (i + 2) == 0)
return false;
return true;
}
public static String getThreadsStateCount() {
return "NEW: " + getCountThreadsState(Thread.State.NEW) +
" ,RUNNABLE: " + getCountThreadsState(Thread.State.RUNNABLE) +
" ,WAITING: " + getCountThreadsState(Thread.State.WAITING) +
" ,TIMED_WAITING: " + getCountThreadsState(Thread.State.TIMED_WAITING) +
" ,BLOCKED: " + getCountThreadsState(Thread.State.BLOCKED) +
" ,TERMINATED: " + getCountThreadsState(Thread.State.TERMINATED);
}
public static long getCountThreadsState(Thread.State state) {
return activeTasks.stream().filter(x -> x.getState() == state).count();
}
}
// prints something like:
Awaiting completion of threads, threads states: NEW: 0 ,RUNNABLE: 1 ,WAITING: 0 ,TIMED_WAITING: 9 ,BLOCKED: 0 ,TERMINATED: 0