haskell 管道 - 如何在字节串管道上重复执行 takeWhile 操作?

haskell pipes - how to repeatedly perform a takeWhile operation on a bytestring pipe?

我想做的是使用 takeWhile 按某个字符拆分字节串。

import qualified Data.ByteString.Internal as BS (c2w, w2c)
import Pipes
import Pipes.ByteString as PB
import Pipes.GZip
import Pipes.Prelude as PP
import System.IO

newline = BS.c2w '\n'

splitter = PB.takeWhile (\myWord -> myWord /= newline)

myPipe fileHandle = PP.toListM $ decompress fileProducer >-> splitter
  where
    fileProducer = PB.fromHandle fileHandle       

run = do
  dat <- withFile "somefile.blob" ReadMode myPipe
  pure dat

这让我得到了第一行,但我真正想要的是有效地将每个块一次生成一个换行符。我该怎么做?

pipes-bytestringpipes-group 的排列使得重复打破 Producer ByteString m r 会产生 FreeT (Producer ByteString m) m rFreeT 在这里可以理解为 A_Succession_Of,因此结果可以认为是 'a succession of bytestring-producer segments returning an r'。这样,如果其中一个段的长度为 10 GB,则 we still have streaming rather than a 10 gigabyte strict bytestring

在我看来,您想在换行符上打破字节串生成器,但我不知道您是否想保留换行符。如果您将它们扔掉,这与使用 view PB.lines 拆分字节串生产者相同,然后将每个从属生产者连接成一个严格的字节串 - 单独的行。我在下面写成 accumLines。它很简单,但是使用 Lens.view 将奇特的 PB.lines 镜头变成了常规函数。 (许多操作在 pipes-bytestring 中被写为镜头,因为它们可以重新用于其他目的,尤其是生产者解析 pipes 的那种。)

import Pipes
import qualified Pipes.Prelude as P
import Pipes.ByteString as PB
import qualified Pipes.Group as PG
import Pipes.GZip

import qualified Data.ByteString.Internal as BS (c2w, w2c)

import System.IO
import Lens.Simple (view) -- or Control.Lens or whatever
import Data.Monoid

main = run >>= mapM_ print

myPipe fileHandle = P.toListM $ accumLines (decompress fileProducer)
  where
    fileProducer = PB.fromHandle fileHandle

run = do
  dat <- withFile "a.gz" ReadMode myPipe
  pure dat

-- little library additions

accumLines :: Monad m => Producer ByteString m r -> Producer ByteString m r
accumLines = mconcats . view PB.lines 

accumSplits :: Monad m => Char -> Producer ByteString m r -> Producer ByteString m r
accumSplits c  = mconcats . view (PB.splits (BS.c2w c)) 

-- this is convenient, but the operations above could 
-- be more rationally implemented using e.g. BL.fromChunks and toListM 
mconcats :: (Monad m, Monoid b) => FreeT (Producer b m) m r -> Producer b m r
mconcats = PG.folds (<>) mempty id

理想情况下,您不会在每个换行符处编写新的字节串。是否必须取决于您要对这些线条做什么。

@Michael 的回答很好。我只想说明一些 此处的使用模式。

(.lhs 在 http://lpaste.net/165352 可用)

先导入几个:

 {-# LANGUAGE OverloadedStrings, NoMonomorphismRestriction #-}

 import Pipes
 import qualified Pipes.Prelude as PP
 import qualified Pipes.Group as PG
 import qualified Pipes.ByteString as PB
 import qualified Pipes.GZip as GZip
 import qualified Data.ByteString as BS
 import Lens.Family (view, over)
 import Control.Monad
 import System.IO

如果您查看 Pipes.ByteString 和 Pipes.GZip 中的函数 你会看到它们都属于以下类型模式:

  1. 生产者... -> FreeT(生产者...)...
  2. FreeT(制作人...)... -> 制作人...
  3. Lens'(制作人...)(FreeT(制作人...)...)
  4. 制作人... -> 制作人...

每个类别中的函数示例:

  1. PB.words
  2. PG.concats
  3. PB.linesPB.chunksOfPB.splits、...
  4. GZip.compressGZip.decompress

以下是如何使用 PB.words 将输入流拆分为单词:

 prod = yield "this is\na test\nof the pipes\nprocessing\nsystem"

 t1 = runEffect $ (PG.concats . PB.words) prod >-> PP.print

使用类型 3 的函数 -- 例如PB.lines,只需使用 view Lens'得到类型1的函数然后与PG.concats:

组合
 t2a = runEffect $ (PG.concats . view PB.lines) prod >-> PP.print

 t2b h = (PG.concats . view PB.lines) (PB.fromHandle h) >-> PP.print

 run2 = withFile "input" ReadMode (runEffect . t2b)

Producer -> Producer函数,使用普通函数应用即可:

 t3 h = GZip.decompress (PB.fromHandle h) >-> PP.print

 run3 = withFile "input.gz" ReadMode (runEffect . t3)

 t4 h = GZip.decompress (PB.fromHandle h) >-> PP.map BS.length >-> PP.print

 run4 = withFile "big.gz" ReadMode (runEffect . t4)

为了先解压再按行拆分,我们嵌套函数 申请:

 t5 h = (PG.concats . view PB.lines) ( GZip.decompress (PB.fromHandle h) )
          >-> PP.map BS.length >-> PP.print

 run5 = withFile "input.gz" ReadMode (runEffect . t5)