如何 运行 多个 ExecutorServices 发出请求并通过单个界面汇集响应?

How to run multiple ExecutorServices to make requests and funnel the response through a single interface?

这道题是架构问题,一直想不通

我有一个 TaskScheduler,它有 start() 和 stop() 等操作。 TaskScheduler 旨在不可知,我希望能够将任何 "Runnable"、"UID" 和服务应 运行 的 "Interval" 传递给它。这一切都被添加到哈希图中,因此如果您尝试传入具有相同 UID 的现有 运行nable,它将用新信息替换之前的 运行nable。

扩展TaskScheduler 是MyScheduler,它是特定于我要发出的请求的。在此示例中,我每 60 秒发出多个配置文件请求。为了跟踪哪个配置文件请求是哪个,我使用 UID 作为键。

然后我想从 MyScheduler 中冒出对应用程序级别的响应。这是我遇到问题的地方。我只能冒出最新调度程序的响应。因此,如果我创建调度程序 A 和调度程序 B,我将仅从调度程序 B 接收更新。同样,如果我创建调度程序 A-C,那么我将仅从调度程序 C 接收更新。

我知道这是为什么,MyScheduler 使用传递给它的最后一个请求。但是,我不知道解决这个问题的好的模式(方法)。

任务调度器class

public class TaskScheduler {

    private static Map<String, SchedulerModel> schedulerModels = new HashMap<>();

    TaskScheduler() {}

    private ScheduledFuture<?> start(@NotNull final SchedulerModel schedulerModel) {
        return schedulerModel.executorService.scheduleWithFixedDelay(schedulerModel.runnable, 0, schedulerModel.interval, TimeUnit.SECONDS);
    }

    /**
     * Method is used to onSchedulerStop executing tasks
     */
    private void shutdown(@NotNull SchedulerModel schedulerModel) {
        if (schedulerModel.executorService != null) {
            schedulerModel.executorService.shutdownNow();
            schedulerModel.executorService = null;
        }
    }

    /**
     * Method is used to initialize scheduler task and time delays
     *
     * @param runnable Represents a command that can be executed
     * @param interval The time interval for execution of code
     */
    void setTask(Runnable runnable, String uid, int interval) {
        SchedulerModel schedulerModel = new SchedulerModel();
        schedulerModel.executorService = Executors.newSingleThreadScheduledExecutor();
        schedulerModel.runnable = runnable;
        schedulerModel.interval = interval;
        schedulerModels.put(uid, schedulerModel);
    }

    public void stop(@NotNull String uid) {
        if (schedulerModels.get(uid) != null) {
            shutdown(schedulerModels.get(uid));
            schedulerModels.remove(uid);
        } else {
            // scheduler id not found
        }
    }


    public void start(@NotNull String uid) {
        if (schedulerModels.get(uid) != null) {
            start(schedulerModels.get(uid));
        } else {
            // scheduler id not found
        }
    }

}

MyScheduler(这个名字是临时的)

public class MyScheduler extends TaskScheduler {

    private final int DEFAULT_SCHEDULER_INTERVAL = 60; // seconds

    private ProfileRequest request;
    private ApiInterface apiInterface;
    private SchedulerInterface schedulerInterface;


    public MyScheduler() {}

    public void createScheduler(@NotNull ApiInterface apiInterface,
                               @NotNull ProfileRequest request,
                               @NotNull SchedulerInterface schedulerInterface) {

        this.apiInterface = apiInterface;
        this.request = request;
        this.schedulerInterface = schedulerInterface;
        super.setTask(new SchedulerRunnable(), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
    }

    public void start(@NotNull String uid) {
        start(uid); // start scheduler
        schedulerInterface.onSchedulerStart(uid); // send feedback to callback
    }

    public void stop(@NotNull String uid) {
        stop(uid); // stop scheduler
        schedulerInterface.onSchedulerStop(uid); // send feedback to callback
    }

    private class SchedulerRunnable implements Runnable {

        @Override
        public void run() {

            ApiClient.createBookmark(request, new Callback<Response>() {
                @Override
                public void onSuccess(@NotNull Response response) {
                    schedulerInterface.onSuccess(response);
                }

                @Override
                public void onFailure(@NotNull Exception exception) {
                    schedulerInterface.onFailure(exception);
                }
            });
        }
    }
}

尝试在应用级别实现此目标

