为流媒体库适配 Store 解码

Adapting Store decoding for streaming library

我正在尝试使用名为 decodeMessageBS 的函数来调整 Store encoding and decoding for Streaming. Store already implements a streaming decode

我尝试为 Streaming 执行 store 反序列化的基本实现,如下所示(暂时没有 bracket 以保持简单)。但是,popper 的逻辑似乎有问题,因为 decodeMessageBS 一直在抛出 PeekException:

{-# LANGUAGE  RankNTypes #-}
import Streaming.Prelude as S hiding (print,show)
import Data.IORef
import Streaming as S
import qualified Data.ByteString as BS (ByteString,empty,length)
import System.IO.ByteBuffer
import Data.Store
import Data.Store.Streaming

streamDecode :: forall a. (Store a) => ByteBuffer -> Stream (Of BS.ByteString) IO () -> Stream (Of a) IO ()
streamDecode bb inp = do
    ref <- lift $ newIORef inp 
    let popper = do
        r <- S.uncons =<< readIORef ref
        case r of
          Nothing -> return Nothing 
          Just (a,rest) -> writeIORef ref rest >> return (Just a)
    let go = do
          r <- lift $ decodeMessageBS bb $ popper
          lift $ print "Decoding"
          case r of 
            Nothing -> return ()
            Just msg -> (lift $ print "Message found") >> (S.yield . fromMessage $ msg) >> go
    go 

我可以用 decodeIOPortionWith 很好地解码我的测试文件 - 所以,问题似乎出在提供 decodeMessageBS 所需的逻辑上。将感谢有关 popper 逻辑错误的指示。

PeekException 的发生是因为 Store 在以流模式保存消息时使用了不同的格式,这与 Binary 不同。当使用 decodeMessageBS 函数时,它期望 Message 类型的包装器围绕 Store 数据。 decodeIOPortionWith 不需要 Message 包装器,因此可以很好地处理保存下来的 Store 数据。在我修复序列化以将数据保存为 Message 编码后,decodeMessageBS 在该数据上运行良好。