在 haskell 管道中分叉流
Forking the streaming flow in haskell-pipes
我在通过带有 haskell 管道的管道引导流量时遇到问题。基本上,我分析了一堆文件,然后我必须
- 以人性化的方式将结果打印到终端
- 将结果编码为 JSON
所选路径取决于命令行选项。
在第二种情况下,我必须输出一个左括号,然后是每个传入值,后跟一个逗号,然后是一个右括号。当前 insertCommas
永远不会终止,因此永远不会输出右括号。
import Pipes
import Data.ByteString.Lazy as B
import Data.Aeson (encode)
insertCommas :: Consumer B.ByteString IO ()
insertCommas = do
first <- await
lift $ B.putStr first
for cat $ \obj -> lift $ do
putStr ","
B.putStr obj
jsonExporter :: Consumer (FilePath, AnalysisResult) IO ()
jsonExporter = do
lift $ putStr "["
P.map encode >-> insertCommas
lift $ putStr "]"
exportStream :: Config -> Consumer (FilePath, AnalysisResult) IO ()
exportStream conf =
case outputMode conf of
JSON -> jsonExporter
_ -> P.map (export conf) >-> P.stdoutLn
main :: IO ()
main = do
-- The first two lines are Docopt stuff, not relevant
args <- parseArgsOrExit patterns =<< getArgs
ins <- allFiles $ args `getAllArgs` argument "paths"
let conf = readConfig args
runEffect $ each ins
>-> P.mapM analyze
>-> P.map (filterResults conf)
>-> P.filter filterNulls
>-> exportStream conf
据我所知,消费者无法检测到流的结尾。为此,您需要使用 Pipes.Parser 并反转控件。
这是一个在字符串元素之间插入逗号的解析器:
import Pipes
import qualified Pipes.Prelude as P
import Pipes.Parse (draw, evalStateT)
commify = do
lift $ putStrLn "["
m1 <- draw
case m1 of
Nothing -> lift $ putStrLn "]"
Just x1 -> do
lift $ putStrLn x1
let loop = do mx <- draw
case mx of
Nothing -> lift $ putStrLn "]"
Just x -> lift (putStr "," >> putStrLn x) >> loop
loop
test1 = evalStateT commify ( mapM_ yield (words "this is a test") )
test2 = evalStateT commify P.stdinLn
为了处理不同的输出格式,我可能会将这两种格式都设为解析器:
exportParser = do
mx <- draw
case mx of
Nothing -> return ()
Just x -> (lift $ putStrLn $ export x) >> exportParser
然后:
let parser = case outputMode of
JSON -> commify
_ -> exportParser
evalStateT parser (P.mapM analyze
>-> P.map (filterResults conf)
>-> P.filter filterNulls)
根据 foldAllM
,可能有一种更巧妙的方式来编写 exportParser
。您还可以使用 MaybeT
转换器来更简洁地编写 commify
解析器。我已经明确地把它们都写出来了,以使它们更容易理解。
我认为你应该 'commify' 使用管道组。有一个intercalates
,但是没有穿插,不过写起来也没什么大不了的。这种问题我觉得你应该远离Consumer端。
{-#LANGUAGE OverloadedStrings #-}
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.ByteString.Lazy.Char8 as B
import Pipes.Group
import Lens.Simple -- or Control.Lens or Lens.Micro or anything with view/^.
import System.Environment
intersperse_ :: Monad m => a -> Producer a m r -> Producer a m r
intersperse_ a producer = intercalates (yield a) (producer ^. chunksOf 1)
main = do
args <- getArgs
let op prod = case args of
"json":_ -> yield "[" *> intersperse_ "," prod <* yield "]"
_ -> intersperse_ " " prod
runEffect $ op producer >-> P.mapM_ B.putStr
putStrLn ""
where
producer = mapM_ yield (B.words "this is a test")
哪个给我这个
>>> :main json
[this,is,a,test]
>>> :main ---
this is a test
我在通过带有 haskell 管道的管道引导流量时遇到问题。基本上,我分析了一堆文件,然后我必须
- 以人性化的方式将结果打印到终端
- 将结果编码为 JSON
所选路径取决于命令行选项。
在第二种情况下,我必须输出一个左括号,然后是每个传入值,后跟一个逗号,然后是一个右括号。当前 insertCommas
永远不会终止,因此永远不会输出右括号。
import Pipes
import Data.ByteString.Lazy as B
import Data.Aeson (encode)
insertCommas :: Consumer B.ByteString IO ()
insertCommas = do
first <- await
lift $ B.putStr first
for cat $ \obj -> lift $ do
putStr ","
B.putStr obj
jsonExporter :: Consumer (FilePath, AnalysisResult) IO ()
jsonExporter = do
lift $ putStr "["
P.map encode >-> insertCommas
lift $ putStr "]"
exportStream :: Config -> Consumer (FilePath, AnalysisResult) IO ()
exportStream conf =
case outputMode conf of
JSON -> jsonExporter
_ -> P.map (export conf) >-> P.stdoutLn
main :: IO ()
main = do
-- The first two lines are Docopt stuff, not relevant
args <- parseArgsOrExit patterns =<< getArgs
ins <- allFiles $ args `getAllArgs` argument "paths"
let conf = readConfig args
runEffect $ each ins
>-> P.mapM analyze
>-> P.map (filterResults conf)
>-> P.filter filterNulls
>-> exportStream conf
据我所知,消费者无法检测到流的结尾。为此,您需要使用 Pipes.Parser 并反转控件。
这是一个在字符串元素之间插入逗号的解析器:
import Pipes
import qualified Pipes.Prelude as P
import Pipes.Parse (draw, evalStateT)
commify = do
lift $ putStrLn "["
m1 <- draw
case m1 of
Nothing -> lift $ putStrLn "]"
Just x1 -> do
lift $ putStrLn x1
let loop = do mx <- draw
case mx of
Nothing -> lift $ putStrLn "]"
Just x -> lift (putStr "," >> putStrLn x) >> loop
loop
test1 = evalStateT commify ( mapM_ yield (words "this is a test") )
test2 = evalStateT commify P.stdinLn
为了处理不同的输出格式,我可能会将这两种格式都设为解析器:
exportParser = do
mx <- draw
case mx of
Nothing -> return ()
Just x -> (lift $ putStrLn $ export x) >> exportParser
然后:
let parser = case outputMode of
JSON -> commify
_ -> exportParser
evalStateT parser (P.mapM analyze
>-> P.map (filterResults conf)
>-> P.filter filterNulls)
根据 foldAllM
,可能有一种更巧妙的方式来编写 exportParser
。您还可以使用 MaybeT
转换器来更简洁地编写 commify
解析器。我已经明确地把它们都写出来了,以使它们更容易理解。
我认为你应该 'commify' 使用管道组。有一个intercalates
,但是没有穿插,不过写起来也没什么大不了的。这种问题我觉得你应该远离Consumer端。
{-#LANGUAGE OverloadedStrings #-}
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.ByteString.Lazy.Char8 as B
import Pipes.Group
import Lens.Simple -- or Control.Lens or Lens.Micro or anything with view/^.
import System.Environment
intersperse_ :: Monad m => a -> Producer a m r -> Producer a m r
intersperse_ a producer = intercalates (yield a) (producer ^. chunksOf 1)
main = do
args <- getArgs
let op prod = case args of
"json":_ -> yield "[" *> intersperse_ "," prod <* yield "]"
_ -> intersperse_ " " prod
runEffect $ op producer >-> P.mapM_ B.putStr
putStrLn ""
where
producer = mapM_ yield (B.words "this is a test")
哪个给我这个
>>> :main json
[this,is,a,test]
>>> :main ---
this is a test