使用 attoparsec 对解析后的数据进行操作
Operating on parsed data with attoparsec
背景
我使用 attoparsec 编写了一个日志文件解析器。我所有的小型解析器都成功了,组合的最终解析器也是如此。我已经用 tests 确认了这一点。但是我在使用解析后的流执行操作时遇到了麻烦。
我试过的
我首先尝试将成功解析的输入传递给一个函数。但是似乎得到的只是 Done ()
,我认为这意味着此时日志文件已被消耗。
prepareStats :: Result Log -> IO ()
prepareStats r =
case r of
Fail _ _ _ -> putStrLn $ "Parsing failed"
Done _ parsedLog -> putStrLn "Success" -- This now has a [LogEntry] array. Do something with it.
main :: IO ()
main = do
[f] <- getArgs
logFile <- B.readFile (f :: FilePath)
let results = parseOnly parseLog logFile
putStrLn "TBC"
我想做什么
我想在使用输入时从日志文件中积累一些统计信息。例如,我正在解析响应代码,我想计算有多少个 2** 响应和多少个 4/5** 响应。我正在解析作为 Ints 返回的每个响应的字节数,我想有效地求和这些(听起来像 foldl'
?)。我定义了这样的数据类型:
data Stats = Stats {
successfulRequestsPerMinute :: Int
, failingRequestsPerMinute :: Int
, meanResponseTime :: Int
, megabytesPerMinute :: Int
} deriving Show
我想在解析输入时不断更新它。但是在我消费时执行操作的部分是我卡住的地方。到目前为止,print
是我成功将输出传递给的唯一函数,它通过在打印输出之前返回 Done
来表明解析成功。
我的主要解析器如下所示:
parseLogEntry :: Parser LogEntry
parseLogEntry = do
ip <- logItem
_ <- char ' '
logName <- logItem
_ <- char ' '
user <- logItem
_ <- char ' '
time <- datetimeLogItem
_ <- char ' '
firstLogLine <- quotedLogItem
_ <- char ' '
finalRequestStatus <- intLogItem
_ <- char ' '
responseSizeB <- intLogItem
_ <- char ' '
timeToResponse <- intLogItem
return $ LogEntry ip logName user time firstLogLine finalRequestStatus responseSizeB timeToResponse
type Log = [LogEntry]
parseLog :: Parser Log
parseLog = many $ parseLogEntry <* endOfLine
期望的结果
我想将每个已解析的行传递给将更新上述数据类型的函数。理想情况下,我希望它的内存效率非常高,因为它将对大文件进行操作。
你必须让你的单元解析单个日志条目而不是日志条目列表。
它并不漂亮,但这里有一个如何交错解析和处理的示例:
(取决于 bytestring
、attoparsec
和 mtl
)
{-# LANGUAGE NoMonomorphismRestriction, FlexibleContexts #-}
import qualified Data.ByteString.Char8 as BS
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Attoparsec.ByteString.Char8 hiding (takeWhile)
import Data.Char
import Control.Monad.State.Strict
aWord :: Parser BS.ByteString
aWord = skipSpace >> A.takeWhile isAlphaNum
getNext :: MonadState [a] m => m (Maybe a)
getNext = do
xs <- get
case xs of
[] -> return Nothing
(y:ys) -> put ys >> return (Just y)
loop iresult =
case iresult of
Fail _ _ msg -> error $ "parse failed: " ++ msg
Done x' aword -> do lift $ process aword; loop (parse aWord x')
Partial _ -> do
mx <- getNext
case mx of
Just y -> loop (feed iresult y)
Nothing -> case feed iresult BS.empty of
Fail _ _ msg -> error $ "parse failed: " ++ msg
Done x' aword -> do lift $ process aword; return ()
Partial _ -> error $ "partial returned" -- probably can't happen
process :: Show a => a -> IO ()
process w = putStrLn $ "got a word: " ++ show w
theWords = map BS.pack [ "this is a te", "st of the emergency ", "broadcasting sys", "tem"]
main = runStateT (loop (Partial (parse aWord))) theWords
备注:
- 我们一次解析一个
aWord
,每个词识别后调用process
。
- 当 return 是
Partial
时,使用 feed
为解析器提供更多输入。
- 当没有更多输入时,向解析器提供一个空字符串。
- 当
Done
为return时,处理识别出的词并继续parse aWord
。
getNext
只是获取下一个输入单元的单子函数的示例。将其替换为您自己的版本 - 即从文件中读取下一行的内容。
更新
这是@dfeuer 建议的使用 parseWith
的解决方案:
noMoreInput = fmap null get
loop2 x = do
iresult <- parseWith (fmap (fromMaybe BS.empty) getNext) aWord x
case iresult of
Fail _ _ msg -> error $ "parse failed: " ++ msg
Done x' aword -> do lift $ process aword;
if BS.null x'
then do b <- noMoreInput
if b then return ()
else loop2 x'
else loop2 x'
Partial _ -> error $ "huh???" -- this really can't happen
main2 = runStateT (loop2 BS.empty) theWords
如果每个日志条目正好是一行,这里有一个更简单的解决方案:
do loglines <- fmap BS.lines $ BS.readfile "input-file.log"
foldl' go initialStats loglines
where
go stats logline =
case parseOnly yourParser logline of
Left e -> error $ "oops: " ++ e
Right r -> let stats' = ... combine r with stats ...
in stats'
基本上您只是逐行读取文件并在每一行上调用 parseOnly
并累积结果。
这是使用流媒体库正确完成的
main = do
f:_ <- getArgs
withFile f ReadMode $ \h -> do
result <- foldStream $ streamProcess $ streamHandle h
print result
where
streamHandle = undefined
streamProcess = undefined
foldStream = undefined
任何流媒体库都可以填补空白,例如
import qualified Pipes.Prelude as P
import Pipes
import qualified Pipes.ByteString as PB
import Pipes.Group (folds)
import qualified Control.Foldl as L
import Control.Lens (view) -- or import Lens.Simple (view), or whatever
streamHandle = Pipes.ByteStream.fromHandle :: Handle -> Producer ByteString IO ()
在那种情况下,我们可能会进一步分工:
streamProcess :: Producer ByteString m r -> Producer LogEntry m r
streamProcess p = streamLines p >-> lineParser
streamLines :: Producer ByteString m r -> Producer ByteString m r
streamLines p = L.purely fold L.list (view (Pipes.ByteString.lines p)) >-> P.map B.toStrict
lineParser :: Pipe ByteString LogEntry m r
lineParser = P.map (parseOnly line_parser) >-> P.concat -- concat removes lefts
(这有点费力,因为管道对累积行和内存通常是明智的挑剔:我们只是试图获得单个严格字节串行的生产者,然后将其转换为已解析行的生产者,并且然后抛出错误的解析,如果有的话。使用 io-streams 或管道,事情将基本相同,并且特定步骤会更容易。)
我们现在可以弃牌 Producer LogEntry IO ()
。这可以使用 Pipes.Prelude.fold
显式完成,这会产生严格的左折叠。在这里我们只复制 user5402
的结构
foldStream str = P.fold go initial_stats id
where
go stats_till_now new_entry = undefined
如果你习惯了foldl
库的使用和使用L.purely fold some_fold
将fold应用到Producer,那么你可以为你的LogEntries out构建Control.Foldl.Fold
s随心所欲地在不同的请求中使用组件和插槽。
如果你使用 pipes-attoparsec
并在你的解析器中包含换行位,那么你可以只写
handleToLogEntries :: Handle -> Producer LogEntry IO ()
handleToLogEntries h = void $ parsed my_line_parser (fromHandle h) >-> P.concat
并更直接地获得Producer LogEntry IO ()
。 (但是,这种超简单的编写方式会在错误的解析处停止;首先按行划分比使用 attoparsec 识别换行符更快。)这对于 io-streams 也非常简单,您可以编写类似
import qualified System.IO.Streams as Streams
io :: Handle -> IO ()
io h = do
bytes <- Streams.handleToInputStream h
log_entries <- Streams.parserToInputStream my_line_parser bytes
fold_result <- Stream.fold go initial_stats log_entries
print fold_result
或保持上面的结构:
where
streamHandle = Streams.handleToInputStream
streamProcess io_bytes =
io_bytes >>= Streams.parserToInputStream my_line_parser
foldStream io_logentries =
log_entries >>= Stream.fold go initial_stats
无论如何,my_line_parser
应该 return 一个 Maybe LogEntry
并且应该识别换行符。
背景
我使用 attoparsec 编写了一个日志文件解析器。我所有的小型解析器都成功了,组合的最终解析器也是如此。我已经用 tests 确认了这一点。但是我在使用解析后的流执行操作时遇到了麻烦。
我试过的
我首先尝试将成功解析的输入传递给一个函数。但是似乎得到的只是 Done ()
,我认为这意味着此时日志文件已被消耗。
prepareStats :: Result Log -> IO ()
prepareStats r =
case r of
Fail _ _ _ -> putStrLn $ "Parsing failed"
Done _ parsedLog -> putStrLn "Success" -- This now has a [LogEntry] array. Do something with it.
main :: IO ()
main = do
[f] <- getArgs
logFile <- B.readFile (f :: FilePath)
let results = parseOnly parseLog logFile
putStrLn "TBC"
我想做什么
我想在使用输入时从日志文件中积累一些统计信息。例如,我正在解析响应代码,我想计算有多少个 2** 响应和多少个 4/5** 响应。我正在解析作为 Ints 返回的每个响应的字节数,我想有效地求和这些(听起来像 foldl'
?)。我定义了这样的数据类型:
data Stats = Stats {
successfulRequestsPerMinute :: Int
, failingRequestsPerMinute :: Int
, meanResponseTime :: Int
, megabytesPerMinute :: Int
} deriving Show
我想在解析输入时不断更新它。但是在我消费时执行操作的部分是我卡住的地方。到目前为止,print
是我成功将输出传递给的唯一函数,它通过在打印输出之前返回 Done
来表明解析成功。
我的主要解析器如下所示:
parseLogEntry :: Parser LogEntry
parseLogEntry = do
ip <- logItem
_ <- char ' '
logName <- logItem
_ <- char ' '
user <- logItem
_ <- char ' '
time <- datetimeLogItem
_ <- char ' '
firstLogLine <- quotedLogItem
_ <- char ' '
finalRequestStatus <- intLogItem
_ <- char ' '
responseSizeB <- intLogItem
_ <- char ' '
timeToResponse <- intLogItem
return $ LogEntry ip logName user time firstLogLine finalRequestStatus responseSizeB timeToResponse
type Log = [LogEntry]
parseLog :: Parser Log
parseLog = many $ parseLogEntry <* endOfLine
期望的结果
我想将每个已解析的行传递给将更新上述数据类型的函数。理想情况下,我希望它的内存效率非常高,因为它将对大文件进行操作。
你必须让你的单元解析单个日志条目而不是日志条目列表。
它并不漂亮,但这里有一个如何交错解析和处理的示例:
(取决于 bytestring
、attoparsec
和 mtl
)
{-# LANGUAGE NoMonomorphismRestriction, FlexibleContexts #-}
import qualified Data.ByteString.Char8 as BS
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Attoparsec.ByteString.Char8 hiding (takeWhile)
import Data.Char
import Control.Monad.State.Strict
aWord :: Parser BS.ByteString
aWord = skipSpace >> A.takeWhile isAlphaNum
getNext :: MonadState [a] m => m (Maybe a)
getNext = do
xs <- get
case xs of
[] -> return Nothing
(y:ys) -> put ys >> return (Just y)
loop iresult =
case iresult of
Fail _ _ msg -> error $ "parse failed: " ++ msg
Done x' aword -> do lift $ process aword; loop (parse aWord x')
Partial _ -> do
mx <- getNext
case mx of
Just y -> loop (feed iresult y)
Nothing -> case feed iresult BS.empty of
Fail _ _ msg -> error $ "parse failed: " ++ msg
Done x' aword -> do lift $ process aword; return ()
Partial _ -> error $ "partial returned" -- probably can't happen
process :: Show a => a -> IO ()
process w = putStrLn $ "got a word: " ++ show w
theWords = map BS.pack [ "this is a te", "st of the emergency ", "broadcasting sys", "tem"]
main = runStateT (loop (Partial (parse aWord))) theWords
备注:
- 我们一次解析一个
aWord
,每个词识别后调用process
。 - 当 return 是
Partial
时,使用feed
为解析器提供更多输入。 - 当没有更多输入时,向解析器提供一个空字符串。
- 当
Done
为return时,处理识别出的词并继续parse aWord
。 getNext
只是获取下一个输入单元的单子函数的示例。将其替换为您自己的版本 - 即从文件中读取下一行的内容。
更新
这是@dfeuer 建议的使用 parseWith
的解决方案:
noMoreInput = fmap null get
loop2 x = do
iresult <- parseWith (fmap (fromMaybe BS.empty) getNext) aWord x
case iresult of
Fail _ _ msg -> error $ "parse failed: " ++ msg
Done x' aword -> do lift $ process aword;
if BS.null x'
then do b <- noMoreInput
if b then return ()
else loop2 x'
else loop2 x'
Partial _ -> error $ "huh???" -- this really can't happen
main2 = runStateT (loop2 BS.empty) theWords
如果每个日志条目正好是一行,这里有一个更简单的解决方案:
do loglines <- fmap BS.lines $ BS.readfile "input-file.log"
foldl' go initialStats loglines
where
go stats logline =
case parseOnly yourParser logline of
Left e -> error $ "oops: " ++ e
Right r -> let stats' = ... combine r with stats ...
in stats'
基本上您只是逐行读取文件并在每一行上调用 parseOnly
并累积结果。
这是使用流媒体库正确完成的
main = do
f:_ <- getArgs
withFile f ReadMode $ \h -> do
result <- foldStream $ streamProcess $ streamHandle h
print result
where
streamHandle = undefined
streamProcess = undefined
foldStream = undefined
任何流媒体库都可以填补空白,例如
import qualified Pipes.Prelude as P
import Pipes
import qualified Pipes.ByteString as PB
import Pipes.Group (folds)
import qualified Control.Foldl as L
import Control.Lens (view) -- or import Lens.Simple (view), or whatever
streamHandle = Pipes.ByteStream.fromHandle :: Handle -> Producer ByteString IO ()
在那种情况下,我们可能会进一步分工:
streamProcess :: Producer ByteString m r -> Producer LogEntry m r
streamProcess p = streamLines p >-> lineParser
streamLines :: Producer ByteString m r -> Producer ByteString m r
streamLines p = L.purely fold L.list (view (Pipes.ByteString.lines p)) >-> P.map B.toStrict
lineParser :: Pipe ByteString LogEntry m r
lineParser = P.map (parseOnly line_parser) >-> P.concat -- concat removes lefts
(这有点费力,因为管道对累积行和内存通常是明智的挑剔:我们只是试图获得单个严格字节串行的生产者,然后将其转换为已解析行的生产者,并且然后抛出错误的解析,如果有的话。使用 io-streams 或管道,事情将基本相同,并且特定步骤会更容易。)
我们现在可以弃牌 Producer LogEntry IO ()
。这可以使用 Pipes.Prelude.fold
显式完成,这会产生严格的左折叠。在这里我们只复制 user5402
foldStream str = P.fold go initial_stats id
where
go stats_till_now new_entry = undefined
如果你习惯了foldl
库的使用和使用L.purely fold some_fold
将fold应用到Producer,那么你可以为你的LogEntries out构建Control.Foldl.Fold
s随心所欲地在不同的请求中使用组件和插槽。
如果你使用 pipes-attoparsec
并在你的解析器中包含换行位,那么你可以只写
handleToLogEntries :: Handle -> Producer LogEntry IO ()
handleToLogEntries h = void $ parsed my_line_parser (fromHandle h) >-> P.concat
并更直接地获得Producer LogEntry IO ()
。 (但是,这种超简单的编写方式会在错误的解析处停止;首先按行划分比使用 attoparsec 识别换行符更快。)这对于 io-streams 也非常简单,您可以编写类似
import qualified System.IO.Streams as Streams
io :: Handle -> IO ()
io h = do
bytes <- Streams.handleToInputStream h
log_entries <- Streams.parserToInputStream my_line_parser bytes
fold_result <- Stream.fold go initial_stats log_entries
print fold_result
或保持上面的结构:
where
streamHandle = Streams.handleToInputStream
streamProcess io_bytes =
io_bytes >>= Streams.parserToInputStream my_line_parser
foldStream io_logentries =
log_entries >>= Stream.fold go initial_stats
无论如何,my_line_parser
应该 return 一个 Maybe LogEntry
并且应该识别换行符。