在下一步中使用结果流式传输管道
Streaming pipeline using result in next step
我正在使用 streaming
包。
我想通过保留常量内存,将 S.store
定义的一个步骤的结果用作管道中后续步骤的参数。 myStream
从文件加载和解析。
我有一个类型检查的示例:
import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M
data A = MkA deriving (Show)
insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined
insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined
myStream :: S.Stream (S.Of A) IO r
myStream = undefined
run :: IO ()
run =
myStream
& S.store insertA
& insertB M.empty
& print
然而,& insertB M.empty
行采用的是一张空地图,但我想使用来自 insertA
函数的上一步中的地图。
insertB
函数然后使用此 Map 进行查找。
我能想到的解决方法如下:
run :: IO ()
run =
myStream
& S.store insertA
& ( \e -> do
resultMap <- S.effects e
insertB resultMap e
)
& print
问题
这是否会在常量内存中保留像 运行 这样的流媒体优势?
它是如何在后台解决这个问题的,因为流需要作为一个整体进行处理才能获得 Map
?它多次传递相同的流 - 从文件加载它 2 次以保留常量内存?
如果是这种情况(加载文件 2 次),如果流的源不是来自解析文件而是来自某些只能读取一次的数据流怎么办?
对于这个问题,是否还有其他优雅的解决方案同时具有流式传输的优势,其中管道中的下一步需要使用前一步的结果?
这里建议的代码有问题:
resultMap <- S.effects e
insertB resultMap e
问题是您“运行宁”相同流两次,即usually problematic for IO
-based streams。
例如,假设 myStream
从文件句柄中读取。当我们为第二遍调用 insertB
时,effects
已经达到 end-of-file!从句柄中的任何进一步读取都不会 return 任何数据。
当然,我们可以用两个不同的流读取同一个文件两次。这保留了流式传输,但需要两次传递。
应该注意的是,对于某些具有 built-in 资源管理的基础 monad,例如 resourcet,您 可以 运行相同的 Stream
值两次,因为流代码足够“智能”,可以在每次流 运行.
时分配和释放底层资源
例如Stream
type present in linear-base supports the function readFile
的版本:
readFile :: FilePath -> Stream (Of Text) RIO ()
其中 return 一个 Stream
在 resource-aware IO
工作。
也就是说,我不喜欢在流式传输管道中隐藏这种重复读取文件的行为,这让我感到困惑。
我正在使用 streaming
包。
我想通过保留常量内存,将 S.store
定义的一个步骤的结果用作管道中后续步骤的参数。 myStream
从文件加载和解析。
我有一个类型检查的示例:
import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M
data A = MkA deriving (Show)
insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined
insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined
myStream :: S.Stream (S.Of A) IO r
myStream = undefined
run :: IO ()
run =
myStream
& S.store insertA
& insertB M.empty
& print
然而,& insertB M.empty
行采用的是一张空地图,但我想使用来自 insertA
函数的上一步中的地图。
insertB
函数然后使用此 Map 进行查找。
我能想到的解决方法如下:
run :: IO ()
run =
myStream
& S.store insertA
& ( \e -> do
resultMap <- S.effects e
insertB resultMap e
)
& print
问题
这是否会在常量内存中保留像 运行 这样的流媒体优势?
它是如何在后台解决这个问题的,因为流需要作为一个整体进行处理才能获得 Map
?它多次传递相同的流 - 从文件加载它 2 次以保留常量内存?
如果是这种情况(加载文件 2 次),如果流的源不是来自解析文件而是来自某些只能读取一次的数据流怎么办?
对于这个问题,是否还有其他优雅的解决方案同时具有流式传输的优势,其中管道中的下一步需要使用前一步的结果?
这里建议的代码有问题:
resultMap <- S.effects e
insertB resultMap e
问题是您“运行宁”相同流两次,即usually problematic for IO
-based streams。
例如,假设 myStream
从文件句柄中读取。当我们为第二遍调用 insertB
时,effects
已经达到 end-of-file!从句柄中的任何进一步读取都不会 return 任何数据。
当然,我们可以用两个不同的流读取同一个文件两次。这保留了流式传输,但需要两次传递。
应该注意的是,对于某些具有 built-in 资源管理的基础 monad,例如 resourcet,您 可以 运行相同的 Stream
值两次,因为流代码足够“智能”,可以在每次流 运行.
例如Stream
type present in linear-base supports the function readFile
的版本:
readFile :: FilePath -> Stream (Of Text) RIO ()
其中 return 一个 Stream
在 resource-aware IO
工作。
也就是说,我不喜欢在流式传输管道中隐藏这种重复读取文件的行为,这让我感到困惑。