将管道合并为一个
Merge conduits into one
我正在寻找可以执行类似于以下操作的函数:
merge :: MonadIO m => [Producer m a] -> Producer m a
我快速浏览了一下stm-conduit
,它看起来很相似,但我不确定它是否符合我的要求:
messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
where
loop = do
mmsg <- liftIO $ getMsg chan ack q
case mmsg of
Just (m, e) -> do
yield (m, e)
liftIO $ ackMsg chan (envDeliveryTag e) False
loop
Nothing -> loop
chan = fst $ amqpChan conn
如您所见,此管道生产者在生成消息后确认消息。在一个简单的 "single-threaded" 管道中它运行良好,消息到达接收器然后被确认。
然而 stm-conduit
这可能会改变,因为据我所知,生产者不会等待消息被接收器消费,它们会并行工作,消息可能是过早确认。
我对stm-conduit
的理解正确吗?
将单独的源合并为一个以获得良好的单流语义的方法是什么?
UPDATE:根据要求将代码更新为实际工作的 AMQP 示例(但可能有点嘈杂)。
更新 2:我认为我所追求的可能是管道来源的替代实例,因此我可以做类似 let src = src1 <|> src2
的事情。有可能吗?
看看 ZipSource
,它是一种新型包装器,其 Applicative
允许您以您想要的方式组合 Source
。
一旦有了 ZipSource
,就可以使用 zipSources
将 Traversable
(例如列表)中的 Source
组合成 Source
共 Traversable
秒。
与您想要的结果类型的唯一区别是它是一个 Source
超过 Traversable
个值,而不仅仅是一个值,但这应该不是什么大问题.
mergeSources
in stm-conduit
在后台维护着一个 TBMChannel
。您所有的 Sources / Producers 首先连接到 TBMChannel
,然后它将创建一个单一的 Source,尝试从通道 FIFO 中提取值。
使用mergeSources
时可以设置中间值TBMChannel
的边界。假设你将边界设置为 n,那么所有源产生的前 n 个值将立即转储到 TBMChannel
和 AmqpConn
,假设它没有在 AmqpConn
端被阻塞,并且您的消费者比来源慢(顺便说一句 AmqpConn
使用无界 Control.Concurrent.Chan
所以它不会阻塞)。之后 TBMChannel 已满,因此不再尝试向通道产生值的源被阻止。您的消费者从组合源中一个一个地获取价值,因此它在前 n 个元素之后是连续的。
要确保它从一开始就是连续的,您可以将边界设置为 1,但这可能会导致一些性能问题。
我正在寻找可以执行类似于以下操作的函数:
merge :: MonadIO m => [Producer m a] -> Producer m a
我快速浏览了一下stm-conduit
,它看起来很相似,但我不确定它是否符合我的要求:
messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
where
loop = do
mmsg <- liftIO $ getMsg chan ack q
case mmsg of
Just (m, e) -> do
yield (m, e)
liftIO $ ackMsg chan (envDeliveryTag e) False
loop
Nothing -> loop
chan = fst $ amqpChan conn
如您所见,此管道生产者在生成消息后确认消息。在一个简单的 "single-threaded" 管道中它运行良好,消息到达接收器然后被确认。
然而 stm-conduit
这可能会改变,因为据我所知,生产者不会等待消息被接收器消费,它们会并行工作,消息可能是过早确认。
我对stm-conduit
的理解正确吗?
将单独的源合并为一个以获得良好的单流语义的方法是什么?
UPDATE:根据要求将代码更新为实际工作的 AMQP 示例(但可能有点嘈杂)。
更新 2:我认为我所追求的可能是管道来源的替代实例,因此我可以做类似 let src = src1 <|> src2
的事情。有可能吗?
看看 ZipSource
,它是一种新型包装器,其 Applicative
允许您以您想要的方式组合 Source
。
一旦有了 ZipSource
,就可以使用 zipSources
将 Traversable
(例如列表)中的 Source
组合成 Source
共 Traversable
秒。
与您想要的结果类型的唯一区别是它是一个 Source
超过 Traversable
个值,而不仅仅是一个值,但这应该不是什么大问题.
mergeSources
in stm-conduit
在后台维护着一个 TBMChannel
。您所有的 Sources / Producers 首先连接到 TBMChannel
,然后它将创建一个单一的 Source,尝试从通道 FIFO 中提取值。
使用mergeSources
时可以设置中间值TBMChannel
的边界。假设你将边界设置为 n,那么所有源产生的前 n 个值将立即转储到 TBMChannel
和 AmqpConn
,假设它没有在 AmqpConn
端被阻塞,并且您的消费者比来源慢(顺便说一句 AmqpConn
使用无界 Control.Concurrent.Chan
所以它不会阻塞)。之后 TBMChannel 已满,因此不再尝试向通道产生值的源被阻止。您的消费者从组合源中一个一个地获取价值,因此它在前 n 个元素之后是连续的。
要确保它从一开始就是连续的,您可以将边界设置为 1,但这可能会导致一些性能问题。