我们如何将 play.libs.concurrent.HttpExecutionContext 传递给 Java 8 中的并行流?

How can we pass play.libs.concurrent.HttpExecutionContext to parallel stream in Java 8?

在 play 框架项目控制器中,我正在使用 forEach() 处理一个对象列表,它运行良好。

List<Post> posts = repository.getPosts();
posts.forEach(post -> {
    //...some processing

    anyFunc(); //<-- internally uses HttpExecutionContext

    //...further processing
});

但是当我尝试使用 parallelStream() 并行处理这些对象列表以提高性能时,我在并行流中丢失了 HttpExecutionContext 实例。

List<Post> posts = repository.getPosts();
posts.parallelStream().forEach(post -> {
    //...some processing

    anyFunc(); //<-- not able to use HttpExecutionContext now

    //...further processing
});

我无法将 HttpExecutionContext 作为参数传递给 anyFunc。 有什么方法可以 pass/set parallelStream() 中的 HttpExecutionContext?

使用HttpExecutionContext.execute

public class HomeController extends Controller {
    @Inject HttpExecutionContext ec;

    public Result index() {
        // The data to parallel processing
        List<String> list = List.of("Item 1", "Item 2", "Item 3","Item 4", "Item 5", "Item 6", "Item 7", "Item 8");

        // Make a Stream. The `parallelStream` is not used because 
        // `current.execute` will make it run in parallel.
        Stream<String> listInParralel = list.stream(); 

        // The current executor with the HTTP context. 
        Executor current = ec.current();

        System.out.println("START");
        listInParralel.forEach(item -> {
          current.execute(()-> {
            // request().uri() internally uses HttpExecutionContext
            System.out.println("item: " + item + " in "  +  request().uri()  + "(" + Thread.currentThread().getName() + ")");
          });
        });

        // Results
        /*
        START
        item: Item 7 in /(application-akka.actor.default-dispatcher-9)
        item: Item 5 in /(application-akka.actor.default-dispatcher-7)
        item: Item 3 in /(application-akka.actor.default-dispatcher-5)
        item: Item 1 in /(application-akka.actor.default-dispatcher-6)
        item: Item 6 in /(application-akka.actor.default-dispatcher-8)
        item: Item 4 in /(application-akka.actor.default-dispatcher-2)
        item: Item 2 in /(application-akka.actor.default-dispatcher-4)
        item: Item 8 in /(application-akka.actor.default-dispatcher-9)
        */

        return ok("Done");
    }

}

不过,我更喜欢缓存 HTTP 数据,然后在并行处理中使用它们。不喜欢打扰 HttpExecutionContext:

public class HomeController extends Controller {
    @Inject HttpExecutionContext ec;

    public Result index() {
        // The data to parallel processing
        List<String> list = List.of("Item 1", "Item 2", "Item 3","Item 4", "Item 5", "Item 6", "Item 7", "Item 8");
        Stream<String> listInParralel = list.parallelStream(); 

        // Take all that you need from the HttpExecutionContext.  
        String uri = request().uri();

        System.out.println("START");
        listInParralel.forEach(item -> {
            // use pre cached HTTP context data, liek `uri`
            System.out.println("item: " + item + " in "  +  uri  + "(" + Thread.currentThread().getName() + ")");
        });

        // Results
        /*
        START
        item: Item 1 in /(ForkJoinPool.commonPool-worker-7)
        item: Item 8 in /(ForkJoinPool.commonPool-worker-3)
        item: Item 7 in /(ForkJoinPool.commonPool-worker-15)
        item: Item 4 in /(ForkJoinPool.commonPool-worker-9)
        item: Item 3 in /(ForkJoinPool.commonPool-worker-13)
        item: Item 2 in /(ForkJoinPool.commonPool-worker-5)
        item: Item 5 in /(ForkJoinPool.commonPool-worker-11)
        item: Item 6 in /(application-akka.actor.default-dispatcher-4)
        */

        return ok("Done");
    }

}