Java 如何并行调用方法传递分块数据
Java how to call a method in parallel passing chunked data
我有一个 API 调用,一次只能接受 5 个子请求。所以如果超过 10 个子请求进来,我需要分块,然后按照 SLA 并行发送它们。
下面是非并行分块代码,我希望并行化此调用 invokeService(bulkRequestChunk)
int BULK_SUBREQUEST_SIZE = 5;
// infoRequestList size = 7
for (int i = 0; i < numOfSubRequests; i += BULK_SUBREQUEST_SIZE) {
List<InfoRequest> infoRequestChunk;
if (i + BULK_SUBREQUEST_SIZE >= numOfSubRequests) {
infoRequestChunk = infoRequestList.subList(i, numOfSubRequests);
} else {
infoRequestChunk = infoRequestList.subList(i, i + BULK_SUBREQUEST_SIZE);
}
BulkRequest bulkRequestChunk = new BulkRequest();
bulkRequestChunk.setRequests(infoRequestChunk);
BulkResponse bulkResponseChunk = invokeService(bulkRequestChunk);
// output list to capture all chunked requests
bulkResponseList.add(bulkResponseChunk);
}
使用 ExecutorService 并将其任务并行提交给 运行:
ExecutorService execServ = Executors.newFixedThreadPool(numThreads);
final List<BulkResponse> bulkResponseList = new ArrayList<>();
int BULK_SUBREQUEST_SIZE = 5;
for (int i = 0; i < numOfSubRequests; i += BULK_SUBREQUEST_SIZE) {
List<InfoRequest> infoRequestChunk;
if (i + BULK_SUBREQUEST_SIZE >= numOfSubRequests) {
infoRequestChunk = infoRequestList.subList(i, numOfSubRequests);
} else {
infoRequestChunk = infoRequestList.subList(i, i + BULK_SUBREQUEST_SIZE);
}
BulkRequest bulkRequestChunk = new BulkRequest();
bulkRequestChunk.setRequests(infoRequestChunk);
execServ.submit(() -> {
BulkResponse bulkResponseChunk = invokeService(bulkRequestChunk);
bulkResponseList.add(bulkResponseChunk);
});
}
execServ.shutdown();
try {
execServ.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
}
我有一个 API 调用,一次只能接受 5 个子请求。所以如果超过 10 个子请求进来,我需要分块,然后按照 SLA 并行发送它们。
下面是非并行分块代码,我希望并行化此调用 invokeService(bulkRequestChunk)
int BULK_SUBREQUEST_SIZE = 5;
// infoRequestList size = 7
for (int i = 0; i < numOfSubRequests; i += BULK_SUBREQUEST_SIZE) {
List<InfoRequest> infoRequestChunk;
if (i + BULK_SUBREQUEST_SIZE >= numOfSubRequests) {
infoRequestChunk = infoRequestList.subList(i, numOfSubRequests);
} else {
infoRequestChunk = infoRequestList.subList(i, i + BULK_SUBREQUEST_SIZE);
}
BulkRequest bulkRequestChunk = new BulkRequest();
bulkRequestChunk.setRequests(infoRequestChunk);
BulkResponse bulkResponseChunk = invokeService(bulkRequestChunk);
// output list to capture all chunked requests
bulkResponseList.add(bulkResponseChunk);
}
使用 ExecutorService 并将其任务并行提交给 运行:
ExecutorService execServ = Executors.newFixedThreadPool(numThreads);
final List<BulkResponse> bulkResponseList = new ArrayList<>();
int BULK_SUBREQUEST_SIZE = 5;
for (int i = 0; i < numOfSubRequests; i += BULK_SUBREQUEST_SIZE) {
List<InfoRequest> infoRequestChunk;
if (i + BULK_SUBREQUEST_SIZE >= numOfSubRequests) {
infoRequestChunk = infoRequestList.subList(i, numOfSubRequests);
} else {
infoRequestChunk = infoRequestList.subList(i, i + BULK_SUBREQUEST_SIZE);
}
BulkRequest bulkRequestChunk = new BulkRequest();
bulkRequestChunk.setRequests(infoRequestChunk);
execServ.submit(() -> {
BulkResponse bulkResponseChunk = invokeService(bulkRequestChunk);
bulkResponseList.add(bulkResponseChunk);
});
}
execServ.shutdown();
try {
execServ.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
}