如何在 Haskell 中编写事件总线?
How to write an event bus in Haskell?
我正在努力解决 Haskell 中的一个设计问题,我似乎无法以优雅和令人满意的方式解决它。我有一个系统,其核心是基于 事件源 的概念:系统的状态是将一系列事件应用到初始状态的结果。有不同类型的事件,每种类型都通过类型族与系统的特定组件相关:
class Model a where
data Event a :: *
apply :: Event a -> a -> a
instance Model Foo where
data Event Foo = Foo Int
...
instance Model Bar where
data Event Bar = Bar String
...
目前系统是 100% 同步和耦合的,每个模型都可以访问所有其他模型的事件,这很快就会变得一团糟,所以我想通过引入 事件总线来解耦 Bus Events
这样我应该可以写出类似的东西
dispatch :: Bus Events -> Consumer (Event Foo) -> Bus Events
假设 Event Foo
和 Events
之间存在某种形式的子类型化或包含,将 Event Foo
的某些消费者附加到 Bus Events
。
然后我可以通过确保每个消费者 运行 在他们自己的线程中来添加异步性。
从系统的角度来看,这将允许我确保每个组件都是独立可打包的,从而将依赖性限制在所有事件的一个子集中。 Events
类型将在整个应用程序级别定义。
这个问题看起来与离散时间 FRP 看似相似,但我似乎无法理解它...
有没有人处理过类似的事情,如果有,怎么做的?
编辑:
我想出了以下代码,它没有使用 Source
但受到@Cirdec 的提议的极大启发:
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Reader
import qualified Data.Vector as V
type Handlers e = V.Vector (Handler e)
data EventBus e = EventBus { handlers :: Handlers e
, eventQueue :: TChan e
, eventThread :: MVar ThreadId
}
newBus :: IO (EventBus e)
newBus = do
chan <- newTChanIO
var <- newEmptyMVar
return $ EventBus V.empty chan var
addHandler :: Handler e -> EventBus e -> EventBus e
addHandler h b@EventBus{..} = b { handlers = V.snoc handlers h }
removeHandler :: Int -> EventBus e -> EventBus e
removeHandler idx b@EventBus{..} = b { handlers = let (h,t) = V.splitAt idx handlers
in h V.++ V.tail t }
startBus :: EventBus e -> IO (EventBus e)
startBus b@EventBus{..} = do
tid <- forkIO (runBus b)
putMVar eventThread tid
return b
runBus :: EventBus e -> IO ()
runBus b@EventBus{..} = do
_ <- takeMVar eventThread
forever $ do
e <- liftIO $ atomically $ readTChan eventQueue
v <- newTVarIO b
runReaderT (runEvents $ publish e) v
-- | A monad to handle pub/sub of events of type @e@
newtype Events e a = Events { runEvents :: ReaderT (TVar (EventBus e)) IO a }
deriving (Applicative, Functor, Monad, MonadIO, MonadReader (TVar (EventBus e)))
newtype Handler e = Handler { handle :: Events e () -- Unsubscription function
-> Events e (e -> Events e ()) -- what to do with events @e@
}
-- | Register a new @Handler e@ within given @Events e@ context
subscribe :: Handler e -> Events e ()
subscribe h = do
bus <- ask
liftIO $ atomically $ modifyTVar' bus (addHandler h)
unsubscribe :: Int -> Events e ()
unsubscribe idx = do
bus <- ask
liftIO $ atomically $ modifyTVar' bus (removeHandler idx)
publishBus :: EventBus e -> e -> IO ()
publishBus EventBus{..} = atomically . writeTChan eventQueue
publish :: e -> Events e ()
publish event = do
EventBus{..} <- ask >>= liftIO . atomically . readTVar
forM_ (zip (V.toList handlers) [0..]) (dispatch event)
dispatch :: e -> (Handler e, Int) -> Events e ()
dispatch event (Handler h, idx) = do
hdl <- h (unsubscribe idx)
hdl event
printer :: (Show s) => String -> Handler s
printer prefix = Handler ( \ _ -> return $ \ e -> liftIO (putStrLn $ prefix ++ show e))
携带 a
可以订阅的事件源具有以下类型
type Source m a = (a -> m ()) -> m (m ())
| | ^--- how to unsubscribe
| ^--- how to subscribe
^--- what to do when an `a` happens
事件的消费者或处理者天真地接受事件源并订阅它
type Handler m a = (Source m a ) -> m ()
= ((a -> m ()) -> m (m ())) -> m ()
^-- set up the consumer.
这有点令人费解,我们可以颠倒过来并为事件处理程序获得更好的表示:
type Handler m a = m () -> m (a -> m ())
| | ^-- what to do when an `a` happens
| ^-- set up the consumer
^-- how to unsubscribe
原始事件源使用起来有点棘手;订阅者可能想取消订阅以响应事件的发生,在这种情况下,他们需要递归地获取取消订阅的结果,以了解事件发生时要执行的操作。从更好的 Handler
定义开始,我们就没有这个问题了。事件源现在是接受事件处理程序并向其发布的东西。
type Source m a = (Handler m a ) -> m ()
= (m () -> m (a -> m ())) -> m ()
^-- how to subscribe
我正在努力解决 Haskell 中的一个设计问题,我似乎无法以优雅和令人满意的方式解决它。我有一个系统,其核心是基于 事件源 的概念:系统的状态是将一系列事件应用到初始状态的结果。有不同类型的事件,每种类型都通过类型族与系统的特定组件相关:
class Model a where
data Event a :: *
apply :: Event a -> a -> a
instance Model Foo where
data Event Foo = Foo Int
...
instance Model Bar where
data Event Bar = Bar String
...
目前系统是 100% 同步和耦合的,每个模型都可以访问所有其他模型的事件,这很快就会变得一团糟,所以我想通过引入 事件总线来解耦 Bus Events
这样我应该可以写出类似的东西
dispatch :: Bus Events -> Consumer (Event Foo) -> Bus Events
假设 Event Foo
和 Events
之间存在某种形式的子类型化或包含,将 Event Foo
的某些消费者附加到 Bus Events
。
然后我可以通过确保每个消费者 运行 在他们自己的线程中来添加异步性。
从系统的角度来看,这将允许我确保每个组件都是独立可打包的,从而将依赖性限制在所有事件的一个子集中。 Events
类型将在整个应用程序级别定义。
这个问题看起来与离散时间 FRP 看似相似,但我似乎无法理解它...
有没有人处理过类似的事情,如果有,怎么做的?
编辑:
我想出了以下代码,它没有使用 Source
但受到@Cirdec 的提议的极大启发:
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Reader
import qualified Data.Vector as V
type Handlers e = V.Vector (Handler e)
data EventBus e = EventBus { handlers :: Handlers e
, eventQueue :: TChan e
, eventThread :: MVar ThreadId
}
newBus :: IO (EventBus e)
newBus = do
chan <- newTChanIO
var <- newEmptyMVar
return $ EventBus V.empty chan var
addHandler :: Handler e -> EventBus e -> EventBus e
addHandler h b@EventBus{..} = b { handlers = V.snoc handlers h }
removeHandler :: Int -> EventBus e -> EventBus e
removeHandler idx b@EventBus{..} = b { handlers = let (h,t) = V.splitAt idx handlers
in h V.++ V.tail t }
startBus :: EventBus e -> IO (EventBus e)
startBus b@EventBus{..} = do
tid <- forkIO (runBus b)
putMVar eventThread tid
return b
runBus :: EventBus e -> IO ()
runBus b@EventBus{..} = do
_ <- takeMVar eventThread
forever $ do
e <- liftIO $ atomically $ readTChan eventQueue
v <- newTVarIO b
runReaderT (runEvents $ publish e) v
-- | A monad to handle pub/sub of events of type @e@
newtype Events e a = Events { runEvents :: ReaderT (TVar (EventBus e)) IO a }
deriving (Applicative, Functor, Monad, MonadIO, MonadReader (TVar (EventBus e)))
newtype Handler e = Handler { handle :: Events e () -- Unsubscription function
-> Events e (e -> Events e ()) -- what to do with events @e@
}
-- | Register a new @Handler e@ within given @Events e@ context
subscribe :: Handler e -> Events e ()
subscribe h = do
bus <- ask
liftIO $ atomically $ modifyTVar' bus (addHandler h)
unsubscribe :: Int -> Events e ()
unsubscribe idx = do
bus <- ask
liftIO $ atomically $ modifyTVar' bus (removeHandler idx)
publishBus :: EventBus e -> e -> IO ()
publishBus EventBus{..} = atomically . writeTChan eventQueue
publish :: e -> Events e ()
publish event = do
EventBus{..} <- ask >>= liftIO . atomically . readTVar
forM_ (zip (V.toList handlers) [0..]) (dispatch event)
dispatch :: e -> (Handler e, Int) -> Events e ()
dispatch event (Handler h, idx) = do
hdl <- h (unsubscribe idx)
hdl event
printer :: (Show s) => String -> Handler s
printer prefix = Handler ( \ _ -> return $ \ e -> liftIO (putStrLn $ prefix ++ show e))
携带 a
可以订阅的事件源具有以下类型
type Source m a = (a -> m ()) -> m (m ())
| | ^--- how to unsubscribe
| ^--- how to subscribe
^--- what to do when an `a` happens
事件的消费者或处理者天真地接受事件源并订阅它
type Handler m a = (Source m a ) -> m ()
= ((a -> m ()) -> m (m ())) -> m ()
^-- set up the consumer.
这有点令人费解,我们可以颠倒过来并为事件处理程序获得更好的表示:
type Handler m a = m () -> m (a -> m ())
| | ^-- what to do when an `a` happens
| ^-- set up the consumer
^-- how to unsubscribe
原始事件源使用起来有点棘手;订阅者可能想取消订阅以响应事件的发生,在这种情况下,他们需要递归地获取取消订阅的结果,以了解事件发生时要执行的操作。从更好的 Handler
定义开始,我们就没有这个问题了。事件源现在是接受事件处理程序并向其发布的东西。
type Source m a = (Handler m a ) -> m ()
= (m () -> m (a -> m ())) -> m ()
^-- how to subscribe