在流式传输管道中将 ResourceT 与支架组合

Combining ResourceT with bracket in a streaming pipeline

这里是我的代码的简化:

import Database.PostgreSQL.Simple (Connection)
import qualified Streaming.Prelude as S
import Streaming.ByteString.Char8 as C
import Streaming.Zip (gunzip)
import Streaming

main :: IO ()
main = do
  res <- runResourceT $ calculateA myLinesStream
  return ()

type MyLinesStream m r = S.Stream (S.Of String) m r

connect :: IO Connection
connect = undefined

close :: Connection -> IO ()
close = undefined

calculateA :: MonadIO m => MyLinesStream m r -> m ()
calculateA stream = liftIO (bracket connect close (go stream))
  where
    go :: MonadIO m => MyLinesStream m r -> Connection -> m ()
    go stream conn = stream & S.length_ >>= liftIO . print

myLinesStream :: (MonadIO m, MonadResource m) => MyLinesStream m ()
myLinesStream = do
  S.each ["1.zip", "2.zip"]
    & S.mapM (\fileName -> C.readFile fileName & gunzip)
    & S.mconcat
    & C.lines
    & mapsM (S.toList . C.unpack)
    & void

go stream 上的以下行存在类型错误:

calculateA stream = liftIO (bracket connect close (go stream))

错误说:

Couldn't match type ‘m’ with ‘IO’
  ‘m’ is a rigid type variable bound by
    the type signature for:
      calculateA :: forall (m :: * -> *) r.
                    MonadIO m =>
                    MyLinesStream m r -> m ()
Expected type: Connection -> IO ()
    Actual type: Connection -> m ()

问题

  1. 如何使此代码进行类型检查并使其在 calculateA 函数中释放资源时仍然安全?
  2. 我正在使用 C.readFile 读取多个文件,然后将其包装在 runResourceT 中。这会正确释放所有文件句柄吗?
  3. 构图好不好? (请注意,我需要 calculateA 函数与 myLinesStream 分开)

问题是您试图将 bracket 与过于笼统的 monad 一起使用。 bracket 有签名:

bracket :: IO a -> (a -> IO b) -> (a -> IO c) -> IO c   

它将IO个动作作为参数。但是,您传递给 bracketgo 函数需要在由 calculateA 的调用者选择的通用基础 monad m 中工作(您稍后将该 monad 设为 ResourceT IOmain).

来自 baseResourceTbracket 不能很好地混合。相反,您需要转向 resourcet 包中的特殊函数,例如 allocate and release,并使用它们来定义一个辅助函数,例如:

bracketStream :: (Functor f, MonadResource m) 
              => IO a 
              -> (a -> IO ()) 
              -> (a -> Stream f m b) 
              -> Stream f m b
bracketStream alloc free inside = do
        (key, seed) <- lift (allocate alloc free)
        r <- inside seed
        release key
        pure r

它是如何工作的?如果你有一个 Xs 的流,它会在流的开头添加一个分配操作(注册在异常终止的情况下调用的相应清理操作,如异常)并且它还会添加一个显式调用流耗尽时的清理操作:

(allocate+register cleanup) X X X ... X (cleanup)

您写道:

I'm reading multiple files using C.readFile and then wrapping it inside runResourceT. Will this properly release all the file handles?

是的。使用 ResourceT,当执行显式清理操作时,或者当我们使用 runResourceT“退出”ResourceT 时(可能异常,有例外),资源将被释放。

因此,如果我们读取 X 流后跟 Y 流,我们将:

(allocate+register cleanup) X X X ... X (cleanup) (allocate+register cleanup) Y Y Y ... Y (cleanup)

即在分配产生Ys的资源之前释放产生Xs的资源