并行流排序时遇到Order wrong
Encounter Order wrong when sorting a parallel stream
我有一个 Record
class:
public class Record implements Comparable<Record>
{
private String myCategory1;
private int myCategory2;
private String myCategory3;
private String myCategory4;
private int myValue1;
private double myValue2;
public Record(String category1, int category2, String category3, String category4,
int value1, double value2)
{
myCategory1 = category1;
myCategory2 = category2;
myCategory3 = category3;
myCategory4 = category4;
myValue1 = value1;
myValue2 = value2;
}
// Getters here
}
我创建了一个包含很多记录的大列表。只有第二个和第五个值 i / 10000
和 i
稍后分别由 getter getCategory2()
和 getValue1()
使用。
List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}
请注意,前 10,000 条记录的 category2
为 0
,然后接下来的 10,000 条记录为 1
,依此类推,而 value1
值依次为 0-114999 .
我创建了一个 Stream
,它既是 parallel
又是 sorted
。
Stream<Record> stream = list.stream()
.parallel()
.sorted(
//(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
)
//.parallel()
;
我有一个 ForkJoinPool
维护 8
个线程,这是我 PC 上的内核数。
ForkJoinPool pool = new ForkJoinPool(8);
我用这个技巧described here to submit a stream processing task to my own ForkJoinPool
instead of the common ForkJoinPool
。
List<Record> output = pool.submit(() ->
stream.collect(Collectors.toList()
)).get();
我预计并行 sorted
操作会遵守流的遇到顺序,并且它会是 稳定的 排序,因为 Spliterator
ArrayList
返回的是 ORDERED
.
但是,按顺序打印结果 List
output
的元素的简单代码表明情况并非如此。
for (Record record : output)
{
System.out.println(record.getValue1());
}
输出,压缩:
0
1
2
3
...
69996
69997
69998
69999
71875 // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000 // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062 // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999
output
的 size()
是 115000
,所有元素似乎都在那里,只是顺序略有不同。
所以我写了一些检查代码来查看 sort
是否稳定。如果它是稳定的,那么所有 value1
值都应该保持有序。此代码验证订单,打印任何差异。
int prev = -1;
boolean verified = true;
for (Record record : output)
{
int curr = record.getValue1();
if (prev != -1)
{
if (prev + 1 != curr)
{
System.out.println("Warning: " + prev + " followed by " + curr + "!");
verified = false;
}
}
prev = curr;
}
System.out.println("Verified: " + verified);
输出:
Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false
如果我执行以下任一操作,这种情况仍然存在:
将 ForkJoinPool
替换为 ThreadPoolExecutor
。
ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
直接处理Stream
,使用普通的ForkJoinPool
。
List<Record> output = stream.collect(Collectors.toList());
在之后调用parallel()
我调用sorted
.
Stream<Record> stream = list.stream().sorted().parallel();
调用 parallelStream()
而不是 stream().parallel()
。
Stream<Record> stream = list.parallelStream().sorted();
使用 Comparator
排序。注意这个排序标准和我定义的Comparable
接口的"natural"顺序是不一样的,虽然从头开始的结果已经是有序的,但结果应该还是一样的。
Stream<Record> stream = list.stream().parallel().sorted(
(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
);
如果我不在 Stream
上执行以下操作之一,我只能得到它来保留相遇顺序:
- 不要打电话给
parallel()
。
- 不要调用
sorted
的任何重载。
有趣的是,没有排序的 parallel()
保留了顺序。
以上两种情况,输出都是:
Verified: true
我的 Java 版本是 1.8.0_05。这个异常也是occurs on Ideone,好像是运行 Java 8u25.
更新
我已经将 JDK 升级到撰写本文时的最新版本 1.8。0_45,问题没有改变。
问题
结果 List
(output
) 中的记录顺序是否乱序,因为排序不稳定,因为遇到顺序未保留,或其他原因?
如何确保在创建并行流并对其排序时保留遇到顺序?
看起来 Arrays.parallelSort
在某些情况下不稳定。好眼力。流并行排序是根据 Arrays.parallelSort
实现的,因此它也会影响流。这是一个简化的例子:
public class StableSortBug {
static final int SIZE = 50_000;
static class Record implements Comparable<Record> {
final int sortVal;
final int seqNum;
Record(int i1, int i2) { sortVal = i1; seqNum = i2; }
@Override
public int compareTo(Record other) {
return Integer.compare(this.sortVal, other.sortVal);
}
}
static Record[] genArray() {
Record[] array = new Record[SIZE];
Arrays.setAll(array, i -> new Record(i / 10_000, i));
return array;
}
static boolean verify(Record[] array) {
return IntStream.range(1, array.length)
.allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum);
}
public static void main(String[] args) {
Record[] array = genArray();
System.out.println(verify(array));
Arrays.sort(array);
System.out.println(verify(array));
Arrays.parallelSort(array);
System.out.println(verify(array));
}
}
在我的机器上(2 个核心 x 2 个线程)打印如下:
true
true
false
当然要打印3次true
。这是在当前 JDK 9 个开发版本上。考虑到您的尝试,如果它出现在迄今为止的所有 JDK 8 个版本中,我不会感到惊讶。奇怪的是,减小大小或除数会改变行为。 20,000 的大小和 10,000 的除数是稳定的,50,000 的大小和 1,000 的除数也是稳定的。看起来问题与足够大的 运行 值比较相等与平行分割大小有关。
OpenJDK 问题 JDK-8076446 涵盖了这个错误。
我有一个 Record
class:
public class Record implements Comparable<Record>
{
private String myCategory1;
private int myCategory2;
private String myCategory3;
private String myCategory4;
private int myValue1;
private double myValue2;
public Record(String category1, int category2, String category3, String category4,
int value1, double value2)
{
myCategory1 = category1;
myCategory2 = category2;
myCategory3 = category3;
myCategory4 = category4;
myValue1 = value1;
myValue2 = value2;
}
// Getters here
}
我创建了一个包含很多记录的大列表。只有第二个和第五个值 i / 10000
和 i
稍后分别由 getter getCategory2()
和 getValue1()
使用。
List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}
请注意,前 10,000 条记录的 category2
为 0
,然后接下来的 10,000 条记录为 1
,依此类推,而 value1
值依次为 0-114999 .
我创建了一个 Stream
,它既是 parallel
又是 sorted
。
Stream<Record> stream = list.stream()
.parallel()
.sorted(
//(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
)
//.parallel()
;
我有一个 ForkJoinPool
维护 8
个线程,这是我 PC 上的内核数。
ForkJoinPool pool = new ForkJoinPool(8);
我用这个技巧described here to submit a stream processing task to my own ForkJoinPool
instead of the common ForkJoinPool
。
List<Record> output = pool.submit(() ->
stream.collect(Collectors.toList()
)).get();
我预计并行 sorted
操作会遵守流的遇到顺序,并且它会是 稳定的 排序,因为 Spliterator
ArrayList
返回的是 ORDERED
.
但是,按顺序打印结果 List
output
的元素的简单代码表明情况并非如此。
for (Record record : output)
{
System.out.println(record.getValue1());
}
输出,压缩:
0
1
2
3
...
69996
69997
69998
69999
71875 // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000 // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062 // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999
output
的 size()
是 115000
,所有元素似乎都在那里,只是顺序略有不同。
所以我写了一些检查代码来查看 sort
是否稳定。如果它是稳定的,那么所有 value1
值都应该保持有序。此代码验证订单,打印任何差异。
int prev = -1;
boolean verified = true;
for (Record record : output)
{
int curr = record.getValue1();
if (prev != -1)
{
if (prev + 1 != curr)
{
System.out.println("Warning: " + prev + " followed by " + curr + "!");
verified = false;
}
}
prev = curr;
}
System.out.println("Verified: " + verified);
输出:
Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false
如果我执行以下任一操作,这种情况仍然存在:
将
ForkJoinPool
替换为ThreadPoolExecutor
。ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
直接处理
Stream
,使用普通的ForkJoinPool
。List<Record> output = stream.collect(Collectors.toList());
在之后调用
parallel()
我调用sorted
.Stream<Record> stream = list.stream().sorted().parallel();
调用
parallelStream()
而不是stream().parallel()
。Stream<Record> stream = list.parallelStream().sorted();
使用
Comparator
排序。注意这个排序标准和我定义的Comparable
接口的"natural"顺序是不一样的,虽然从头开始的结果已经是有序的,但结果应该还是一样的。Stream<Record> stream = list.stream().parallel().sorted( (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2()) );
如果我不在 Stream
上执行以下操作之一,我只能得到它来保留相遇顺序:
- 不要打电话给
parallel()
。 - 不要调用
sorted
的任何重载。
有趣的是,没有排序的 parallel()
保留了顺序。
以上两种情况,输出都是:
Verified: true
我的 Java 版本是 1.8.0_05。这个异常也是occurs on Ideone,好像是运行 Java 8u25.
更新
我已经将 JDK 升级到撰写本文时的最新版本 1.8。0_45,问题没有改变。
问题
结果 List
(output
) 中的记录顺序是否乱序,因为排序不稳定,因为遇到顺序未保留,或其他原因?
如何确保在创建并行流并对其排序时保留遇到顺序?
看起来 Arrays.parallelSort
在某些情况下不稳定。好眼力。流并行排序是根据 Arrays.parallelSort
实现的,因此它也会影响流。这是一个简化的例子:
public class StableSortBug {
static final int SIZE = 50_000;
static class Record implements Comparable<Record> {
final int sortVal;
final int seqNum;
Record(int i1, int i2) { sortVal = i1; seqNum = i2; }
@Override
public int compareTo(Record other) {
return Integer.compare(this.sortVal, other.sortVal);
}
}
static Record[] genArray() {
Record[] array = new Record[SIZE];
Arrays.setAll(array, i -> new Record(i / 10_000, i));
return array;
}
static boolean verify(Record[] array) {
return IntStream.range(1, array.length)
.allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum);
}
public static void main(String[] args) {
Record[] array = genArray();
System.out.println(verify(array));
Arrays.sort(array);
System.out.println(verify(array));
Arrays.parallelSort(array);
System.out.println(verify(array));
}
}
在我的机器上(2 个核心 x 2 个线程)打印如下:
true
true
false
当然要打印3次true
。这是在当前 JDK 9 个开发版本上。考虑到您的尝试,如果它出现在迄今为止的所有 JDK 8 个版本中,我不会感到惊讶。奇怪的是,减小大小或除数会改变行为。 20,000 的大小和 10,000 的除数是稳定的,50,000 的大小和 1,000 的除数也是稳定的。看起来问题与足够大的 运行 值比较相等与平行分割大小有关。
OpenJDK 问题 JDK-8076446 涵盖了这个错误。