管道:将两个来源合二为一
Conduit: Join two sources into one
我有两个管道源 A 和 B,我想将它们合并为一个产生:
data Result = Left Int | Value Int | Right Int
merge :: Monad m => Source m Int -> Source m Int -> Source Result
merge a b = undefined
比如:
- 使用来自
a
和 b
的值
- 执行一些计算以产生
Value Int
- 作为计算结果,
a
或 b
可能有 leftover
- 当其中一个序列耗尽时,结果源应继续生成
Left
或 Right
值,(取决于哪个原始源仍然有值)直到两个源都耗尽
我尝试用 ZipSource
实现它,例如:
getZipSource (ZipSource (a =$= CL.map Left) <* ZipSource (b =$= CL.map Right))
但我不知道如何让它在来源之间交替(当我做两个 await
s 时)以及如何以我上面描述的方式处理剩菜。
我也看了sequenceSources
,但似乎也没有帮助。
可以用 Conduit 构建类似的东西吗?
一个具体的例子是:
- 有两个(假设已排序)
Int
个来源
- 从两者获取值并进行比较
- 产生
min
值,从最大值中减去它,然后将剩余的返回到它的流中
- 重复。
预期输出为:
runConduit $ merge (CL.sourceList [10, 20, 30]) (CL.sourceList [6, 4, 20]) $$ CL.take 10
Value 6 -- 10-6 = 4, 6 yielded, 4 goes back to "a"
Value 4 -- 4-4 = 0, both values are fully consumed
Value 20 -- 20-20 = 0, both values are fully consumed
Left 30 -- "b" has no values, but "a" still yielding
[更新]
到目前为止,我发现的最好方法是编写类似于 zipSources
的内容,将其内部结构调整为:
go (Done ()) (HaveOutput src close y) = HaveOutput (go (Done ()) src) close (Nothing, Just y)
go (HaveOutput src close x) (Done ()) = HaveOutput (go src (Done ())) close (Just x, Nothing)
这是正确的方法吗?
我最后这样做了:
data MergedValue a v b = BackL a v | MergedValue v | BackR v b
data JoinResult a v b = LeftoverL a | JoinValue v | LeftoverR b
joinSources :: Monad m
=> (a -> b -> MergedValue a v b)
-> Source m a
-> Source m b
-> Source m (JoinResult a v b)
joinSources f as bs =
go (newResumableSource as) (newResumableSource bs)
where
go ras rbs = do
(ras', ma) <- lift $ ras $$++ await
(rbs', mb) <- lift $ rbs $$++ await
case (ma, mb) of
(Nothing, Nothing) -> pure ()
(Nothing, Just b) -> yield (LeftoverR b) >> go ras' rbs'
(Just a, Nothing) -> yield (LeftoverL a) >> go ras' rbs'
(Just a, Just b) -> case f a b of
BackL x v -> do
yield (JoinValue v)
(nxt, _) <- lift $ ras' $$++ leftover x
go nxt rbs'
BackR v x -> do
yield (JoinValue v)
(nxt, _) <- lift $ rbs' $$++ leftover x
go ras' nxt
MergedValue v -> yield (JoinValue v) >> go ras' rbs'
我有两个管道源 A 和 B,我想将它们合并为一个产生:
data Result = Left Int | Value Int | Right Int
merge :: Monad m => Source m Int -> Source m Int -> Source Result
merge a b = undefined
比如:
- 使用来自
a
和b
的值
- 执行一些计算以产生
Value Int
- 作为计算结果,
a
或b
可能有leftover
- 当其中一个序列耗尽时,结果源应继续生成
Left
或Right
值,(取决于哪个原始源仍然有值)直到两个源都耗尽
我尝试用 ZipSource
实现它,例如:
getZipSource (ZipSource (a =$= CL.map Left) <* ZipSource (b =$= CL.map Right))
但我不知道如何让它在来源之间交替(当我做两个 await
s 时)以及如何以我上面描述的方式处理剩菜。
我也看了sequenceSources
,但似乎也没有帮助。
可以用 Conduit 构建类似的东西吗?
一个具体的例子是:
- 有两个(假设已排序)
Int
个来源 - 从两者获取值并进行比较
- 产生
min
值,从最大值中减去它,然后将剩余的返回到它的流中 - 重复。
预期输出为:
runConduit $ merge (CL.sourceList [10, 20, 30]) (CL.sourceList [6, 4, 20]) $$ CL.take 10
Value 6 -- 10-6 = 4, 6 yielded, 4 goes back to "a"
Value 4 -- 4-4 = 0, both values are fully consumed
Value 20 -- 20-20 = 0, both values are fully consumed
Left 30 -- "b" has no values, but "a" still yielding
[更新]
到目前为止,我发现的最好方法是编写类似于 zipSources
的内容,将其内部结构调整为:
go (Done ()) (HaveOutput src close y) = HaveOutput (go (Done ()) src) close (Nothing, Just y)
go (HaveOutput src close x) (Done ()) = HaveOutput (go src (Done ())) close (Just x, Nothing)
这是正确的方法吗?
我最后这样做了:
data MergedValue a v b = BackL a v | MergedValue v | BackR v b
data JoinResult a v b = LeftoverL a | JoinValue v | LeftoverR b
joinSources :: Monad m
=> (a -> b -> MergedValue a v b)
-> Source m a
-> Source m b
-> Source m (JoinResult a v b)
joinSources f as bs =
go (newResumableSource as) (newResumableSource bs)
where
go ras rbs = do
(ras', ma) <- lift $ ras $$++ await
(rbs', mb) <- lift $ rbs $$++ await
case (ma, mb) of
(Nothing, Nothing) -> pure ()
(Nothing, Just b) -> yield (LeftoverR b) >> go ras' rbs'
(Just a, Nothing) -> yield (LeftoverL a) >> go ras' rbs'
(Just a, Just b) -> case f a b of
BackL x v -> do
yield (JoinValue v)
(nxt, _) <- lift $ ras' $$++ leftover x
go nxt rbs'
BackR v x -> do
yield (JoinValue v)
(nxt, _) <- lift $ rbs' $$++ leftover x
go ras' nxt
MergedValue v -> yield (JoinValue v) >> go ras' rbs'