在流式传输管道中将 ResourceT 与支架组合
Combining ResourceT with bracket in a streaming pipeline
这里是我的代码的简化:
import Database.PostgreSQL.Simple (Connection)
import qualified Streaming.Prelude as S
import Streaming.ByteString.Char8 as C
import Streaming.Zip (gunzip)
import Streaming
main :: IO ()
main = do
res <- runResourceT $ calculateA myLinesStream
return ()
type MyLinesStream m r = S.Stream (S.Of String) m r
connect :: IO Connection
connect = undefined
close :: Connection -> IO ()
close = undefined
calculateA :: MonadIO m => MyLinesStream m r -> m ()
calculateA stream = liftIO (bracket connect close (go stream))
where
go :: MonadIO m => MyLinesStream m r -> Connection -> m ()
go stream conn = stream & S.length_ >>= liftIO . print
myLinesStream :: (MonadIO m, MonadResource m) => MyLinesStream m ()
myLinesStream = do
S.each ["1.zip", "2.zip"]
& S.mapM (\fileName -> C.readFile fileName & gunzip)
& S.mconcat
& C.lines
& mapsM (S.toList . C.unpack)
& void
go stream
上的以下行存在类型错误:
calculateA stream = liftIO (bracket connect close (go stream))
错误说:
Couldn't match type ‘m’ with ‘IO’
‘m’ is a rigid type variable bound by
the type signature for:
calculateA :: forall (m :: * -> *) r.
MonadIO m =>
MyLinesStream m r -> m ()
Expected type: Connection -> IO ()
Actual type: Connection -> m ()
问题
- 如何使此代码进行类型检查并使其在
calculateA
函数中释放资源时仍然安全?
- 我正在使用
C.readFile
读取多个文件,然后将其包装在 runResourceT
中。这会正确释放所有文件句柄吗?
- 构图好不好? (请注意,我需要
calculateA
函数与 myLinesStream
分开)
问题是您试图将 bracket
与过于笼统的 monad 一起使用。 bracket
有签名:
bracket :: IO a -> (a -> IO b) -> (a -> IO c) -> IO c
它将IO
个动作作为参数。但是,您传递给 bracket
的 go
函数需要在由 calculateA
的调用者选择的通用基础 monad m
中工作(您稍后将该 monad 设为 ResourceT IO
在 main
).
来自 base 和 ResourceT
的 bracket
不能很好地混合。相反,您需要转向 resourcet 包中的特殊函数,例如 allocate
and release
,并使用它们来定义一个辅助函数,例如:
bracketStream :: (Functor f, MonadResource m)
=> IO a
-> (a -> IO ())
-> (a -> Stream f m b)
-> Stream f m b
bracketStream alloc free inside = do
(key, seed) <- lift (allocate alloc free)
r <- inside seed
release key
pure r
它是如何工作的?如果你有一个 X
s 的流,它会在流的开头添加一个分配操作(注册在异常终止的情况下调用的相应清理操作,如异常)并且它还会添加一个显式调用流耗尽时的清理操作:
(allocate+register cleanup) X X X ... X (cleanup)
您写道:
I'm reading multiple files using C.readFile and then wrapping it
inside runResourceT. Will this properly release all the file handles?
是的。使用 ResourceT
,当执行显式清理操作时,或者当我们使用 runResourceT
“退出”ResourceT
时(可能异常,有例外),资源将被释放。
因此,如果我们读取 X 流后跟 Y 流,我们将:
(allocate+register cleanup) X X X ... X (cleanup) (allocate+register cleanup) Y Y Y ... Y (cleanup)
即在分配产生Ys的资源之前释放产生Xs的资源
这里是我的代码的简化:
import Database.PostgreSQL.Simple (Connection)
import qualified Streaming.Prelude as S
import Streaming.ByteString.Char8 as C
import Streaming.Zip (gunzip)
import Streaming
main :: IO ()
main = do
res <- runResourceT $ calculateA myLinesStream
return ()
type MyLinesStream m r = S.Stream (S.Of String) m r
connect :: IO Connection
connect = undefined
close :: Connection -> IO ()
close = undefined
calculateA :: MonadIO m => MyLinesStream m r -> m ()
calculateA stream = liftIO (bracket connect close (go stream))
where
go :: MonadIO m => MyLinesStream m r -> Connection -> m ()
go stream conn = stream & S.length_ >>= liftIO . print
myLinesStream :: (MonadIO m, MonadResource m) => MyLinesStream m ()
myLinesStream = do
S.each ["1.zip", "2.zip"]
& S.mapM (\fileName -> C.readFile fileName & gunzip)
& S.mconcat
& C.lines
& mapsM (S.toList . C.unpack)
& void
go stream
上的以下行存在类型错误:
calculateA stream = liftIO (bracket connect close (go stream))
错误说:
Couldn't match type ‘m’ with ‘IO’
‘m’ is a rigid type variable bound by
the type signature for:
calculateA :: forall (m :: * -> *) r.
MonadIO m =>
MyLinesStream m r -> m ()
Expected type: Connection -> IO ()
Actual type: Connection -> m ()
问题
- 如何使此代码进行类型检查并使其在
calculateA
函数中释放资源时仍然安全? - 我正在使用
C.readFile
读取多个文件,然后将其包装在runResourceT
中。这会正确释放所有文件句柄吗? - 构图好不好? (请注意,我需要
calculateA
函数与myLinesStream
分开)
问题是您试图将 bracket
与过于笼统的 monad 一起使用。 bracket
有签名:
bracket :: IO a -> (a -> IO b) -> (a -> IO c) -> IO c
它将IO
个动作作为参数。但是,您传递给 bracket
的 go
函数需要在由 calculateA
的调用者选择的通用基础 monad m
中工作(您稍后将该 monad 设为 ResourceT IO
在 main
).
来自 base 和 ResourceT
的 bracket
不能很好地混合。相反,您需要转向 resourcet 包中的特殊函数,例如 allocate
and release
,并使用它们来定义一个辅助函数,例如:
bracketStream :: (Functor f, MonadResource m)
=> IO a
-> (a -> IO ())
-> (a -> Stream f m b)
-> Stream f m b
bracketStream alloc free inside = do
(key, seed) <- lift (allocate alloc free)
r <- inside seed
release key
pure r
它是如何工作的?如果你有一个 X
s 的流,它会在流的开头添加一个分配操作(注册在异常终止的情况下调用的相应清理操作,如异常)并且它还会添加一个显式调用流耗尽时的清理操作:
(allocate+register cleanup) X X X ... X (cleanup)
您写道:
I'm reading multiple files using C.readFile and then wrapping it inside runResourceT. Will this properly release all the file handles?
是的。使用 ResourceT
,当执行显式清理操作时,或者当我们使用 runResourceT
“退出”ResourceT
时(可能异常,有例外),资源将被释放。
因此,如果我们读取 X 流后跟 Y 流,我们将:
(allocate+register cleanup) X X X ... X (cleanup) (allocate+register cleanup) Y Y Y ... Y (cleanup)
即在分配产生Ys的资源之前释放产生Xs的资源