为什么使用 Spliterators 创建的流没有被并行处理?
Why stream created with Spliterators is not being processed in parallel?
这可能是非常基本的,但我不是 Java 人。这是我的处理代码,它只是打印和休眠:
private static void myProcessings(int value)
{
System.out.println("Processing " + value);
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Finished processing " + value);
}
现在,这个并行流似乎并行工作了:
IntStream iit = IntStream.rangeClosed(1,3);
iit.parallel().forEach(Main::myProcessings);
// output:
// Processing 2
// Processing 1
// Processing 3
// Finished processing 3
// Finished processing 2
// Finished processing 1
但是这个(由 Iterator 制成)不会:
static class MyIter implements Iterator<Integer>
{
private int max;
private int current;
public MyIter(int maxVal)
{
max = maxVal;
current = 1;
}
@Override
public boolean hasNext()
{
return current <= max;
}
@Override
public Integer next()
{
return current++;
}
}
MyIter it = new MyIter(3);
StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), true)
.forEach(Main::myProcessings);
// output:
// Processing 1
// Finished processing 1
// Processing 2
// Finished processing 2
// Processing 3
// Finished processing 3
我在自定义 Iterator 版本中做错了什么? (我用的是Java8)
一种方法是估计流的大小:
Spliterators.spliterator(it, 3, 0);
数字(此处为 3)不必非常精确,但如果您给出 10000,则实际大小为 3 时将只使用一个线程。如果您给出 10,则将使用多个线程, 即使大小为 3.
估计值(在我的示例中为 3)用于确定批次的大小(在移动到下一个线程之前发送到一个线程的任务数)。如果您提供了一个很大的估计数量并且只提交了几个任务,那么它们可能会全部分组并 运行 在第一个线程上,而不会向第二个线程发送任何内容。
Spliterators.spliteratorUnknownSize()
实施存在缺陷。我在 Java 19 中修复了它,请参阅 JDK-8280915。由于 19-ea+19-1283 早期访问构建问题不再重现,您的代码在没有明确大小规范的情况下正确并行化:
Processing 2
Processing 3
Processing 1
Finished processing 3
Finished processing 1
Finished processing 2
这可能是非常基本的,但我不是 Java 人。这是我的处理代码,它只是打印和休眠:
private static void myProcessings(int value)
{
System.out.println("Processing " + value);
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Finished processing " + value);
}
现在,这个并行流似乎并行工作了:
IntStream iit = IntStream.rangeClosed(1,3);
iit.parallel().forEach(Main::myProcessings);
// output:
// Processing 2
// Processing 1
// Processing 3
// Finished processing 3
// Finished processing 2
// Finished processing 1
但是这个(由 Iterator 制成)不会:
static class MyIter implements Iterator<Integer>
{
private int max;
private int current;
public MyIter(int maxVal)
{
max = maxVal;
current = 1;
}
@Override
public boolean hasNext()
{
return current <= max;
}
@Override
public Integer next()
{
return current++;
}
}
MyIter it = new MyIter(3);
StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), true)
.forEach(Main::myProcessings);
// output:
// Processing 1
// Finished processing 1
// Processing 2
// Finished processing 2
// Processing 3
// Finished processing 3
我在自定义 Iterator 版本中做错了什么? (我用的是Java8)
一种方法是估计流的大小:
Spliterators.spliterator(it, 3, 0);
数字(此处为 3)不必非常精确,但如果您给出 10000,则实际大小为 3 时将只使用一个线程。如果您给出 10,则将使用多个线程, 即使大小为 3.
估计值(在我的示例中为 3)用于确定批次的大小(在移动到下一个线程之前发送到一个线程的任务数)。如果您提供了一个很大的估计数量并且只提交了几个任务,那么它们可能会全部分组并 运行 在第一个线程上,而不会向第二个线程发送任何内容。
Spliterators.spliteratorUnknownSize()
实施存在缺陷。我在 Java 19 中修复了它,请参阅 JDK-8280915。由于 19-ea+19-1283 早期访问构建问题不再重现,您的代码在没有明确大小规范的情况下正确并行化:
Processing 2
Processing 3
Processing 1
Finished processing 3
Finished processing 1
Finished processing 2