Java 8 中的异步方法后跟并行执行的方法
Async method followed by a parallelly executed method in Java 8
在花了一天时间学习 java 并发 API 之后,我仍然不太明白如何在 CompletableFuture 和 ExecutorService 类 的帮助下创建以下功能:
当我在我的 REST 端点上收到请求时,我需要:
- 启动一个异步任务(包括DB查询,过滤等),最后会给我一个StringURL的列表
- 与此同时,用 HTTP OK 回复 REST 调用者,表明已收到请求,我正在处理它
- 异步任务完成后,我需要将 HTTP 请求(带有负载,REST 调用者给我)发送到我从作业中获得的 URLs。最多 URL 的数量大约为 100,因此我需要这些并行发生。
- 理想情况下,我有一些同步计数器来计算有多少 http 请求是 success/fail,我可以将此信息发送回 REST 调用者(URL 我需要发送它请求负载中提供了返回)。
我已经为这些编写了构建块(方法如:getMatchingObjectsFromDB(callerPayload)、getURLs(resultOfgetMachingObjects)、sendHttpRequest(Url、methodType) 等...) ,我只是不太明白如何将第 1 步和第 3 步联系在一起。我会在第 1 步使用 CompletableFuture.supplyAsync()
,然后我需要 CompletableFuture.thenComponse
方法来开始第 3 步,但我不清楚如何使用此 API 实现并行性。不过,使用 ExecutorService executor = Executors.newWorkStealingPool();
非常直观,它根据可用的处理能力创建线程池,并且可以通过 invokeAll()
方法提交任务。
如何同时使用 CompletableFuture
和 ExecutorService
?或者我如何保证使用 CompletableFuture
并行执行任务列表?演示代码片段将不胜感激。谢谢。
您应该使用 join()
等待所有线程完成。
创建 Map<String, Boolean> result
来存储您的请求结果。
在您的控制器中:
public void yourControllerMethod() {
CompletableFuture.runAsync(() -> yourServiceMethod());
}
为您服务:
// Execute your logic to get List<String> urls
List<CompletableFuture> futures = urls.stream().map(v ->
CompletableFuture.supplyAsync(url -> requestUrl(url))
.thenAcceptAsync(requestResult -> result.put(url, true or false))
).collect(toList()); // You have list of completeable future here
然后使用.join()
等待所有线程(记住你的服务已经在它自己的线程中执行)
CompletableFuture.allOf(futures).join();
然后你可以通过访问result
地图
来确定是哪一个success/fail
编辑
请post你的程序代码,以便其他人也能理解你。
我已经阅读了您的代码,这里是需要修改的地方:
When this for loop was not commented out, the receiver webserver got
the same request twice,
I dont understand the purpose of this for loop.
对不起,我在之前的回答中没有清理干净。这只是我脑子里的一个临时想法,我最后忘了删除 :D
只需将其从您的代码中删除
// allOf() only accepts arrays, so the List needed to be converted
/* The code never gets over this part (I know allOf() is a blocking call), even long after when the receiver got the HTTP request
with the correct payload. I'm not sure yet where exactly the code gets stuck */
您的地图应该是 ConcurrentHashMap
因为您稍后要同时修改它。
Map<String, Boolean> result = new ConcurrentHashMap<>();
如果您的代码仍然无法按预期工作,我建议删除 parallelStream()
部分。
CompletableFuture
和 parallelStream
使用公共 forkjoin 池。我认为游泳池已经用完了。
并且您应该为您的 CompletableFuture
:
创建自己的池
Executor pool = Executors.newFixedThreadPool(10);
并使用该池执行您的请求:
CompletableFuture.supplyAsync(YOURTASK, pool).thenAcceptAsync(Yourtask, pool)
为了完成这里是代码的相关部分,经过清理和测试(感谢 Mạnh Quyết Nguyễn):
休息控制器class:
@POST
@Path("publish")
public Response publishEvent(PublishEvent eventPublished) {
/*
Payload verification, etc.
*/
//First send the event to the right subscribers, then send the resulting hashmap<String url, Boolean subscriberGotTheRequest> back to the publisher
CompletableFuture.supplyAsync(() -> EventHandlerService.propagateEvent(eventPublished)).thenAccept(map -> {
if (eventPublished.getDeliveryCompleteUri() != null) {
String callbackUrl = Utility
.getUri(eventPublished.getSource().getAddress(), eventPublished.getSource().getPort(), eventPublished.getDeliveryCompleteUri(), isSecure,
false);
try {
Utility.sendRequest(callbackUrl, "POST", map);
} catch (RuntimeException e) {
log.error("Callback after event publishing failed at: " + callbackUrl);
e.printStackTrace();
}
}
});
//return OK while the event publishing happens in async
return Response.status(Status.OK).build();
}
服务class:
private static List<EventFilter> getMatchingEventFilters(PublishEvent pe) {
//query the database, filter the results based on the method argument
}
private static boolean sendRequest(String url, Event event) {
//send the HTTP request to the given URL, with the given Event payload, return true if the response is positive (status code starts with 2), false otherwise
}
static Map<String, Boolean> propagateEvent(PublishEvent eventPublished) {
// Get the event relevant filters from the DB
List<EventFilter> filters = getMatchingEventFilters(eventPublished);
// Create the URLs from the filters
List<String> urls = new ArrayList<>();
for (EventFilter filter : filters) {
String url;
try {
boolean isSecure = filter.getConsumer().getAuthenticationInfo() != null;
url = Utility.getUri(filter.getConsumer().getAddress(), filter.getPort(), filter.getNotifyUri(), isSecure, false);
} catch (ArrowheadException | NullPointerException e) {
e.printStackTrace();
continue;
}
urls.add(url);
}
Map<String, Boolean> result = new ConcurrentHashMap<>();
Stream<CompletableFuture> stream = urls.stream().map(url -> CompletableFuture.supplyAsync(() -> sendRequest(url, eventPublished.getEvent()))
.thenAcceptAsync(published -> result.put(url, published)));
CompletableFuture.allOf(stream.toArray(CompletableFuture[]::new)).join();
log.info("Event published to " + urls.size() + " subscribers.");
return result;
}
调试这个比平时更难一些,有时代码会神奇地停止。为了解决这个问题,我只将代码部分放入绝对必要的异步任务中,并且我确保任务中的代码使用线程安全的东西。另外我一开始是个笨蛋,我在 EventHandlerService.class
里面的方法使用了 synchronized
关键字,这导致 Service class 方法里面的 CompletableFuture 没有执行,因为它使用了一个默认线程池。
A piece of logic marked with synchronized becomes a synchronized block, allowing only one thread to execute at any given time.
在花了一天时间学习 java 并发 API 之后,我仍然不太明白如何在 CompletableFuture 和 ExecutorService 类 的帮助下创建以下功能:
当我在我的 REST 端点上收到请求时,我需要:
- 启动一个异步任务(包括DB查询,过滤等),最后会给我一个StringURL的列表
- 与此同时,用 HTTP OK 回复 REST 调用者,表明已收到请求,我正在处理它
- 异步任务完成后,我需要将 HTTP 请求(带有负载,REST 调用者给我)发送到我从作业中获得的 URLs。最多 URL 的数量大约为 100,因此我需要这些并行发生。
- 理想情况下,我有一些同步计数器来计算有多少 http 请求是 success/fail,我可以将此信息发送回 REST 调用者(URL 我需要发送它请求负载中提供了返回)。
我已经为这些编写了构建块(方法如:getMatchingObjectsFromDB(callerPayload)、getURLs(resultOfgetMachingObjects)、sendHttpRequest(Url、methodType) 等...) ,我只是不太明白如何将第 1 步和第 3 步联系在一起。我会在第 1 步使用 CompletableFuture.supplyAsync()
,然后我需要 CompletableFuture.thenComponse
方法来开始第 3 步,但我不清楚如何使用此 API 实现并行性。不过,使用 ExecutorService executor = Executors.newWorkStealingPool();
非常直观,它根据可用的处理能力创建线程池,并且可以通过 invokeAll()
方法提交任务。
如何同时使用 CompletableFuture
和 ExecutorService
?或者我如何保证使用 CompletableFuture
并行执行任务列表?演示代码片段将不胜感激。谢谢。
您应该使用 join()
等待所有线程完成。
创建 Map<String, Boolean> result
来存储您的请求结果。
在您的控制器中:
public void yourControllerMethod() {
CompletableFuture.runAsync(() -> yourServiceMethod());
}
为您服务:
// Execute your logic to get List<String> urls
List<CompletableFuture> futures = urls.stream().map(v ->
CompletableFuture.supplyAsync(url -> requestUrl(url))
.thenAcceptAsync(requestResult -> result.put(url, true or false))
).collect(toList()); // You have list of completeable future here
然后使用.join()
等待所有线程(记住你的服务已经在它自己的线程中执行)
CompletableFuture.allOf(futures).join();
然后你可以通过访问result
地图
编辑
请post你的程序代码,以便其他人也能理解你。
我已经阅读了您的代码,这里是需要修改的地方:
When this for loop was not commented out, the receiver webserver got the same request twice, I dont understand the purpose of this for loop.
对不起,我在之前的回答中没有清理干净。这只是我脑子里的一个临时想法,我最后忘了删除 :D
只需将其从您的代码中删除
// allOf() only accepts arrays, so the List needed to be converted /* The code never gets over this part (I know allOf() is a blocking call), even long after when the receiver got the HTTP request
with the correct payload. I'm not sure yet where exactly the code gets stuck */
您的地图应该是 ConcurrentHashMap
因为您稍后要同时修改它。
Map<String, Boolean> result = new ConcurrentHashMap<>();
如果您的代码仍然无法按预期工作,我建议删除 parallelStream()
部分。
CompletableFuture
和 parallelStream
使用公共 forkjoin 池。我认为游泳池已经用完了。
并且您应该为您的 CompletableFuture
:
Executor pool = Executors.newFixedThreadPool(10);
并使用该池执行您的请求:
CompletableFuture.supplyAsync(YOURTASK, pool).thenAcceptAsync(Yourtask, pool)
为了完成这里是代码的相关部分,经过清理和测试(感谢 Mạnh Quyết Nguyễn):
休息控制器class:
@POST
@Path("publish")
public Response publishEvent(PublishEvent eventPublished) {
/*
Payload verification, etc.
*/
//First send the event to the right subscribers, then send the resulting hashmap<String url, Boolean subscriberGotTheRequest> back to the publisher
CompletableFuture.supplyAsync(() -> EventHandlerService.propagateEvent(eventPublished)).thenAccept(map -> {
if (eventPublished.getDeliveryCompleteUri() != null) {
String callbackUrl = Utility
.getUri(eventPublished.getSource().getAddress(), eventPublished.getSource().getPort(), eventPublished.getDeliveryCompleteUri(), isSecure,
false);
try {
Utility.sendRequest(callbackUrl, "POST", map);
} catch (RuntimeException e) {
log.error("Callback after event publishing failed at: " + callbackUrl);
e.printStackTrace();
}
}
});
//return OK while the event publishing happens in async
return Response.status(Status.OK).build();
}
服务class:
private static List<EventFilter> getMatchingEventFilters(PublishEvent pe) {
//query the database, filter the results based on the method argument
}
private static boolean sendRequest(String url, Event event) {
//send the HTTP request to the given URL, with the given Event payload, return true if the response is positive (status code starts with 2), false otherwise
}
static Map<String, Boolean> propagateEvent(PublishEvent eventPublished) {
// Get the event relevant filters from the DB
List<EventFilter> filters = getMatchingEventFilters(eventPublished);
// Create the URLs from the filters
List<String> urls = new ArrayList<>();
for (EventFilter filter : filters) {
String url;
try {
boolean isSecure = filter.getConsumer().getAuthenticationInfo() != null;
url = Utility.getUri(filter.getConsumer().getAddress(), filter.getPort(), filter.getNotifyUri(), isSecure, false);
} catch (ArrowheadException | NullPointerException e) {
e.printStackTrace();
continue;
}
urls.add(url);
}
Map<String, Boolean> result = new ConcurrentHashMap<>();
Stream<CompletableFuture> stream = urls.stream().map(url -> CompletableFuture.supplyAsync(() -> sendRequest(url, eventPublished.getEvent()))
.thenAcceptAsync(published -> result.put(url, published)));
CompletableFuture.allOf(stream.toArray(CompletableFuture[]::new)).join();
log.info("Event published to " + urls.size() + " subscribers.");
return result;
}
调试这个比平时更难一些,有时代码会神奇地停止。为了解决这个问题,我只将代码部分放入绝对必要的异步任务中,并且我确保任务中的代码使用线程安全的东西。另外我一开始是个笨蛋,我在 EventHandlerService.class
里面的方法使用了 synchronized
关键字,这导致 Service class 方法里面的 CompletableFuture 没有执行,因为它使用了一个默认线程池。
A piece of logic marked with synchronized becomes a synchronized block, allowing only one thread to execute at any given time.