运行 java 中的异步代码超时
Running asynchronous code in java with timeout
在我编写的网络服务器中,每个请求都会调用一系列操作。其中一些操作不像其他操作那么重要,因此我想 运行 在后台线程中进行它们。
另外,因为它们不是那么重要,所以我不在乎其中一个是否很少失败,我也不希望它们永远占用一个线程,这样其他线程就可以用来处理下一个批量。
所以,我想要一个线程池(例如:10 个线程)并像这样为每个后台任务分配一个线程。将每个线程限制为 1 秒,如果到那时它还没有完成,就将其杀死,并为下一个任务提供服务。
我该怎么做?
到目前为止,这就是我所拥有的:
public class AsyncCodeRunner {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public void Run(Callable<Void> callableCode, int timeout) {
final int threadTimeout = 10;
Future<Void> callableFuture = executor.submit(callableCode);
try {
callableFuture.get(threadTimeout, TimeUnit.SECONDS);
} catch (Exception e) {
logger.Info("Thread was timed out", e);
}
}
}
我想像这样使用这个 class :
public void processRequest(RequestObject request) {
// do some important processing
// throw some less important processing to background thread
(new AsyncCodeRunner()).Run(new Callable<Void> () {
@Override
public Void call() throws Exception {
// do something...
return null;
}
}, 1); // 1 second timeout
// return result (without waiting for background task)
return;
}
这会像我想要的那样工作吗?或者我应该如何更改它?
如果我调用 Run()
但线程池中没有可用的线程可以分发,会发生什么情况?
我认为这个相当优雅的想法的主要问题是你只是在 Future
的 get
超时,一旦超时你实际上并没有中止进程,你只是放弃等待。当您意识到您甚至可能在进程尚未启动时超时 - 它仍在队列中时,问题变得更加复杂。
也许这样的事情会有效。它确实需要两个线程,但 TimerTask
线程应该消耗很少。
public class RunWithTimeout {
public RunWithTimeout(Runnable r, long timeout) {
// Prepare the thread.
final Thread t = new Thread(r);
// Start the timer.
new Timer(true).schedule(new TimerTask() {
@Override
public void run() {
if (t.isAlive()) {
// Abort the thread.
t.interrupt();
}
}
}, timeout * 1000);
// Start the thread.
t.start();
}
}
class WaitAFewSeconds implements Runnable {
final long seconds;
WaitAFewSeconds(long seconds) {
this.seconds = seconds;
}
@Override
public void run() {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException ie) {
System.out.println("WaitAFewSeconds(" + seconds + ") - Interrupted!");
}
}
}
public void test() {
new RunWithTimeout(new WaitAFewSeconds(5), 3);
new RunWithTimeout(new WaitAFewSeconds(3), 5);
}
这是一种仅使用一个额外线程的替代方案。
public class ThreadKiller implements Runnable {
DelayQueue<WaitForDeath> kill = new DelayQueue<>();
private class WaitForDeath implements Delayed {
final Thread t;
final long finish;
public WaitForDeath(Thread t, long wait) {
this.t = t;
this.finish = System.currentTimeMillis() + wait;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(finish - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
long itsFinish = ((WaitForDeath) o).finish;
return finish < itsFinish ? -1 : finish == itsFinish ? 0 : 1;
}
}
@Override
public void run() {
while (true) {
try {
WaitForDeath t = kill.take();
if (t.t.isAlive()) {
// Interrupt it.
t.t.interrupt();
}
} catch (InterruptedException ex) {
// Not sure what to do here.
}
}
}
public void registerThread(Thread t, long wait) {
// Post it into the delay queue.
kill.add(new WaitForDeath(t, wait));
}
}
public void test() throws InterruptedException {
// Testing the ThreadKiller.
ThreadKiller killer = new ThreadKiller();
Thread killerThread = new Thread(killer);
killerThread.setDaemon(true);
Thread twoSeconds = new Thread(new WaitAFewSeconds(2));
Thread fourSeconds = new Thread(new WaitAFewSeconds(4));
killer.registerThread(twoSeconds, 5000);
killer.registerThread(fourSeconds, 3000);
killerThread.start();
twoSeconds.start();
fourSeconds.start();
System.out.println("Waiting");
Thread.sleep(10 * 1000);
System.out.println("Finished");
killerThread.interrupt();
}
您需要在线程运行时启动计时器。这样就不会杀死处于等待状态的线程。这是来自 this thread 的示例:
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class PoolTest {
class TimeOutTask extends TimerTask {
Thread t;
TimeOutTask(Thread t) {
this.t = t;
}
public void run() {
if (t != null && t.isAlive()) {
t.interrupt();
}
}
}
class MyRunnable implements Runnable {
Timer timer = new Timer(true);
public void run() {
timer.schedule(new TimeOutTask(Thread.currentThread()), 1000);
try {
System.out.println("MyRunnable...");
Thread.sleep(10000);
} catch (InterruptedException ie) {
System.out.println("MyRunnable error...");
ie.printStackTrace();
}
}
}
public static void main(String args[]) {
new PoolTest();
}
public PoolTest() {
try {
ExecutorService pe = Executors.newFixedThreadPool(3);
pe.execute(new MyRunnable());
} catch (Exception e) {
e.printStackTrace();
}
}
}
在我编写的网络服务器中,每个请求都会调用一系列操作。其中一些操作不像其他操作那么重要,因此我想 运行 在后台线程中进行它们。
另外,因为它们不是那么重要,所以我不在乎其中一个是否很少失败,我也不希望它们永远占用一个线程,这样其他线程就可以用来处理下一个批量。
所以,我想要一个线程池(例如:10 个线程)并像这样为每个后台任务分配一个线程。将每个线程限制为 1 秒,如果到那时它还没有完成,就将其杀死,并为下一个任务提供服务。
我该怎么做?
到目前为止,这就是我所拥有的:
public class AsyncCodeRunner {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public void Run(Callable<Void> callableCode, int timeout) {
final int threadTimeout = 10;
Future<Void> callableFuture = executor.submit(callableCode);
try {
callableFuture.get(threadTimeout, TimeUnit.SECONDS);
} catch (Exception e) {
logger.Info("Thread was timed out", e);
}
}
}
我想像这样使用这个 class :
public void processRequest(RequestObject request) {
// do some important processing
// throw some less important processing to background thread
(new AsyncCodeRunner()).Run(new Callable<Void> () {
@Override
public Void call() throws Exception {
// do something...
return null;
}
}, 1); // 1 second timeout
// return result (without waiting for background task)
return;
}
这会像我想要的那样工作吗?或者我应该如何更改它?
如果我调用 Run()
但线程池中没有可用的线程可以分发,会发生什么情况?
我认为这个相当优雅的想法的主要问题是你只是在 Future
的 get
超时,一旦超时你实际上并没有中止进程,你只是放弃等待。当您意识到您甚至可能在进程尚未启动时超时 - 它仍在队列中时,问题变得更加复杂。
也许这样的事情会有效。它确实需要两个线程,但 TimerTask
线程应该消耗很少。
public class RunWithTimeout {
public RunWithTimeout(Runnable r, long timeout) {
// Prepare the thread.
final Thread t = new Thread(r);
// Start the timer.
new Timer(true).schedule(new TimerTask() {
@Override
public void run() {
if (t.isAlive()) {
// Abort the thread.
t.interrupt();
}
}
}, timeout * 1000);
// Start the thread.
t.start();
}
}
class WaitAFewSeconds implements Runnable {
final long seconds;
WaitAFewSeconds(long seconds) {
this.seconds = seconds;
}
@Override
public void run() {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException ie) {
System.out.println("WaitAFewSeconds(" + seconds + ") - Interrupted!");
}
}
}
public void test() {
new RunWithTimeout(new WaitAFewSeconds(5), 3);
new RunWithTimeout(new WaitAFewSeconds(3), 5);
}
这是一种仅使用一个额外线程的替代方案。
public class ThreadKiller implements Runnable {
DelayQueue<WaitForDeath> kill = new DelayQueue<>();
private class WaitForDeath implements Delayed {
final Thread t;
final long finish;
public WaitForDeath(Thread t, long wait) {
this.t = t;
this.finish = System.currentTimeMillis() + wait;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(finish - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
long itsFinish = ((WaitForDeath) o).finish;
return finish < itsFinish ? -1 : finish == itsFinish ? 0 : 1;
}
}
@Override
public void run() {
while (true) {
try {
WaitForDeath t = kill.take();
if (t.t.isAlive()) {
// Interrupt it.
t.t.interrupt();
}
} catch (InterruptedException ex) {
// Not sure what to do here.
}
}
}
public void registerThread(Thread t, long wait) {
// Post it into the delay queue.
kill.add(new WaitForDeath(t, wait));
}
}
public void test() throws InterruptedException {
// Testing the ThreadKiller.
ThreadKiller killer = new ThreadKiller();
Thread killerThread = new Thread(killer);
killerThread.setDaemon(true);
Thread twoSeconds = new Thread(new WaitAFewSeconds(2));
Thread fourSeconds = new Thread(new WaitAFewSeconds(4));
killer.registerThread(twoSeconds, 5000);
killer.registerThread(fourSeconds, 3000);
killerThread.start();
twoSeconds.start();
fourSeconds.start();
System.out.println("Waiting");
Thread.sleep(10 * 1000);
System.out.println("Finished");
killerThread.interrupt();
}
您需要在线程运行时启动计时器。这样就不会杀死处于等待状态的线程。这是来自 this thread 的示例:
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class PoolTest {
class TimeOutTask extends TimerTask {
Thread t;
TimeOutTask(Thread t) {
this.t = t;
}
public void run() {
if (t != null && t.isAlive()) {
t.interrupt();
}
}
}
class MyRunnable implements Runnable {
Timer timer = new Timer(true);
public void run() {
timer.schedule(new TimeOutTask(Thread.currentThread()), 1000);
try {
System.out.println("MyRunnable...");
Thread.sleep(10000);
} catch (InterruptedException ie) {
System.out.println("MyRunnable error...");
ie.printStackTrace();
}
}
}
public static void main(String args[]) {
new PoolTest();
}
public PoolTest() {
try {
ExecutorService pe = Executors.newFixedThreadPool(3);
pe.execute(new MyRunnable());
} catch (Exception e) {
e.printStackTrace();
}
}
}