在 haskell 管道中分叉流

Forking the streaming flow in haskell-pipes

我在通过带有 haskell 管道的管道引导流量时遇到问题。基本上,我分析了一堆文件,然后我必须

  1. 以人性化的方式将结果打印到终端
  2. 将结果编码为 JSON

所选路径取决于命令行选项。
在第二种情况下,我必须输出一个左括号,然后是每个传入值,后跟一个逗号,然后是一个右括号。当前 insertCommas 永远不会终止,因此永远不会输出右括号。

import Pipes
import Data.ByteString.Lazy as B
import Data.Aeson (encode)

insertCommas :: Consumer B.ByteString IO ()
insertCommas = do
    first <- await
    lift $ B.putStr first
    for cat $ \obj -> lift $ do
        putStr ","
        B.putStr obj

jsonExporter :: Consumer (FilePath, AnalysisResult) IO ()
jsonExporter = do
    lift $ putStr "["
    P.map encode >-> insertCommas
    lift $ putStr "]"

exportStream :: Config -> Consumer (FilePath, AnalysisResult) IO ()
exportStream conf =
    case outputMode conf of
      JSON -> jsonExporter
      _    -> P.map (export conf) >-> P.stdoutLn

main :: IO ()
main = do
    -- The first two lines are Docopt stuff, not relevant
    args <- parseArgsOrExit patterns =<< getArgs
    ins  <- allFiles $ args `getAllArgs` argument "paths"
    let conf = readConfig args
    runEffect $ each ins
             >-> P.mapM analyze
             >-> P.map (filterResults conf)
             >-> P.filter filterNulls
             >-> exportStream conf

据我所知,消费者无法检测到流的结尾。为此,您需要使用 Pipes.Parser 并反转控件。

这是一个在字符串元素之间插入逗号的解析器:

import Pipes
import qualified Pipes.Prelude as P
import Pipes.Parse (draw, evalStateT)

commify = do
  lift $ putStrLn "["
  m1 <- draw
  case m1 of
    Nothing -> lift $ putStrLn "]"
    Just x1 -> do
      lift $ putStrLn x1
      let loop = do mx <- draw
                    case mx of
                      Nothing -> lift $ putStrLn "]"
                      Just x  -> lift (putStr "," >> putStrLn x) >> loop
      loop

test1 = evalStateT commify ( mapM_ yield (words "this is a test") )
test2 = evalStateT commify P.stdinLn

为了处理不同的输出格式,我可能会将这两种格式都设为解析器:

exportParser = do
  mx <- draw
  case mx of
    Nothing -> return ()
    Just x  -> (lift $ putStrLn $ export x) >> exportParser

然后:

let parser = case outputMode of
               JSON -> commify
               _    -> exportParser
evalStateT parser (P.mapM analyze
                      >-> P.map (filterResults conf)
                      >-> P.filter filterNulls)

根据 foldAllM,可能有一种更巧妙的方式来编写 exportParser。您还可以使用 MaybeT 转换器来更简洁地编写 commify 解析器。我已经明确地把它们都写出来了,以使它们更容易理解。

我认为你应该 'commify' 使用管道组。有一个intercalates,但是没有穿插,不过写起来也没什么大不了的。这种问题我觉得你应该远离Consumer端。

{-#LANGUAGE OverloadedStrings #-}
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.ByteString.Lazy.Char8 as B
import Pipes.Group
import Lens.Simple  -- or Control.Lens or Lens.Micro or anything with view/^.
import System.Environment

intersperse_ :: Monad m => a -> Producer a m r -> Producer a m r
intersperse_ a producer = intercalates (yield a) (producer ^. chunksOf 1) 

main = do 
  args <- getArgs
  let op prod = case args of 
        "json":_ -> yield "[" *> intersperse_ "," prod <* yield "]"
        _        -> intersperse_ " " prod
  runEffect $ op producer >-> P.mapM_ B.putStr
  putStrLn ""
  where 
    producer = mapM_ yield (B.words "this is a test")

哪个给我这个

    >>> :main json
    [this,is,a,test]
    >>> :main ---
    this is a test