在 Java Future 上使用 isDone 和 Cancel 轮询而不是阻塞 get
Using polling with isDone and Cancel on Java Future instead of blocking get
我正在处理一些无法立即重构的遗留代码。
代码使用阻塞 Java Future。它使用 future.get(withTimeOut,...)。这意味着我们需要有一个合适大小的线程池才能足够响应。因为呼叫将被阻止,直到他们完成或超时。
问题:
我正在考虑抓住未来并将其放入一个数据结构中,该数据结构将意识到任务执行的开始。然后有一个专用线程或池,它将遍历数据结构并检查 future.isDone 是否超过超时限制。如果是,它可以获取结果或取消执行。这样就不需要很多线程。它是正确的实施还是根本不推荐?
提前致谢。
编辑:
只是为了提供更多背景信息。这些线程用于记录下游服务。我们真的不关心响应,但我们不希望连接挂起。因此,我们需要抓住未来并确保它被取消或超时。
这是我在问完问题后写的一个基本模拟。
@Component
public class PollingService {
private ExecutorService executorService = Executors.newFixedThreadPool(1);
PoorMultiplexer poorMultiplexer = new PoorMultiplexer();
private ConcurrentMap<Integer, Map<Future, Long>> futures = new ConcurrentHashMap<>();
public void startHandler(){
Thread handler = new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
//This should be handled better. If there is not anything stop and re-start it later.
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(Iterator<ConcurrentMap.Entry<Integer, Map<Future, Long>>> it = futures.entrySet().iterator(); it.hasNext();){
ConcurrentMap.Entry<Integer, Map<Future, Long>> entry = it.next();
Map<Future, Long> futureMap = entry.getValue();
boolean isProcessed = false;
if(futureMap.keySet().iterator().next().isDone()){
//mark completed
isProcessed = true;
}
if(futureMap.values().iterator().next() < (300 + System.currentTimeMillis()) && !isProcessed){
//cancel
futureMap.keySet().iterator().next().cancel(true);
isProcessed = true;
}
if(isProcessed){
futures.remove(entry.getKey());
System.out.println("Completed : " + entry.getKey());
}
}
System.out.println("Run completed");
}
}
});
handler.start();
}
public void run(int i) throws InterruptedException, ExecutionException{
System.out.println("Starting : " + i);
poorMultiplexer.send(new Runnable() {
@Override
public void run() {
long startTime = System.currentTimeMillis();
Future future = poorMultiplexer.send(execute());
Map<Future, Long> entry = new HashMap<>();
entry.put(future, startTime);
futures.put(i, entry);
System.out.println("Added : " + i);
}
});
}
public void stop(){
executorService.shutdown();
}
public Runnable execute(){
Worker worker = new Worker();
return worker;
}
}
//This is a placeholder for a framework
class PoorMultiplexer {
private ExecutorService executorService = Executors.newFixedThreadPool(20);
public Future send(Runnable task){
return executorService.submit(task);
}
}
class Worker implements Runnable{
@Override
public void run() {
//service call here
}
}
使用单独的线程异步轮询一组 Futures 对我来说确实是一个合理的实现。也就是说,如果您能够添加库依赖项,您可能会发现切换到 Guava 的 ListenableFuture 会更容易,因为 Guava 提供了大量用于执行异步工作的实用程序。
我正在处理一些无法立即重构的遗留代码。
代码使用阻塞 Java Future。它使用 future.get(withTimeOut,...)。这意味着我们需要有一个合适大小的线程池才能足够响应。因为呼叫将被阻止,直到他们完成或超时。
问题: 我正在考虑抓住未来并将其放入一个数据结构中,该数据结构将意识到任务执行的开始。然后有一个专用线程或池,它将遍历数据结构并检查 future.isDone 是否超过超时限制。如果是,它可以获取结果或取消执行。这样就不需要很多线程。它是正确的实施还是根本不推荐?
提前致谢。
编辑:
只是为了提供更多背景信息。这些线程用于记录下游服务。我们真的不关心响应,但我们不希望连接挂起。因此,我们需要抓住未来并确保它被取消或超时。
这是我在问完问题后写的一个基本模拟。
@Component
public class PollingService {
private ExecutorService executorService = Executors.newFixedThreadPool(1);
PoorMultiplexer poorMultiplexer = new PoorMultiplexer();
private ConcurrentMap<Integer, Map<Future, Long>> futures = new ConcurrentHashMap<>();
public void startHandler(){
Thread handler = new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
//This should be handled better. If there is not anything stop and re-start it later.
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(Iterator<ConcurrentMap.Entry<Integer, Map<Future, Long>>> it = futures.entrySet().iterator(); it.hasNext();){
ConcurrentMap.Entry<Integer, Map<Future, Long>> entry = it.next();
Map<Future, Long> futureMap = entry.getValue();
boolean isProcessed = false;
if(futureMap.keySet().iterator().next().isDone()){
//mark completed
isProcessed = true;
}
if(futureMap.values().iterator().next() < (300 + System.currentTimeMillis()) && !isProcessed){
//cancel
futureMap.keySet().iterator().next().cancel(true);
isProcessed = true;
}
if(isProcessed){
futures.remove(entry.getKey());
System.out.println("Completed : " + entry.getKey());
}
}
System.out.println("Run completed");
}
}
});
handler.start();
}
public void run(int i) throws InterruptedException, ExecutionException{
System.out.println("Starting : " + i);
poorMultiplexer.send(new Runnable() {
@Override
public void run() {
long startTime = System.currentTimeMillis();
Future future = poorMultiplexer.send(execute());
Map<Future, Long> entry = new HashMap<>();
entry.put(future, startTime);
futures.put(i, entry);
System.out.println("Added : " + i);
}
});
}
public void stop(){
executorService.shutdown();
}
public Runnable execute(){
Worker worker = new Worker();
return worker;
}
}
//This is a placeholder for a framework
class PoorMultiplexer {
private ExecutorService executorService = Executors.newFixedThreadPool(20);
public Future send(Runnable task){
return executorService.submit(task);
}
}
class Worker implements Runnable{
@Override
public void run() {
//service call here
}
}
使用单独的线程异步轮询一组 Futures 对我来说确实是一个合理的实现。也就是说,如果您能够添加库依赖项,您可能会发现切换到 Guava 的 ListenableFuture 会更容易,因为 Guava 提供了大量用于执行异步工作的实用程序。