monad transformer中的分布式进程

Distributed Process in monad transformer

我正在考虑为所谓的 cloud-haskell 实现一个基于八卦的集群成员资格后端,或者它是 Distributed.Process.. 无论如何,我试图在没有 ioref 的情况下摆脱 handeling 状态或者MVars 而不是使用状态转换器并将 Process monad 放在底部,如下所示:

type ClusterT = StateT ClusterState
type Cluster a = ClusterT Process a

使用 Control.Distributed.Process.Lifted (https://hackage.haskell.org/package/distributed-process-lifted) 效果很好,您可以这样做:

mystatefulcomp :: Cluster ()
mystatefulcomp = do
   msg <- expect :: Cluster String
   old_state <- get
   say $ "My old state was " ++ (show old_state)
   put $ modifyState curr_state msg
   mystatefulcomp

main = do
   Right transport <- createTransport '127.0.0.1' '3000' (\n -> ('127.0.0.1', n) defaultTCPParameters
   node <- newLocalNode transport initRemoteTable
   runProcess node (evalStateT mystatefulcomp initialstate)
   where initialstate = ClusterState.empty

这工作得相当好,让我可以很好地构建我的程序,我可以保持我的状态功能并将它与 Cluster monad 一起线程化。

当我尝试使用 receiveWaitmatch 接收消息时,这一切都崩溃了。

让我们重写 statefulcomp 以使用 receiveWait

做其他事情
doSomethingWithString :: String -> Cluster ()
doSomethingWithString str = do
   s < get
   put $ modifyState s str    

mystatefulcomp :: Cluster ()
mystatefulcomp = do
   old_state <- get
   receiveWait [ match doSomthingWithString ]
   new_state <- get
   say $ "old state " ++ (show old_state) ++ " new " ++ (show new_state)

这不会起作用,因为 match 函数是 (a -> Process b) -> Match b 类型,但我们希望它是 (a -> Cluster b) -> Match b 类型。这是我如履薄冰的地方。据我了解,Control.Distributed.Process.Lifted rexposes Control.Distributed.Process 函数被提升到 tansformer 堆栈中,允许您使用 expectsay 之类的函数,但不会 rexposes matchmatchIf 等等..

我真的在努力寻找解决方法或重新实现 match 及其朋友的方法,使其成为 MonadProcess m => (a -> m b) -> Match b

任何见解都是有用的。

编辑

所以在一些摆弄之后,我想到了以下内容

doSomethingWithString :: String -> Cluster ()
doSomethingWithString str = do
   s < get
   put $ modifyState s str

doSomethingWithInt :: Int -> Cluster ()
...

mystatefulcomp :: Cluster ()
mystatefulcomp = do
   old_state <- get
   id =<< receiveWait [ match $ return . doSomethingWithString
                      , match $ return . doSomethingWithInt ]
   new_state <- get
   say $ "old state " ++ (show old_state) ++ " new " ++ (show new_state)

这很好用,但我仍然很好奇这个设计有多好

正如 Michael Snoyman 指出的 out in a series of blog posts(即 5 个链接),将 StateT 包裹在 IO 周围是个坏主意。您只是无意中发现了一个出现该问题的实例。

mystatefulcomp :: Cluster ()
mystatefulcomp = do
   old_state <- get
   receiveWait [ match doSomethingWithString ]
   new_state <- get

问题是如果 doSomethingWithString 抛出错误,最终会出现在 new_state 中。 old_statedoSomethingWithString 异常前的一些中间状态?你看,我们想知道的事实使得这种方法不亚于将状态存储在 IORefMVar.

除了有问题的语义,如果不重写 distributed-process 以在所有地方使用 MonadBaseControl,这甚至无法实现。这正是 distributed-process-lifted 无法交付的原因,因为它只是环绕了来自 distributed-process.

的原语

所以,我在这里要做的是传递一个 data Config = Config { clusterState :: MVar ClusterState } 环境(哦,看,Process 也这样做!)。可能 ReaderT 以一种理智的方式与 IO 交互,此外,您可以轻松地将 Process 的任意数量的嵌套事件提升到 ReaderT Config Process 自己。

重复 Michael 的博文中的信息:StateT 一般而言还不错(在纯转换器堆栈中),仅适用于我们以某种方式包装 IO 的情况。我鼓励你阅读这些帖子,它们对我来说非常鼓舞人心,所以它们又来了:

  1. https://www.fpcomplete.com/blog/2017/06/readert-design-pattern
  2. https://www.fpcomplete.com/blog/2017/06/understanding-resourcet
  3. https://www.fpcomplete.com/blog/2017/06/tale-of-two-brackets
  4. https://www.fpcomplete.com/blog/2017/07/announcing-new-unliftio-library
  5. https://www.fpcomplete.com/blog/2017/07/the-rio-monad