无限的有效行动

Infinite stream of effectful actions

我想将无限的字节流解析为无限的 Haskell 数据流。每个字节都是从网络中读取的,因此它们被包装到 IO monad 中。

更具体地说,我有一个 [IO(ByteString)] 类型的无限流。另一方面,我有一个纯解析函数 parse :: [ByteString] -> [Object](其中 Object 是 Haskell 数据类型)

有没有办法将我无限的 monad 流插入到我的解析函数中?

例如,是否可以编写类型为 [IO(ByteString)] -> IO [ByteString] 的函数,以便我在 monad 中使用我的函数 parse

问题

一般来说,为了正确排序 IO 操作并使其行为可预测,每个操作都需要在下一个操作 运行 之前完全完成。在 do 块中,这意味着这有效:

main = do
    sequence (map putStrLn ["This","action","will","complete"])
    putStrLn "before we get here"

但不幸的是,如果最后的 IO 操作很重要,这将不起作用:

dontRunMe = do
    putStrLn "This is a problem when an action is"
    sequence (repeat (putStrLn "infinite"))
    putStrLn "<not printed>"

因此,即使 sequence 可以专门化为正确的类型签名:

sequence :: [IO a] -> IO [a]

它无法在无穷无尽的 IO 操作列表上按预期工作。 定义这样的序列没有问题:

badSeq :: IO [Char]
badSeq = sequence (repeat (return '+'))

但是任何执行 IO 操作的尝试(例如,通过尝试打印结果列表的头部)都会挂起:

main = (head <$> badSeq) >>= print

只需要结果的部分也没关系。在整个 sequence 完成之前,您不会从 IO monad 中得到任何东西(所以 "never" 如果列表是无限的)。

"Lazy IO" 解决方案

如果您想从部分完成的 IO 操作中获取数据,您需要对其进行明确说明并使用听起来很可怕的 Haskell 逃生口 unsafeInterleaveIO。此函数采用 IO 操作并 "defers" 它以便在需要该值之前不会实际执行。

这通常是不安全的原因是现在有意义的 IO 操作,如果在稍后的时间点实际执行,可能意味着不同的东西。举个简单的例子,truncates/removes 文件的 IO 操作如果在 before 执行与 after 更新文件内容时会产生截然不同的效果写好了!

无论如何,你想在这里做的是写一个懒惰的版本 sequence:

import System.IO.Unsafe (unsafeInterleaveIO)

lazySequence :: [IO a] -> IO [a]
lazySequence [] = return []  -- oops, not infinite after all
lazySequence (m:ms) = do
  x <- m
  xs <- unsafeInterleaveIO (lazySequence ms)
  return (x:xs)

这里的重点是,当执行一个lazySequence infstream动作时,它实际上会只执行第一个动作;剩余的操作将包含在一个延迟的 IO 操作中,该操作在需要返回列表的第二个和后续元素之前不会真正执行。

这适用于虚假 IO 操作:

> take 5 <$> lazySequence (repeat (return ('+'))
"+++++"
>

(如果您将 lazySequence 替换为 sequence,它将挂起)。它也适用于真正的 IO 操作:

> lns <- lazySequence (repeat getLine)
<waits for first line of input, then returns to prompt>
> print (head lns)
<prints whatever you entered>
> length (head (tail lns))  -- force next element
<waits for second line of input>
<then shows length of your second line before prompt>
>

无论如何,使用 lazySequence 的定义和类型:

parse :: [ByteString] -> [Object]
input :: [IO ByteString]

你写应该没有问题:

outputs :: IO [Object]
outputs = parse <$> lazySequence inputs

然后随心所欲地使用它:

main = do
    objs <- outputs
    mapM_ doSomethingWithObj objs

使用管道

尽管上述惰性 IO 机制非常简单明了,但由于资源管理问题、space 泄漏的脆弱性(其中一个小的更改到你的代码炸毁了内存占用),以及异常处理的问题。

一个解决方案是 conduit 库。另一个是pipes。两者都是精心设计的流媒体库,可以支持无限流。

对于 conduit,如果你有一个为每个字节字符串创建一个对象的解析函数,例如:

parse1 :: ByteString -> Object
parse1 = ...

然后给出:

inputs :: [IO ByteString]
inputs = ...

useObject :: Object -> IO ()
useObject = ...

管道看起来像:

import Conduit

main :: IO ()
main = runConduit $  mapM_ yieldM inputs
                  .| mapC parse1
                  .| mapM_C useObject

鉴于您的解析函数具有签名:

parse :: [ByteString] -> [Object]

我很确定您不能将它与管道直接集成(或者至少不能以任何不会放弃使用管道的所有好处的方式)。您需要重写它,使其在使用字节字符串和生成对象的方式上对管道友好。