从调用方法并行执行方法

executing a method in parallel from a call method

我有一个客户正在使用的库,他们正在传递 DataRequest 对象,其中包含 useridtimeout 和其他一些字段。现在,我使用此 DataRequest 对象创建 URL,然后使用 RestTemplate 进行 HTTP 调用,我的服务 return 返回 JSON 响应,我用它来制作一个 DataResponse 对象和 return 这个 DataResponse 对象返回给他们。

下面是我的 DataClient class 客户通过将 DataRequest 对象传递给它来使用。如果在 getSyncData 方法中花费太多时间,我正在使用客户在 DataRequest 中传递的超时值来使请求超时。

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public DataResponse getSyncData(DataRequest key) {
        DataResponse response = null;
        Future<DataResponse> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true);
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<DataResponse> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<DataResponse> future = service.submit(task);

        return future;
    }
}

DataFetcherTask class:

public class DataFetcherTask implements Callable<DataResponse> {

    private DataRequest key;
    private RestTemplate restTemplate;

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        // In a nutshell below is what I am doing here. 
        // 1. Make an url using DataRequest key.
        // 2. And then execute the url RestTemplate.
        // 3. Make a DataResponse object and return it.

        // I am calling this whole logic in call method as LogicA
    }
}

截至目前,我的 DataFetcherTask class 负责一个 DataRequest 键,如上所示..

问题陈述:-

现在我有一个小的设计更改。客户会将 DataRequest(例如 keyA)对象传递到我的库,然后我将使用 [=13= 中存在的用户 ID 对另一个服务(我在当前设计中没有这样做)进行新的 http 调用] (keyA) 对象,它将返回用户 ID 列表,因此我将使用这些用户 ID,并为每个用户 ID returned 制作一些其他 DataRequest (keyB, keyC, keyD) 对象响应。然后我将拥有 List<DataRequest> 对象,其中包含 keyB、keyC 和 keyD DataRequest 对象。 List<DataRequest> 中的最大元素将是三个,仅此而已。

现在,对于 List<DataRequest> 中的每个 DataRequest 对象,我想并行执行上面的 DataFetcherTask.call 方法,然后通过添加每个 DataResponse 对于每个键。所以我将对 DataFetcherTask.call 进行三个并行调用。此并行调用背后的想法是在相同的全局超时值中获取所有这些最多三个键的数据。

所以我的建议是 - DataFetcherTask class 将 return 返回 List<DataResponse> 对象而不是 DataResponse 然后 getSyncData 的签名和getAsyncData 方法也会改变。所以这是算法:

通过这种方式,我也可以在步骤 1 和步骤 2 上应用相同的全局超时。如果上述任何一步都需要时间,我们将在 getSyncData 方法中超时。

DataFetcherTask class 设计更改后:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    // second executor here
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = generateKeys();
        CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);

        int count = 0;
        for (final DataRequest key : keys) {
            comp.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            });
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        while (count-- > 0) {
            Future<DataResponse> future = comp.take();
            responseList.add(future.get());
        }
        return responseList;
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
    }
}

现在我的问题是 -

我简化了代码,这样我的想法就很清楚了。

正如您的问题的评论中已经提到的,您可以使用 Java 的 ForkJoin 框架。这将为您节省 DataFetcherTask.

中的额外线程池

您只需在 DataClient 中使用 ForkJoinPool 并将 DataFetcherTask 转换为 RecursiveTaskForkJoinTask 的子类型之一) .这使您可以轻松地并行执行其他子任务。

因此,经过这些修改后,您的代码将如下所示:

DataFetcherTask

DataFetcherTask 现在是 RecursiveTask,它首先生成密钥并为每个生成的密钥调用子任务。这些子任务与父任务在相同的 ForkJoinPool 中执行。

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {

  private final DataRequest key;
  private final RestTemplate restTemplate;

  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
      this.key = key;
      this.restTemplate = restTemplate;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        // TODO - Handle exception properly
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }

    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }

    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
      List<DataRequest> keys = new ArrayList<>();
      // use key object which is passed in contructor to make HTTP call to another service
      // and then make List of DataRequest object and return keys.
      return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {

    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      // This will have all LogicA code here which is shown in my original design.
      // everything as it is same..
      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
    }
  }

}

数据客户端

DataClient 除了新的线程池外不会有太大变化:

public class DataClient implements Client {

  private final RestTemplate restTemplate = new RestTemplate();
  // Replace the ExecutorService with a ForkJoinPool
  private final ForkJoinPool service = new ForkJoinPool(15);

  @Override
  public List<DataResponse> getSyncData(DataRequest key) {
      List<DataResponse> responsList = null;
      Future<List<DataResponse>> responseFuture = null;

      try {
          responseFuture = getAsyncData(key);
          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
      } catch (TimeoutException | ExecutionException | InterruptedException ex) {
          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
          responseFuture.cancel(true);
          // logging exception here
      }

      return responsList;
  }

  @Override
  public Future<List<DataResponse>> getAsyncData(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
      return this.service.submit(task);
  }
}

进入 Java8 后,您可以考虑将实施更改为 CompletableFutures。然后它看起来像这样:

DataClientCF

public class DataClientCF {

