java如何为流操作创建线程池
java how to create thread pool for stream operation
我喜欢在使用带有线程池的流时控制线程执行。
目前我有
字符串列表
List<String> mylist = new ArrayList() {"1","2","3","4"}; //that holds the strings
List<Action> actions = new ArrayList<>{} holds function that manipulate the strings from mylist
每个动作都有从 mylist 中获取字符串的工作方法
Stream<String> stream = mylist.parallelStream();
stream = stream.flatMap(s-> actions.stream().map(ac -> ac.work(str)));
r = stream.collect(Collectors.toList());
一切正常,但我无法控制线程池,知道我可以像这样使用 ForkJoinPool
Example
但我没有找到在我的例子中实现它的方法
这例如不起作用:
ForkJoinPool customThreadPool = new ForkJoinPool(4);
r= customThreadPool.submit(
() -> mylist.parallelStream().flatMap(s-> actions.stream().map(ac -> ac.work(str))).collect(Collectors.toList()));
给我错误:
java: incompatible types: no instance(s) of type variable(s) T,R,A,capture#1 of ?,T exist so that java.util.concurrent.ForkJoinTask<T> conforms to java.util.List<java.lang.String>
谢谢
代码编译并运行良好,一旦代码错误被修复 (str
=> s
)。
公共池
// Setup with dummy actions for testing which thread executes the action
List<String> mylist = new ArrayList<>(Arrays.asList("1","2","3","4")); //that holds the strings
List<Action> actions = new ArrayList<>(Arrays.asList(
s -> { s += "x"; System.out.println(Thread.currentThread().getName() + ": " + s); return s; },
s -> { s += "y"; System.out.println(Thread.currentThread().getName() + ": " + s); return s; }
));
// Using common pool
Stream<String> stream = mylist.parallelStream();
stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
List<String> r = stream.collect(Collectors.toList());
System.out.println(r);
输出
ForkJoinPool.commonPool-worker-7: 1x
ForkJoinPool.commonPool-worker-3: 2x
ForkJoinPool.commonPool-worker-3: 2y
main: 3x
ForkJoinPool.commonPool-worker-5: 4x
main: 3y
ForkJoinPool.commonPool-worker-7: 1y
ForkJoinPool.commonPool-worker-5: 4y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]
自定义池
ForkJoinPool customThreadPool = new ForkJoinPool(4);
ForkJoinTask<List<String>> task = customThreadPool.submit(
() -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));
System.out.println(task.get());
如果编译器如问题中所述报错,您需要通过在第 3 行中转换 lambda 表达式来帮助它选择 submit()
的正确重载:
(Callable<List<String>>) () -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));
输出
ForkJoinPool-1-worker-3: 3x
ForkJoinPool-1-worker-1: 1x
ForkJoinPool-1-worker-1: 1y
ForkJoinPool-1-worker-5: 2x
ForkJoinPool-1-worker-7: 4x
ForkJoinPool-1-worker-7: 4y
ForkJoinPool-1-worker-5: 2y
ForkJoinPool-1-worker-3: 3y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]
单线程
Stream<String> stream = mylist.stream();
stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
List<String> r = stream.collect(Collectors.toList());
System.out.println(r);
输出
main: 1x
main: 1y
main: 2x
main: 2y
main: 3x
main: 3y
main: 4x
main: 4y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]
我喜欢在使用带有线程池的流时控制线程执行。 目前我有 字符串列表
List<String> mylist = new ArrayList() {"1","2","3","4"}; //that holds the strings
List<Action> actions = new ArrayList<>{} holds function that manipulate the strings from mylist
每个动作都有从 mylist 中获取字符串的工作方法
Stream<String> stream = mylist.parallelStream();
stream = stream.flatMap(s-> actions.stream().map(ac -> ac.work(str)));
r = stream.collect(Collectors.toList());
一切正常,但我无法控制线程池,知道我可以像这样使用 ForkJoinPool
Example
但我没有找到在我的例子中实现它的方法
这例如不起作用:
ForkJoinPool customThreadPool = new ForkJoinPool(4);
r= customThreadPool.submit(
() -> mylist.parallelStream().flatMap(s-> actions.stream().map(ac -> ac.work(str))).collect(Collectors.toList()));
给我错误:
java: incompatible types: no instance(s) of type variable(s) T,R,A,capture#1 of ?,T exist so that java.util.concurrent.ForkJoinTask<T> conforms to java.util.List<java.lang.String>
谢谢
代码编译并运行良好,一旦代码错误被修复 (str
=> s
)。
公共池
// Setup with dummy actions for testing which thread executes the action
List<String> mylist = new ArrayList<>(Arrays.asList("1","2","3","4")); //that holds the strings
List<Action> actions = new ArrayList<>(Arrays.asList(
s -> { s += "x"; System.out.println(Thread.currentThread().getName() + ": " + s); return s; },
s -> { s += "y"; System.out.println(Thread.currentThread().getName() + ": " + s); return s; }
));
// Using common pool
Stream<String> stream = mylist.parallelStream();
stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
List<String> r = stream.collect(Collectors.toList());
System.out.println(r);
输出
ForkJoinPool.commonPool-worker-7: 1x
ForkJoinPool.commonPool-worker-3: 2x
ForkJoinPool.commonPool-worker-3: 2y
main: 3x
ForkJoinPool.commonPool-worker-5: 4x
main: 3y
ForkJoinPool.commonPool-worker-7: 1y
ForkJoinPool.commonPool-worker-5: 4y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]
自定义池
ForkJoinPool customThreadPool = new ForkJoinPool(4);
ForkJoinTask<List<String>> task = customThreadPool.submit(
() -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));
System.out.println(task.get());
如果编译器如问题中所述报错,您需要通过在第 3 行中转换 lambda 表达式来帮助它选择 submit()
的正确重载:
(Callable<List<String>>) () -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));
输出
ForkJoinPool-1-worker-3: 3x
ForkJoinPool-1-worker-1: 1x
ForkJoinPool-1-worker-1: 1y
ForkJoinPool-1-worker-5: 2x
ForkJoinPool-1-worker-7: 4x
ForkJoinPool-1-worker-7: 4y
ForkJoinPool-1-worker-5: 2y
ForkJoinPool-1-worker-3: 3y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]
单线程
Stream<String> stream = mylist.stream();
stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
List<String> r = stream.collect(Collectors.toList());
System.out.println(r);
输出
main: 1x
main: 1y
main: 2x
main: 2y
main: 3x
main: 3y
main: 4x
main: 4y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]