使用 postgresql-simple 创建流式管道源
Creating a streaming Conduit Source with postgresql-simple
postgresql-simple
提供流式查询的函数,例如
fold
:: (FromRow row, ToRow params)
=> Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a
我想创建一个充分利用流媒体的管道源。
mySource :: (FromRow row, Monad m) => Source m row
不幸的是,因为 IO
出现在 fold
中的逆变位置(我认为?),我真的很难处理这些类型。以下类型检查,但在产生值之前折叠整个流。
getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
where
foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
foo cond evt = pure (cond >> C.yield evt)
任何关于如何实现它的指示都将不胜感激!谢谢!
一个(不太好)的方法来解决这个问题
- 创建一个新的
TMChan
来接收行
- 设置
foreach_
仅将行转储到该频道
- 终于用
stm-conduit
制作出频道的来源
我没有办法测试这个off-hand,但下面的方法应该可行
import Conduit
import Database.PostgreSQL.Simple (foreach_)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically)
mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row)
mySource connection query = do
chan <- newTMChanIO
forEach_ connection query (atomically . writeTMChan chan)
pure (sourceTMChan chan)
如果我们有 forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m ()) -> m ()
这可能会更容易...
这里是对 Alec 上面的编译和运行的修改。 mkPgSource
是 Alec 在其 post.
末尾提到的通用函数
import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.FromRow
import Database.PostgreSQL.Simple.ToRow
import Control.Monad.IO.Class (MonadIO)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan,
closeTMChan, TMChan)
import GHC.Conc (atomically, forkIO)
import Conduit
--closes the channel after action is done to terminate the source
mkPgSource :: (MonadIO m, FromRow r) => ((r -> IO ()) -> IO ()) -> IO (Source m r)
mkPgSource action = do
chan <- newTMChanIO
_ <- forkIO $ do action $ atomically . (writeTMChan chan)
atomically $ closeTMChan chan
pure $ sourceTMChan chan
sourceQuery :: (ToRow params, FromRow r, MonadIO m) =>
Connection -> Query -> params -> IO (Source m r)
sourceQuery conn q params = mkPgSource $ forEach conn q params
sourceQuery_ :: (FromRow r, MonadIO m) => Connection -> Query -> IO
(Source m r)
sourceQuery_ conn q = mkPgSource $ forEach_ conn q
postgresql-simple
提供流式查询的函数,例如
fold
:: (FromRow row, ToRow params)
=> Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a
我想创建一个充分利用流媒体的管道源。
mySource :: (FromRow row, Monad m) => Source m row
不幸的是,因为 IO
出现在 fold
中的逆变位置(我认为?),我真的很难处理这些类型。以下类型检查,但在产生值之前折叠整个流。
getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
where
foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
foo cond evt = pure (cond >> C.yield evt)
任何关于如何实现它的指示都将不胜感激!谢谢!
一个(不太好)的方法来解决这个问题
- 创建一个新的
TMChan
来接收行 - 设置
foreach_
仅将行转储到该频道 - 终于用
stm-conduit
制作出频道的来源
我没有办法测试这个off-hand,但下面的方法应该可行
import Conduit
import Database.PostgreSQL.Simple (foreach_)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically)
mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row)
mySource connection query = do
chan <- newTMChanIO
forEach_ connection query (atomically . writeTMChan chan)
pure (sourceTMChan chan)
如果我们有 forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m ()) -> m ()
这可能会更容易...
这里是对 Alec 上面的编译和运行的修改。 mkPgSource
是 Alec 在其 post.
import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.FromRow
import Database.PostgreSQL.Simple.ToRow
import Control.Monad.IO.Class (MonadIO)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan,
closeTMChan, TMChan)
import GHC.Conc (atomically, forkIO)
import Conduit
--closes the channel after action is done to terminate the source
mkPgSource :: (MonadIO m, FromRow r) => ((r -> IO ()) -> IO ()) -> IO (Source m r)
mkPgSource action = do
chan <- newTMChanIO
_ <- forkIO $ do action $ atomically . (writeTMChan chan)
atomically $ closeTMChan chan
pure $ sourceTMChan chan
sourceQuery :: (ToRow params, FromRow r, MonadIO m) =>
Connection -> Query -> params -> IO (Source m r)
sourceQuery conn q params = mkPgSource $ forEach conn q params
sourceQuery_ :: (FromRow r, MonadIO m) => Connection -> Query -> IO
(Source m r)
sourceQuery_ conn q = mkPgSource $ forEach_ conn q