在下一步中使用结果流式传输管道

Streaming pipeline using result in next step

我正在使用 streaming 包。 我想通过保留常量内存,将 S.store 定义的一个步骤的结果用作管道中后续步骤的参数。 myStream 从文件加载和解析。

我有一个类型检查的示例:

import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M

data A = MkA deriving (Show)

insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined

insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined

myStream :: S.Stream (S.Of A) IO r
myStream = undefined

run :: IO ()
run =
  myStream
    & S.store insertA
    & insertB M.empty
    & print

然而,& insertB M.empty 行采用的是一张空地图,但我想使用来自 insertA 函数的上一步中的地图。 insertB 函数然后使用此 Map 进行查找。

我能想到的解决方法如下:

run :: IO ()
run =
  myStream
    & S.store insertA
    & ( \e -> do
          resultMap <- S.effects e
          insertB resultMap e
      )
    & print

问题

这是否会在常量内存中保留像 运行 这样的流媒体优势? 它是如何在后台解决这个问题的,因为流需要作为一个整体进行处理才能获得 Map?它多次传递相同的流 - 从文件加载它 2 次以保留常量内存?

如果是这种情况(加载文件 2 次),如果流的源不是来自解析文件而是来自某些只能读取一次的数据流怎么办?

对于这个问题,是否还有其他优雅的解决方案同时具有流式传输的优势,其中管道中的下一步需要使用前一步的结果?

这里建议的代码有问题:

  resultMap <- S.effects e
  insertB resultMap e

问题是您“运行宁”相同流两次,即usually problematic for IO-based streams

例如,假设 myStream 从文件句柄中读取。当我们为第二遍调用 insertB 时,effects 已经达到 end-of-file!从句柄中的任何进一步读取都不会 return 任何数据。

当然,我们可以用两个不同的流读取同一个文件两次。这保留了流式传输,但需要两次传递。


应该注意的是,对于某些具有 built-in 资源管理的基础 monad,例如 resourcet,您 可以 运行相同的 Stream 值两次,因为流代码足够“智能”,可以在每次流 运行.

时分配和释放底层资源

例如Stream type present in linear-base supports the function readFile的版本:

readFile :: FilePath -> Stream (Of Text) RIO ()

其中 return 一个 Streamresource-aware IO 工作。

也就是说,我不喜欢在流式传输管道中隐藏这种重复读取文件的行为,这让我感到困惑。