CompletionService 中的重试策略
Retry policy in CompletionService
我需要配置通过 ExecutorCompletionService 调用 API 的重试策略。
示例代码:
public void func() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
List<Future<String>> list = new ArrayList<Future<String>>();
for(int i=0; i<10; i++) {
AsyncTest asyncTest = new AsyncTest();
Future<String> futureString = completionService.submit(asyncTest);
list.add(futureString);
}
while (list.size() > 0) {
Future<String> futureResponse = completionService.take();
System.out.println(futureResponse.get());
list.remove(futureResponse);
}
executorService.shutdown();
}
public class AsyncTest implements Callable<String> {
public String call() throws Exception {
//returns a response from api call
//this is a network call and throws TimeoutException
}
}
针对调用 API 时抛出的 TimeoutException 实施重试策略的最佳方法是什么?
我已经增强了你的 class AsyncTest:
public class RetryableAsyncTest implements Callable<RetryableAsyncTest> {
private final String _name;
private /* */ String _value;
private /* */ boolean _timeouted;
private /* */ int _retryCount;
public RetryableAsyncTest( String name ) {
_name = name;
}
@Override
public RetryableAsyncTest call() throws Exception {
try {
++_retryCount;
_timeouted = false;
//-------- Begin of functionnal code
if( Math.random() > 0.5 ) { // Simulation of
throw new TimeoutException(); // timeout condition
}
_value = "computation result";
//-------- End of functionnal code
}
catch( final TimeoutException x ) {
_timeouted = true;
}
return this;
}
public String getName() {
return _name;
}
public String getValue() {
return _value;
}
public boolean isTimeouted() {
return _timeouted;
}
public int getRetryCount() {
return _retryCount;
}
}
RetryableAsyncExecutor class:
public class RetryableAsyncExecutor {
private final ExecutorService _exec;
private final CompletionService<RetryableAsyncTest> _comp;
public RetryableAsyncExecutor( int nThreads ) {
_exec = Executors.newFixedThreadPool( nThreads );
_comp = new ExecutorCompletionService<>( _exec );
}
public void submit( RetryableAsyncTest task ) {
_comp.submit( task );
}
public RetryableAsyncTest get() throws Exception {
final Future<RetryableAsyncTest> f = _comp.take();
final RetryableAsyncTest task = f.get();
if( task.isTimeouted()) {
_comp.submit( task );
}
return task;
}
public void shutdown() {
_exec.shutdown();
}
}
测试用例:
public class Main {
public static void main( String[] args ) {
final int COUNT = 8;
final RetryableAsyncExecutor re = new RetryableAsyncExecutor( 5 );
try {
for( int i = 0; i < COUNT; ++i ) {
re.submit( new RetryableAsyncTest("Async#"+(i+1)));
}
int count = 0;
while( count < COUNT ) {
final RetryableAsyncTest task = re.get();
if( task.isTimeouted()) {
System.err.printf( "%s: retrying (%d)\n",
task.getName(), task.getRetryCount());
}
else {
System.err.printf( "%s: done with '%s'.\n",
task.getName(), task.getValue());
++count;
}
}
}
catch( final Throwable t ) {
t.printStackTrace();
}
re.shutdown();
System.exit( 0 );
}
}
执行日志:
Async#4: done with 'computation result'.
Async#1: done with 'computation result'.
Async#6: retrying (1)
Async#3: done with 'computation result'.
Async#8: done with 'computation result'.
Async#7: retrying (1)
Async#2: done with 'computation result'.
Async#5: retrying (1)
Async#6: done with 'computation result'.
Async#7: done with 'computation result'.
Async#5: retrying (2)
Async#5: done with 'computation result'.
如果你想限制重试次数,这个逻辑发生在RetryableAsyncExecutor.get()
方法中,作为围绕_comp.submit( task );
的if-then-else条件
我需要配置通过 ExecutorCompletionService 调用 API 的重试策略。
示例代码:
public void func() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
List<Future<String>> list = new ArrayList<Future<String>>();
for(int i=0; i<10; i++) {
AsyncTest asyncTest = new AsyncTest();
Future<String> futureString = completionService.submit(asyncTest);
list.add(futureString);
}
while (list.size() > 0) {
Future<String> futureResponse = completionService.take();
System.out.println(futureResponse.get());
list.remove(futureResponse);
}
executorService.shutdown();
}
public class AsyncTest implements Callable<String> {
public String call() throws Exception {
//returns a response from api call
//this is a network call and throws TimeoutException
}
}
针对调用 API 时抛出的 TimeoutException 实施重试策略的最佳方法是什么?
我已经增强了你的 class AsyncTest:
public class RetryableAsyncTest implements Callable<RetryableAsyncTest> {
private final String _name;
private /* */ String _value;
private /* */ boolean _timeouted;
private /* */ int _retryCount;
public RetryableAsyncTest( String name ) {
_name = name;
}
@Override
public RetryableAsyncTest call() throws Exception {
try {
++_retryCount;
_timeouted = false;
//-------- Begin of functionnal code
if( Math.random() > 0.5 ) { // Simulation of
throw new TimeoutException(); // timeout condition
}
_value = "computation result";
//-------- End of functionnal code
}
catch( final TimeoutException x ) {
_timeouted = true;
}
return this;
}
public String getName() {
return _name;
}
public String getValue() {
return _value;
}
public boolean isTimeouted() {
return _timeouted;
}
public int getRetryCount() {
return _retryCount;
}
}
RetryableAsyncExecutor class:
public class RetryableAsyncExecutor {
private final ExecutorService _exec;
private final CompletionService<RetryableAsyncTest> _comp;
public RetryableAsyncExecutor( int nThreads ) {
_exec = Executors.newFixedThreadPool( nThreads );
_comp = new ExecutorCompletionService<>( _exec );
}
public void submit( RetryableAsyncTest task ) {
_comp.submit( task );
}
public RetryableAsyncTest get() throws Exception {
final Future<RetryableAsyncTest> f = _comp.take();
final RetryableAsyncTest task = f.get();
if( task.isTimeouted()) {
_comp.submit( task );
}
return task;
}
public void shutdown() {
_exec.shutdown();
}
}
测试用例:
public class Main {
public static void main( String[] args ) {
final int COUNT = 8;
final RetryableAsyncExecutor re = new RetryableAsyncExecutor( 5 );
try {
for( int i = 0; i < COUNT; ++i ) {
re.submit( new RetryableAsyncTest("Async#"+(i+1)));
}
int count = 0;
while( count < COUNT ) {
final RetryableAsyncTest task = re.get();
if( task.isTimeouted()) {
System.err.printf( "%s: retrying (%d)\n",
task.getName(), task.getRetryCount());
}
else {
System.err.printf( "%s: done with '%s'.\n",
task.getName(), task.getValue());
++count;
}
}
}
catch( final Throwable t ) {
t.printStackTrace();
}
re.shutdown();
System.exit( 0 );
}
}
执行日志:
Async#4: done with 'computation result'.
Async#1: done with 'computation result'.
Async#6: retrying (1)
Async#3: done with 'computation result'.
Async#8: done with 'computation result'.
Async#7: retrying (1)
Async#2: done with 'computation result'.
Async#5: retrying (1)
Async#6: done with 'computation result'.
Async#7: done with 'computation result'.
Async#5: retrying (2)
Async#5: done with 'computation result'.
如果你想限制重试次数,这个逻辑发生在RetryableAsyncExecutor.get()
方法中,作为围绕_comp.submit( task );