并行化 for 循环并填充多个数据结构

parallelize a for loop and populate multiple data structures

我有一个要并行化的 for 循环。在我下面的代码中,我迭代了我最外层的 for 循环并将条目放入各种数据结构中并且它工作正常。所有这些数据结构在同一个 class 中都有一个 getter,我稍后会用它来获取所有细节,一旦在这个 for 循环中从其他 class 完成所有操作。我正在填充 infoitemToNumberMappingcatToValueHoldertasksByCategorycatHolderitemIds 数据结构,它们也有 getters还有。

  // want to parallelize this for loop
  for (Task task : tasks) {
    if (task.getCategories().isEmpty() || task.getEventList() == null
        || task.getMetaInfo() == null) {
      continue;
    }
    String itemId = task.getEventList().getId();
    String categoryId = task.getCategories().get(0).getId();
    Processor fp = new Processor(siteId, itemId, categoryId, poolType);
    Map<String, Integer> holder = fp.getDataHolder();
    if (!holder.isEmpty()) {
      for (Map.Entry<String, Integer> entry : holder.entrySet()) {
        info.putIfAbsent(entry.getKey(), entry.getValue());
      }
      List<Integer> values = new ArrayList<>();
      for (String key : holder.keySet()) {
        values.add(info.get(key));
      }
      itemToNumberMapping.put(itemId, StringUtils.join(values, ","));
      catToValueHolder.put(categoryId, StringUtils.join(values, ","));
    }
    Category cat = getCategory(task, holder.isEmpty());
    tasksByCategory.add(cat);
    LinkedList<String> ids = getCategoryIds(task);
    catHolder.put(categoryId, ids.getLast());
    itemIds.add(itemId);
  }

现在我知道如何并行化一个 for 循环,如下例所示,但令人困惑的是 - 在我的例子中,我没有像下面示例中的 output 这样的对象。在我的例子中,我有多个数据结构,我通过迭代 for 循环来填充它们,所以我很困惑如何并行化我的最外层 for 循环并仍然填充所有这些数据结构?

private final ExecutorService service = Executors.newFixedThreadPool(10);

List<Future<Output>> futures = new ArrayList<Future<Output>>();
for (final Input input : inputs) {
  Callable<Output> callable = new Callable<Output>() {
    public Output call() throws Exception {
      Output output = new Output();
      // process your input here and compute the output
      return output;
    }
  };
  futures.add(service.submit(callable));
}

service.shutdown();

List<Output> outputs = new ArrayList<Output>();
for (Future<Output> future : futures) {
  outputs.add(future.get());
}

更新:-

我正在并行化 do while 循环内的 for 循环和我的 do while 循环 运行s,直到 number 小于或等于 pages。所以也许我做得不对。因为我的 do while 循环将 运行 直到所有页面都完成,并且对于每个页面,我有一个 for 循环,我试图并行化以及我设置它的方式,它给出 rejectedexecutionexception

  private void check() {
    String endpoint = "some_url";
    int number = 1;
    int pages = 0;
    do {
      ExecutorService executorService = Executors.newFixedThreadPool(10);
      for (int i = 1; i <= retryCount; i++) {
        try {
          HttpEntity<String> requestEntity =
              new HttpEntity<String>(getBody(number), getHeader());
          ResponseEntity<String> responseEntity =
              HttpClient.getInstance().getClient()
                  .exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
          String jsonInput = responseEntity.getBody();
          Process response = objectMapper.readValue(jsonInput, Process.class);
          pages = (int) response.getPaginationResponse().getTotalPages();
          List<Task> tasks = response.getTasks();
          if (pages <= 0 || tasks.isEmpty()) {
            continue;
          }
          // want to parallelize this for loop
          for (Task task : tasks) {
            Callable<Void> c = new Callable<>() {
              public void call() {
                if (!task.getCategories().isEmpty() && task.getEventList() != null
                    && task.getMetaInfo() != null) {
                    // my code here
                }
              }
            };
            executorService.submit(c);
          }
          // is this at right place? because I am getting rejectedexecutionexception
          executorService.shutdown();
          number++;
          break;
        } catch (Exception ex) {
          // log exception
        }
      }
    } while (number <= pages);
  }

您不必从并行代码中输出一些东西。您只需获取外部循环的主体并为每个项目创建一个任务,如下所示:

for (Task task : tasks) {
   Callable<Void> c = new Callable<>() {
      public void call() {
         if (task.getCategories().isEmpty() || task.getEventList() == null || task.getMetaInfo() == null) {
               // ... rest of code here
          }
       }
    };
    executorService.submit(c);
 }

// wait for executor service, check for exceptions or whatever else you want to do here