CompletionService 中的重试策略

Retry policy in CompletionService

我需要配置通过 ExecutorCompletionService 调用 API 的重试策略。

示例代码:

public void func() throws Exception{
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    List<Future<String>> list = new ArrayList<Future<String>>();
    for(int i=0; i<10; i++) {
        AsyncTest asyncTest = new AsyncTest();
        Future<String> futureString = completionService.submit(asyncTest);
        list.add(futureString);
    }
    while (list.size() > 0) {
        Future<String> futureResponse = completionService.take();
        System.out.println(futureResponse.get());
        list.remove(futureResponse);
        }
    executorService.shutdown();
}
public class AsyncTest implements Callable<String> {
       public String call() throws Exception {
              //returns a response from api call
              //this is a network call and throws TimeoutException
       }
}

针对调用 API 时抛出的 TimeoutException 实施重试策略的最佳方法是什么?

我已经增强了你的 class AsyncTest:

public class RetryableAsyncTest implements Callable<RetryableAsyncTest> {

   private final String  _name;
   private /* */ String  _value;
   private /* */ boolean _timeouted;
   private /* */ int     _retryCount;

   public RetryableAsyncTest( String name ) {
      _name = name;
   }

   @Override
   public RetryableAsyncTest call() throws Exception {
      try {
         ++_retryCount;
         _timeouted = false;
         //-------- Begin of functionnal code
         if( Math.random() > 0.5 ) {      // Simulation of
            throw new TimeoutException(); // timeout condition
         }
         _value = "computation result";
         //-------- End of functionnal code
      }
      catch( final TimeoutException x ) {
         _timeouted = true;
      }
      return this;
   }

   public String getName() {
      return _name;
   }

   public String getValue() {
      return _value;
   }

   public boolean isTimeouted() {
      return _timeouted;
   }

   public int getRetryCount() {
      return _retryCount;
   }
}

RetryableAsyncExecutor class:

public class RetryableAsyncExecutor {

   private final ExecutorService                       _exec;
   private final CompletionService<RetryableAsyncTest> _comp;

   public RetryableAsyncExecutor( int nThreads ) {
      _exec = Executors.newFixedThreadPool( nThreads );
      _comp = new ExecutorCompletionService<>( _exec );
   }

   public void submit( RetryableAsyncTest task ) {
      _comp.submit( task );
   }

   public RetryableAsyncTest get() throws Exception {
      final Future<RetryableAsyncTest> f = _comp.take();
      final RetryableAsyncTest task = f.get();
      if( task.isTimeouted()) {
         _comp.submit( task );
      }
      return task;
   }

   public void shutdown() {
      _exec.shutdown();
   }
}

测试用例:

public class Main {

   public static void main( String[] args ) {
      final int COUNT = 8;
      final RetryableAsyncExecutor re = new RetryableAsyncExecutor( 5 );
      try {
         for( int i = 0; i < COUNT; ++i ) {
            re.submit( new RetryableAsyncTest("Async#"+(i+1)));
         }
         int count = 0;
         while( count < COUNT ) {
            final RetryableAsyncTest task = re.get();
            if( task.isTimeouted()) {
               System.err.printf( "%s: retrying (%d)\n",
                  task.getName(), task.getRetryCount());
            }
            else {
               System.err.printf( "%s: done with '%s'.\n",
                  task.getName(), task.getValue());
               ++count;
            }
         }
      }
      catch( final Throwable t ) {
         t.printStackTrace();
      }
      re.shutdown();
      System.exit( 0 );
   }
}

执行日志:

Async#4: done with 'computation result'.
Async#1: done with 'computation result'.
Async#6: retrying (1)
Async#3: done with 'computation result'.
Async#8: done with 'computation result'.
Async#7: retrying (1)
Async#2: done with 'computation result'.
Async#5: retrying (1)
Async#6: done with 'computation result'.
Async#7: done with 'computation result'.
Async#5: retrying (2)
Async#5: done with 'computation result'.

如果你想限制重试次数,这个逻辑发生在RetryableAsyncExecutor.get()方法中,作为围绕_comp.submit( task );

的if-then-else条件