使用 takeWhile 时 StreamEx 并行性是否应该起作用?
Should StreamEx parallelism work when using takeWhile?
我有一个像这样创建的流:
StreamEx.generate(new MySupplier<List<Entity>>())
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.. // more stuff
我只需添加 parallel
:
即可将其更改为并行工作
StreamEx.generate(new MySupplier<List<Entity>>())
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.parallel()
.. // more stuff
但我还想添加一个takeWhile
条件来停止流:
StreamEx.generate(new MySupplier<List<Entity>>())
.takeWhile(not(List::isEmpty))
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.parallel()
.. // more stuff
但是一旦我添加 takeWhile
流似乎就变成了连续的(至少它只由一个线程处理)。根据 takeWhile
的 javadoc,如果我理解正确,应该使用并行流。是我做错了什么还是设计使然?
与普通 Stream 一样 API 如果某些东西并行工作,并不意味着它工作效率很高。 javadoc 指出:
While this operation is quite cheap for sequential stream, it can be quite expensive on parallel pipelines.
其实你想使用takeWhile
无序流,可以专门优化,但目前没有优化,所以这可以被认为是一个缺陷。我会尝试解决这个问题(我是 StreamEx 的作者)。
更新:已在 0.6.5 版本中修复
我有一个像这样创建的流:
StreamEx.generate(new MySupplier<List<Entity>>())
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.. // more stuff
我只需添加 parallel
:
StreamEx.generate(new MySupplier<List<Entity>>())
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.parallel()
.. // more stuff
但我还想添加一个takeWhile
条件来停止流:
StreamEx.generate(new MySupplier<List<Entity>>())
.takeWhile(not(List::isEmpty))
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.parallel()
.. // more stuff
但是一旦我添加 takeWhile
流似乎就变成了连续的(至少它只由一个线程处理)。根据 takeWhile
的 javadoc,如果我理解正确,应该使用并行流。是我做错了什么还是设计使然?
与普通 Stream 一样 API 如果某些东西并行工作,并不意味着它工作效率很高。 javadoc 指出:
While this operation is quite cheap for sequential stream, it can be quite expensive on parallel pipelines.
其实你想使用takeWhile
无序流,可以专门优化,但目前没有优化,所以这可以被认为是一个缺陷。我会尝试解决这个问题(我是 StreamEx 的作者)。
更新:已在 0.6.5 版本中修复