并行化 for 循环并填充多个数据结构
parallelize a for loop and populate multiple data structures
我有一个要并行化的 for 循环。在我下面的代码中,我迭代了我最外层的 for 循环并将条目放入各种数据结构中并且它工作正常。所有这些数据结构在同一个 class 中都有一个 getter,我稍后会用它来获取所有细节,一旦在这个 for 循环中从其他 class 完成所有操作。我正在填充 info
、itemToNumberMapping
、catToValueHolder
、tasksByCategory
、catHolder
、itemIds
数据结构,它们也有 getters还有。
// want to parallelize this for loop
for (Task task : tasks) {
if (task.getCategories().isEmpty() || task.getEventList() == null
|| task.getMetaInfo() == null) {
continue;
}
String itemId = task.getEventList().getId();
String categoryId = task.getCategories().get(0).getId();
Processor fp = new Processor(siteId, itemId, categoryId, poolType);
Map<String, Integer> holder = fp.getDataHolder();
if (!holder.isEmpty()) {
for (Map.Entry<String, Integer> entry : holder.entrySet()) {
info.putIfAbsent(entry.getKey(), entry.getValue());
}
List<Integer> values = new ArrayList<>();
for (String key : holder.keySet()) {
values.add(info.get(key));
}
itemToNumberMapping.put(itemId, StringUtils.join(values, ","));
catToValueHolder.put(categoryId, StringUtils.join(values, ","));
}
Category cat = getCategory(task, holder.isEmpty());
tasksByCategory.add(cat);
LinkedList<String> ids = getCategoryIds(task);
catHolder.put(categoryId, ids.getLast());
itemIds.add(itemId);
}
现在我知道如何并行化一个 for 循环,如下例所示,但令人困惑的是 - 在我的例子中,我没有像下面示例中的 output
这样的对象。在我的例子中,我有多个数据结构,我通过迭代 for 循环来填充它们,所以我很困惑如何并行化我的最外层 for 循环并仍然填充所有这些数据结构?
private final ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<Output>> futures = new ArrayList<Future<Output>>();
for (final Input input : inputs) {
Callable<Output> callable = new Callable<Output>() {
public Output call() throws Exception {
Output output = new Output();
// process your input here and compute the output
return output;
}
};
futures.add(service.submit(callable));
}
service.shutdown();
List<Output> outputs = new ArrayList<Output>();
for (Future<Output> future : futures) {
outputs.add(future.get());
}
更新:-
我正在并行化 do while 循环内的 for 循环和我的 do while 循环 运行s,直到 number
小于或等于 pages
。所以也许我做得不对。因为我的 do while 循环将 运行 直到所有页面都完成,并且对于每个页面,我有一个 for 循环,我试图并行化以及我设置它的方式,它给出 rejectedexecutionexception
。
private void check() {
String endpoint = "some_url";
int number = 1;
int pages = 0;
do {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 1; i <= retryCount; i++) {
try {
HttpEntity<String> requestEntity =
new HttpEntity<String>(getBody(number), getHeader());
ResponseEntity<String> responseEntity =
HttpClient.getInstance().getClient()
.exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
String jsonInput = responseEntity.getBody();
Process response = objectMapper.readValue(jsonInput, Process.class);
pages = (int) response.getPaginationResponse().getTotalPages();
List<Task> tasks = response.getTasks();
if (pages <= 0 || tasks.isEmpty()) {
continue;
}
// want to parallelize this for loop
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (!task.getCategories().isEmpty() && task.getEventList() != null
&& task.getMetaInfo() != null) {
// my code here
}
}
};
executorService.submit(c);
}
// is this at right place? because I am getting rejectedexecutionexception
executorService.shutdown();
number++;
break;
} catch (Exception ex) {
// log exception
}
}
} while (number <= pages);
}
您不必从并行代码中输出一些东西。您只需获取外部循环的主体并为每个项目创建一个任务,如下所示:
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (task.getCategories().isEmpty() || task.getEventList() == null || task.getMetaInfo() == null) {
// ... rest of code here
}
}
};
executorService.submit(c);
}
// wait for executor service, check for exceptions or whatever else you want to do here
我有一个要并行化的 for 循环。在我下面的代码中,我迭代了我最外层的 for 循环并将条目放入各种数据结构中并且它工作正常。所有这些数据结构在同一个 class 中都有一个 getter,我稍后会用它来获取所有细节,一旦在这个 for 循环中从其他 class 完成所有操作。我正在填充 info
、itemToNumberMapping
、catToValueHolder
、tasksByCategory
、catHolder
、itemIds
数据结构,它们也有 getters还有。
// want to parallelize this for loop
for (Task task : tasks) {
if (task.getCategories().isEmpty() || task.getEventList() == null
|| task.getMetaInfo() == null) {
continue;
}
String itemId = task.getEventList().getId();
String categoryId = task.getCategories().get(0).getId();
Processor fp = new Processor(siteId, itemId, categoryId, poolType);
Map<String, Integer> holder = fp.getDataHolder();
if (!holder.isEmpty()) {
for (Map.Entry<String, Integer> entry : holder.entrySet()) {
info.putIfAbsent(entry.getKey(), entry.getValue());
}
List<Integer> values = new ArrayList<>();
for (String key : holder.keySet()) {
values.add(info.get(key));
}
itemToNumberMapping.put(itemId, StringUtils.join(values, ","));
catToValueHolder.put(categoryId, StringUtils.join(values, ","));
}
Category cat = getCategory(task, holder.isEmpty());
tasksByCategory.add(cat);
LinkedList<String> ids = getCategoryIds(task);
catHolder.put(categoryId, ids.getLast());
itemIds.add(itemId);
}
现在我知道如何并行化一个 for 循环,如下例所示,但令人困惑的是 - 在我的例子中,我没有像下面示例中的 output
这样的对象。在我的例子中,我有多个数据结构,我通过迭代 for 循环来填充它们,所以我很困惑如何并行化我的最外层 for 循环并仍然填充所有这些数据结构?
private final ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<Output>> futures = new ArrayList<Future<Output>>();
for (final Input input : inputs) {
Callable<Output> callable = new Callable<Output>() {
public Output call() throws Exception {
Output output = new Output();
// process your input here and compute the output
return output;
}
};
futures.add(service.submit(callable));
}
service.shutdown();
List<Output> outputs = new ArrayList<Output>();
for (Future<Output> future : futures) {
outputs.add(future.get());
}
更新:-
我正在并行化 do while 循环内的 for 循环和我的 do while 循环 运行s,直到 number
小于或等于 pages
。所以也许我做得不对。因为我的 do while 循环将 运行 直到所有页面都完成,并且对于每个页面,我有一个 for 循环,我试图并行化以及我设置它的方式,它给出 rejectedexecutionexception
。
private void check() {
String endpoint = "some_url";
int number = 1;
int pages = 0;
do {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 1; i <= retryCount; i++) {
try {
HttpEntity<String> requestEntity =
new HttpEntity<String>(getBody(number), getHeader());
ResponseEntity<String> responseEntity =
HttpClient.getInstance().getClient()
.exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
String jsonInput = responseEntity.getBody();
Process response = objectMapper.readValue(jsonInput, Process.class);
pages = (int) response.getPaginationResponse().getTotalPages();
List<Task> tasks = response.getTasks();
if (pages <= 0 || tasks.isEmpty()) {
continue;
}
// want to parallelize this for loop
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (!task.getCategories().isEmpty() && task.getEventList() != null
&& task.getMetaInfo() != null) {
// my code here
}
}
};
executorService.submit(c);
}
// is this at right place? because I am getting rejectedexecutionexception
executorService.shutdown();
number++;
break;
} catch (Exception ex) {
// log exception
}
}
} while (number <= pages);
}
您不必从并行代码中输出一些东西。您只需获取外部循环的主体并为每个项目创建一个任务,如下所示:
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (task.getCategories().isEmpty() || task.getEventList() == null || task.getMetaInfo() == null) {
// ... rest of code here
}
}
};
executorService.submit(c);
}
// wait for executor service, check for exceptions or whatever else you want to do here