以 Servant 和 mtl 风格流式传输

Stream with Servant and mtl style

我使用 servant 定义了以下端点:

type ServiceAPI = "maintenance" :> Get '[PlainText] Text

myServer ::
     MonadIO m
  => MonadLog m
  => MonadMetrics m
  => MonadRandom m
  => Config
  -> Client
  -> ServerT ServiceAPI m
myServer cfg client = ...

有时内容太大,无法立即返回,导致HTTP请求超时。我想将此服务转换为某种基于流的响应。类似于:

type ServiceAPI = "maintenance" :> StreamGet NewlineFraming PlainText (SourceIO Text) -- or SourceT m Text

但是,我不明白/弄清楚如何更新 myServer 以与流媒体 SourceIO (SourceT m) 很好地播放。我相信它不能与 SourceIO 一起使用,因为 type SourceIO = SourceT IO 并且这里我们有一些其他的 monad 堆栈。

看来你需要一个SourceT m Text。查看Servant.Types.SourceT,定义为

newtype SourceT m a = SourceT
    { unSourceT :: forall b. (StepT m a -> m b) -> m b
    }

所以,我们给定一个消费者函数StepT m Text -> m b,我们需要向它传递一个StepT m Text

我们可能会问,Servant为什么不直接require StepT m Text,而不是这个continuation-passing定义呢?答案是连续传递定义允许您插入类似 bracket 的操作,例如,在开头打开文件并确保文件在流式传输完成后关闭。

我在您的签名中看到的一个可能的问题是约束不支持类似括号的操作。你需要像 MonadUnliftIO or MonadMask 这样的东西。 MonadIO 不够。

假设你的 monad 有一个 MonadUnliftIO 的实例,那么你可以像这样流式传输一个 Text 文件:

 {-# LANGUAGE ScopedTypeVariables #-}
import Control.Monad.IO.Unlift -- from "unliftio-core"
import Data.Text
import Data.Text.IO
import Servant.API
import Servant.Server
import Servant.Types.SourceT
import System.FilePath
import System.IO

serveText :: forall m. MonadUnliftIO m => FilePath -> SourceT m Text
serveText filePath = SourceT $ \consumer ->
  withRunInIO $ \unlift ->
    withFile filePath ReadMode $ \handle -> do
      let steps :: StepT m Text
          steps =
            Effect
              ( do
                  eof <- liftIO $ hIsEOF handle
                  if eof
                    then pure Stop
                    else do
                      line <- liftIO $ Data.Text.IO.hGetLine handle
                      pure (Yield line steps) -- recurse for more lines
              )
      -- we get down to IO to satisfy the signature of withFile,
      -- the withRunInIO brings us back to m
      unlift (consumer steps)

(小心 hGetLine 的天真使用,因为它使用系统的默认编码。在实践中,像 streamDecodeUtf8 这样的编码可能会更好。)

一些注意事项:

  • 只有 ReaderT 同构的 monad 可以是 MonadUnliftIO 的实例,而你的可能不符合这种模式。也许是 MonadMask.
  • 或许,您可以尝试使用 resourcet 包,而不是使用经典的 bracketwithField
  • 有适用于流行流媒体库的适配器,使您不必直接定义 StepT

一如既往,答案取决于您尝试输入的方式。 我们想要 return 一个流,为此我们更新 ServiceAPI 的类型如下:

type ServiceAPI = "maintenance" :> StreamGet NewlineFraming PlainText (SourceIO Text)

myServer 的类型保持不变。由于我是 servant 的新手,所以我没有意识到我们想要 return 是 SourceT IO Text 在任何 myServer 计算的主体中。 为了更好地查看类型,我们假设 myServer 主体调用函数 myFunc 如下,其中 myFunc 在 returning Text[Text] 之前(此解决方案只需最少的重构工作即可适用于两者)。

myServer ::
     MonadIO m
  => MonadLog m
  => MonadMetrics m
  => MonadRandom m
  => Config
  -> ServerT ServiceAPI m
myServer cfg = myFunc cfg

myFunc ::
     MonadIO m
  => MonadLog m
  => Config
  -> m (SourceT IO Text)
myFunc cfg client = do
  ...
  let ls = _ :: [Text] -- some [Text]
  pure $ source ls

实现是我们不希望myFunc到returnSourceT m Text而是m (SourceT IO Text)。缺少的部分之一是 source :: [a] -> SourceT n a 并且我们想要 n = IO,这样我们就可以 return SourceIO = SourceT IO.

(我的错误是将 myFunc 的 return 类型理解为 SourceT m Text,而实际上应该是 m (SourceT IO Text),类型检查器是不允许这样做)