流的 GroupBy by agreggateId(Haskell / 并发流)

GroupBy of stream by agreggateId (Haskell / concurrency streaming)

Context :我正在 CQRS 中实现一个应用程序,我正在尝试优化命令的处理(1 个流基本上聚合 Id)...

问题:我想要第一个流接收所有命令并通过它们在不同的聚合 ID 分派这些命令线程:

1) 聚合中的命令以序列化方式处理
2)聚合独立(并行)处理它们的命令。

解决方案:我正在尝试通过聚合 ID 对流执行 groupBy 基本上......为了帮助一点,我简化示例如下:

module Sandbox where

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO(..))

main :: IO ()
main = do
         runStream $ parallely $ S.fromList getAggregateIds |& S.mapM (\x -> do
            threadId <- myThreadId
            liftIO $ putStrLn $ (show threadId) ++ "  value " ++ (show x))


getAggregateIds :: [Integer]
getAggregateIds = [1..3] <> [1..3]

因此此脚本显示以下结果:

ThreadId 17  value 1
ThreadId 15  value 2
ThreadId 19  value 3
ThreadId 13  value 1
ThreadId 16  value 3
ThreadId 18  value 2

我期待的是类似的东西(没有特殊订单只是 x 总是在同一个线程 x1 上处理):

ThreadId X1  value X
ThreadId Y1  value Y
ThreadId Z1  value Z
ThreadId X1  value X
ThreadId Y1  value Y
ThreadId Z1  value Z

谢谢!!

在上面的代码中,parallely 决定为列表 getAggregateIds 中的每个元素创建一个 Haskell 线程,即 [1,2,3,1,2,3]parallely 不关心列表中是否存在重复元素:它只是为每个元素启动一个线程。

原则上,parallely 只能分配少量的 Haskell 线程并在以后重用它们(可能用于相同的重复 ID,或另一个 ID),但不会有性能这样做的好处。实际上,这里的关键部分是正在分配 Haskell 线程,而不是 OS 线程,

Haskell 线程非常轻量级,它们使用的内存非常少,因此创建和处置它们的成本非常低。尝试重用它们可能会导致性能下降。

此外,Haskell 运行时可以在单个 OS 线程中执行多个 Haskell 线程。通常,运行时会保留一小部分 OS 线程,并将 Haskell 线程映射到这些线程。由于 OS 线程不是轻量级 OS 线程确实在 Haskell 线程之间重用。

最后,请注意 ThreadId 是 Haskell 线程的名称,而不是 OS 线程的名称,因此没有重复使用这些 ID 是正常的。