流的 GroupBy by agreggateId(Haskell / 并发流)
GroupBy of stream by agreggateId (Haskell / concurrency streaming)
Context :我正在 CQRS 中实现一个应用程序,我正在尝试优化命令的处理(1 个流基本上聚合 Id)...
问题:我想要第一个流接收所有命令并通过它们在不同的聚合 ID 分派这些命令线程:
1) 聚合中的命令以序列化方式处理
2)聚合独立(并行)处理它们的命令。
解决方案:我正在尝试通过聚合 ID 对流执行 groupBy 基本上......为了帮助一点,我简化示例如下:
module Sandbox where
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO(..))
main :: IO ()
main = do
runStream $ parallely $ S.fromList getAggregateIds |& S.mapM (\x -> do
threadId <- myThreadId
liftIO $ putStrLn $ (show threadId) ++ " value " ++ (show x))
getAggregateIds :: [Integer]
getAggregateIds = [1..3] <> [1..3]
因此此脚本显示以下结果:
ThreadId 17 value 1
ThreadId 15 value 2
ThreadId 19 value 3
ThreadId 13 value 1
ThreadId 16 value 3
ThreadId 18 value 2
我期待的是类似的东西(没有特殊订单只是 x 总是在同一个线程 x1 上处理):
ThreadId X1 value X
ThreadId Y1 value Y
ThreadId Z1 value Z
ThreadId X1 value X
ThreadId Y1 value Y
ThreadId Z1 value Z
谢谢!!
在上面的代码中,parallely
决定为列表 getAggregateIds
中的每个元素创建一个 Haskell 线程,即 [1,2,3,1,2,3]
。
parallely
不关心列表中是否存在重复元素:它只是为每个元素启动一个线程。
原则上,parallely
只能分配少量的 Haskell 线程并在以后重用它们(可能用于相同的重复 ID,或另一个 ID),但不会有性能这样做的好处。实际上,这里的关键部分是正在分配 Haskell 线程,而不是 OS 线程,
Haskell 线程非常轻量级,它们使用的内存非常少,因此创建和处置它们的成本非常低。尝试重用它们可能会导致性能下降。
此外,Haskell 运行时可以在单个 OS 线程中执行多个 Haskell 线程。通常,运行时会保留一小部分 OS 线程,并将 Haskell 线程映射到这些线程。由于 OS 线程不是轻量级 OS 线程确实在 Haskell 线程之间重用。
最后,请注意 ThreadId
是 Haskell 线程的名称,而不是 OS 线程的名称,因此没有重复使用这些 ID 是正常的。
Context :我正在 CQRS 中实现一个应用程序,我正在尝试优化命令的处理(1 个流基本上聚合 Id)...
问题:我想要第一个流接收所有命令并通过它们在不同的聚合 ID 分派这些命令线程:
1) 聚合中的命令以序列化方式处理
2)聚合独立(并行)处理它们的命令。
解决方案:我正在尝试通过聚合 ID 对流执行 groupBy 基本上......为了帮助一点,我简化示例如下:
module Sandbox where
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO(..))
main :: IO ()
main = do
runStream $ parallely $ S.fromList getAggregateIds |& S.mapM (\x -> do
threadId <- myThreadId
liftIO $ putStrLn $ (show threadId) ++ " value " ++ (show x))
getAggregateIds :: [Integer]
getAggregateIds = [1..3] <> [1..3]
因此此脚本显示以下结果:
ThreadId 17 value 1
ThreadId 15 value 2
ThreadId 19 value 3
ThreadId 13 value 1
ThreadId 16 value 3
ThreadId 18 value 2
我期待的是类似的东西(没有特殊订单只是 x 总是在同一个线程 x1 上处理):
ThreadId X1 value X
ThreadId Y1 value Y
ThreadId Z1 value Z
ThreadId X1 value X
ThreadId Y1 value Y
ThreadId Z1 value Z
谢谢!!
在上面的代码中,parallely
决定为列表 getAggregateIds
中的每个元素创建一个 Haskell 线程,即 [1,2,3,1,2,3]
。
parallely
不关心列表中是否存在重复元素:它只是为每个元素启动一个线程。
原则上,parallely
只能分配少量的 Haskell 线程并在以后重用它们(可能用于相同的重复 ID,或另一个 ID),但不会有性能这样做的好处。实际上,这里的关键部分是正在分配 Haskell 线程,而不是 OS 线程,
Haskell 线程非常轻量级,它们使用的内存非常少,因此创建和处置它们的成本非常低。尝试重用它们可能会导致性能下降。
此外,Haskell 运行时可以在单个 OS 线程中执行多个 Haskell 线程。通常,运行时会保留一小部分 OS 线程,并将 Haskell 线程映射到这些线程。由于 OS 线程不是轻量级 OS 线程确实在 Haskell 线程之间重用。
最后,请注意 ThreadId
是 Haskell 线程的名称,而不是 OS 线程的名称,因此没有重复使用这些 ID 是正常的。