Java 8 中的异步方法后跟并行执行的方法

Async method followed by a parallelly executed method in Java 8

在花了一天时间学习 java 并发 API 之后,我仍然不太明白如何在 CompletableFuture 和 ExecutorService 类 的帮助下创建以下功能:

当我在我的 REST 端点上收到请求时,我需要:

  1. 启动一个异步任务(包括DB查询,过滤等),最后会给我一个StringURL的列表
  2. 与此同时,用 HTTP OK 回复 REST 调用者,表明已收到请求,我正在处理它
  3. 异步任务完成后,我需要将 HTTP 请求(带有负载,REST 调用者给我)发送到我从作业中获得的 URLs。最多 URL 的数量大约为 100,因此我需要这些并行发生。
  4. 理想情况下,我有一些同步计数器来计算有多少 http 请求是 success/fail,我可以将此信息发送回 REST 调用者(URL 我需要发送它请求负载中提供了返回)。

我已经为这些编写了构建块(方法如:getMatchingObjectsFromDB(callerPayload)、getURLs(resultOfgetMachingObjects)、sendHttpRequest(Url、methodType) 等...) ,我只是不太明白如何将第 1 步和第 3 步联系在一起。我会在第 1 步使用 CompletableFuture.supplyAsync(),然后我需要 CompletableFuture.thenComponse 方法来开始第 3 步,但我不清楚如何使用此 API 实现并行性。不过,使用 ExecutorService executor = Executors.newWorkStealingPool(); 非常直观,它根据可用的处理能力创建线程池,并且可以通过 invokeAll() 方法提交任务。

如何同时使用 CompletableFutureExecutorService?或者我如何保证使用 CompletableFuture 并行执行任务列表?演示代码片段将不胜感激。谢谢。

您应该使用 join() 等待所有线程完成。

创建 Map<String, Boolean> result 来存储您的请求结果。

在您的控制器中:

public void yourControllerMethod() {

  CompletableFuture.runAsync(() -> yourServiceMethod());
}

为您服务:

// Execute your logic to get List<String> urls

List<CompletableFuture> futures = urls.stream().map(v -> 
 CompletableFuture.supplyAsync(url -> requestUrl(url))
           .thenAcceptAsync(requestResult -> result.put(url, true or false))
).collect(toList()); // You have list of completeable future here

然后使用.join()等待所有线程(记住你的服务已经在它自己的线程中执行)

CompletableFuture.allOf(futures).join();

然后你可以通过访问result地图

来确定是哪一个success/fail

编辑

请post你的程序代码,以便其他人也能理解你。

我已经阅读了您的代码,这里是需要修改的地方:

When this for loop was not commented out, the receiver webserver got the same request twice, I dont understand the purpose of this for loop.

对不起,我在之前的回答中没有清理干净。这只是我脑子里的一个临时想法,我最后忘了删除 :D

只需将其从您的代码中删除

// allOf() only accepts arrays, so the List needed to be converted /* The code never gets over this part (I know allOf() is a blocking call), even long after when the receiver got the HTTP request

   with the correct payload. I'm not sure yet where exactly the code gets stuck */

您的地图应该是 ConcurrentHashMap 因为您稍后要同时修改它。

Map<String, Boolean> result = new ConcurrentHashMap<>();

如果您的代码仍然无法按预期工作,我建议删除 parallelStream() 部分。

CompletableFutureparallelStream 使用公共 forkjoin 池。我认为游泳池已经用完了。

并且您应该为您的 CompletableFuture:

创建自己的池
Executor pool = Executors.newFixedThreadPool(10);

并使用该池执行您的请求:

CompletableFuture.supplyAsync(YOURTASK, pool).thenAcceptAsync(Yourtask, pool)

为了完成这里是代码的相关部分,经过清理和测试(感谢 Mạnh Quyết Nguyễn):

休息控制器class:

@POST
@Path("publish")
public Response publishEvent(PublishEvent eventPublished) {
    /*
        Payload verification, etc.
    */

    //First send the event to the right subscribers, then send the resulting hashmap<String url, Boolean subscriberGotTheRequest> back to the publisher
    CompletableFuture.supplyAsync(() -> EventHandlerService.propagateEvent(eventPublished)).thenAccept(map -> {
      if (eventPublished.getDeliveryCompleteUri() != null) {
        String callbackUrl = Utility
            .getUri(eventPublished.getSource().getAddress(), eventPublished.getSource().getPort(), eventPublished.getDeliveryCompleteUri(), isSecure,
                    false);
        try {
          Utility.sendRequest(callbackUrl, "POST", map);
        } catch (RuntimeException e) {
          log.error("Callback after event publishing failed at: " + callbackUrl);
          e.printStackTrace();
        }
      }
    });

    //return OK while the event publishing happens in async
    return Response.status(Status.OK).build();
}

服务class:

private static List<EventFilter> getMatchingEventFilters(PublishEvent pe) {
    //query the database, filter the results based on the method argument
}

private static boolean sendRequest(String url, Event event) {
    //send the HTTP request to the given URL, with the given Event payload, return true if the response is positive (status code starts with 2), false otherwise
}

static Map<String, Boolean> propagateEvent(PublishEvent eventPublished) {
    // Get the event relevant filters from the DB
    List<EventFilter> filters = getMatchingEventFilters(eventPublished);
    // Create the URLs from the filters
    List<String> urls = new ArrayList<>();
    for (EventFilter filter : filters) {
      String url;
      try {
        boolean isSecure = filter.getConsumer().getAuthenticationInfo() != null;
        url = Utility.getUri(filter.getConsumer().getAddress(), filter.getPort(), filter.getNotifyUri(), isSecure, false);
      } catch (ArrowheadException | NullPointerException e) {
        e.printStackTrace();
        continue;
      }
      urls.add(url);
    }

    Map<String, Boolean> result = new ConcurrentHashMap<>();
    Stream<CompletableFuture> stream = urls.stream().map(url -> CompletableFuture.supplyAsync(() -> sendRequest(url, eventPublished.getEvent()))
                                                                                 .thenAcceptAsync(published -> result.put(url, published)));
    CompletableFuture.allOf(stream.toArray(CompletableFuture[]::new)).join();
    log.info("Event published to " + urls.size() + " subscribers.");
    return result;
}

调试这个比平时更难一些,有时代码会神奇地停止。为了解决这个问题,我只将代码部分放入绝对必要的异步任务中,并且我确保任务中的代码使用线程安全的东西。另外我一开始是个笨蛋,我在 EventHandlerService.class 里面的方法使用了 synchronized 关键字,这导致 Service class 方法里面的 CompletableFuture 没有执行,因为它使用了一个默认线程池。

A piece of logic marked with synchronized becomes a synchronized block, allowing only one thread to execute at any given time.