cyclops-react:ReactiveSeq 上没有批处理功能?

cyclops-react: No batching functions on ReactiveSeq?

使用 cyclops-react 1.0.0-RC3,我尝试使用批处理在 cyclops-react streams user guide 上重新创建示例。我发现 ReactiveSeq 中缺少一些方法,包括 batchBySizewindowByTime.

我确实在 StreamUtils 上找到了这些方法,它们按预期工作,但看起来不像用户指南中的示例那么流畅...

来自用户指南...

// Example 19. Batch by size example
ReactiveSeq.of(1,2,3,4,5, 6)
  .map(n-> n==6? sleep(1) : n)
  .batchBySize(4) // this function seems to be missing...
  .toList()

我可以做什么...

import com.aol.cyclops.control.ReactiveSeq;
// ...
StreamUtils.batchBySize(
    ReactiveSeq.of(1, 2, 3, 4, 5, 6)
        .map(n -> TestUtils.mayBeSlow(n)),
    4)
    .collect(Collectors.toList());

您可以在 testBatchingSlidingWindowing 方法测试 class StreamsTest.java

中的工作 JUnit 中看到我的代码

我应该在 ReactiveSeq 上找到 batchBySizewindowByTime 还是使用 StreamUtils 的正确方式?

改为使用分组。它适用于所有 cyclops-react Traversable 类型(例如 ListX、SetX、QueueX、DequeX、ReactiveSeq 等)。所以你的例子会变成

ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .grouped(4) 
           .toList()

groupedXXX 运算符的作用类似于 batchByXXX 和 windowByXXX,通过扩展的 Collection 类型提供对分组数据的访问,该类型本身具有所有可遍历和可折叠的运算符。

例如加倍,例如群组成员/批处理列表

 ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .grouped(4) 
           .map(list-> list.map(i->i*2))
           .toList() 

您还可以使用 groupedT,其中 returns 一个 ListTransformer。 ListTransformers 允许您像操作未嵌套的结构一样操作嵌套结构。

例如加倍,例如使用 groupedT

的组成员/批处理列表
ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .groupedT(4) 
           .map(i->i*2);

并将 ListTransformer 转换回列表流

ListTSeq<Integer> listT = ReactiveSeq.of(1,2,3,4,5, 6)
                                     .map(n-> n==6? sleep(1) : n)
                                     .groupedT(4);

ReactiveSeq<ListX<Integer>> nested = listT.toNestedListX()
                                          .stream();