管道中的错误处理

Error handling in pipes

背景故事

我有许多数据文件,每个文件都包含一个数据记录列表(每行一个)。 与 CSV 类似,但差异很大,以至于我更愿意编写自己的解析器而不是使用 CSV 库。 出于这个问题的目的,我将使用一个简化的数据文件,每行只包含一个数字:

1
2
3
error
4

如您所见,文件可能包含格式错误的数据,在这种情况下,整个文件都应被视为格式错误。

我想做的那种数据处理可以用地图和折叠来表达。 所以,我认为这将是学习如何使用 pipes 库的好机会。

{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE FlexibleContexts #-}

import           Control.Monad.Except
import           Pipes ((>->))
import qualified Pipes as P
import qualified Pipes.Prelude as P
import qualified Pipes.Safe as P
import qualified System.IO as IO

首先,我在文本文件中创建了行生成器。 这与 Pipes.Safe.

文档中的示例非常相似
getLines = do
    P.bracket (IO.openFile "data.txt" IO.ReadMode) IO.hClose P.fromHandle

接下来,我需要一个函数来解析每一行。 正如我之前提到的,这可能会失败,我将用 Either.

表示
type ErrMsg = String

parseNumber :: String -> Either ErrMsg Integer
parseNumber s = case reads s of
                  [(n, "")] -> Right n
                  _         -> Left $ "Parse Error: \"" ++ s ++ "\""

为简单起见,作为第一步,我想将所有数据记录收集到一个记录列表中。 最直接的方法是将所有行通过解析器进行管道传输,然后将整个内容收集到一个列表中。

readNumbers1 :: IO [Either ErrMsg Integer]
readNumbers1 = P.runSafeT $ P.toListM $
    getLines >-> P.map parseNumber

不幸的是,这会创建一个记录列表。 但是,如果文件包含一条错误记录,则整个文件都应该被认为是错误的。 我真正想要的是记录列表中的任何一个。 当然,我可以只使用 sequence 转置 列表。

readNumbers2 :: IO (Either ErrMsg [Integer])
readNumbers2 = sequence <$> readNumbers1

但是,即使第一行已经格式错误,这也会读取整个文件。 这些文件可能很大,我有很多,所以,如果读取在第一个错误时停止就更好了。

问题

我的问题 是如何实现的。 如何中止第一个格式错误的记录的解析?

到目前为止我得到了什么

我的第一个想法是使用 Either ErrMsgP.mapM 的 monad 实例而不是 P.map。 由于我们正在从一个文件中读取,我们的 monad 堆栈中已经有 IOSafeT,所以,我想我需要 ExceptT 才能将错误处理放入该 monad 堆栈中。 这就是我卡住的地方。 我尝试了很多不同的组合,但最终总是被类型检查器大吼大叫。 以下是我能得到的最接近它编译

readNumbers3 = P.runSafeT $ runExceptT $ P.toListM $
    getLines >-> P.mapM (ExceptT . return . parseNumber)

readNumbers3 的推断类型为

*Main> :t readNumbers3
readNumbers3
  :: (MonadIO m, P.MonadSafe (ExceptT ErrMsg (P.SafeT m)),
      P.MonadMask m, P.Base (ExceptT ErrMsg (P.SafeT m)) ~ IO) =>
     m (Either ErrMsg [Integer])

看起来很接近我想要的:

readNumbers3 :: IO (Either ErrMsg [Integer])

但是,一旦我尝试实际执行该操作,我就会在 ghci 中收到以下错误消息:

*Main> readNumbers3

<interactive>:7:1:
    Couldn't match expected type ‘IO’
                with actual type ‘P.Base (ExceptT ErrMsg (P.SafeT m0))’
    The type variable ‘m0’ is ambiguous
    In the first argument of ‘print’, namely ‘it’
    In a stmt of an interactive GHCi command: print it

如果我尝试应用以下类型签名:

readNumbers3 :: IO (Either ErrMsg [Integer])

然后我收到以下错误消息:

error.hs:108:5:
    Couldn't match expected type ‘IO’
                with actual type ‘P.Base (ExceptT ErrMsg (P.SafeT IO))’
    In the first argument of ‘(>->)’, namely ‘getLines’
    In the second argument of ‘($)’, namely
      ‘getLines >-> P.mapM (ExceptT . return . parseNumber)’
    In the second argument of ‘($)’, namely
      ‘P.toListM $ getLines >-> P.mapM (ExceptT . return . parseNumber)’
Failed, modules loaded: none.

放在一边

将错误处理转移到管道的基本 monad 中的另一个动机是,如果我不必在 maps 和 folds 中处理任何一个,它将使进一步的数据处理变得更加容易。

这是解决问题的增量方法。

遵循 Tekmo 在 this SO answer 中的建议 我们的目标是在以下 monad 中运行:

ExceptT String (Pipe a b m) r

我们从导入和 parseNumber 的定义开始:

import           Control.Monad.Except
import           Pipes ((>->))
import qualified Pipes as P
import qualified Pipes.Prelude as P

parseNumber :: String -> Either String Integer
parseNumber s = case reads s of
                  [(n, "")] -> Right n
                  _         -> Left $ "Parse Error: \"" ++ s ++ "\""

这是我们将用作输入的 IO-monad 中的普通字符串生成器:

p1 :: P.Producer String IO ()
p1 = P.stdinLn >-> P.takeWhile (/= "quit")

要将其提升到 ExceptT monad,我们只需使用 lift:

p2 :: ExceptT String (P.Producer String IO) ()
p2 = lift p1

这是在 ExceptT monad 中将字符串转换为整数的管道段:

p4 :: ExceptT String (P.Pipe String Integer IO) a
p4 = forever $ 
       do s <- lift P.await
          case parseNumber s of
            Left e  -> throwError e
            Right n -> lift $ P.yield n

可能可以更组合地编写,但为了清楚起见,我将其保留得非常明确。

接下来我们将 p2 和 p4 连接在一起。结果也在 ExceptT monad 中。

-- join together p2 and p4
p7 :: ExceptT String (P.Producer Integer IO) ()
p7 = ExceptT $ runExceptT p2 >-> runExceptT p4

Tekmo 的 SO 回答建议为此创建一个新的运算符。

最后,我们可以使用toListM'到运行这条管道。 (我在这里包含了 toListM' 的定义,因为它没有出现在我安装的 Pipes.Prelude 版本中)

p8 :: IO ([Integer], Either String ())
p8 = toListM' $ runExceptT p7

toListM' :: Monad m => P.Producer a m r -> m ([a], r)
toListM' = P.fold' step begin done
  where
    step x a = x . (a:)
    begin = id
    done x = x []

p8 工作原理示例:

ghci> p8
4
5
6
quit
([4,5,6],Right ())

ghci> p8
5
asd
([5],Left "Parse Error: \"asd\"")

更新

您可以像这样概括 parseNumber 来简化代码:

parseNumber' :: (MonadError [Char] m) => String -> m Integer
parseNumber' s = case reads s of
                   [(n, "")] -> return n
                   _         -> throwError $ "Parse Error: \"" ++ s ++ "\""

那么p4可以写成:

p4' :: ExceptT String (P.Pipe String Integer IO) a
p4' = forever $ lift P.await >>= parseNumber' >>= lift . P.yield