管道来源取决于 MVar

Conduit Source depending on MVar

我正在为一些订阅队列并将所有到达消息放入 MVar 的客户端实现管道源。

问题是我无法通过管道源从那个 MVar 读取到 yield 那些消息,因为它在运行时报告异常:thread blocked indefinitely in an MVar operation

mqttSource :: (Monad m, MonadIO m, MonadResource m) => MqttOptions -> Source m String
mqttSource MqttOptions {..} = do
  bracketP mkConsumer cleanConsumer runHandler
 where
  mkConsumer = do
    chan <- liftIO $ newEmptyMVar
    client <- liftIO.hookToChan $ chan
    return (chan, client)

  cleanConsumer (_, client) =
    liftIO.disconnectClient $ client

  runHandler (chan, client) = do
    newMsg <- liftIO $ readMVar chan
    yield newMsg
    runHandler (chan, client)

(hookToChan 只是告诉客户端使用这个函数订阅队列:\topic msg -> putMVar chan (show msg))

感谢 Cirdec 提出的意见,我已经设法解决了这个问题。

问题是我在同一线程中生成客户端。

hookToChan 负责这样做,我在同一个线程上订阅队列。我刚刚在 hookToChan 函数中添加了一个 forkIO,问题就消失了。