管道:将两个来源合二为一

Conduit: Join two sources into one

我有两个管道源 A 和 B,我想将它们合并为一个产生:

data Result = Left Int | Value Int | Right Int

merge :: Monad m => Source m Int -> Source m Int -> Source Result
merge a b = undefined

比如:

我尝试用 ZipSource 实现它,例如:

getZipSource (ZipSource (a =$= CL.map Left) <* ZipSource (b =$= CL.map Right))

但我不知道如何让它在来源之间交替(当我做两个 awaits 时)以及如何以我上面描述的方式处理剩菜。

我也看了sequenceSources,但似乎也没有帮助。

可以用 Conduit 构建类似的东西吗?

一个具体的例子是:

预期输出为:

runConduit $ merge (CL.sourceList [10, 20, 30]) (CL.sourceList [6, 4, 20]) $$ CL.take 10

Value 6    -- 10-6  = 4,  6 yielded, 4 goes back to "a"
Value 4    -- 4-4   = 0,  both values are fully consumed
Value 20   -- 20-20 = 0,  both values are fully consumed
Left 30    -- "b" has no values, but "a" still yielding

[更新] 到目前为止,我发现的最好方法是编写类似于 zipSources 的内容,将其内部结构调整为:

go (Done ()) (HaveOutput src close y) = HaveOutput (go (Done ()) src) close (Nothing, Just y)
go (HaveOutput src close x) (Done ()) = HaveOutput (go src (Done ())) close (Just x, Nothing)

这是正确的方法吗?

我最后这样做了:

data MergedValue a v b = BackL a v | MergedValue v | BackR v b
data JoinResult a v b = LeftoverL a | JoinValue v | LeftoverR b

joinSources :: Monad m
            => (a -> b -> MergedValue a v b)
            -> Source m a
            -> Source m b
            -> Source m (JoinResult a v b)
joinSources f as bs =
  go (newResumableSource as) (newResumableSource bs)
  where
    go ras rbs = do
      (ras', ma) <- lift $ ras $$++ await
      (rbs', mb) <- lift $ rbs $$++ await
      case (ma, mb) of
        (Nothing, Nothing) -> pure ()
        (Nothing, Just b)  -> yield (LeftoverR b) >> go ras' rbs'
        (Just a,  Nothing) -> yield (LeftoverL a) >> go ras' rbs'
        (Just a,  Just b)  -> case f a b of
          BackL x v -> do
            yield (JoinValue v)
            (nxt, _) <- lift $ ras' $$++ leftover x
            go nxt rbs'
          BackR v x -> do
            yield (JoinValue v)
            (nxt, _) <- lift $ rbs' $$++ leftover x
            go ras' nxt
          MergedValue v -> yield (JoinValue v) >> go ras' rbs'