Haskell streaming - 如何将原始流与结果流合并
Haskell streaming - how to merge original stream with result stream
使用 Haskell-streaming,我可以轻松地对流进行分组并计算每组的总和。
>>> S.print $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
[-1,-2]
[3,4,5]
[-6]
>>> S.print $S.map sum $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
-3
12
-6
如何让函数 myfn
生成以顺序敏感方式合并上述两者的流? IE。我希望得到
的结果流
>>> myfn $ each [-1,-2,3,4,5,-6]
-1:> -2:> -3:> 3:> 4:> 5:> 12:> -6:> -6:> ()
解决方案涉及使 mapped
的函数参数同时累加列表 和 一次计算总和。
这可以通过 store
I think, but I find the streaming sinks from foldl 更易于使用来完成。他们的 Applicative
实例让我们可以从更简单的 Fold
构建复合 Fold
:
foo :: Monad m
=> (Int -> Int -> Bool)
-> Stream (Of Int) m ()
-> Stream (Of Int) m ()
foo p =
flip S.for (\(xs,total) -> S.each xs *> S.yield total)
. mapped (L.purely S.fold $ (,) <$> L.list <*> L.sum)
. S.groupBy p
其中 L.purely
, L.list
and L.sum
来自 "foldl".
最后的润色是从 mapped
中取出每一对 ([Int],Int)
,并使用 for
.
将其替换为子流
投入使用:
*Main> S.print $ foo (\x y -> x*y>0) $ S.each [-1,-2,3,4,5,-6]
编辑: 想想看,之前的解决方案是有缺陷的。我们只对 streamed 结果感兴趣,但我们在将其发送到下游之前使用 S.toList
或 L.list
在内存中累积每个单独的组。但是,如果一组恰好大于机器中的可用内存怎么办?
这里有一个完美流式传输并且对每个组的大小无动于衷的解决方案:
foo :: Monad m
=> (Int -> Int -> Bool)
-> Stream (Of Int) m ()
-> Stream (Of Int) m ()
foo p =
concats
. S.maps (S.store (\s -> do (total :> r) <- L.purely S.fold L.sum s
S.yield total
return r))
. S.groupBy p
发生了什么变化?首先,我们使用 maps
instead of mapped
,因为现在我们要转换子组流,而不是 return 在基础 monad 中生成结果。
对于每个子组流,我们使用 store
在不破坏流的情况下执行求和折叠。然后我们获取折叠的结果并将其追加回流中,同时还注意按照 maps
.
的要求保留原始 return 值
剩下的唯一步骤是使用 concats
重新加入子组。
使用 Haskell-streaming,我可以轻松地对流进行分组并计算每组的总和。
>>> S.print $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
[-1,-2]
[3,4,5]
[-6]
>>> S.print $S.map sum $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
-3
12
-6
如何让函数 myfn
生成以顺序敏感方式合并上述两者的流? IE。我希望得到
>>> myfn $ each [-1,-2,3,4,5,-6]
-1:> -2:> -3:> 3:> 4:> 5:> 12:> -6:> -6:> ()
解决方案涉及使 mapped
的函数参数同时累加列表 和 一次计算总和。
这可以通过 store
I think, but I find the streaming sinks from foldl 更易于使用来完成。他们的 Applicative
实例让我们可以从更简单的 Fold
构建复合 Fold
:
foo :: Monad m
=> (Int -> Int -> Bool)
-> Stream (Of Int) m ()
-> Stream (Of Int) m ()
foo p =
flip S.for (\(xs,total) -> S.each xs *> S.yield total)
. mapped (L.purely S.fold $ (,) <$> L.list <*> L.sum)
. S.groupBy p
其中 L.purely
, L.list
and L.sum
来自 "foldl".
最后的润色是从 mapped
中取出每一对 ([Int],Int)
,并使用 for
.
投入使用:
*Main> S.print $ foo (\x y -> x*y>0) $ S.each [-1,-2,3,4,5,-6]
编辑: 想想看,之前的解决方案是有缺陷的。我们只对 streamed 结果感兴趣,但我们在将其发送到下游之前使用 S.toList
或 L.list
在内存中累积每个单独的组。但是,如果一组恰好大于机器中的可用内存怎么办?
这里有一个完美流式传输并且对每个组的大小无动于衷的解决方案:
foo :: Monad m
=> (Int -> Int -> Bool)
-> Stream (Of Int) m ()
-> Stream (Of Int) m ()
foo p =
concats
. S.maps (S.store (\s -> do (total :> r) <- L.purely S.fold L.sum s
S.yield total
return r))
. S.groupBy p
发生了什么变化?首先,我们使用 maps
instead of mapped
,因为现在我们要转换子组流,而不是 return 在基础 monad 中生成结果。
对于每个子组流,我们使用 store
在不破坏流的情况下执行求和折叠。然后我们获取折叠的结果并将其追加回流中,同时还注意按照 maps
.
剩下的唯一步骤是使用 concats
重新加入子组。