如何 运行 多个 ExecutorServices 发出请求并通过单个界面汇集响应?
How to run multiple ExecutorServices to make requests and funnel the response through a single interface?
这道题是架构问题,一直想不通
我有一个 TaskScheduler,它有 start() 和 stop() 等操作。 TaskScheduler 旨在不可知,我希望能够将任何 "Runnable"、"UID" 和服务应 运行 的 "Interval" 传递给它。这一切都被添加到哈希图中,因此如果您尝试传入具有相同 UID 的现有 运行nable,它将用新信息替换之前的 运行nable。
扩展TaskScheduler 是MyScheduler,它是特定于我要发出的请求的。在此示例中,我每 60 秒发出多个配置文件请求。为了跟踪哪个配置文件请求是哪个,我使用 UID 作为键。
然后我想从 MyScheduler 中冒出对应用程序级别的响应。这是我遇到问题的地方。我只能冒出最新调度程序的响应。因此,如果我创建调度程序 A 和调度程序 B,我将仅从调度程序 B 接收更新。同样,如果我创建调度程序 A-C,那么我将仅从调度程序 C 接收更新。
我知道这是为什么,MyScheduler 使用传递给它的最后一个请求。但是,我不知道解决这个问题的好的模式(方法)。
任务调度器class
public class TaskScheduler {
private static Map<String, SchedulerModel> schedulerModels = new HashMap<>();
TaskScheduler() {}
private ScheduledFuture<?> start(@NotNull final SchedulerModel schedulerModel) {
return schedulerModel.executorService.scheduleWithFixedDelay(schedulerModel.runnable, 0, schedulerModel.interval, TimeUnit.SECONDS);
}
/**
* Method is used to onSchedulerStop executing tasks
*/
private void shutdown(@NotNull SchedulerModel schedulerModel) {
if (schedulerModel.executorService != null) {
schedulerModel.executorService.shutdownNow();
schedulerModel.executorService = null;
}
}
/**
* Method is used to initialize scheduler task and time delays
*
* @param runnable Represents a command that can be executed
* @param interval The time interval for execution of code
*/
void setTask(Runnable runnable, String uid, int interval) {
SchedulerModel schedulerModel = new SchedulerModel();
schedulerModel.executorService = Executors.newSingleThreadScheduledExecutor();
schedulerModel.runnable = runnable;
schedulerModel.interval = interval;
schedulerModels.put(uid, schedulerModel);
}
public void stop(@NotNull String uid) {
if (schedulerModels.get(uid) != null) {
shutdown(schedulerModels.get(uid));
schedulerModels.remove(uid);
} else {
// scheduler id not found
}
}
public void start(@NotNull String uid) {
if (schedulerModels.get(uid) != null) {
start(schedulerModels.get(uid));
} else {
// scheduler id not found
}
}
}
MyScheduler(这个名字是临时的)
public class MyScheduler extends TaskScheduler {
private final int DEFAULT_SCHEDULER_INTERVAL = 60; // seconds
private ProfileRequest request;
private ApiInterface apiInterface;
private SchedulerInterface schedulerInterface;
public MyScheduler() {}
public void createScheduler(@NotNull ApiInterface apiInterface,
@NotNull ProfileRequest request,
@NotNull SchedulerInterface schedulerInterface) {
this.apiInterface = apiInterface;
this.request = request;
this.schedulerInterface = schedulerInterface;
super.setTask(new SchedulerRunnable(), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
}
public void start(@NotNull String uid) {
start(uid); // start scheduler
schedulerInterface.onSchedulerStart(uid); // send feedback to callback
}
public void stop(@NotNull String uid) {
stop(uid); // stop scheduler
schedulerInterface.onSchedulerStop(uid); // send feedback to callback
}
private class SchedulerRunnable implements Runnable {
@Override
public void run() {
ApiClient.createBookmark(request, new Callback<Response>() {
@Override
public void onSuccess(@NotNull Response response) {
schedulerInterface.onSuccess(response);
}
@Override
public void onFailure(@NotNull Exception exception) {
schedulerInterface.onFailure(exception);
}
});
}
}
}
尝试在应用级别实现此目标
mProfileScheduler.createScheduler(apiInterface, request, new SchedulerInterface {
Override
public void onSuccess(Response response) {
// problem so far is that I only get response from latest scheduler
}
Override
public void onFailure(Exception exception) {}
Override
public void onSchedulerStop(String uid) {
// pass back uid so that I know which profile scheduler was stopped
}
Override
public void onSchedulerStart(String uid) {}
// pass back uid so that I know which profile scheduler was started
}
});
你遇到这个问题是因为 schedulerInterface
是 MyScheduler
的成员。
因此它在所有任务之间共享,并在每个新任务提交后被覆盖。
这里的解决方案是让 schedulerInterface
成为 SchedulerRunnable
的成员:
private class SchedulerRunnable implements Runnable {
private SchedulerInterface schedulerInterface;
SchedulerRunnable(SchedulerInterface schedulerInterface) {
this.schedulerInterface = schedulerInterface;
}
}
为了调用 onSchedulerStop()
和 onSchedulerStart()
你可以在 TaskScheduler
return Runnable
中创建 start()
和 stop
.然后在 MyTaskScheduler
中将其转换为 SchedulerRunnable
以获得对 schedulerInterface
.
的引用
如果您不希望 Runnable
作为 public 接口的一部分被 return 编辑,您可以创建受保护的方法,例如Runnable doStart()
和 Runnable doStop
可以被 void start()
和 void stop()
覆盖和调用。
其他问题
并发
您正在为 TaskScheduler schedulerModels
使用 HashMap
。它不是 thread-safe。
如果您不打算从多个线程访问它,这是可以的。
否则,您可能会遇到竞争条件和内存可见性问题。
您应该使用 ConcurrentHashMap
及其原子操作,如 computeIfPresent()
或 computeIfAbsent()
而不是 put
。
资源管理
当您用具有相同 UID 的新任务替换现有任务时,您既不会停止其执行程序服务,也不会取消当前 运行 任务。因此你将泄漏线程并且之前的可运行将保持 运行。
您为每个任务创建一个新的 SingleThreadExecutorService
。这使得使用的线程数量可能不受限制,并且很难对应用程序资源消耗做出任何保证。通常你会使用一个线程池,它有固定数量的线程,在任务之间重复使用。
再次,我建议阅读 "Java Concurrency in Practice" 书籍以了解这些问题和解决这些问题的模式。
完整解决方案
聊天后,这是我建议的解决方案。
我已经模拟了所有未指定的 类 和接口。
import org.jetbrains.annotations.NotNull;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Main {
private static class MySchedulerInterface implements SchedulerInterface {
private final ProfileRequest request;
public MySchedulerInterface(ProfileRequest req1) {
this.request = req1;
}
@Override
public void onSuccess(String response) {
System.out.println("onSuccess:[" + request + "]" + response);
}
@Override
public void onFailure(Exception exception) {
System.out.println("onFailure:[" + request + "]" + exception);
}
@Override
public void onSchedulerStop(String uid) {
System.out.println("onSchedulerStop:[" + request + "] - " + uid);
}
@Override
public void onSchedulerStart(String uid) {
System.out.println("onSchedulerStart:[" + request + "] - " + uid);
}
}
public static void main(String[] args) throws InterruptedException {
ApiInterface api = new ApiInterface();
ProfileRequest req1 = new ProfileRequest("1", "apple");
ProfileRequest req2 = new ProfileRequest("2", "orange");
ProfileRequest req3 = new ProfileRequest("3", "peach");
ProfileRequest req11 = new ProfileRequest("1", "pineapple");
MyScheduler scheduler = new MyScheduler();
scheduler.createScheduler(api, req1, new MySchedulerInterface(req1));
scheduler.createScheduler(api, req2, new MySchedulerInterface(req2));
scheduler.createScheduler(api, req3, new MySchedulerInterface(req3));
System.out.println("Created 3 tasks");
TimeUnit.SECONDS.sleep(2);
System.out.println("Starting 3 tasks");
scheduler.start("1");
scheduler.start("2");
scheduler.start("3");
System.out.println("Started 3 tasks");
TimeUnit.SECONDS.sleep(10);
System.out.println("Replacing task 1...");
scheduler.createScheduler(api, req11, new MySchedulerInterface(req11));
System.out.println("Replaced task 1.");
TimeUnit.SECONDS.sleep(10);
System.out.println("Stopping 3 tasks...");
scheduler.stop("1");
scheduler.stop("2");
scheduler.stop("3");
System.out.println("The end.");
}
}
class ProfileRequest {
private final String uid;
private final String value;
public ProfileRequest(String uid, String value) {
this.uid = uid;
this.value = value;
}
public String getUid() {
return uid;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return new StringJoiner(", ", ProfileRequest.class.getSimpleName() + "[", "]")
.add("uid='" + uid + "'")
.add("value='" + value + "'")
.toString();
}
}
class ApiInterface {
public void createBookmark(ProfileRequest request, Callback<String> stringCallback) {
stringCallback.onSuccess("SUCCESS: I'm done with " + request);
}
}
interface SchedulerInterface {
void onSuccess(String response);
void onFailure(Exception exception);
void onSchedulerStop(String uid);
void onSchedulerStart(String uid);
}
interface Callback<T> {
void onSuccess(@NotNull T response);
void onFailure(@NotNull Exception exception);
}
class MyScheduler extends TaskScheduler {
private final int DEFAULT_SCHEDULER_INTERVAL = 2; // seconds
public MyScheduler() {
}
public void createScheduler(@NotNull ApiInterface apiInterface,
@NotNull ProfileRequest request,
@NotNull SchedulerInterface schedulerInterface) {
super.setTask(new SchedulerRunnable(apiInterface, request, schedulerInterface), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
}
@Override
public ScheduledTask doStart(@NotNull String uid) {
final ScheduledTask task = super.doStart(uid);
if (task != null) {
final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
runnable.schedulerInterface.onSchedulerStart(uid);
}
return task;
}
@Override
protected ScheduledTask doStop(@NotNull String uid) {
final ScheduledTask task = super.doStop(uid);
if (task != null) {
final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
runnable.schedulerInterface.onSchedulerStop(uid);
}
return task;
}
private class SchedulerRunnable implements Runnable {
private final ApiInterface apiInterface;
private final ProfileRequest request;
private final SchedulerInterface schedulerInterface;
SchedulerRunnable(ApiInterface apiInterface, ProfileRequest request, SchedulerInterface schedulerInterface) {
this.apiInterface = apiInterface;
this.request = request;
this.schedulerInterface = schedulerInterface;
}
@Override
public void run() {
apiInterface.createBookmark(request, new Callback<String>() {
@Override
public void onSuccess(@NotNull String response) {
schedulerInterface.onSuccess(response);
}
@Override
public void onFailure(@NotNull Exception exception) {
schedulerInterface.onFailure(exception);
}
});
}
}
}
class SchedulerModel {
ScheduledExecutorService executorService;
Runnable runnable;
int interval;
}
class TaskScheduler {
static class ScheduledTask {
String uid;
Runnable runnable;
int interval;
ScheduledFuture<?> future;
ScheduledTask(String uid, Runnable runnable, int interval, ScheduledFuture<?> future) {
this.uid = uid;
this.runnable = runnable;
this.interval = interval;
this.future = future;
}
void dispose() {
if (future != null) {
future.cancel(true);
}
}
boolean isScheduled() {
return future != null;
}
}
private ConcurrentMap<String, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>();
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
TaskScheduler() {
}
/**
* Method is used to initialize scheduler task and time delays
*
* @param runnable Represents a command that can be executed
* @param interval The time interval for execution of code
*/
void setTask(Runnable runnable, String uid, int interval) {
AtomicBoolean requiresRestart = new AtomicBoolean(false);
final ScheduledTask task = scheduledTasks.compute(uid, (id, oldTask) -> {
ScheduledTask newTask = new ScheduledTask(uid, runnable, interval, null);
if (oldTask != null) {
oldTask.dispose();
requiresRestart.set(oldTask.isScheduled());
}
return newTask;
});
if (requiresRestart.get()) {
start(uid);
}
}
public void start(@NotNull String uid) {
doStart(uid);
}
public void stop(@NotNull String uid) {
doStop(uid);
}
protected ScheduledTask doStart(@NotNull String uid) {
final ScheduledTask scheduledTask = scheduledTasks.computeIfPresent(uid, (id, oldTask) -> {
ScheduledFuture<?> future = executor.scheduleWithFixedDelay(
oldTask.runnable, 0, oldTask.interval, TimeUnit.SECONDS);
ScheduledTask newTask = new ScheduledTask(oldTask.uid, oldTask.runnable, oldTask.interval, future);
return newTask;
});
return scheduledTask;
}
protected ScheduledTask doStop(@NotNull String uid) {
final ScheduledTask task = scheduledTasks.remove(uid);
task.dispose();
return task;
}
}
这道题是架构问题,一直想不通
我有一个 TaskScheduler,它有 start() 和 stop() 等操作。 TaskScheduler 旨在不可知,我希望能够将任何 "Runnable"、"UID" 和服务应 运行 的 "Interval" 传递给它。这一切都被添加到哈希图中,因此如果您尝试传入具有相同 UID 的现有 运行nable,它将用新信息替换之前的 运行nable。
扩展TaskScheduler 是MyScheduler,它是特定于我要发出的请求的。在此示例中,我每 60 秒发出多个配置文件请求。为了跟踪哪个配置文件请求是哪个,我使用 UID 作为键。
然后我想从 MyScheduler 中冒出对应用程序级别的响应。这是我遇到问题的地方。我只能冒出最新调度程序的响应。因此,如果我创建调度程序 A 和调度程序 B,我将仅从调度程序 B 接收更新。同样,如果我创建调度程序 A-C,那么我将仅从调度程序 C 接收更新。
我知道这是为什么,MyScheduler 使用传递给它的最后一个请求。但是,我不知道解决这个问题的好的模式(方法)。
任务调度器class
public class TaskScheduler {
private static Map<String, SchedulerModel> schedulerModels = new HashMap<>();
TaskScheduler() {}
private ScheduledFuture<?> start(@NotNull final SchedulerModel schedulerModel) {
return schedulerModel.executorService.scheduleWithFixedDelay(schedulerModel.runnable, 0, schedulerModel.interval, TimeUnit.SECONDS);
}
/**
* Method is used to onSchedulerStop executing tasks
*/
private void shutdown(@NotNull SchedulerModel schedulerModel) {
if (schedulerModel.executorService != null) {
schedulerModel.executorService.shutdownNow();
schedulerModel.executorService = null;
}
}
/**
* Method is used to initialize scheduler task and time delays
*
* @param runnable Represents a command that can be executed
* @param interval The time interval for execution of code
*/
void setTask(Runnable runnable, String uid, int interval) {
SchedulerModel schedulerModel = new SchedulerModel();
schedulerModel.executorService = Executors.newSingleThreadScheduledExecutor();
schedulerModel.runnable = runnable;
schedulerModel.interval = interval;
schedulerModels.put(uid, schedulerModel);
}
public void stop(@NotNull String uid) {
if (schedulerModels.get(uid) != null) {
shutdown(schedulerModels.get(uid));
schedulerModels.remove(uid);
} else {
// scheduler id not found
}
}
public void start(@NotNull String uid) {
if (schedulerModels.get(uid) != null) {
start(schedulerModels.get(uid));
} else {
// scheduler id not found
}
}
}
MyScheduler(这个名字是临时的)
public class MyScheduler extends TaskScheduler {
private final int DEFAULT_SCHEDULER_INTERVAL = 60; // seconds
private ProfileRequest request;
private ApiInterface apiInterface;
private SchedulerInterface schedulerInterface;
public MyScheduler() {}
public void createScheduler(@NotNull ApiInterface apiInterface,
@NotNull ProfileRequest request,
@NotNull SchedulerInterface schedulerInterface) {
this.apiInterface = apiInterface;
this.request = request;
this.schedulerInterface = schedulerInterface;
super.setTask(new SchedulerRunnable(), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
}
public void start(@NotNull String uid) {
start(uid); // start scheduler
schedulerInterface.onSchedulerStart(uid); // send feedback to callback
}
public void stop(@NotNull String uid) {
stop(uid); // stop scheduler
schedulerInterface.onSchedulerStop(uid); // send feedback to callback
}
private class SchedulerRunnable implements Runnable {
@Override
public void run() {
ApiClient.createBookmark(request, new Callback<Response>() {
@Override
public void onSuccess(@NotNull Response response) {
schedulerInterface.onSuccess(response);
}
@Override
public void onFailure(@NotNull Exception exception) {
schedulerInterface.onFailure(exception);
}
});
}
}
}
尝试在应用级别实现此目标
mProfileScheduler.createScheduler(apiInterface, request, new SchedulerInterface {
Override
public void onSuccess(Response response) {
// problem so far is that I only get response from latest scheduler
}
Override
public void onFailure(Exception exception) {}
Override
public void onSchedulerStop(String uid) {
// pass back uid so that I know which profile scheduler was stopped
}
Override
public void onSchedulerStart(String uid) {}
// pass back uid so that I know which profile scheduler was started
}
});
你遇到这个问题是因为 schedulerInterface
是 MyScheduler
的成员。
因此它在所有任务之间共享,并在每个新任务提交后被覆盖。
这里的解决方案是让 schedulerInterface
成为 SchedulerRunnable
的成员:
private class SchedulerRunnable implements Runnable {
private SchedulerInterface schedulerInterface;
SchedulerRunnable(SchedulerInterface schedulerInterface) {
this.schedulerInterface = schedulerInterface;
}
}
为了调用 onSchedulerStop()
和 onSchedulerStart()
你可以在 TaskScheduler
return Runnable
中创建 start()
和 stop
.然后在 MyTaskScheduler
中将其转换为 SchedulerRunnable
以获得对 schedulerInterface
.
如果您不希望 Runnable
作为 public 接口的一部分被 return 编辑,您可以创建受保护的方法,例如Runnable doStart()
和 Runnable doStop
可以被 void start()
和 void stop()
覆盖和调用。
其他问题
并发
您正在为 TaskScheduler schedulerModels
使用 HashMap
。它不是 thread-safe。
如果您不打算从多个线程访问它,这是可以的。
否则,您可能会遇到竞争条件和内存可见性问题。
您应该使用 ConcurrentHashMap
及其原子操作,如 computeIfPresent()
或 computeIfAbsent()
而不是 put
。
资源管理
当您用具有相同 UID 的新任务替换现有任务时,您既不会停止其执行程序服务,也不会取消当前 运行 任务。因此你将泄漏线程并且之前的可运行将保持 运行。
您为每个任务创建一个新的
SingleThreadExecutorService
。这使得使用的线程数量可能不受限制,并且很难对应用程序资源消耗做出任何保证。通常你会使用一个线程池,它有固定数量的线程,在任务之间重复使用。
再次,我建议阅读 "Java Concurrency in Practice" 书籍以了解这些问题和解决这些问题的模式。
完整解决方案
聊天后,这是我建议的解决方案。 我已经模拟了所有未指定的 类 和接口。
import org.jetbrains.annotations.NotNull;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Main {
private static class MySchedulerInterface implements SchedulerInterface {
private final ProfileRequest request;
public MySchedulerInterface(ProfileRequest req1) {
this.request = req1;
}
@Override
public void onSuccess(String response) {
System.out.println("onSuccess:[" + request + "]" + response);
}
@Override
public void onFailure(Exception exception) {
System.out.println("onFailure:[" + request + "]" + exception);
}
@Override
public void onSchedulerStop(String uid) {
System.out.println("onSchedulerStop:[" + request + "] - " + uid);
}
@Override
public void onSchedulerStart(String uid) {
System.out.println("onSchedulerStart:[" + request + "] - " + uid);
}
}
public static void main(String[] args) throws InterruptedException {
ApiInterface api = new ApiInterface();
ProfileRequest req1 = new ProfileRequest("1", "apple");
ProfileRequest req2 = new ProfileRequest("2", "orange");
ProfileRequest req3 = new ProfileRequest("3", "peach");
ProfileRequest req11 = new ProfileRequest("1", "pineapple");
MyScheduler scheduler = new MyScheduler();
scheduler.createScheduler(api, req1, new MySchedulerInterface(req1));
scheduler.createScheduler(api, req2, new MySchedulerInterface(req2));
scheduler.createScheduler(api, req3, new MySchedulerInterface(req3));
System.out.println("Created 3 tasks");
TimeUnit.SECONDS.sleep(2);
System.out.println("Starting 3 tasks");
scheduler.start("1");
scheduler.start("2");
scheduler.start("3");
System.out.println("Started 3 tasks");
TimeUnit.SECONDS.sleep(10);
System.out.println("Replacing task 1...");
scheduler.createScheduler(api, req11, new MySchedulerInterface(req11));
System.out.println("Replaced task 1.");
TimeUnit.SECONDS.sleep(10);
System.out.println("Stopping 3 tasks...");
scheduler.stop("1");
scheduler.stop("2");
scheduler.stop("3");
System.out.println("The end.");
}
}
class ProfileRequest {
private final String uid;
private final String value;
public ProfileRequest(String uid, String value) {
this.uid = uid;
this.value = value;
}
public String getUid() {
return uid;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return new StringJoiner(", ", ProfileRequest.class.getSimpleName() + "[", "]")
.add("uid='" + uid + "'")
.add("value='" + value + "'")
.toString();
}
}
class ApiInterface {
public void createBookmark(ProfileRequest request, Callback<String> stringCallback) {
stringCallback.onSuccess("SUCCESS: I'm done with " + request);
}
}
interface SchedulerInterface {
void onSuccess(String response);
void onFailure(Exception exception);
void onSchedulerStop(String uid);
void onSchedulerStart(String uid);
}
interface Callback<T> {
void onSuccess(@NotNull T response);
void onFailure(@NotNull Exception exception);
}
class MyScheduler extends TaskScheduler {
private final int DEFAULT_SCHEDULER_INTERVAL = 2; // seconds
public MyScheduler() {
}
public void createScheduler(@NotNull ApiInterface apiInterface,
@NotNull ProfileRequest request,
@NotNull SchedulerInterface schedulerInterface) {
super.setTask(new SchedulerRunnable(apiInterface, request, schedulerInterface), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
}
@Override
public ScheduledTask doStart(@NotNull String uid) {
final ScheduledTask task = super.doStart(uid);
if (task != null) {
final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
runnable.schedulerInterface.onSchedulerStart(uid);
}
return task;
}
@Override
protected ScheduledTask doStop(@NotNull String uid) {
final ScheduledTask task = super.doStop(uid);
if (task != null) {
final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
runnable.schedulerInterface.onSchedulerStop(uid);
}
return task;
}
private class SchedulerRunnable implements Runnable {
private final ApiInterface apiInterface;
private final ProfileRequest request;
private final SchedulerInterface schedulerInterface;
SchedulerRunnable(ApiInterface apiInterface, ProfileRequest request, SchedulerInterface schedulerInterface) {
this.apiInterface = apiInterface;
this.request = request;
this.schedulerInterface = schedulerInterface;
}
@Override
public void run() {
apiInterface.createBookmark(request, new Callback<String>() {
@Override
public void onSuccess(@NotNull String response) {
schedulerInterface.onSuccess(response);
}
@Override
public void onFailure(@NotNull Exception exception) {
schedulerInterface.onFailure(exception);
}
});
}
}
}
class SchedulerModel {
ScheduledExecutorService executorService;
Runnable runnable;
int interval;
}
class TaskScheduler {
static class ScheduledTask {
String uid;
Runnable runnable;
int interval;
ScheduledFuture<?> future;
ScheduledTask(String uid, Runnable runnable, int interval, ScheduledFuture<?> future) {
this.uid = uid;
this.runnable = runnable;
this.interval = interval;
this.future = future;
}
void dispose() {
if (future != null) {
future.cancel(true);
}
}
boolean isScheduled() {
return future != null;
}
}
private ConcurrentMap<String, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>();
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
TaskScheduler() {
}
/**
* Method is used to initialize scheduler task and time delays
*
* @param runnable Represents a command that can be executed
* @param interval The time interval for execution of code
*/
void setTask(Runnable runnable, String uid, int interval) {
AtomicBoolean requiresRestart = new AtomicBoolean(false);
final ScheduledTask task = scheduledTasks.compute(uid, (id, oldTask) -> {
ScheduledTask newTask = new ScheduledTask(uid, runnable, interval, null);
if (oldTask != null) {
oldTask.dispose();
requiresRestart.set(oldTask.isScheduled());
}
return newTask;
});
if (requiresRestart.get()) {
start(uid);
}
}
public void start(@NotNull String uid) {
doStart(uid);
}
public void stop(@NotNull String uid) {
doStop(uid);
}
protected ScheduledTask doStart(@NotNull String uid) {
final ScheduledTask scheduledTask = scheduledTasks.computeIfPresent(uid, (id, oldTask) -> {
ScheduledFuture<?> future = executor.scheduleWithFixedDelay(
oldTask.runnable, 0, oldTask.interval, TimeUnit.SECONDS);
ScheduledTask newTask = new ScheduledTask(oldTask.uid, oldTask.runnable, oldTask.interval, future);
return newTask;
});
return scheduledTask;
}
protected ScheduledTask doStop(@NotNull String uid) {
final ScheduledTask task = scheduledTasks.remove(uid);
task.dispose();
return task;
}
}