使用 attoparsec 对解析后的数据进行操作

Operating on parsed data with attoparsec

背景

我使用 attoparsec 编写了一个日志文件解析器。我所有的小型解析器都成功了,组合的最终解析器也是如此。我已经用 tests 确认了这一点。但是我在使用解析后的流执行操作时遇到了麻烦。

我试过的

我首先尝试将成功解析的输入传递给一个函数。但是似乎得到的只是 Done (),我认为这意味着此时日志文件已被消耗。

prepareStats :: Result Log -> IO ()
prepareStats r =
case r of
    Fail _ _ _ -> putStrLn $ "Parsing failed"
    Done _ parsedLog -> putStrLn "Success" -- This now has a [LogEntry] array. Do something with it.

main :: IO ()
main = do
[f] <- getArgs
logFile <- B.readFile (f :: FilePath)
let results = parseOnly parseLog logFile
putStrLn "TBC"

我想做什么

我想在使用输入时从日志文件中积累一些统计信息。例如,我正在解析响应代码,我想计算有多少个 2** 响应和多少个 4/5** 响应。我正在解析作为 Ints 返回的每个响应的字节数,我想有效地求和这些(听起来像 foldl'?)。我定义了这样的数据类型:

data Stats = Stats {
    successfulRequestsPerMinute :: Int
  , failingRequestsPerMinute    :: Int
  , meanResponseTime            :: Int
  , megabytesPerMinute          :: Int
  } deriving Show

我想在解析输入时不断更新它。但是在我消费时执行操作的部分是我卡住的地方。到目前为止,print 是我成功将输出传递给的唯一函数,它通过在打印输出之前返回 Done 来表明解析成功。

我的主要解析器如下所示:

parseLogEntry :: Parser LogEntry
parseLogEntry = do
ip <- logItem
_ <- char ' '
logName <- logItem
_ <- char ' '
user <- logItem
_ <- char ' '
time <- datetimeLogItem
_ <- char ' '
firstLogLine <- quotedLogItem
_ <- char ' '
finalRequestStatus <- intLogItem
_ <- char ' '
responseSizeB <- intLogItem
_ <- char ' '
timeToResponse <- intLogItem
return $ LogEntry ip logName user time firstLogLine finalRequestStatus responseSizeB timeToResponse

type Log = [LogEntry]

parseLog :: Parser Log
parseLog = many $ parseLogEntry <* endOfLine

期望的结果

我想将每个已解析的行传递给将更新上述数据类型的函数。理想情况下,我希望它的内存效率非常高,因为它将对大文件进行操作。

你必须让你的单元解析单个日志条目而不是日志条目列表。

它并不漂亮,但这里有一个如何交错解析和处理的示例:

(取决于 bytestringattoparsecmtl

{-# LANGUAGE NoMonomorphismRestriction, FlexibleContexts #-}

import qualified Data.ByteString.Char8 as BS
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Attoparsec.ByteString.Char8 hiding (takeWhile)
import Data.Char
import Control.Monad.State.Strict

aWord :: Parser BS.ByteString
aWord = skipSpace >> A.takeWhile isAlphaNum

getNext :: MonadState [a] m => m (Maybe a)
getNext = do
  xs <- get
  case xs of
    [] -> return Nothing
    (y:ys) -> put ys >> return (Just y)

loop iresult =
  case iresult of
    Fail _ _ msg  -> error $ "parse failed: " ++ msg
    Done x' aword -> do lift $ process aword; loop (parse aWord x')
    Partial _     -> do
      mx <- getNext
      case mx of
        Just y  -> loop (feed iresult y)
        Nothing -> case feed iresult BS.empty of
                     Fail _ _ msg  -> error $ "parse failed: " ++ msg
                     Done x' aword -> do lift $ process aword; return ()
                     Partial _     -> error $ "partial returned"  -- probably can't happen

process :: Show a => a -> IO ()
process w = putStrLn $ "got a word: " ++ show w

theWords = map BS.pack [ "this is a te", "st of the emergency ", "broadcasting sys", "tem"]


main = runStateT (loop (Partial (parse aWord))) theWords

备注:

  • 我们一次解析一个aWord,每个词识别后调用process
  • 当 return 是 Partial 时,使用 feed 为解析器提供更多输入。
  • 当没有更多输入时,向解析器提供一个空字符串。
  • Done为return时,处理识别出的词并继续parse aWord
  • getNext 只是获取下一个输入单元的单子函数的示例。将其替换为您自己的版本 - 即从文件中读取下一行的内容。

更新

这是@dfeuer 建议的使用 parseWith 的解决方案:

noMoreInput = fmap null get

loop2 x = do
  iresult <- parseWith (fmap (fromMaybe BS.empty) getNext) aWord x
  case iresult of
    Fail _ _ msg  -> error $ "parse failed: " ++ msg
    Done x' aword -> do lift $ process aword;
                        if BS.null x'
                           then do b <- noMoreInput
                                   if b then return ()
                                        else loop2 x'
                           else loop2 x'
    Partial _     -> error $ "huh???" -- this really can't happen

main2 = runStateT (loop2 BS.empty) theWords

如果每个日志条目正好是一行,这里有一个更简单的解决方案:

do loglines <- fmap BS.lines $ BS.readfile "input-file.log"
   foldl' go initialStats loglines
   where
     go stats logline = 
        case parseOnly yourParser logline of
          Left e  -> error $ "oops: " ++ e
          Right r -> let stats' = ... combine r with stats ...
                     in stats'

基本上您只是逐行读取文件并在每一行上调用 parseOnly 并累积结果。

这是使用流媒体库正确完成的

main = do
  f:_ <- getArgs
  withFile f ReadMode $ \h -> do
       result <- foldStream $ streamProcess $ streamHandle h
       print result
where
 streamHandle  = undefined
 streamProcess = undefined
 foldStream    = undefined

任何流媒体库都可以填补空白,例如

 import qualified Pipes.Prelude as P
 import Pipes
 import qualified Pipes.ByteString as PB
 import Pipes.Group (folds)
 import qualified Control.Foldl as L
 import Control.Lens (view) -- or import Lens.Simple (view), or whatever

 streamHandle =  Pipes.ByteStream.fromHandle :: Handle -> Producer ByteString IO ()

在那种情况下,我们可能会进一步分工:

 streamProcess :: Producer ByteString m r -> Producer LogEntry m r
 streamProcess p =  streamLines p >-> lineParser

 streamLines :: Producer ByteString m r -> Producer ByteString m r
 streamLines p = L.purely fold L.list (view (Pipes.ByteString.lines p)) >-> P.map B.toStrict

 lineParser :: Pipe ByteString LogEntry m r
 lineParser = P.map (parseOnly line_parser) >-> P.concat -- concat removes lefts

(这有点费力,因为管道对累积行和内存通常是明智的挑剔:我们只是试图获得单个严格字节串行的生产者,然后将其转换为已解析行的生产者,并且然后抛出错误的解析,如果有的话。使用 io-streams 或管道,事情将基本相同,并且特定步骤会更容易。)

我们现在可以弃牌 Producer LogEntry IO ()。这可以使用 Pipes.Prelude.fold 显式完成,这会产生严格的左折叠。在这里我们只复制 user5402

的结构
 foldStream str = P.fold go initial_stats id
  where
   go stats_till_now new_entry = undefined

如果你习惯了foldl库的使用和使用L.purely fold some_fold将fold应用到Producer,那么你可以为你的LogEntries out构建Control.Foldl.Folds随心所欲地在不同的请求中使用组件和插槽。

如果你使用 pipes-attoparsec 并在你的解析器中包含换行位,那么你可以只写

 handleToLogEntries :: Handle -> Producer LogEntry IO ()
 handleToLogEntries h = void $ parsed my_line_parser (fromHandle h) >-> P.concat

并更直接地获得Producer LogEntry IO ()。 (但是,这种超简单的编写方式会在错误的解析处停止;首先按行划分比使用 attoparsec 识别换行符更快。)这对于 io-streams 也非常简单,您可以编写类似

import qualified System.IO.Streams as Streams

io :: Handle -> IO ()
io h = do  
    bytes <- Streams.handleToInputStream h
    log_entries <- Streams.parserToInputStream my_line_parser bytes
    fold_result <- Stream.fold go initial_stats log_entries
    print fold_result

或保持上面的结构:

 where 
  streamHandle = Streams.handleToInputStream
  streamProcess io_bytes = 
      io_bytes >>= Streams.parserToInputStream my_line_parser
  foldStream io_logentries =
      log_entries >>= Stream.fold go initial_stats 

无论如何,my_line_parser 应该 return 一个 Maybe LogEntry 并且应该识别换行符。