管道来源取决于 MVar
Conduit Source depending on MVar
我正在为一些订阅队列并将所有到达消息放入 MVar 的客户端实现管道源。
问题是我无法通过管道源从那个 MVar 读取到 yield
那些消息,因为它在运行时报告异常:thread blocked indefinitely in an MVar operation
mqttSource :: (Monad m, MonadIO m, MonadResource m) => MqttOptions -> Source m String
mqttSource MqttOptions {..} = do
bracketP mkConsumer cleanConsumer runHandler
where
mkConsumer = do
chan <- liftIO $ newEmptyMVar
client <- liftIO.hookToChan $ chan
return (chan, client)
cleanConsumer (_, client) =
liftIO.disconnectClient $ client
runHandler (chan, client) = do
newMsg <- liftIO $ readMVar chan
yield newMsg
runHandler (chan, client)
(hookToChan
只是告诉客户端使用这个函数订阅队列:\topic msg -> putMVar chan (show msg)
)
感谢 Cirdec 提出的意见,我已经设法解决了这个问题。
问题是我在同一线程中生成客户端。
hookToChan
负责这样做,我在同一个线程上订阅队列。我刚刚在 hookToChan
函数中添加了一个 forkIO
,问题就消失了。
我正在为一些订阅队列并将所有到达消息放入 MVar 的客户端实现管道源。
问题是我无法通过管道源从那个 MVar 读取到 yield
那些消息,因为它在运行时报告异常:thread blocked indefinitely in an MVar operation
mqttSource :: (Monad m, MonadIO m, MonadResource m) => MqttOptions -> Source m String
mqttSource MqttOptions {..} = do
bracketP mkConsumer cleanConsumer runHandler
where
mkConsumer = do
chan <- liftIO $ newEmptyMVar
client <- liftIO.hookToChan $ chan
return (chan, client)
cleanConsumer (_, client) =
liftIO.disconnectClient $ client
runHandler (chan, client) = do
newMsg <- liftIO $ readMVar chan
yield newMsg
runHandler (chan, client)
(hookToChan
只是告诉客户端使用这个函数订阅队列:\topic msg -> putMVar chan (show msg)
)
感谢 Cirdec 提出的意见,我已经设法解决了这个问题。
问题是我在同一线程中生成客户端。
hookToChan
负责这样做,我在同一个线程上订阅队列。我刚刚在 hookToChan
函数中添加了一个 forkIO
,问题就消失了。