使用 Haskell 管道合并源
Merging sources using Haskell Conduit
是否可以在 Conduit 中构建一个函数(比如 zipC2)来转换以下源:
series1 = yieldMany [2, 4, 6, 8, 16 :: Int]
series2 = yieldMany [1, 5, 6 :: Int]
变成一个会产生以下对的(在这里显示为列表):
[(Nothing, Just 1), (Just 2, Just 1), (Just 4, Just 1), (Just 4, Just 5), (Just 6, Just 6), (Just 8, Just 6), (Just 16, Just 6)]
它将通过以下方式调用比较函数:
runConduitPure ( zipC2 (<=) series1 series1 .| sinkList )
之前的版本中曾经有一个mergeSources
函数,做的事情比较类似(虽然没有记忆效应),但在最近的版本(1.3.1)中消失了。
说明函数的工作原理:
这个想法是采用 2 个来源 A(生成值 a)和 B(生成值 b).
然后我们生成对:
如果a < b我们首先构建(只是一个,没有)
如果 b < a 它将产生 (Nothing, Just b)
如果 a == b 我们更新双方,我们产生 (Just a, Just b)
源中未更新的值不会被消耗,而是用于下一轮比较。仅使用更新的值。
然后我们根据 A 和 B 的相对值不断更新对。
换句话说:如果 a < b,我们更新对的左侧,如果 b < a,则更新右侧,或双方如果 a == b。任何未使用的值都保留在内存中以供下一轮比较。
我已经成功创建了你的 zipC2
函数:
import Data.Ord
import Conduit
import Control.Monad
zipC2Def :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> (Maybe a, Maybe a) -> ConduitT () (Maybe a, Maybe a) m ()
zipC2Def f c1 c2 (s1, s2) = do
ma <- c1 .| peekC
mb <- c2 .| peekC
case (ma, mb) of
(Just a, Just b) ->
case (f a b, f b a) of
(True, True) -> do
yield (ma, mb)
zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, mb)
(_, True) -> do
yield (s1, mb)
zipC2Def f c1 (c2 .| drop1) (s1, mb)
(True, _) -> do
yield (ma, s2)
zipC2Def f (c1 .| drop1) c2 (ma, s2)
_ ->
zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, s2)
(Just a, Nothing) -> do
yield (ma, s2)
zipC2Def f (c1 .| drop1) c2 (ma, s2)
(Nothing, Just b) -> do
yield (s1, mb)
zipC2Def f c1 (c2 .| drop1) (s1, mb)
_ -> return ()
where
drop1 = dropC 1 >> takeWhileC (const True)
zipC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (Maybe a, Maybe a) m ()
zipC2 f c1 c2 = zipC2Def f c1 c2 (Nothing, Nothing)
main :: IO ()
main =
let
series1 = yieldMany [2, 4, 6, 8, 16 :: Int] :: ConduitT () Int Identity ()
series2 = yieldMany [1, 5, 6 :: Int] :: ConduitT () Int Identity ()
in
putStrLn $ show $ runConduitPure $
(zipC2 (<=) series1 series2)
.| sinkList
输出:
[(Nothing,Just 1),(Just 2,Just 1),(Just 4,Just 1),(Just 4,Just 5),(Just 6,Just 6),(Just 8,Just 6),(Just 16,Just 6)]
下面的代码按预期工作(我调用了函数 mergeSort):
module Data.Conduit.Merge where
import Prelude (Monad, Bool, Maybe(..), Show, Eq)
import Prelude (otherwise, return)
import Prelude (($))
import Conduit (ConduitT)
import Conduit (evalStateC, mapC, yield, await)
import Conduit ((.|))
import Control.Monad.State (get, put, lift)
import Control.Monad.Trans.State.Strict (StateT)
import qualified Data.Conduit.Internal as CI
-- | Takes two sources and merges them.
-- This comes from https://github.com/luispedro/conduit-algorithms made available thanks to Luis Pedro Coelho.
mergeC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () a m ()
mergeC2 comparator (CI.ConduitT s1) (CI.ConduitT s2) = CI.ConduitT $ processMergeC2 comparator s1 s2
processMergeC2 :: Monad m => (a -> a -> Bool)
-> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s1 ConduitT () a m ()
-> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s2 ConduitT () a m ()
-> ((() -> CI.Pipe () () a () m b ) -> CI.Pipe () () a () m b ) -- rest ConduitT () a m ()
processMergeC2 comparator s1 s2 rest = go (s1 CI.Done) (s2 CI.Done)
where
go s1''@(CI.HaveOutput s1' v1) s2''@(CI.HaveOutput s2' v2) -- s1''@ and s2''@ simply name the pattern expressions
| comparator v1 v2 = CI.HaveOutput (go s1' s2'') v1
| otherwise = CI.HaveOutput (go s1'' s2') v2
go s1'@CI.Done{} (CI.HaveOutput s v) = CI.HaveOutput (go s1' s) v
go (CI.HaveOutput s v) s1'@CI.Done{} = CI.HaveOutput (go s s1') v
go CI.Done{} CI.Done{} = rest ()
go (CI.PipeM p) left = do
next <- lift p
go next left
go right (CI.PipeM p) = do
next <- lift p
go right next
go (CI.NeedInput _ next) left = go (next ()) left
go right (CI.NeedInput _ next) = go right (next ())
go (CI.Leftover next ()) left = go next left
go right (CI.Leftover next ()) = go right next
data MergeTag = LeftItem | RightItem deriving (Show, Eq)
data TaggedItem a = TaggedItem MergeTag a deriving (Show, Eq)
mergeTag :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (TaggedItem a) m ()
mergeTag func series1 series2 = mergeC2 (tagSort func) taggedSeries1 taggedSeries2
where
taggedSeries1 = series1 .| mapC (\item -> TaggedItem LeftItem item)
taggedSeries2 = series2 .| mapC (\item -> TaggedItem RightItem item)
tagSort :: (a -> a -> Bool) -> TaggedItem a -> TaggedItem a -> Bool
tagSort f (TaggedItem _ item1) (TaggedItem _ item2) = f item1 item2
type StateMergePair a = (Maybe a, Maybe a)
pairTagC :: (Monad m) => ConduitT (TaggedItem a) (StateMergePair a) (StateT (StateMergePair a) m) ()
pairTagC = do
input <- await
case input of
Nothing -> return ()
Just taggedItem -> do
stateMergePair <- lift get
let outputState = updateStateMergePair taggedItem stateMergePair
lift $ put outputState
yield outputState
pairTagC
updateStateMergePair :: TaggedItem a -> StateMergePair a -> StateMergePair a
updateStateMergePair (TaggedItem tag item) (Just leftItem, Just rightItem) = case tag of
LeftItem -> (Just item, Just rightItem)
RightItem -> (Just leftItem, Just item)
updateStateMergePair (TaggedItem tag item) (Nothing, Just rightItem) = case tag of
LeftItem -> (Just item, Just rightItem)
RightItem -> (Nothing, Just item)
updateStateMergePair (TaggedItem tag item) (Just leftItem, Nothing) = case tag of
LeftItem -> (Just item, Nothing)
RightItem -> (Just leftItem, Just item)
updateStateMergePair (TaggedItem tag item) (Nothing, Nothing) = case tag of
LeftItem -> (Just item, Nothing)
RightItem -> (Nothing, Just item)
pairTag :: (Monad m) => ConduitT (TaggedItem a) (StateMergePair a) m ()
pairTag = evalStateC (Nothing, Nothing) pairTagC
mergeSort :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (StateMergePair a) m ()
mergeSort func series1 series2 = mergeTag func series1 series2 .| pairTag
我借用了https://github.com/luispedro/conduit-algorithms的mergeC2函数...
我只是Haskell的初学者,所以代码肯定不是最优的。
是否可以在 Conduit 中构建一个函数(比如 zipC2)来转换以下源:
series1 = yieldMany [2, 4, 6, 8, 16 :: Int]
series2 = yieldMany [1, 5, 6 :: Int]
变成一个会产生以下对的(在这里显示为列表):
[(Nothing, Just 1), (Just 2, Just 1), (Just 4, Just 1), (Just 4, Just 5), (Just 6, Just 6), (Just 8, Just 6), (Just 16, Just 6)]
它将通过以下方式调用比较函数:
runConduitPure ( zipC2 (<=) series1 series1 .| sinkList )
之前的版本中曾经有一个mergeSources
函数,做的事情比较类似(虽然没有记忆效应),但在最近的版本(1.3.1)中消失了。
说明函数的工作原理: 这个想法是采用 2 个来源 A(生成值 a)和 B(生成值 b).
然后我们生成对:
如果a < b我们首先构建(只是一个,没有)
如果 b < a 它将产生 (Nothing, Just b)
如果 a == b 我们更新双方,我们产生 (Just a, Just b)
源中未更新的值不会被消耗,而是用于下一轮比较。仅使用更新的值。
然后我们根据 A 和 B 的相对值不断更新对。
换句话说:如果 a < b,我们更新对的左侧,如果 b < a,则更新右侧,或双方如果 a == b。任何未使用的值都保留在内存中以供下一轮比较。
我已经成功创建了你的 zipC2
函数:
import Data.Ord
import Conduit
import Control.Monad
zipC2Def :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> (Maybe a, Maybe a) -> ConduitT () (Maybe a, Maybe a) m ()
zipC2Def f c1 c2 (s1, s2) = do
ma <- c1 .| peekC
mb <- c2 .| peekC
case (ma, mb) of
(Just a, Just b) ->
case (f a b, f b a) of
(True, True) -> do
yield (ma, mb)
zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, mb)
(_, True) -> do
yield (s1, mb)
zipC2Def f c1 (c2 .| drop1) (s1, mb)
(True, _) -> do
yield (ma, s2)
zipC2Def f (c1 .| drop1) c2 (ma, s2)
_ ->
zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, s2)
(Just a, Nothing) -> do
yield (ma, s2)
zipC2Def f (c1 .| drop1) c2 (ma, s2)
(Nothing, Just b) -> do
yield (s1, mb)
zipC2Def f c1 (c2 .| drop1) (s1, mb)
_ -> return ()
where
drop1 = dropC 1 >> takeWhileC (const True)
zipC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (Maybe a, Maybe a) m ()
zipC2 f c1 c2 = zipC2Def f c1 c2 (Nothing, Nothing)
main :: IO ()
main =
let
series1 = yieldMany [2, 4, 6, 8, 16 :: Int] :: ConduitT () Int Identity ()
series2 = yieldMany [1, 5, 6 :: Int] :: ConduitT () Int Identity ()
in
putStrLn $ show $ runConduitPure $
(zipC2 (<=) series1 series2)
.| sinkList
输出:
[(Nothing,Just 1),(Just 2,Just 1),(Just 4,Just 1),(Just 4,Just 5),(Just 6,Just 6),(Just 8,Just 6),(Just 16,Just 6)]
下面的代码按预期工作(我调用了函数 mergeSort):
module Data.Conduit.Merge where
import Prelude (Monad, Bool, Maybe(..), Show, Eq)
import Prelude (otherwise, return)
import Prelude (($))
import Conduit (ConduitT)
import Conduit (evalStateC, mapC, yield, await)
import Conduit ((.|))
import Control.Monad.State (get, put, lift)
import Control.Monad.Trans.State.Strict (StateT)
import qualified Data.Conduit.Internal as CI
-- | Takes two sources and merges them.
-- This comes from https://github.com/luispedro/conduit-algorithms made available thanks to Luis Pedro Coelho.
mergeC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () a m ()
mergeC2 comparator (CI.ConduitT s1) (CI.ConduitT s2) = CI.ConduitT $ processMergeC2 comparator s1 s2
processMergeC2 :: Monad m => (a -> a -> Bool)
-> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s1 ConduitT () a m ()
-> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s2 ConduitT () a m ()
-> ((() -> CI.Pipe () () a () m b ) -> CI.Pipe () () a () m b ) -- rest ConduitT () a m ()
processMergeC2 comparator s1 s2 rest = go (s1 CI.Done) (s2 CI.Done)
where
go s1''@(CI.HaveOutput s1' v1) s2''@(CI.HaveOutput s2' v2) -- s1''@ and s2''@ simply name the pattern expressions
| comparator v1 v2 = CI.HaveOutput (go s1' s2'') v1
| otherwise = CI.HaveOutput (go s1'' s2') v2
go s1'@CI.Done{} (CI.HaveOutput s v) = CI.HaveOutput (go s1' s) v
go (CI.HaveOutput s v) s1'@CI.Done{} = CI.HaveOutput (go s s1') v
go CI.Done{} CI.Done{} = rest ()
go (CI.PipeM p) left = do
next <- lift p
go next left
go right (CI.PipeM p) = do
next <- lift p
go right next
go (CI.NeedInput _ next) left = go (next ()) left
go right (CI.NeedInput _ next) = go right (next ())
go (CI.Leftover next ()) left = go next left
go right (CI.Leftover next ()) = go right next
data MergeTag = LeftItem | RightItem deriving (Show, Eq)
data TaggedItem a = TaggedItem MergeTag a deriving (Show, Eq)
mergeTag :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (TaggedItem a) m ()
mergeTag func series1 series2 = mergeC2 (tagSort func) taggedSeries1 taggedSeries2
where
taggedSeries1 = series1 .| mapC (\item -> TaggedItem LeftItem item)
taggedSeries2 = series2 .| mapC (\item -> TaggedItem RightItem item)
tagSort :: (a -> a -> Bool) -> TaggedItem a -> TaggedItem a -> Bool
tagSort f (TaggedItem _ item1) (TaggedItem _ item2) = f item1 item2
type StateMergePair a = (Maybe a, Maybe a)
pairTagC :: (Monad m) => ConduitT (TaggedItem a) (StateMergePair a) (StateT (StateMergePair a) m) ()
pairTagC = do
input <- await
case input of
Nothing -> return ()
Just taggedItem -> do
stateMergePair <- lift get
let outputState = updateStateMergePair taggedItem stateMergePair
lift $ put outputState
yield outputState
pairTagC
updateStateMergePair :: TaggedItem a -> StateMergePair a -> StateMergePair a
updateStateMergePair (TaggedItem tag item) (Just leftItem, Just rightItem) = case tag of
LeftItem -> (Just item, Just rightItem)
RightItem -> (Just leftItem, Just item)
updateStateMergePair (TaggedItem tag item) (Nothing, Just rightItem) = case tag of
LeftItem -> (Just item, Just rightItem)
RightItem -> (Nothing, Just item)
updateStateMergePair (TaggedItem tag item) (Just leftItem, Nothing) = case tag of
LeftItem -> (Just item, Nothing)
RightItem -> (Just leftItem, Just item)
updateStateMergePair (TaggedItem tag item) (Nothing, Nothing) = case tag of
LeftItem -> (Just item, Nothing)
RightItem -> (Nothing, Just item)
pairTag :: (Monad m) => ConduitT (TaggedItem a) (StateMergePair a) m ()
pairTag = evalStateC (Nothing, Nothing) pairTagC
mergeSort :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (StateMergePair a) m ()
mergeSort func series1 series2 = mergeTag func series1 series2 .| pairTag
我借用了https://github.com/luispedro/conduit-algorithms的mergeC2函数...
我只是Haskell的初学者,所以代码肯定不是最优的。