Haskell streaming - 如何将原始流与结果流合并

Haskell streaming - how to merge original stream with result stream

使用 Haskell-streaming,我可以轻松地对流进行分组并计算每组的总和。

>>> S.print $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
[-1,-2]
[3,4,5]
[-6]

>>> S.print $S.map sum $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
-3
12
-6

如何让函数 myfn 生成以顺序敏感方式合并上述两者的流? IE。我希望得到

的结果流
>>> myfn $ each [-1,-2,3,4,5,-6]
-1:> -2:> -3:> 3:> 4:> 5:> 12:> -6:> -6:> ()

解决方案涉及使 mapped 的函数参数同时累加列表 一次计算总和。

这可以通过 store I think, but I find the streaming sinks from foldl 更易于使用来完成。他们的 Applicative 实例让我们可以从更简单的 Fold 构建复合 Fold

foo :: Monad m 
    => (Int -> Int -> Bool) 
    -> Stream (Of Int) m ()
    -> Stream (Of Int) m ()
foo p = 
      flip S.for (\(xs,total) -> S.each xs *> S.yield total)
    . mapped (L.purely S.fold $ (,) <$> L.list <*> L.sum)
    . S.groupBy p

其中 L.purely, L.list and L.sum 来自 "foldl".

最后的润色是从 mapped 中取出每一对 ([Int],Int),并使用 for.

将其替换为子流

投入使用:

*Main> S.print $ foo (\x y -> x*y>0) $ S.each [-1,-2,3,4,5,-6]

编辑: 想想看,之前的解决方案是有缺陷的。我们只对 streamed 结果感兴趣,但我们在将其发送到下游之前使用 S.toListL.list 在内存中累积每个单独的组。但是,如果一组恰好大于机器中的可用内存怎么办?

这里有一个完美流式传输并且对每个组的大小无动于衷的解决方案:

foo :: Monad m 
    => (Int -> Int -> Bool) 
    -> Stream (Of Int) m ()
    -> Stream (Of Int) m ()
foo p = 
      concats
    . S.maps (S.store (\s -> do (total :> r) <- L.purely S.fold L.sum s
                                S.yield total
                                return r))
    . S.groupBy p

发生了什么变化?首先,我们使用 maps instead of mapped,因为现在我们要转换子组流,而不是 return 在基础 monad 中生成结果。

对于每个子组流,我们使用 store 在不破坏流的情况下执行求和折叠。然后我们获取折叠的结果并将其追加回流中,同时还注意按照 maps.

的要求保留原始 return 值

剩下的唯一步骤是使用 concats 重新加入子组。