带有提到的 timeOut 的 Future 对象在下一个线程中不断增加(timeOut 并不适用于 Java 中 ThreadPool 中的所有线程)
Future Object with mentioned timeOut is keep increasing for the next threads(timeOut is not applying for all the threads in the ThreadPool in Java)
这里定义了工作线程,在运行方法中有10秒的繁重任务
import java.util.Date;
import java.util.Random;
import java.util.concurrent.Callable;
public class WorkerThread implements Callable {
private String command;
private long startTime;
public WorkerThread(String s){
this.command=s;
}
@Override
public Object call() throws Exception {
startTime = System.currentTimeMillis();
System.out.println(new Date()+"::::"+Thread.currentThread().getName()+" Start. Command = "+command);
Random generator = new Random();
Integer randomNumber = generator.nextInt(5);
processCommand();
System.out.println(new Date()+ ":::"+Thread.currentThread().getName()+" End.::"+command+"::"+ (System.currentTimeMillis()-startTime));
return randomNumber+"::"+this.command;
}
private void processCommand() {
try {
Thread.sleep(10000);
}
catch (Exception e) {
System.out.println("Interrupted::;Process Command:::"+this.command);
}
}
@Override
public String toString(){
return this.command;
}
}
定义了我的 WorkerPool,未来超时为 1 秒。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class WorkerPool {
static BlockingQueue queue=new LinkedBlockingQueue(2);
static RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
static ThreadFactory threadFactory = Executors.defaultThreadFactory();
static ThreadPoolExecutor executorPool = new ThreadPoolExecutor(4, 4, 11, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
static MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
public static void main(String args[]) throws InterruptedException, TimeoutException{
List<Future<Integer>> list = new ArrayList<Future<Integer>>();
for(int i=1; i< 5; i++){
WorkerThread worker = new WorkerThread("WorkerThread:::_"+i);
Future<Integer> future = executorPool.submit(worker);
list.add(future);
}
for(Future<Integer> future : list){
try {
try {
future.get(1000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
}
} catch (Exception e) {
e.printStackTrace();
}
}
executorPool.shutdown();
}
}
对于未来的线程,线程的超时不断增加,我的期望应该是,如果所有线程花费的时间超过 1 秒,应该在 !第二.
在 aboce 场景中,工作线程需要 10 秒来处理,但是我在 1 秒内超时了我的所有 4 个线程,但是每个线程的时间对于每个任务递增 1 秒。
第一个超时时间为 1 秒
第二个超时时间为 2 秒
第三个超时时间为 3 秒。
为什么所有线程都没有在 1 秒内中断?我的代码有问题吗?
因为你在这一段循环中顺序等待:
for(Future<Integer> future : list) {
...
future.get(1000, TimeUnit.MILLISECONDS);
...
}
基本上流程是:
- all workers 1 .. 4 start
- you wait for worker A to finish
- 1 second passes, TimeoutException (worker A was alive for 1 second)
- you wait for worker B to finish
- 1 second passes, TimeoutException (worker B was alive for 2 seconds)
- you wait for worker C to finish
- 1 second passes, TimeoutException (worker C was alive for 3 seconds)
- ... same for D ...
如果你想为所有工人最多等待1秒,你需要计算到目前为止你等待了多少时间,然后等待剩余时间。
类似于伪代码:
long quota = 1000
for (Future future : futures) {
long start = System.currentTimeMillis
try {
future.get(quota, MILLISECONDS)
}
catch (TimeoutException e) {
future.cancel(true)
}
finally {
long spent = System.currentTimeMillis() - start
quota -= spent
if (quota < 0) {quota = 0} // the whole block is going to execute longer than .get() only
}
}
这里定义了工作线程,在运行方法中有10秒的繁重任务
import java.util.Date;
import java.util.Random;
import java.util.concurrent.Callable;
public class WorkerThread implements Callable {
private String command;
private long startTime;
public WorkerThread(String s){
this.command=s;
}
@Override
public Object call() throws Exception {
startTime = System.currentTimeMillis();
System.out.println(new Date()+"::::"+Thread.currentThread().getName()+" Start. Command = "+command);
Random generator = new Random();
Integer randomNumber = generator.nextInt(5);
processCommand();
System.out.println(new Date()+ ":::"+Thread.currentThread().getName()+" End.::"+command+"::"+ (System.currentTimeMillis()-startTime));
return randomNumber+"::"+this.command;
}
private void processCommand() {
try {
Thread.sleep(10000);
}
catch (Exception e) {
System.out.println("Interrupted::;Process Command:::"+this.command);
}
}
@Override
public String toString(){
return this.command;
}
}
定义了我的 WorkerPool,未来超时为 1 秒。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class WorkerPool {
static BlockingQueue queue=new LinkedBlockingQueue(2);
static RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
static ThreadFactory threadFactory = Executors.defaultThreadFactory();
static ThreadPoolExecutor executorPool = new ThreadPoolExecutor(4, 4, 11, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
static MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
public static void main(String args[]) throws InterruptedException, TimeoutException{
List<Future<Integer>> list = new ArrayList<Future<Integer>>();
for(int i=1; i< 5; i++){
WorkerThread worker = new WorkerThread("WorkerThread:::_"+i);
Future<Integer> future = executorPool.submit(worker);
list.add(future);
}
for(Future<Integer> future : list){
try {
try {
future.get(1000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
}
} catch (Exception e) {
e.printStackTrace();
}
}
executorPool.shutdown();
}
}
对于未来的线程,线程的超时不断增加,我的期望应该是,如果所有线程花费的时间超过 1 秒,应该在 !第二.
在 aboce 场景中,工作线程需要 10 秒来处理,但是我在 1 秒内超时了我的所有 4 个线程,但是每个线程的时间对于每个任务递增 1 秒。
第一个超时时间为 1 秒 第二个超时时间为 2 秒 第三个超时时间为 3 秒。
为什么所有线程都没有在 1 秒内中断?我的代码有问题吗?
因为你在这一段循环中顺序等待:
for(Future<Integer> future : list) {
...
future.get(1000, TimeUnit.MILLISECONDS);
...
}
基本上流程是:
- all workers 1 .. 4 start
- you wait for worker A to finish
- 1 second passes, TimeoutException (worker A was alive for 1 second)
- you wait for worker B to finish
- 1 second passes, TimeoutException (worker B was alive for 2 seconds)
- you wait for worker C to finish
- 1 second passes, TimeoutException (worker C was alive for 3 seconds)
- ... same for D ...
如果你想为所有工人最多等待1秒,你需要计算到目前为止你等待了多少时间,然后等待剩余时间。 类似于伪代码:
long quota = 1000
for (Future future : futures) {
long start = System.currentTimeMillis
try {
future.get(quota, MILLISECONDS)
}
catch (TimeoutException e) {
future.cancel(true)
}
finally {
long spent = System.currentTimeMillis() - start
quota -= spent
if (quota < 0) {quota = 0} // the whole block is going to execute longer than .get() only
}
}