在 AMQP 回调之间共享上下文
Sharing a context between AMQP callbacks
有一个关于 Haskell RabbitMQ 用法的简单教程,我在其中获取了这段代码
main :: IO ()
main = do
conn <- openConnection "127.0.0.1" "/" "guest" "guest"
ch <- openChannel conn
declareQueue ch newQueue {queueName = "hello",
queueAutoDelete = False,
queueDurable = False}
putStrLn " [*] Waiting for messages. To exit press CTRL+C"
consumeMsgs ch "hello" NoAck deliveryHandler
-- waits for keypresses
getLine
closeConnection conn
deliveryHandler :: (Message, Envelope) -> IO ()
deliveryHandler (msg, metadata) =
BL.putStrLn $ " [x] Received " <> msgBody msg
它简单地解释了如何从队列中获取消息并使用回调对其进行处理。
有一件事可能很容易解决,但我很难理解如何在回调中添加一些可变上下文,因此每次运行函数时它都可以更改它。简单的说就是如何计算队列顺序中的消息个数。我发现一个可能的解决方案是 State monad,是吗?
第二个问题 - 所有这些回调是否并行处理?如果不是,如何并行处理它们并在不发生数据竞争的情况下保持可变上下文?
如果您计划并行处理多条消息(在同一个 Haskell 进程中),我将从 MVar 开始以保持共享状态。
MVar基本上是一个带锁的共享变量,合理的接口。在简单的情况下(如计数器),这足以防止数据竞争。它是共享内存上较低级别 (IORef) 和较高级别 (STM) 抽象之间的中间地带。我认为它是最容易理解的,我将它用于所有初始原型制作。
我不了解 RabbitMQ 库,所以无法回答你的第二个问题,关于消息是否并行处理。
基于@bergey 的回答 - 您可以创建可变引用,例如 IORef 或 MVar。这些引用可以使用部分函数应用程序传递给您的处理程序。以下是输入但未经测试的代码。
main :: IO ()
main = do
conn <- openConnection "127.0.0.1" "/" "guest" "guest"
ch <- openChannel conn
ref <- newMVar 0
注意上面的ref
和Control.Concurrent.MVar
的生成函数newMVar
。
declareQueue ch newQueue {queueName = "hello",
queueAutoDelete = False,
queueDurable = False}
putStrLn " [*] Waiting for messages. To exit press CTRL+C"
consumeMsgs ch "hello" NoAck (deliveryHandler ref)
看看我们如何通过函数应用程序将 ref 传递给 deliveryHandler
。
-- waits for keypresses
getLine
closeConnection conn
deliveryHandler :: MVar Int -> (Message, Envelope) -> IO ()
deliveryHandler ref (msg, metadata) =
BL.putStrLn $ " [x] Received " <> msgBody msg
withMVar' ref $ \val ->
do print val
pure (val + 1)
最后,我们可以使用 Control.Concurrent.MVar
中的函数来处理 ref,获取旧值并根据需要用新值替换它。
有一个关于 Haskell RabbitMQ 用法的简单教程,我在其中获取了这段代码
main :: IO ()
main = do
conn <- openConnection "127.0.0.1" "/" "guest" "guest"
ch <- openChannel conn
declareQueue ch newQueue {queueName = "hello",
queueAutoDelete = False,
queueDurable = False}
putStrLn " [*] Waiting for messages. To exit press CTRL+C"
consumeMsgs ch "hello" NoAck deliveryHandler
-- waits for keypresses
getLine
closeConnection conn
deliveryHandler :: (Message, Envelope) -> IO ()
deliveryHandler (msg, metadata) =
BL.putStrLn $ " [x] Received " <> msgBody msg
它简单地解释了如何从队列中获取消息并使用回调对其进行处理。
有一件事可能很容易解决,但我很难理解如何在回调中添加一些可变上下文,因此每次运行函数时它都可以更改它。简单的说就是如何计算队列顺序中的消息个数。我发现一个可能的解决方案是 State monad,是吗?
第二个问题 - 所有这些回调是否并行处理?如果不是,如何并行处理它们并在不发生数据竞争的情况下保持可变上下文?
如果您计划并行处理多条消息(在同一个 Haskell 进程中),我将从 MVar 开始以保持共享状态。
MVar基本上是一个带锁的共享变量,合理的接口。在简单的情况下(如计数器),这足以防止数据竞争。它是共享内存上较低级别 (IORef) 和较高级别 (STM) 抽象之间的中间地带。我认为它是最容易理解的,我将它用于所有初始原型制作。
我不了解 RabbitMQ 库,所以无法回答你的第二个问题,关于消息是否并行处理。
基于@bergey 的回答 - 您可以创建可变引用,例如 IORef 或 MVar。这些引用可以使用部分函数应用程序传递给您的处理程序。以下是输入但未经测试的代码。
main :: IO ()
main = do
conn <- openConnection "127.0.0.1" "/" "guest" "guest"
ch <- openChannel conn
ref <- newMVar 0
注意上面的ref
和Control.Concurrent.MVar
的生成函数newMVar
。
declareQueue ch newQueue {queueName = "hello",
queueAutoDelete = False,
queueDurable = False}
putStrLn " [*] Waiting for messages. To exit press CTRL+C"
consumeMsgs ch "hello" NoAck (deliveryHandler ref)
看看我们如何通过函数应用程序将 ref 传递给 deliveryHandler
。
-- waits for keypresses
getLine
closeConnection conn
deliveryHandler :: MVar Int -> (Message, Envelope) -> IO ()
deliveryHandler ref (msg, metadata) =
BL.putStrLn $ " [x] Received " <> msgBody msg
withMVar' ref $ \val ->
do print val
pure (val + 1)
最后,我们可以使用 Control.Concurrent.MVar
中的函数来处理 ref,获取旧值并根据需要用新值替换它。