  private final RestTemplate restTemplate = new RestTemplate();
  private final ExecutorService executor = Executors.newFixedThreadPool(15);

  public List<DataResponse> getData(DataRequest initialKey) {
    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
      .exceptionally(t -> { throw new RuntimeException(t); })
      .join();
  }

  private List<DataRequest> generateKeys(DataRequest key) {
    return new ArrayList<>();
  }

  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
  }
}

如评论中所述,Guava 的 ListenableFutures 将为 Java7 提供类似的功能,但如果没有 Lambda,它们往往会变得笨拙。

据我所知,RestTemplate 是阻塞的,在 ForkJoinTask 的 ForkJoinPool JavaDoc 中说:

Computations should avoid synchronized methods or blocks, and should minimize other blocking synchronization apart from joining other tasks or using synchronizers such as Phasers that are advertised to cooperate with fork/join scheduling. ...
Tasks should also not perform blocking IO,...

来电是多余的。
而且你不需要两个执行者。您也可以 return 部分结果 getSyncData(DataRequest key)。这可以这样做

DataClient.java

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public List<DataResponse> getSyncData(DataRequest key) {
        List<DataResponse> responseList = null;
        DataFetcherResult response = null;
        try {
            response = getAsyncData(key);
            responseList = response.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response.cancel(true);
            responseList = response.getPartialResult();
        }
        return responseList;
    }

    @Override
    public DataFetcherResult getAsyncData(DataRequest key) {
        List<DataRequest> keys = generateKeys(key);
        final List<Future<DataResponse>> responseList = new ArrayList<>();
        final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null
        for (final DataRequest _key : keys) {
            responseList.add(service.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    DataResponse response = null;
                    try {
                        response = performDataRequest(_key);
                    } finally {
                        latch.countDown();
                        return response;
                    }
                }
            }));
        }
        return new DataFetcherResult(responseList, latch);
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys(DataRequest key) {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
        return null;
    }
}

DataFetcherResult.java

public class DataFetcherResult implements Future<List<DataResponse>> {
    final List<Future<DataResponse>> futures;
    final CountDownLatch latch;

    public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) {
        this.futures = futures;
        this.latch = latch;
    }

    //non-blocking
    public List<DataResponse> getPartialResult() {
        List<DataResponse> result = new ArrayList<>(futures.size());
        for (Future<DataResponse> future : futures) {
            try {
                result.add(future.isDone() ? future.get() : null);
                //instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                //ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled
                //you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum
            }
        }
        return result;
    }

    @Override
    public List<DataResponse> get() throws ExecutionException, InterruptedException {
        List<DataResponse> result = new ArrayList<>(futures.size());
        for (Future<DataResponse> future : futures) {
            result.add(future.get());
        }
        return result;
    }

    @Override
    public List<DataResponse> get(long timeout, TimeUnit timeUnit)
            throws ExecutionException, InterruptedException, TimeoutException {
        if (latch.await(timeout, timeUnit)) {
            return get();
        }
        throw new TimeoutException();//or getPartialResult()
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = true;
        for (Future<DataResponse> future : futures) {
            cancelled &= future.cancel(mayInterruptIfRunning);
        }
        return cancelled;
    }

    @Override
    public boolean isCancelled() {
        boolean cancelled = true;
        for (Future<DataResponse> future : futures) {
            cancelled &= future.isCancelled();
        }
        return cancelled;
    }

    @Override
    public boolean isDone() {
        boolean done = true;
        for (Future<DataResponse> future : futures) {
            done &= future.isDone();
        }
        return done;
    }

    //and etc.
}

我用 CountDownLatch 写的,看起来不错,但请注意有细微差别。 您可能会在 DataFetcherResult.get(long timeout, TimeUnit timeUnit) 中卡住一段时间,因为 CountDownLatch 与未来的状态不同步。可能 latch.getCount() == 0 但并非所有期货都会同时 return future.isDone() == true。因为他们已经在 finally {} Callable 的块中传递了 latch.countDown(); 但没有改变内部 state 仍然等于 NEW
因此在 get(long timeout, TimeUnit timeUnit) 中调用 get() 可能会导致小的延迟。
类似的案例是 described here

Get with timeout DataFetcherResult.get(...) 可以使用 futures future.get(long timeout, TimeUnit timeUnit) 重写,您可以从 class.

中删除 CountDownLatch
public List<DataResponse> get(long timeout, TimeUnit timeUnit)
        throws ExecutionException, InterruptedException{
    List<DataResponse> result = new ArrayList<>(futures.size());
    long timeoutMs = timeUnit.toMillis(timeout);
    boolean timeout = false;
    for (Future<DataResponse> future : futures) {
        long beforeGet = System.currentTimeMillis();
        try {
            if (!timeout && timeoutMs > 0) {
                result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS));
                timeoutMs -= System.currentTimeMillis() - beforeGet;
            } else {
                if (future.isDone()) {
                    result.add(future.get());
                } else {
                    //result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ?
                }
            }
        } catch (TimeoutException e) {
            result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR));
            timeout = true;
        }
        //you can also handle ExecutionException or CancellationException here
    }

    return result;
}

此代码是作为示例给出的,在用于生产之前应该进行测试,但看起来是合法的:)