Haskell、通道、STM、线程、消息传递

Haskell, Channels, STM, -threaded, Message Passing

我正在尝试使用 channels/STM 来实现 Haskell 中的消息传递。也许这是一个糟糕的主意,并且有更好的方法 implement/use 消息传递 Haskell。如果是这种情况,请告诉我;然而,我的探索开启了一些关于并发 Haskell.

的基本问题

我听说过关于 STM 的好消息,尤其是 Haskell 中的实施。由于它支持读取和写入,并且具有一些安全优势,我认为可以从这里开始。这提出了我最大的问题:does

msg <- atomically $ readTChan chan

其中 chan 是 TChan Int,导致等待通道上有值的等待?

考虑以下程序:

p chan = do
    atomically $ writeTChan chan 1
    atomically $ writeTChan chan 2

q chan = do
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan
    -- for testing purposes
    putStrLn $ show msg1 
    putStrLn $ show msg2

main = do
    chan <- atomically $ newTChan
    p chan
    q chan

用 ghc --make -threaded 编译它,然后 运行 程序,实际上你得到 1 后跟 2 打印到控制台。现在,假设我们

main = do 
    chan <- atomically $ newTChan
    forkIO $ p chan 
    forkIO $ q chan

相反。现在,如果我们使用-threaded,它要么不打印任何内容,要么打印 1,或者 1 后跟 2 到终端;但是,如果您不使用 -threaded 进行编译,它总是打印 1 后跟 2。 问题 2:-threaded 和非 -threaded 之间有什么区别?我想它们并不是真正 运行 并发的东西,它们只是 运行 一个接一个。这与下文一致。

现在,在我的想法中,如果我同时拥有 p 和 q 运行ning;即我 forkIO 了他们,他们应该能够以相反的顺序 运行。假设

main = do
    chan <- atomically newTChan
    forkIO $ q chan
    forkIO $ p chan

现在,如果我在没有 -threaded 的情况下编译它,我永远不会将任何内容打印到控制台。如果我用 -threaded 编译,我有时会这样做。虽然,1 后跟 2 的情况非常罕见——通常只有 1 或什么都没有。我也用 Control.Concurrent.Chan 试过了,得到了一致的结果。

第二个大问题:channels和fork怎么玩,上面的程序是怎么回事?

好歹我也不能这么天真地用STM来模拟消息传递。也许 Cloud Haskell 是解决这些问题的一个选项——我真的不知道。任何有关如何使消息传递缺少序列化的信息 ~~> 写入套接字 ~~> 从套接字读取 ~~> 反序列化将不胜感激。

不,你的想法是对的 - 这就是 TChan 的用途 - 你只是错过了 forkIO 的一个小问题:

问题是您的主线程不会等待使用 forkIO (see here for reference)

创建的线程终止

所以如果我使用参考文献中给出的提示

import Control.Concurrent
import Control.Concurrent.STM

p :: Num a => TChan a -> IO ()
p chan = do
    atomically $ writeTChan chan 1
    atomically $ writeTChan chan 2

q chan = do
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan
    -- for testing purposes
    putStrLn $ show msg1 
    putStrLn $ show msg2

main :: IO ()
main = do
    children <- newMVar []
    chan <- atomically $ newTChan
    _ <- forkChild children $ p chan
    _ <- forkChild children $ q chan
    waitForChildren children
    return ()

waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
  cs <- takeMVar children
  case cs of
    []   -> return ()
    m:ms -> do
      putMVar children ms
      takeMVar m
      waitForChildren children

forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
  mvar <- newEmptyMVar
  childs <- takeMVar children
  putMVar children (mvar:childs)
  forkFinally io (\_ -> putMVar mvar ())

它按预期工作:

d:/Temp $ ghc --make -threaded tchan.hs
[1 of 1] Compiling Main             ( tchan.hs, tchan.o )
Linking tchan.exe ...
d:/Temp $ ./tchan.exe 
1
2
d:/Temp $

当然,如果您将调用切换到 pq,它当然会继续工作