保持状态的管道

Pipe that maintains state

我正在尝试使用 pipes.

计算大文件的滚动哈希值 (buzzhash)

目前我有这个。但是不知道如何写一个保持状态的管道。

import qualified Data.ByteString.Lazy as L
import Data.Word
import Data.Bits(xor,rotate)
import Data.Array
import Pipes
import Control.Monad.State.Strict
import Control.Monad(forever)

produceFromList (x:xs) = do 
  yield x
  produceFromList xs

buzzHash = do
  x <- await
  h <- lift $ get -- pull out previous value
  let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
  lift $ put h' -- save new value 
  yield h'

stdoutLn :: Consumer Word64 IO ()
stdoutLn = do 
  a <- await 
  lift $ print a

main = do 
  bs <- L.unpack `fmap` L.getContents
  runEffect $ produceFromList bs >-> buzzHash >-> stdoutLn

hashArrW8 :: Array Word8 Word64

如何让buzzHash保存上一个值并用于计算下一个值?初始状态值应为 0。

你快到了;你只需要 运行 状态。

main = do
  bs <- L.unpack `fmap` L.getContents
  flip execStateT 0 $ runEffect $ produceList bs >-> buzzHash >-> hoist lift stdoutLn

我假设你不想恢复状态,所以我使用 execStateT 而不是 runStateT

这里唯一的好奇是 stdoutLn 被标记为 Consumer Word64 IO () 。所以我使用 hoist lift 使其成为 Consumer Word64 (StateT Word64 IO) () 系列中的所有内容 a >-> b >-> c 必须在底层 monad 和 return 类型中一致。

这里有一些进一步的评论可能会节省您的时间。首先 produceFromListeach.

此外,您可以通过重新标记 stdoutLn:

来避免 hoist lift
stdoutLn :: MonadIO m => Consumer Word64 m ()
stdoutLn = do 
   a <- await 
   liftIO $ print a

但是这里有一些麻烦:你没有重复这个动作。这应该很明显是一个循环:

stdoutLn :: MonadIO m => Consumer Word64 m ()
stdoutLn = do 
   a <- await 
   liftIO $ print a
   stdoutLn

事实上这已经可以作为P.print,所以我们可以写成

import qualified Pipes.Prelude as P
main = do
  bs <- L.unpack `fmap` L.getContents
  flip execStateT 0 $ runEffect $ each bs >-> buzzHash >-> P.print

如果我理解你的话,buzzHash 也意味着无限期地重复:

buzzHash = do
  x <- await
  h <- lift $ get -- pull out previous value
  let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
  lift $ put h' -- save new value 
  yield h'
  buzzHash

(这是 forever buzzHash,我们在这里使用您的 buzzHash

最后,如果你

 import qualified Pipes.ByteString as PB
 import Control.Lens (view) -- (or Lens.Micro.MTL or Lens.Simple)

我们看到我们不需要惰性字节串 IO,它无论如何都不能正确流式传输。 Pipes.ByteString 已经有了我们想要的 unpack,打包成一个镜头,这样我们就可以在其他地方使用 B.unpack 的地方使用 view PB.unpack。所以最后我们可以写成

main = flip evalStateT 0 $ runEffect $ view PB.unpack PB.stdin >-> buzzHash >-> P.print

一旦它成为这种形式,我们就会看到我们没有使用管道的基础状态,除了 buzzHash,所以我们可以本地化这个

import Pipes.Lift (evalStateP) 
main =  runEffect $ view PB.unpack PB.stdin >-> evalStateP 0 buzzHash >-> P.print

或者,如果您愿意,可以重写

buzzHash' :: Monad m => Word64 -> Pipe Word8 Word64 m r
buzzHash' n = evalStateP n $ forever $ do
    x <- await
    h <- lift $ get -- pull out previous value
    let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
    lift $ put h' -- save new value 
    yield h'

然后你会写

main =  runEffect $ view PB.unpack PB.stdin >-> buzzHash' 0 >-> P.print