Spliterator 并行排序顺序
Spliterator parallel sort order
我正在实现一个分页拆分器(在 Java 中),它应该允许并行访问。
我有以下测试用例(测试在 Groovy 中与 Spock):
def 'parallel, two pages'()
{
when: 'a sorted range from 0 to 6'
def fetcher = new IntegerRangePageFetcher(6)
and: 'a spliterator with a page size of 5'
def spliterator = new PagedSpliterator(fetcher, 5)
and: 'a stream with the given range is collected to a list'
def result = StreamSupport
.stream(spliterator, true)
.collect(Collectors.toList())
then: 'the sort order is obeyed'
expect result, contains(0, 1, 2, 3, 4, 5)
}
此测试用例失败并出现以下错误:
Condition not satisfied:
expect result, contains(0, 1, 2, 3, 4, 5)
| |
false [5, 0, 1, 2, 3, 4]
Expected: iterable containing [<0>, <1>, <2>, <3>, <4>, <5>]
but: item 0: was <5>
拆分器具有 characteristics()
return IMMUTABLE | ORDERED | SIZED | SUBSIZED | NONNULL;
代码在我不使用并行时有效。所以我不明白 ORDERED
:
- 如果设置了,流框架是否应该保证顺序,并且在使用并行生成的块时应该对结果进行排序?如果是,为什么不对我的情况进行排序?
- 或者我的
trySplit
实现是否有错误,必须按照给定的顺序拆分? (目前我在打开的页面中间拆分,0-mid 留在当前拆分器,mid-end 进入新创建的拆分器)
- 或者我应该在
collect()
之前调用 sort()
因为框架根本不保证任何顺序?
--- 根据反馈更新---
感谢您的回答,我的代码中有两个逻辑错误。首先请求的片段:
@Override
public Spliterator<T> trySplit()
{
// first query
if (pageIterator == null) {
pageIterator = pageFetcher.fetchNextPage(paginationInfo);
}
// delegate split decision
var newPaginationInfo = paginationInfo.split();
if (newPaginationInfo == null) {
log.info("* Spliterator returns null");
return null;
}
// now we split
var newSpliterator = new PagedSpliterator<>(pageFetcher, newPaginationInfo);
return newSpliterator;
}
public PaginationInfo split()
{
// when open range or nothing left we don't split
if ((endElementIndex == -1) || !hasNextPage()) {
return null;
}
// calculate the splitting position
var firstHalfPages = (getEndPageIndex() - getNextPageIndex()) / 2;
var midElementIndex = (getNextPageIndex() + firstHalfPages) * pageSize;
// create an additional PaginationInfo and set the ranges according to the split position
var newPaginationInfo = new PaginationInfo(this);
newPaginationInfo.firstElementOnNextPageIndex = midElementIndex;
newPaginationInfo.nextElementIndex = midElementIndex;
endElementIndex = midElementIndex;
return newPaginationInfo;
}
第一个错误:
新创建的 Spliterator 设置为后半范围而不是第一个。我在文档中读到了前缀,但对我来说感觉很笨拙。我按页面大小拆分以具有多个并行请求。一开始(第一个 spliterator 实例)我必须获取第一页以获取页面和元素计数器。所以为了解决顺序问题,我必须把从第一个拆分器获取的数据分发给第二个拆分器来遵守顺序,这对我来说感觉很奇怪而且不直观。
第二个错误:
// first query
if (pageIterator == null) {
pageIterator = pageFetcher.fetchNextPage(paginationInfo);
}
所有后续创建的拆分器将收到来自框架的 estimateSize()
和 trySplit()
调用。在此调用期间,目前我获取了一个页面,但这会阻止并行性,获取必须在 tryAdvance()
调用的后期进行。
我会实施这些更改,然后再回复您。
是的,您的 trySplit 中存在错误。 Spliterator.trySplit 的文档指定如果您具有 ORDERED 特征,则返回的拆分器必须包含元素的前缀。切换返回的 Spliterator 和拆分器的剩余内容。
来自 trySplit
的文档:
If this Spliterator is ORDERED, the returned Spliterator must cover a strict prefix of the elements.
您的实施:
... 0-mid stays at the current spliterator, mid-end goes in the newly created spliterator
您可以从这里连接正确的点。
我正在实现一个分页拆分器(在 Java 中),它应该允许并行访问。
我有以下测试用例(测试在 Groovy 中与 Spock):
def 'parallel, two pages'()
{
when: 'a sorted range from 0 to 6'
def fetcher = new IntegerRangePageFetcher(6)
and: 'a spliterator with a page size of 5'
def spliterator = new PagedSpliterator(fetcher, 5)
and: 'a stream with the given range is collected to a list'
def result = StreamSupport
.stream(spliterator, true)
.collect(Collectors.toList())
then: 'the sort order is obeyed'
expect result, contains(0, 1, 2, 3, 4, 5)
}
此测试用例失败并出现以下错误:
Condition not satisfied:
expect result, contains(0, 1, 2, 3, 4, 5)
| |
false [5, 0, 1, 2, 3, 4]
Expected: iterable containing [<0>, <1>, <2>, <3>, <4>, <5>]
but: item 0: was <5>
拆分器具有 characteristics()
return IMMUTABLE | ORDERED | SIZED | SUBSIZED | NONNULL;
代码在我不使用并行时有效。所以我不明白 ORDERED
:
- 如果设置了,流框架是否应该保证顺序,并且在使用并行生成的块时应该对结果进行排序?如果是,为什么不对我的情况进行排序?
- 或者我的
trySplit
实现是否有错误,必须按照给定的顺序拆分? (目前我在打开的页面中间拆分,0-mid 留在当前拆分器,mid-end 进入新创建的拆分器) - 或者我应该在
collect()
之前调用sort()
因为框架根本不保证任何顺序?
--- 根据反馈更新---
感谢您的回答,我的代码中有两个逻辑错误。首先请求的片段:
@Override
public Spliterator<T> trySplit()
{
// first query
if (pageIterator == null) {
pageIterator = pageFetcher.fetchNextPage(paginationInfo);
}
// delegate split decision
var newPaginationInfo = paginationInfo.split();
if (newPaginationInfo == null) {
log.info("* Spliterator returns null");
return null;
}
// now we split
var newSpliterator = new PagedSpliterator<>(pageFetcher, newPaginationInfo);
return newSpliterator;
}
public PaginationInfo split()
{
// when open range or nothing left we don't split
if ((endElementIndex == -1) || !hasNextPage()) {
return null;
}
// calculate the splitting position
var firstHalfPages = (getEndPageIndex() - getNextPageIndex()) / 2;
var midElementIndex = (getNextPageIndex() + firstHalfPages) * pageSize;
// create an additional PaginationInfo and set the ranges according to the split position
var newPaginationInfo = new PaginationInfo(this);
newPaginationInfo.firstElementOnNextPageIndex = midElementIndex;
newPaginationInfo.nextElementIndex = midElementIndex;
endElementIndex = midElementIndex;
return newPaginationInfo;
}
第一个错误:
新创建的 Spliterator 设置为后半范围而不是第一个。我在文档中读到了前缀,但对我来说感觉很笨拙。我按页面大小拆分以具有多个并行请求。一开始(第一个 spliterator 实例)我必须获取第一页以获取页面和元素计数器。所以为了解决顺序问题,我必须把从第一个拆分器获取的数据分发给第二个拆分器来遵守顺序,这对我来说感觉很奇怪而且不直观。
第二个错误:
// first query
if (pageIterator == null) {
pageIterator = pageFetcher.fetchNextPage(paginationInfo);
}
所有后续创建的拆分器将收到来自框架的 estimateSize()
和 trySplit()
调用。在此调用期间,目前我获取了一个页面,但这会阻止并行性,获取必须在 tryAdvance()
调用的后期进行。
我会实施这些更改,然后再回复您。
是的,您的 trySplit 中存在错误。 Spliterator.trySplit 的文档指定如果您具有 ORDERED 特征,则返回的拆分器必须包含元素的前缀。切换返回的 Spliterator 和拆分器的剩余内容。
来自 trySplit
的文档:
If this Spliterator is ORDERED, the returned Spliterator must cover a strict prefix of the elements.
您的实施:
... 0-mid stays at the current spliterator, mid-end goes in the newly created spliterator
您可以从这里连接正确的点。