将管道合并为一个

Merge conduits into one

我正在寻找可以执行类似于以下操作的函数:

merge :: MonadIO m => [Producer m a] -> Producer m a

我快速浏览了一下stm-conduit,它看起来很相似,但我不确定它是否符合我的要求:

messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
  where
    loop = do
      mmsg <- liftIO $ getMsg chan ack q
      case mmsg of
        Just (m, e) -> do
          yield (m, e)
          liftIO $ ackMsg chan (envDeliveryTag e) False
          loop
        Nothing     -> loop
    chan = fst $ amqpChan conn

如您所见,此管道生产者在生成消息后确认消息。在一个简单的 "single-threaded" 管道中它运行良好,消息到达接收器然后被确认。

然而 stm-conduit 这可能会改变,因为据我所知,生产者不会等待消息被接收器消费,它们会并行工作,消息可能是过早确认。

我对stm-conduit的理解正确吗?
将单独的源合并为一个以获得良好的单流语义的方法是什么?

UPDATE:根据要求将代码更新为实际工作的 AMQP 示例(但可能有点嘈杂)。

更新 2:我认为我所追求的可能是管道来源的替代实例,因此我可以做类似 let src = src1 <|> src2 的事情。有可能吗?

看看 ZipSource,它是一种新型包装器,其 Applicative 允许您以您想要的方式组合 Source

一旦有了 ZipSource,就可以使用 zipSourcesTraversable(例如列表)中的 Source 组合成 SourceTraversable 秒。

与您想要的结果类型的唯一区别是它是一个 Source 超过 Traversable 个值,而不仅仅是一个值,但这应该不是什么大问题.

mergeSources in stm-conduit 在后台维护着一个 TBMChannel。您所有的 Sources / Producers 首先连接到 TBMChannel,然后它将创建一个单一的 Source,尝试从通道 FIFO 中提取值。

使用mergeSources时可以设置中间值TBMChannel的边界。假设你将边界设置为 n,那么所有源产生的前 n 个值将立即转储到 TBMChannelAmqpConn,假设它没有在 AmqpConn 端被阻塞,并且您的消费者比来源慢(顺便说一句 AmqpConn 使用无界 Control.Concurrent.Chan 所以它不会阻塞)。之后 TBMChannel 已满,因此不再尝试向通道产生值的源被阻止。您的消费者从组合源中一个一个地获取价值,因此它在前 n 个元素之后是连续的。

要确保它从一开始就是连续的,您可以将边界设置为 1,但这可能会导致一些性能问题。