    mProfileScheduler.createScheduler(apiInterface, request, new SchedulerInterface {

    Override
    public void onSuccess(Response response) {
       // problem so far is that I only get response from latest scheduler
    }

    Override
    public void onFailure(Exception exception) {}

    Override
    public void onSchedulerStop(String uid) {
       // pass back uid so that I know which profile scheduler was stopped
    }

    Override
    public void onSchedulerStart(String uid) {}
       // pass back uid so that I know which profile scheduler was started
    }
});

你遇到这个问题是因为 schedulerInterfaceMyScheduler 的成员。 因此它在所有任务之间共享,并在每个新任务提交后被覆盖。

这里的解决方案是让 schedulerInterface 成为 SchedulerRunnable 的成员:


private class SchedulerRunnable implements Runnable {
  private SchedulerInterface schedulerInterface;

  SchedulerRunnable(SchedulerInterface schedulerInterface) {
    this.schedulerInterface = schedulerInterface;
  }
}

为了调用 onSchedulerStop()onSchedulerStart() 你可以在 TaskScheduler return Runnable 中创建 start()stop .然后在 MyTaskScheduler 中将其转换为 SchedulerRunnable 以获得对 schedulerInterface.

的引用

如果您不希望 Runnable 作为 public 接口的一部分被 return 编辑,您可以创建受保护的方法,例如Runnable doStart()Runnable doStop 可以被 void start()void stop() 覆盖和调用。


其他问题

并发

您正在为 TaskScheduler schedulerModels 使用 HashMap。它不是 thread-safe。 如果您不打算从多个线程访问它,这是可以的。 否则,您可能会遇到竞争条件和内存可见性问题。

您应该使用 ConcurrentHashMap 及其原子操作,如 computeIfPresent()computeIfAbsent() 而不是 put

资源管理

  1. 当您用具有相同 UID 的新任务替换现有任务时,您既不会停止其执行程序服务,也不会取消当前 运行 任务。因此你将泄漏线程并且之前的可运行将保持 运行。

  2. 您为每个任务创建一个新的 SingleThreadExecutorService。这使得使用的线程数量可能不受限制,并且很难对应用程序资源消耗做出任何保证。通常你会使用一个线程池,它有固定数量的线程,在任务之间重复使用。

再次,我建议阅读 "Java Concurrency in Practice" 书籍以了解这些问题和解决这些问题的模式。


完整解决方案

聊天后,这是我建议的解决方案。 我已经模拟了所有未指定的 类 和接口。

import org.jetbrains.annotations.NotNull;

import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {

    private static class MySchedulerInterface implements SchedulerInterface {
        private final ProfileRequest request;

        public MySchedulerInterface(ProfileRequest req1) {
            this.request = req1;
        }

        @Override
        public void onSuccess(String response) {
            System.out.println("onSuccess:[" + request + "]" + response);
        }

        @Override
        public void onFailure(Exception exception) {
            System.out.println("onFailure:[" + request + "]" + exception);
        }

        @Override
        public void onSchedulerStop(String uid) {
            System.out.println("onSchedulerStop:[" + request + "] - " + uid);
        }

        @Override
        public void onSchedulerStart(String uid) {
            System.out.println("onSchedulerStart:[" + request + "] - " + uid);
        }
    }

    public static void main(String[] args) throws InterruptedException {

        ApiInterface api = new ApiInterface();

        ProfileRequest req1 = new ProfileRequest("1", "apple");
        ProfileRequest req2 = new ProfileRequest("2", "orange");
        ProfileRequest req3 = new ProfileRequest("3", "peach");
        ProfileRequest req11 = new ProfileRequest("1", "pineapple");

        MyScheduler scheduler = new MyScheduler();
        scheduler.createScheduler(api, req1, new MySchedulerInterface(req1));
        scheduler.createScheduler(api, req2, new MySchedulerInterface(req2));
        scheduler.createScheduler(api, req3, new MySchedulerInterface(req3));

        System.out.println("Created 3 tasks");
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Starting 3 tasks");
        scheduler.start("1");
        scheduler.start("2");
        scheduler.start("3");
        System.out.println("Started 3 tasks");

        TimeUnit.SECONDS.sleep(10);

        System.out.println("Replacing task 1...");
        scheduler.createScheduler(api, req11, new MySchedulerInterface(req11));
        System.out.println("Replaced task 1.");

        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping 3 tasks...");
        scheduler.stop("1");
        scheduler.stop("2");
        scheduler.stop("3");
        System.out.println("The end.");
    }
}

class ProfileRequest {
    private final String uid;
    private final String value;

    public ProfileRequest(String uid, String value) {
        this.uid = uid;
        this.value = value;
    }

    public String getUid() {
        return uid;
    }

