无限的有效行动
Infinite stream of effectful actions
我想将无限的字节流解析为无限的 Haskell 数据流。每个字节都是从网络中读取的,因此它们被包装到 IO monad 中。
更具体地说,我有一个 [IO(ByteString)]
类型的无限流。另一方面,我有一个纯解析函数 parse :: [ByteString] -> [Object]
(其中 Object
是 Haskell 数据类型)
有没有办法将我无限的 monad 流插入到我的解析函数中?
例如,是否可以编写类型为 [IO(ByteString)] -> IO [ByteString]
的函数,以便我在 monad 中使用我的函数 parse
?
问题
一般来说,为了正确排序 IO 操作并使其行为可预测,每个操作都需要在下一个操作 运行 之前完全完成。在 do 块中,这意味着这有效:
main = do
sequence (map putStrLn ["This","action","will","complete"])
putStrLn "before we get here"
但不幸的是,如果最后的 IO 操作很重要,这将不起作用:
dontRunMe = do
putStrLn "This is a problem when an action is"
sequence (repeat (putStrLn "infinite"))
putStrLn "<not printed>"
因此,即使 sequence
可以专门化为正确的类型签名:
sequence :: [IO a] -> IO [a]
它无法在无穷无尽的 IO 操作列表上按预期工作。 定义这样的序列没有问题:
badSeq :: IO [Char]
badSeq = sequence (repeat (return '+'))
但是任何执行 IO 操作的尝试(例如,通过尝试打印结果列表的头部)都会挂起:
main = (head <$> badSeq) >>= print
只需要结果的部分也没关系。在整个 sequence
完成之前,您不会从 IO monad 中得到任何东西(所以 "never" 如果列表是无限的)。
"Lazy IO" 解决方案
如果您想从部分完成的 IO 操作中获取数据,您需要对其进行明确说明并使用听起来很可怕的 Haskell 逃生口 unsafeInterleaveIO
。此函数采用 IO 操作并 "defers" 它以便在需要该值之前不会实际执行。
这通常是不安全的原因是现在有意义的 IO 操作,如果在稍后的时间点实际执行,可能意味着不同的东西。举个简单的例子,truncates/removes 文件的 IO 操作如果在 before 执行与 after 更新文件内容时会产生截然不同的效果写好了!
无论如何,你想在这里做的是写一个懒惰的版本 sequence
:
import System.IO.Unsafe (unsafeInterleaveIO)
lazySequence :: [IO a] -> IO [a]
lazySequence [] = return [] -- oops, not infinite after all
lazySequence (m:ms) = do
x <- m
xs <- unsafeInterleaveIO (lazySequence ms)
return (x:xs)
这里的重点是,当执行一个lazySequence infstream
动作时,它实际上会只执行第一个动作;剩余的操作将包含在一个延迟的 IO 操作中,该操作在需要返回列表的第二个和后续元素之前不会真正执行。
这适用于虚假 IO 操作:
> take 5 <$> lazySequence (repeat (return ('+'))
"+++++"
>
(如果您将 lazySequence
替换为 sequence
,它将挂起)。它也适用于真正的 IO 操作:
> lns <- lazySequence (repeat getLine)
<waits for first line of input, then returns to prompt>
> print (head lns)
<prints whatever you entered>
> length (head (tail lns)) -- force next element
<waits for second line of input>
<then shows length of your second line before prompt>
>
无论如何,使用 lazySequence
的定义和类型:
parse :: [ByteString] -> [Object]
input :: [IO ByteString]
你写应该没有问题:
outputs :: IO [Object]
outputs = parse <$> lazySequence inputs
然后随心所欲地使用它:
main = do
objs <- outputs
mapM_ doSomethingWithObj objs
使用管道
尽管上述惰性 IO 机制非常简单明了,但由于资源管理问题、space 泄漏的脆弱性(其中一个小的更改到你的代码炸毁了内存占用),以及异常处理的问题。
一个解决方案是 conduit
库。另一个是pipes
。两者都是精心设计的流媒体库,可以支持无限流。
对于 conduit
,如果你有一个为每个字节字符串创建一个对象的解析函数,例如:
parse1 :: ByteString -> Object
parse1 = ...
然后给出:
inputs :: [IO ByteString]
inputs = ...
useObject :: Object -> IO ()
useObject = ...
管道看起来像:
import Conduit
main :: IO ()
main = runConduit $ mapM_ yieldM inputs
.| mapC parse1
.| mapM_C useObject
鉴于您的解析函数具有签名:
parse :: [ByteString] -> [Object]
我很确定您不能将它与管道直接集成(或者至少不能以任何不会放弃使用管道的所有好处的方式)。您需要重写它,使其在使用字节字符串和生成对象的方式上对管道友好。
我想将无限的字节流解析为无限的 Haskell 数据流。每个字节都是从网络中读取的,因此它们被包装到 IO monad 中。
更具体地说,我有一个 [IO(ByteString)]
类型的无限流。另一方面,我有一个纯解析函数 parse :: [ByteString] -> [Object]
(其中 Object
是 Haskell 数据类型)
有没有办法将我无限的 monad 流插入到我的解析函数中?
例如,是否可以编写类型为 [IO(ByteString)] -> IO [ByteString]
的函数,以便我在 monad 中使用我的函数 parse
?
问题
一般来说,为了正确排序 IO 操作并使其行为可预测,每个操作都需要在下一个操作 运行 之前完全完成。在 do 块中,这意味着这有效:
main = do
sequence (map putStrLn ["This","action","will","complete"])
putStrLn "before we get here"
但不幸的是,如果最后的 IO 操作很重要,这将不起作用:
dontRunMe = do
putStrLn "This is a problem when an action is"
sequence (repeat (putStrLn "infinite"))
putStrLn "<not printed>"
因此,即使 sequence
可以专门化为正确的类型签名:
sequence :: [IO a] -> IO [a]
它无法在无穷无尽的 IO 操作列表上按预期工作。 定义这样的序列没有问题:
badSeq :: IO [Char]
badSeq = sequence (repeat (return '+'))
但是任何执行 IO 操作的尝试(例如,通过尝试打印结果列表的头部)都会挂起:
main = (head <$> badSeq) >>= print
只需要结果的部分也没关系。在整个 sequence
完成之前,您不会从 IO monad 中得到任何东西(所以 "never" 如果列表是无限的)。
"Lazy IO" 解决方案
如果您想从部分完成的 IO 操作中获取数据,您需要对其进行明确说明并使用听起来很可怕的 Haskell 逃生口 unsafeInterleaveIO
。此函数采用 IO 操作并 "defers" 它以便在需要该值之前不会实际执行。
这通常是不安全的原因是现在有意义的 IO 操作,如果在稍后的时间点实际执行,可能意味着不同的东西。举个简单的例子,truncates/removes 文件的 IO 操作如果在 before 执行与 after 更新文件内容时会产生截然不同的效果写好了!
无论如何,你想在这里做的是写一个懒惰的版本 sequence
:
import System.IO.Unsafe (unsafeInterleaveIO)
lazySequence :: [IO a] -> IO [a]
lazySequence [] = return [] -- oops, not infinite after all
lazySequence (m:ms) = do
x <- m
xs <- unsafeInterleaveIO (lazySequence ms)
return (x:xs)
这里的重点是,当执行一个lazySequence infstream
动作时,它实际上会只执行第一个动作;剩余的操作将包含在一个延迟的 IO 操作中,该操作在需要返回列表的第二个和后续元素之前不会真正执行。
这适用于虚假 IO 操作:
> take 5 <$> lazySequence (repeat (return ('+'))
"+++++"
>
(如果您将 lazySequence
替换为 sequence
,它将挂起)。它也适用于真正的 IO 操作:
> lns <- lazySequence (repeat getLine)
<waits for first line of input, then returns to prompt>
> print (head lns)
<prints whatever you entered>
> length (head (tail lns)) -- force next element
<waits for second line of input>
<then shows length of your second line before prompt>
>
无论如何,使用 lazySequence
的定义和类型:
parse :: [ByteString] -> [Object]
input :: [IO ByteString]
你写应该没有问题:
outputs :: IO [Object]
outputs = parse <$> lazySequence inputs
然后随心所欲地使用它:
main = do
objs <- outputs
mapM_ doSomethingWithObj objs
使用管道
尽管上述惰性 IO 机制非常简单明了,但由于资源管理问题、space 泄漏的脆弱性(其中一个小的更改到你的代码炸毁了内存占用),以及异常处理的问题。
一个解决方案是 conduit
库。另一个是pipes
。两者都是精心设计的流媒体库,可以支持无限流。
对于 conduit
,如果你有一个为每个字节字符串创建一个对象的解析函数,例如:
parse1 :: ByteString -> Object
parse1 = ...
然后给出:
inputs :: [IO ByteString]
inputs = ...
useObject :: Object -> IO ()
useObject = ...
管道看起来像:
import Conduit
main :: IO ()
main = runConduit $ mapM_ yieldM inputs
.| mapC parse1
.| mapM_C useObject
鉴于您的解析函数具有签名:
parse :: [ByteString] -> [Object]
我很确定您不能将它与管道直接集成(或者至少不能以任何不会放弃使用管道的所有好处的方式)。您需要重写它,使其在使用字节字符串和生成对象的方式上对管道友好。