Haskell 流 - 如何在复制后将 1 个流分成 2 个?
Haskell streaming - how to separate 1 stream into 2 after copy?
在haskellstreaming里面有一个例子copy
>>> (S.toList . mapped S.toList . chunksOf 5) $ (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ each [1..10]
[[1,2,3,4,5],[6,7,8,9,10]] :> ([[1,2,3],[4,5,6],[7,8,9],[10]] :> ())
是否可以将其分成两个 "clean" 流,以便打印以下结果?
>>>S.print stream1
[[1,2,3,4,5],[6,7,8,9,10]]
>>>S.print stream2
[[1,2,3],[4,5,6],[7,8,9],[10]]
注意上面的结果中不再有':>'。更一般地说,我不确定 'simplify' 来自 m
或 (Of a)
的嵌套流(或流的流)是否存在 Stream (Of a) m r
部分的函数
f1::Stream (Of a) (Stream (Of b) m) r -> Stream (Of b) m r
f2::Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) m r
f3::Stream (Stream (Of a) m) r -> Stream (Of a) m r
[更新]
这个问题的背景是我正在寻找惯用的方法来多次重用底层流。流是从数据库中提取的,IO 可能很昂贵。我还想获得对中间流的引用,以便更好地构建我的代码。一些模拟代码:
my-stream-fn = do
original_stream <- pull_from_database
let (o1, s1) = calc_moving_average $ S.copy original_stream
(o2, s2) = calc_max $ S.copy o1
(o3, s3) = calc_min $ S.copy o2
S.print $ S.zipWith3 (\x y z-> (x, y, z)) s1 s2 s3
我希望o1 o2和o3与original_stream和pull_from_database完全一样,IO操作只在original_stream被拉动时完成一次
f1 = S.effects @(Stream (Of _) _)
:: Monad m
=> Stream (Of a) (Stream (Of b) m) r
-> Stream (Of b) r
f2 = hoist @(Stream (Of _)) S.effects
:: Monad m
=> Stream (Of a) (Stream (Of b) m) r
-> Stream (Of a) m r
(为清楚起见重命名了类型变量,请参阅 effects
的文档),并且 f3
不进行实物检查。
感觉就像你在试图击败流媒体的点。您构建管道,从源到接收器,然后 运行 它 - 关键是没有(隐含的)中间值积累。您的问题有点松散,因此无法准确回答,但是如果您希望 运行 第一个流的所有效果,然后是第二个流的所有效果,那么您必须愿意存储(代表的计算the) 第二个流,直到第一个流完成影响 => 你已经积累了第二个流(因此并没有真正流式传输它)。因此,为什么 S.copy
是为交错效果而设计的。比照。 this github issue.
[响应更新]
我认为让您感到困惑的部分原因是您使用的是纯流,并且在没有效果的情况下,限制的动机就不那么明显了。使用管道组件的标识符,而不是部分结果。同样在您的示例中,您应该组合折叠,例如。
import qualified Control.Foldl as L
import qualified Streaming.Prelude as S
myStreamFn =
let movingAvg n = {-# ... #-}
combinedAcc = (,,) <$> L.minimum <*> L.maximum <*> movingAvg 10
in S.print
$ L.purely S.fold combinedAcc
$ pullFromDatabase
您可能要考虑的另一个函数是 S.store
,例如
myStreamFn
= pullFromDatabase
& S.store S.maximum
& S.store (L.purely S.fold L.minimum)
& S.store movingAvg
& S.print
在haskellstreaming里面有一个例子copy
>>> (S.toList . mapped S.toList . chunksOf 5) $ (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ each [1..10]
[[1,2,3,4,5],[6,7,8,9,10]] :> ([[1,2,3],[4,5,6],[7,8,9],[10]] :> ())
是否可以将其分成两个 "clean" 流,以便打印以下结果?
>>>S.print stream1
[[1,2,3,4,5],[6,7,8,9,10]]
>>>S.print stream2
[[1,2,3],[4,5,6],[7,8,9],[10]]
注意上面的结果中不再有':>'。更一般地说,我不确定 'simplify' 来自 m
或 (Of a)
的嵌套流(或流的流)是否存在 Stream (Of a) m r
f1::Stream (Of a) (Stream (Of b) m) r -> Stream (Of b) m r
f2::Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) m r
f3::Stream (Stream (Of a) m) r -> Stream (Of a) m r
[更新]
这个问题的背景是我正在寻找惯用的方法来多次重用底层流。流是从数据库中提取的,IO 可能很昂贵。我还想获得对中间流的引用,以便更好地构建我的代码。一些模拟代码:
my-stream-fn = do
original_stream <- pull_from_database
let (o1, s1) = calc_moving_average $ S.copy original_stream
(o2, s2) = calc_max $ S.copy o1
(o3, s3) = calc_min $ S.copy o2
S.print $ S.zipWith3 (\x y z-> (x, y, z)) s1 s2 s3
我希望o1 o2和o3与original_stream和pull_from_database完全一样,IO操作只在original_stream被拉动时完成一次
f1 = S.effects @(Stream (Of _) _)
:: Monad m
=> Stream (Of a) (Stream (Of b) m) r
-> Stream (Of b) r
f2 = hoist @(Stream (Of _)) S.effects
:: Monad m
=> Stream (Of a) (Stream (Of b) m) r
-> Stream (Of a) m r
(为清楚起见重命名了类型变量,请参阅 effects
的文档),并且 f3
不进行实物检查。
感觉就像你在试图击败流媒体的点。您构建管道,从源到接收器,然后 运行 它 - 关键是没有(隐含的)中间值积累。您的问题有点松散,因此无法准确回答,但是如果您希望 运行 第一个流的所有效果,然后是第二个流的所有效果,那么您必须愿意存储(代表的计算the) 第二个流,直到第一个流完成影响 => 你已经积累了第二个流(因此并没有真正流式传输它)。因此,为什么 S.copy
是为交错效果而设计的。比照。 this github issue.
[响应更新]
我认为让您感到困惑的部分原因是您使用的是纯流,并且在没有效果的情况下,限制的动机就不那么明显了。使用管道组件的标识符,而不是部分结果。同样在您的示例中,您应该组合折叠,例如。
import qualified Control.Foldl as L
import qualified Streaming.Prelude as S
myStreamFn =
let movingAvg n = {-# ... #-}
combinedAcc = (,,) <$> L.minimum <*> L.maximum <*> movingAvg 10
in S.print
$ L.purely S.fold combinedAcc
$ pullFromDatabase
您可能要考虑的另一个函数是 S.store
,例如
myStreamFn
= pullFromDatabase
& S.store S.maximum
& S.store (L.purely S.fold L.minimum)
& S.store movingAvg
& S.print