向 ManagedProcess 发送消息时,云 Haskell 永远挂起

Cloud Haskell hanging forever when sending messages to ManagedProcess

问题

您好!我正在 Cloud Haskell 中编写一个简单的 Server - Worker 程序。问题是,当我尝试创建 ManagedProcess 时,在服务器发现步骤之后,我的示例即使在使用 callTimeout 时也会永远挂起(应该在 100 毫秒后中断)。代码非常简单,但我找不到任何问题。

我也在邮件列表上发布了这个问题,但据我所知,在 SO 社区,我可以在这里更快地得到答案。如果我从邮件列表中得到答案,我也会在这里发布。

源代码

Worker.hs:

{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveGeneric             #-}
{-# LANGUAGE TemplateHaskell           #-}

module Main where

import Network.Transport     (EndPointAddress(EndPointAddress))
import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Platform hiding (__remoteTable)
import Control.Distributed.Process.Platform.Async
import Control.Distributed.Process.Platform.ManagedProcess
import Control.Distributed.Process.Platform.Time
import Control.Distributed.Process.Platform.Timer (sleep)
import Control.Distributed.Process.Closure (mkClosure, remotable)
import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Distributed.Process.Node hiding (call)
import Control.Concurrent (threadDelay)
import GHC.Generics (Generic)
import Data.Binary (Binary) 
import Data.Typeable (Typeable)
import Data.ByteString.Char8 (pack)
import System.Environment    (getArgs)

import qualified Server as Server

main = do
  [host, port, serverAddr] <- getArgs

  Right transport <- createTransport host port defaultTCPParameters
  node <- newLocalNode transport initRemoteTable

  let addr = EndPointAddress (pack serverAddr)
      srvID = NodeId addr

  _ <- forkProcess node $ do
    sid <- discoverServer srvID
    liftIO $ putStrLn "x"
    liftIO $ print sid
    r <- callTimeout sid (Server.Add 5 6) 100 :: Process (Maybe Double)
    liftIO $ putStrLn "x"
    liftIO $ threadDelay (10 * 1000 * 1000)


  threadDelay (10 * 1000 * 1000)
  return ()


discoverServer srvID = do
  whereisRemoteAsync srvID "serverPID"
  reply <- expectTimeout 100 :: Process (Maybe WhereIsReply)
  case reply of
    Just (WhereIsReply _ msid) -> case msid of
      Just sid -> return sid
      Nothing  -> discoverServer srvID
    Nothing                    -> discoverServer srvID

Server.hs:

{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveGeneric             #-}
{-# LANGUAGE TemplateHaskell           #-}

module Server where

import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Platform hiding (__remoteTable)
import Control.Distributed.Process.Platform.Async
import Control.Distributed.Process.Platform.ManagedProcess
import Control.Distributed.Process.Platform.Time
import Control.Distributed.Process.Platform.Timer (sleep)
import Control.Distributed.Process.Closure (mkClosure, remotable)
import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Distributed.Process.Node hiding (call)
import Control.Concurrent (threadDelay)
import GHC.Generics (Generic)
import Data.Binary (Binary) 
import Data.Typeable (Typeable)


data Add = Add Double Double
  deriving (Typeable, Generic)
instance Binary Add

launchServer :: Process ProcessId
launchServer = spawnLocal $ serve () (statelessInit Infinity) server >> return () where
  server = statelessProcess { apiHandlers            = [ handleCall_ (\(Add x y) -> liftIO (putStrLn "!") >> return (x + y)) ]
                            , unhandledMessagePolicy = Drop
                            }


main = do
  Right transport <- createTransport "127.0.0.1" "8080" defaultTCPParameters
  node <- newLocalNode transport initRemoteTable
  _ <- forkProcess node $ do
    self <- getSelfPid
    register "serverPID" self

    liftIO $ putStrLn "x"
    mid <- launchServer
    liftIO $ putStrLn "y"
    r <- call mid (Add 5 6) :: Process Double
    liftIO $ print r
    liftIO $ putStrLn "z"
    liftIO $ threadDelay (10 * 1000 * 1000)
    liftIO $ putStrLn "z2"

  threadDelay (10 * 1000 * 1000)
  return ()

我们可以运行他们如下:

runhaskell Server.hs
runhaskell Worker.hs 127.0.0.2 8080 127.0.0.1:8080:0

结果

当我们运行程序时,我们得到了以下结果:

来自服务器:

x
y
!
11.0 -- this one shows that inside the same process we were able to use the "call" function
z
-- waiting - all the output above were tests from inside the server now it waits for external messages

来自工人:

x
pid://127.0.0.1:8080:0:10 -- this is the process id of the server optained with whereisRemoteAsync 
-- waiting forever on the "callTimeout sid (Server.Add 5 6) 100" code!

作为旁注 - 我发现,当使用 send(来自 Control.Distributed.Process)发送消息并使用 expect 接收它们时,效果很好。但是用 call(来自 Control.Distributed.Process.Platform)发送它们并尝试用 ManagedProcess api 处理程序接收它们 - 永远挂起 call(即使使用 callTimeout!)

您的客户出现异常,您无法轻易观察到该异常,因为您是 运行 您在 forkProcess 中的客户。如果您想这样做,那很好,但是您需要监视或 link 该过程。在这种情况下,简单地使用 runProcess 会简单得多。如果你这样做,你会看到你得到这个异常:

Worker.hs: trying to call fromInteger for a TimeInterval. Cannot guess units

callTimeout 不接受一个整数,它接受一个TimeInterval,它是用Time 模块中的函数构造的。这是一个伪 Num - 它似乎实际上不支持 fromInteger。我会认为这是一个错误或至少是错误的形式(在 Haskell 中),但无论如何修复代码的方法很简单

r <- callTimeout sid (Server.Add 5 6) (milliSeconds 100) :: Process (Maybe Double)

要解决客户端调用服务器的问题,您需要注册您生成的服务器进程的 pid,而不是您生成它的主进程的 pid - 即更改

self <- getSelfPid
register "serverPID" self

liftIO $ putStrLn "x"
mid <- launchServer
liftIO $ putStrLn "y"

mid <- launchServer
register "serverPID" mid
liftIO $ putStrLn "y"