Java 一旦他分配的任务之一因任何原因失败,就停止执行程序服务
Java stop executor service once one of his assigned tasks fails for any reason
我需要某种服务,可以同时 运行 执行几个任务,并以 1 秒的间隔执行 1 分钟。
如果其中一个任务失败,我想停止服务和每个 运行 的任务,并带有某种指示出错的指示,否则如果一分钟后一切正常,服务将停止一切顺利的指标。
例如,我有两个函数:
Runnable task1 = ()->{
int num = Math.rand(1,100);
if (num < 5){
throw new Exception("something went wrong with this task,terminate");
}
}
Runnable task2 = ()->{
int num = Math.rand(1,100)
return num < 50;
}
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
task1schedule = scheduledExecutorService.scheduleAtFixedRate(task1, 1, 60, TimeUnit.SECONDS);
task2schedule = scheduledExecutorService.scheduleAtFixedRate(task2, 1, 60, TimeUnit.SECONDS);
if (!task1schedule || !task2schedule) scheduledExecutorService.shutdown();
关于我应该如何解决这个问题并使事情尽可能通用的任何想法?
你只需要添加一个额外的任务,它的工作是监视所有其他 运行 任务——当任何被监视的任务失败时,它们需要设置一个信号量(标志)刺客可以检查。
ScheduledExecutorService executor = (ScheduledExecutorService) Executors.newScheduledThreadPool(2);
// INSTANTIATE THE REMOTE-FILE-MONITOR:
RemoteFileMonitor monitor = new RemoteFileMonitor(remotesource, localtarget);
// THIS TimerTask PERIODICALLY TRIGGERS THE RemoteFileMonitor:
TimerTask remote = new TimerTask() {
// RUN FORREST... RUN !
public void run() {
try {
kae.trace("TimerTask::run() --> Calling RemoteFileMonitor.check()");
monitor.check();
} catch (Exception ex) {
// NULL TRAP: ALLOWS US TO CONTINUE AND RETRY:
}
}
};
// THIS TimerTask PERIODICALLY TRIES TO KILL THE REMOTE-FILE-MONITOR:
TimerTask assassin = new TimerTask() {
// WHERE DO BAD FOLKS GO WHEN THEY DIE ?
private final LocalDateTime death = LocalDateTime.now().plus(ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);
// RUN FORREST... RUN !
public void run() {
// IS THERE LIFE AFTER DEATH ???
if (LocalDateTime.now().isAfter(death)) {
// THEY GO TO A LAKE OF FIRE AND FRY:
kae.error(ReturnCode.MONITOR_POLLING_CYCLE_EXCEEDED);
}
}
};
// SCHEDULE THE PERIODIC EXECUTION OF THE RemoteFileMonitor: (remote --> run() monitor --> check())
executor.scheduleAtFixedRate(remote, delay, interval, TimeUnit.MINUTES);
// SCHEDULE PERIODIC ASSASSINATION ATTEMPTS AGAINST THE RemoteFileMonitor: (assassin --> run() --> after death --> die())
executor.scheduleAtFixedRate(assassin, delay, 60L, TimeUnit.SECONDS);
// LOOP UNTIL THE MONITOR COMPLETES:
do {
try {
// I THINK I NEED A NAP:
Thread.sleep(interval * 10);
} catch (InterruptedException e) {
// FAIL && THEN cleanexit();
kae.error(ReturnCode.MONITORING_ERROR, "Monitoring of the XXXXXX-Ingestion site was interrupted");
}
// NOTE: THE MONITOR IS SET TO 'FINISHED' WHEN THE DONE-File IS DELIVERED AND RETRIEVED:
} while (monitor.isNotFinished());
// SHUTDOWN THE MONITOR TASK:
executor.shutdown();
想法是将任务推送到公共对象 TaskCompleteEvent。如果他们推送错误,调度程序将停止,所有任务都将停止。
您可以在地图 "errors" 和 "success" 中查看每个任务迭代的结果。
public class SchedulerTest {
@Test
public void scheduler() throws InterruptedException {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
TaskCompleteEvent taskCompleteEvent = new TaskCompleteEvent(scheduledExecutorService);
Runnable task1 = () -> {
int num = new Random().nextInt(100);
if (num < 5) {
taskCompleteEvent.message("task1-"+UUID.randomUUID().toString(), "Num "+num+" was obatined. Breaking all the executions.", true);
}
};
Runnable task2 = () -> {
int num = new Random().nextInt(100);
taskCompleteEvent.message("task2-"+UUID.randomUUID().toString(), num < 50, false);
};
scheduledExecutorService.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(task2, 0, 1, TimeUnit.SECONDS);
scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
System.out.println("Success: "+taskCompleteEvent.getSuccess());
System.out.println("Errors: "+taskCompleteEvent.getErrors());
System.out.println("Went well?: "+taskCompleteEvent.getErrors().isEmpty());
}
public static class TaskCompleteEvent {
private final ScheduledExecutorService scheduledExecutorService;
private final Map<String, Object> errors = new LinkedHashMap<>();
private final Map<String, Object> success = new LinkedHashMap<>();
public TaskCompleteEvent(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
}
public synchronized void message(String id, Object response, boolean error) {
if (error) {
errors.put(id, response);
scheduledExecutorService.shutdown();
} else {
success.put(id, response);
}
}
public synchronized Map<String, Object> getErrors() {
return errors;
}
public synchronized Map<String, Object> getSuccess() {
return success;
}
}
}
我需要某种服务,可以同时 运行 执行几个任务,并以 1 秒的间隔执行 1 分钟。
如果其中一个任务失败,我想停止服务和每个 运行 的任务,并带有某种指示出错的指示,否则如果一分钟后一切正常,服务将停止一切顺利的指标。
例如,我有两个函数:
Runnable task1 = ()->{
int num = Math.rand(1,100);
if (num < 5){
throw new Exception("something went wrong with this task,terminate");
}
}
Runnable task2 = ()->{
int num = Math.rand(1,100)
return num < 50;
}
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
task1schedule = scheduledExecutorService.scheduleAtFixedRate(task1, 1, 60, TimeUnit.SECONDS);
task2schedule = scheduledExecutorService.scheduleAtFixedRate(task2, 1, 60, TimeUnit.SECONDS);
if (!task1schedule || !task2schedule) scheduledExecutorService.shutdown();
关于我应该如何解决这个问题并使事情尽可能通用的任何想法?
你只需要添加一个额外的任务,它的工作是监视所有其他 运行 任务——当任何被监视的任务失败时,它们需要设置一个信号量(标志)刺客可以检查。
ScheduledExecutorService executor = (ScheduledExecutorService) Executors.newScheduledThreadPool(2);
// INSTANTIATE THE REMOTE-FILE-MONITOR:
RemoteFileMonitor monitor = new RemoteFileMonitor(remotesource, localtarget);
// THIS TimerTask PERIODICALLY TRIGGERS THE RemoteFileMonitor:
TimerTask remote = new TimerTask() {
// RUN FORREST... RUN !
public void run() {
try {
kae.trace("TimerTask::run() --> Calling RemoteFileMonitor.check()");
monitor.check();
} catch (Exception ex) {
// NULL TRAP: ALLOWS US TO CONTINUE AND RETRY:
}
}
};
// THIS TimerTask PERIODICALLY TRIES TO KILL THE REMOTE-FILE-MONITOR:
TimerTask assassin = new TimerTask() {
// WHERE DO BAD FOLKS GO WHEN THEY DIE ?
private final LocalDateTime death = LocalDateTime.now().plus(ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);
// RUN FORREST... RUN !
public void run() {
// IS THERE LIFE AFTER DEATH ???
if (LocalDateTime.now().isAfter(death)) {
// THEY GO TO A LAKE OF FIRE AND FRY:
kae.error(ReturnCode.MONITOR_POLLING_CYCLE_EXCEEDED);
}
}
};
// SCHEDULE THE PERIODIC EXECUTION OF THE RemoteFileMonitor: (remote --> run() monitor --> check())
executor.scheduleAtFixedRate(remote, delay, interval, TimeUnit.MINUTES);
// SCHEDULE PERIODIC ASSASSINATION ATTEMPTS AGAINST THE RemoteFileMonitor: (assassin --> run() --> after death --> die())
executor.scheduleAtFixedRate(assassin, delay, 60L, TimeUnit.SECONDS);
// LOOP UNTIL THE MONITOR COMPLETES:
do {
try {
// I THINK I NEED A NAP:
Thread.sleep(interval * 10);
} catch (InterruptedException e) {
// FAIL && THEN cleanexit();
kae.error(ReturnCode.MONITORING_ERROR, "Monitoring of the XXXXXX-Ingestion site was interrupted");
}
// NOTE: THE MONITOR IS SET TO 'FINISHED' WHEN THE DONE-File IS DELIVERED AND RETRIEVED:
} while (monitor.isNotFinished());
// SHUTDOWN THE MONITOR TASK:
executor.shutdown();
想法是将任务推送到公共对象 TaskCompleteEvent。如果他们推送错误,调度程序将停止,所有任务都将停止。
您可以在地图 "errors" 和 "success" 中查看每个任务迭代的结果。
public class SchedulerTest {
@Test
public void scheduler() throws InterruptedException {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
TaskCompleteEvent taskCompleteEvent = new TaskCompleteEvent(scheduledExecutorService);
Runnable task1 = () -> {
int num = new Random().nextInt(100);
if (num < 5) {
taskCompleteEvent.message("task1-"+UUID.randomUUID().toString(), "Num "+num+" was obatined. Breaking all the executions.", true);
}
};
Runnable task2 = () -> {
int num = new Random().nextInt(100);
taskCompleteEvent.message("task2-"+UUID.randomUUID().toString(), num < 50, false);
};
scheduledExecutorService.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(task2, 0, 1, TimeUnit.SECONDS);
scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
System.out.println("Success: "+taskCompleteEvent.getSuccess());
System.out.println("Errors: "+taskCompleteEvent.getErrors());
System.out.println("Went well?: "+taskCompleteEvent.getErrors().isEmpty());
}
public static class TaskCompleteEvent {
private final ScheduledExecutorService scheduledExecutorService;
private final Map<String, Object> errors = new LinkedHashMap<>();
private final Map<String, Object> success = new LinkedHashMap<>();
public TaskCompleteEvent(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
}
public synchronized void message(String id, Object response, boolean error) {
if (error) {
errors.put(id, response);
scheduledExecutorService.shutdown();
} else {
success.put(id, response);
}
}
public synchronized Map<String, Object> getErrors() {
return errors;
}
public synchronized Map<String, Object> getSuccess() {
return success;
}
}
}