Haskell、通道、STM、线程、消息传递
Haskell, Channels, STM, -threaded, Message Passing
我正在尝试使用 channels/STM 来实现 Haskell 中的消息传递。也许这是一个糟糕的主意,并且有更好的方法 implement/use 消息传递 Haskell。如果是这种情况,请告诉我;然而,我的探索开启了一些关于并发 Haskell.
的基本问题
我听说过关于 STM 的好消息,尤其是 Haskell 中的实施。由于它支持读取和写入,并且具有一些安全优势,我认为可以从这里开始。这提出了我最大的问题:does
msg <- atomically $ readTChan chan
其中 chan 是 TChan Int,导致等待通道上有值的等待?
考虑以下程序:
p chan = do
atomically $ writeTChan chan 1
atomically $ writeTChan chan 2
q chan = do
msg1 <- atomically $ readTChan chan
msg2 <- atomically $ readTChan chan
-- for testing purposes
putStrLn $ show msg1
putStrLn $ show msg2
main = do
chan <- atomically $ newTChan
p chan
q chan
用 ghc --make -threaded 编译它,然后 运行 程序,实际上你得到 1 后跟 2 打印到控制台。现在,假设我们
main = do
chan <- atomically $ newTChan
forkIO $ p chan
forkIO $ q chan
相反。现在,如果我们使用-threaded,它要么不打印任何内容,要么打印 1,或者 1 后跟 2 到终端;但是,如果您不使用 -threaded 进行编译,它总是打印 1 后跟 2。 问题 2:-threaded 和非 -threaded 之间有什么区别?我想它们并不是真正 运行 并发的东西,它们只是 运行 一个接一个。这与下文一致。
现在,在我的想法中,如果我同时拥有 p 和 q 运行ning;即我 forkIO 了他们,他们应该能够以相反的顺序 运行。假设
main = do
chan <- atomically newTChan
forkIO $ q chan
forkIO $ p chan
现在,如果我在没有 -threaded 的情况下编译它,我永远不会将任何内容打印到控制台。如果我用 -threaded 编译,我有时会这样做。虽然,1 后跟 2 的情况非常罕见——通常只有 1 或什么都没有。我也用 Control.Concurrent.Chan 试过了,得到了一致的结果。
第二个大问题:channels和fork怎么玩,上面的程序是怎么回事?
好歹我也不能这么天真地用STM来模拟消息传递。也许 Cloud Haskell 是解决这些问题的一个选项——我真的不知道。任何有关如何使消息传递缺少序列化的信息 ~~> 写入套接字 ~~> 从套接字读取 ~~> 反序列化将不胜感激。
不,你的想法是对的 - 这就是 TChan
的用途 - 你只是错过了 forkIO
的一个小问题:
问题是您的主线程不会等待使用 forkIO
(see here for reference)
创建的线程终止
所以如果我使用参考文献中给出的提示:
import Control.Concurrent
import Control.Concurrent.STM
p :: Num a => TChan a -> IO ()
p chan = do
atomically $ writeTChan chan 1
atomically $ writeTChan chan 2
q chan = do
msg1 <- atomically $ readTChan chan
msg2 <- atomically $ readTChan chan
-- for testing purposes
putStrLn $ show msg1
putStrLn $ show msg2
main :: IO ()
main = do
children <- newMVar []
chan <- atomically $ newTChan
_ <- forkChild children $ p chan
_ <- forkChild children $ q chan
waitForChildren children
return ()
waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
cs <- takeMVar children
case cs of
[] -> return ()
m:ms -> do
putMVar children ms
takeMVar m
waitForChildren children
forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
mvar <- newEmptyMVar
childs <- takeMVar children
putMVar children (mvar:childs)
forkFinally io (\_ -> putMVar mvar ())
它按预期工作:
d:/Temp $ ghc --make -threaded tchan.hs
[1 of 1] Compiling Main ( tchan.hs, tchan.o )
Linking tchan.exe ...
d:/Temp $ ./tchan.exe
1
2
d:/Temp $
当然,如果您将调用切换到 p
和 q
,它当然会继续工作
我正在尝试使用 channels/STM 来实现 Haskell 中的消息传递。也许这是一个糟糕的主意,并且有更好的方法 implement/use 消息传递 Haskell。如果是这种情况,请告诉我;然而,我的探索开启了一些关于并发 Haskell.
的基本问题我听说过关于 STM 的好消息,尤其是 Haskell 中的实施。由于它支持读取和写入,并且具有一些安全优势,我认为可以从这里开始。这提出了我最大的问题:does
msg <- atomically $ readTChan chan
其中 chan 是 TChan Int,导致等待通道上有值的等待?
考虑以下程序:
p chan = do
atomically $ writeTChan chan 1
atomically $ writeTChan chan 2
q chan = do
msg1 <- atomically $ readTChan chan
msg2 <- atomically $ readTChan chan
-- for testing purposes
putStrLn $ show msg1
putStrLn $ show msg2
main = do
chan <- atomically $ newTChan
p chan
q chan
用 ghc --make -threaded 编译它,然后 运行 程序,实际上你得到 1 后跟 2 打印到控制台。现在,假设我们
main = do
chan <- atomically $ newTChan
forkIO $ p chan
forkIO $ q chan
相反。现在,如果我们使用-threaded,它要么不打印任何内容,要么打印 1,或者 1 后跟 2 到终端;但是,如果您不使用 -threaded 进行编译,它总是打印 1 后跟 2。 问题 2:-threaded 和非 -threaded 之间有什么区别?我想它们并不是真正 运行 并发的东西,它们只是 运行 一个接一个。这与下文一致。
现在,在我的想法中,如果我同时拥有 p 和 q 运行ning;即我 forkIO 了他们,他们应该能够以相反的顺序 运行。假设
main = do
chan <- atomically newTChan
forkIO $ q chan
forkIO $ p chan
现在,如果我在没有 -threaded 的情况下编译它,我永远不会将任何内容打印到控制台。如果我用 -threaded 编译,我有时会这样做。虽然,1 后跟 2 的情况非常罕见——通常只有 1 或什么都没有。我也用 Control.Concurrent.Chan 试过了,得到了一致的结果。
第二个大问题:channels和fork怎么玩,上面的程序是怎么回事?
好歹我也不能这么天真地用STM来模拟消息传递。也许 Cloud Haskell 是解决这些问题的一个选项——我真的不知道。任何有关如何使消息传递缺少序列化的信息 ~~> 写入套接字 ~~> 从套接字读取 ~~> 反序列化将不胜感激。
不,你的想法是对的 - 这就是 TChan
的用途 - 你只是错过了 forkIO
的一个小问题:
问题是您的主线程不会等待使用 forkIO
(see here for reference)
所以如果我使用参考文献中给出的提示:
import Control.Concurrent
import Control.Concurrent.STM
p :: Num a => TChan a -> IO ()
p chan = do
atomically $ writeTChan chan 1
atomically $ writeTChan chan 2
q chan = do
msg1 <- atomically $ readTChan chan
msg2 <- atomically $ readTChan chan
-- for testing purposes
putStrLn $ show msg1
putStrLn $ show msg2
main :: IO ()
main = do
children <- newMVar []
chan <- atomically $ newTChan
_ <- forkChild children $ p chan
_ <- forkChild children $ q chan
waitForChildren children
return ()
waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
cs <- takeMVar children
case cs of
[] -> return ()
m:ms -> do
putMVar children ms
takeMVar m
waitForChildren children
forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
mvar <- newEmptyMVar
childs <- takeMVar children
putMVar children (mvar:childs)
forkFinally io (\_ -> putMVar mvar ())
它按预期工作:
d:/Temp $ ghc --make -threaded tchan.hs
[1 of 1] Compiling Main ( tchan.hs, tchan.o )
Linking tchan.exe ...
d:/Temp $ ./tchan.exe
1
2
d:/Temp $
当然,如果您将调用切换到 p
和 q
,它当然会继续工作