    public String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return new StringJoiner(", ", ProfileRequest.class.getSimpleName() + "[", "]")
                .add("uid='" + uid + "'")
                .add("value='" + value + "'")
                .toString();
    }
}

class ApiInterface {
    public void createBookmark(ProfileRequest request, Callback<String> stringCallback) {
        stringCallback.onSuccess("SUCCESS: I'm done with " + request);
    }
}

interface SchedulerInterface {

    void onSuccess(String response);

    void onFailure(Exception exception);

    void onSchedulerStop(String uid);

    void onSchedulerStart(String uid);
}


interface Callback<T> {
    void onSuccess(@NotNull T response);

    void onFailure(@NotNull Exception exception);
}

class MyScheduler extends TaskScheduler {

    private final int DEFAULT_SCHEDULER_INTERVAL = 2; // seconds

    public MyScheduler() {
    }

    public void createScheduler(@NotNull ApiInterface apiInterface,
                                @NotNull ProfileRequest request,
                                @NotNull SchedulerInterface schedulerInterface) {
        super.setTask(new SchedulerRunnable(apiInterface, request, schedulerInterface), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
    }

    @Override
    public ScheduledTask doStart(@NotNull String uid) {
        final ScheduledTask task = super.doStart(uid);
        if (task != null) {
            final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
            runnable.schedulerInterface.onSchedulerStart(uid);
        }
        return task;
    }

    @Override
    protected ScheduledTask doStop(@NotNull String uid) {
        final ScheduledTask task = super.doStop(uid);
        if (task != null) {
            final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
            runnable.schedulerInterface.onSchedulerStop(uid);
        }
        return task;
    }

    private class SchedulerRunnable implements Runnable {

        private final ApiInterface apiInterface;
        private final ProfileRequest request;
        private final SchedulerInterface schedulerInterface;

        SchedulerRunnable(ApiInterface apiInterface, ProfileRequest request, SchedulerInterface schedulerInterface) {
            this.apiInterface = apiInterface;
            this.request = request;
            this.schedulerInterface = schedulerInterface;
        }

        @Override
        public void run() {
            apiInterface.createBookmark(request, new Callback<String>() {
                @Override
                public void onSuccess(@NotNull String response) {
                    schedulerInterface.onSuccess(response);
                }

                @Override
                public void onFailure(@NotNull Exception exception) {
                    schedulerInterface.onFailure(exception);
                }
            });
        }
    }
}


class SchedulerModel {
    ScheduledExecutorService executorService;
    Runnable runnable;
    int interval;
}


class TaskScheduler {

    static class ScheduledTask {
        String uid;
        Runnable runnable;
        int interval;
        ScheduledFuture<?> future;

        ScheduledTask(String uid, Runnable runnable, int interval, ScheduledFuture<?> future) {
            this.uid = uid;
            this.runnable = runnable;
            this.interval = interval;
            this.future = future;
        }

        void dispose() {
            if (future != null) {
                future.cancel(true);
            }
        }

        boolean isScheduled() {
            return future != null;
        }
    }

    private ConcurrentMap<String, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>();
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

    TaskScheduler() {
    }

    /**
     * Method is used to initialize scheduler task and time delays
     *
     * @param runnable Represents a command that can be executed
     * @param interval The time interval for execution of code
     */
    void setTask(Runnable runnable, String uid, int interval) {
        AtomicBoolean requiresRestart = new AtomicBoolean(false);
        final ScheduledTask task = scheduledTasks.compute(uid, (id, oldTask) -> {
            ScheduledTask newTask = new ScheduledTask(uid, runnable, interval, null);
            if (oldTask != null) {
                oldTask.dispose();
                requiresRestart.set(oldTask.isScheduled());
            }
            return newTask;
        });

        if (requiresRestart.get()) {
            start(uid);
        }
    }

    public void start(@NotNull String uid) {
        doStart(uid);
    }

    public void stop(@NotNull String uid) {
        doStop(uid);
    }

    protected ScheduledTask doStart(@NotNull String uid) {
        final ScheduledTask scheduledTask = scheduledTasks.computeIfPresent(uid, (id, oldTask) -> {
            ScheduledFuture<?> future = executor.scheduleWithFixedDelay(
                    oldTask.runnable, 0, oldTask.interval, TimeUnit.SECONDS);
            ScheduledTask newTask = new ScheduledTask(oldTask.uid, oldTask.runnable, oldTask.interval, future);
            return newTask;
        });
        return scheduledTask;
    }

    protected ScheduledTask doStop(@NotNull String uid) {
        final ScheduledTask task = scheduledTasks.remove(uid);
        task.dispose();
        return task;
    }


}