Haskell 中多个发送者和接收者的消息传递并发

Message passing concurrency in Haskell with multiple senders and receivers

我正在尝试解决一个涉及多个发送方和接收方的问题,希望得到一些关于我的方法是否正确的反馈。

问题:我们有 N 个领导者和 M 个追随者,他们必须全部由单独的线程表示。每个人都是舞者,并且有一个关联的 "dance card" 与 8 种不同舞蹈的名称。每个领导者都必须询问下属是否会跳特定的舞蹈。追随者等待领导者的邀请,只有当他们还没有跳那支舞并且他们还没有同意与这位领导者跳另外 2 支舞时才接受。如果领导者收到他们的邀请被接受的回复,他们将继续努力为下一支舞争取一场比赛。否则,他们会继续尝试为同一舞蹈寻找匹配项。最后,领导者 "dance card" 会打印每支舞以及与他们共舞的追随者的 ID。

方法:我创建了两个函数:领导者和追随者。主要是,我使用 forkIO 调用领导者 n 次和追随者 m 次。但是,我 运行 关注如何保持状态(特别是 dancecard)的问题。我正在考虑创建一个类型 class "Dancer" 然后是它的两个实例:Leader 和 Follower。每个领导者和每个追随者都有一个唯一的 ID(从 1 到 N 或 M)。每个人还需要一个 mvar 作为自己的个人邮箱。领导者会以某种方式需要 "get" 追随者的 mvar 以便在其中放入一些东西,以便同一追随者可以将其取出并对邀请做出“是”或“否”的回答。关于dancecard,我认为最好合并state monad。例如,当领导者邀请追随者跳舞时,追随者应该能够查看他们的舞蹈卡并确认他们还没有舞伴。

哇,您已经有了一个类型类、两个实例和一个状态 monad,而且您甚至还没有确定 MVar 的类型!事情越来越复杂了。

我担心你可能掉进了 Haskell-as-Java 陷阱,你已经在脑海中想出了一个面向对象的解决方案,而你现在尝试将其直接转化为 Haskell,将您的舞者视为具有共享方法的有状态对象,这些对象包含在 "class" 中,等等

我建议采用不同的方法。舞者不是 "things";他们是任务。将它们实现为简单的函数,并使用参数传递和递归代替 "state",这是惯用的 Haskell.

的典型做法

剧透如下, 但这里有一个简单的方法来定义一个 "follower" 有一个 id,通过一对 [=] 响应请求46=] MVars,并使用递归核心循环维护舞蹈卡。请注意,Follower 数据类型不应该是 "follower object"(例如,它没有舞蹈卡);这只是记录 follower 中的 return 值的一种便捷方式,它用作 "handle" 用于识别跟随者任务并与之通信:

type LeaderId = Int
type FollowerId = Int
type Dance = Int

-- |A dance card for a follower with a list of dance/leader pairs.
data Card = Card { getCard :: [(Dance, LeaderId)] } deriving (Show)
emptyCard = Card []

-- |Follower handle giving its id and request/response MVars
data Follower =
  Follower { followerId :: FollowerId
           , request :: MVar (Dance, LeaderId)
           , response :: MVar Bool
           }

-- |Create a new follower task with given id.
follower :: FollowerId -> IO Follower
follower followerId_ = do
  req <- newEmptyMVar
  res <- newEmptyMVar
  let loop (Card xs) = do
        -- get next request
        (dance, leaderId_) <- takeMVar req
        case lookup dance xs of
          -- if dance is free and we haven't danced too often w/ this leader
          Nothing | length (filter ((==leaderId_) . snd) xs) < 2
                    -- then say yes and update dance card
                    -> do putMVar res True
                          loop (Card $ (dance, leaderId_) : xs)
          -- otherwise, refuse
          _         -> do putMVar res False
                          loop (Card xs)
  forkIO $ loop emptyCard
  return $ Follower followerId_ req res

您可以通过邀请他们跳舞来创建和测试几个追随者:

> f1 <- follower 1   -- follower #1
> f2 <- follower 2   -- follower #2
> putMVar (request f1) (1, 10) -- dance #1 w/ leader #10
> takeMVar (response f1)
True                           -- hooray!
> putMVar (request f1) (1, 14) -- dance #1 w/ leader #14
> takeMVar (response f1)
False                          -- wah! dance is taken
> putMVar (request f2) (1, 14) -- try different follower
> takeMVar (response f2)
True                           -- hooray!
>

请注意,既不能向这些特定的追随者询问他们的舞蹈卡,也不能告诉他们退出他们的无限循环。对于此应用程序,您不需要它(我们只需要来自领导者的舞蹈卡,并且在我们得到答案时不关心是否有一堆卡住的轻量级线程),但是您总是可以添加几个 MVars 如果你做到了。

同样,您应该能够将领导者实现为具有简单递归核心循环的函数。请注意,如果领导者试图按顺序填写其舞蹈卡,它实际上并不需要跟踪舞蹈卡——最后一张舞蹈卡(和 "core loop")只是一个 mapM 试图填补舞蹈空位 1 到 8。

您如何为领导者提供请求追随者跳舞的能力?那么,首先创建完整的追随者集,并将追随者句柄列表 ([Follower]) 作为参数传递给 leader 创建函数。怎么从领导那里拿回舞蹈卡? leader 函数应该 return 卡片的 MVar,main 函数可以 mapM takeMVar leadersDanceCards 获取完整的舞蹈卡片列表。