Haskell 流 - 如何在复制后将 1 个流分成 2 个?

Haskell streaming - how to separate 1 stream into 2 after copy?

在haskellstreaming里面有一个例子copy

>>> (S.toList . mapped S.toList . chunksOf 5) $  (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ each [1..10]
[[1,2,3,4,5],[6,7,8,9,10]] :> ([[1,2,3],[4,5,6],[7,8,9],[10]] :> ())

是否可以将其分成两个 "clean" 流,以便打印以下结果?

>>>S.print stream1
[[1,2,3,4,5],[6,7,8,9,10]]
>>>S.print stream2
[[1,2,3],[4,5,6],[7,8,9],[10]]

注意上面的结果中不再有':>'。更一般地说,我不确定 'simplify' 来自 m(Of a) 的嵌套流(或流的流)是否存在 Stream (Of a) m r

部分的函数
f1::Stream (Of a) (Stream (Of b) m) r -> Stream (Of b) m r
f2::Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) m r
f3::Stream (Stream (Of a) m) r -> Stream (Of a) m r

[更新]

这个问题的背景是我正在寻找惯用的方法来多次重用底层流。流是从数据库中提取的,IO 可能很昂贵。我还想获得对中间流的引用,以便更好地构建我的代码。一些模拟代码:

my-stream-fn = do
  original_stream <- pull_from_database 
  let  (o1, s1) = calc_moving_average $ S.copy original_stream
       (o2, s2) = calc_max $ S.copy o1
       (o3, s3) = calc_min $ S.copy o2
  S.print $ S.zipWith3 (\x y z-> (x, y, z)) s1 s2 s3

我希望o1 o2和o3与original_stream和pull_from_database完全一样,IO操作只在original_stream被拉动时完成一次

f1 = S.effects @(Stream (Of _) _) 
  :: Monad m 
  => Stream (Of a) (Stream (Of b) m) r 
  -> Stream (Of b) r
f2 = hoist @(Stream (Of _)) S.effects
  :: Monad m
  => Stream (Of a) (Stream (Of b) m) r
  -> Stream (Of a) m r

(为清楚起见重命名了类型变量,请参阅 effects 的文档),并且 f3 不进行实物检查。

感觉就像你在试图击败流媒体的。您构建管道,从源到接收器,然后 运行 它 - 关键是没有(隐含的)中间值积累。您的问题有点松散,因此无法准确回答,但是如果您希望 运行 第一个流的所有效果,然后是第二个流的所有效果,那么您必须愿意存储(代表的计算the) 第二个流,直到第一个流完成影响 => 你已经积累了第二个流(因此并没有真正流式传输它)。因此,为什么 S.copy 是为交错效果而设计的。比照。 this github issue.

[响应更新]

我认为让您感到困惑的部分原因是您使用的是纯流,并且在没有效果的情况下,限制的动机就不那么明显了。使用管道组件的标识符,而不是部分结果。同样在您的示例中,您应该组合折叠,例如。

import qualified Control.Foldl as L
import qualified Streaming.Prelude as S

myStreamFn =
  let movingAvg n = {-# ... #-}
      combinedAcc = (,,) <$> L.minimum <*> L.maximum <*> movingAvg 10
  in  S.print 
   $  L.purely S.fold combinedAcc 
   $  pullFromDatabase

您可能要考虑的另一个函数是 S.store,例如

myStreamFn 
  = pullFromDatabase
  & S.store S.maximum
  & S.store (L.purely S.fold L.minimum)
  & S.store movingAvg
  & S.print