在多义词中使用 pooledMapConcurrentlyN
Using pooledMapConcurrentlyN in polysemy
我目前正在玩 Polysemy,重写我的一个小玩具项目以适应它。我偶然发现了一段使用 pooledMapConcurrentlyN
的代码,所以基本上是具有有限并发性的遍历的并行版本。
我可以将示例简化为:
foo :: Sem r Int
foo = do
res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
pure $ sum res
action :: String -> Sem r Int
action = pure. length
这无法编译,因为 MonadUnliftIO (Sem r)
没有实例。当我使用 traverse
时它确实可以编译,但我正在寻找并发版本。我不知道我现在应该走哪条路。
我看到以下选项:
- 实施一个
MonadUnliftIO (Sem r)
实例。我看到在 this GitHub issue 中有一些关于 adding/implementing 这样的实例的讨论。但是,我不清楚这样做是否是个好主意。
- 使用
pooledMapConcurrentlyN
以外的东西给我一个等效的行为。我知道 par-dual package, but that would require a ParDual
instance. The parallel
包中的 parTraverse
也可以使解决方案成为可能,但我不熟悉它,所以我不知道它是否可能。
- 将平行导线建模为效果。我试过了,但我无法设法实现效果。我试过的效果定义如下所示:
data ParTraverse m a where
TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)
我对 GADT 和 Polysemy 都不是很熟悉,所以我可能在这里遗漏了一些明显的东西。
编辑: 正如下面的答案所指出的,最合适的解决方案是将其建模为一种效果,并处理效果解释中的并发性,而不是业务逻辑.这意味着我正在寻找类似于上面 ParTraverse
效果的高阶效果 (?):
data ParTraverse m a where
TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)
makeSem ''ParTraverse
parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ \case
TraverseP f ta -> do
_something
我不确定此类型签名是否正确(操作是否应具有类型 a -> Sem r b
?traverse
的签名对 [=25] 具有 Applicative
约束=],我该如何建模?)
那我会试试:
- 不要将并发作为业务逻辑中的效果
- 使用 pooledMapconcurrentlyIO + 嵌入你的解释器
所以你会有这样的东西
data GetThings m a where
GetThings :: [InfoToFetchThing] -> GetThings m [Thing]
runGetThingsConcurrently :: Member (Embed IO) r => Sem (GetThings ': r) a -> Sem r a
runGetThingsConcurrently = interpret \case
GetThings infos -> do
...
embed $ pooledMapConcurrentlyIO 42 <fetch-action> infos
当然,你也可以自定义很多 - 使用 Traversable
而不是列表,将 <fetch-action>
作为参数传递给解释器,传递你想要的线程数作为给你解释器的论点等
编辑:由于要执行的操作也需要在 Sem r 中,而不是在 IO 中,您可以使用 withWeavingToFinal 来(可能)从 Sem r 中获取 IO,如link.
至于 ParTraverse
实现,这是我在 github 上针对 t
专门针对 []
的版本的回复:
pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
pooledMapConcurrently num f ta =
...
data ParTraverse m a where
TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]
makeSem ''ParTraverse
parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
parTraverseToIO =
interpretH \case
TraverseP f ta -> do
taT <- traverse pureT ta
fT <- bindT f
tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
ins <- getInspectorT
pureT (catMaybes (inspect ins <$> catMaybes tb))
对interpretH
内部使用的组合器的一些解释,我们在Tactical
环境中操作:
- 由于我们正在处理一个函数
a -> m b
,其中 m
在解释器中被实例化为 Sem rInitial
,我们必须使用 bindT
来获取一个函数这类似于 f a -> Sem r (f b)
,其中 f
是解释器的单子状态。
- 我们不能直接在
Sem rInitial
上 运行 pooledMapConcurrently
,因为 Member (Final IO)
只给 r
。
ta
包含 f
的输入,但由于我们将其提升为期望 f a
,我们还必须对 [=27] 的每个元素调用 pureT
=], 使用 traverse
因为它是一个 monadic 动作。
- 由
bindT
(和 runT
)产生的函数产生的 Sem
s 仍然具有当前效果,ParTraverse
,在头部,因为效果必须在包装的 Sem
中被解释(作为 a -> m b
传入)。这甚至允许为内部程序使用不同的解释器。在我们的例子中,我们再次简单地 运行 parTraverseToIO
f
的结果。之后,我们必须将这个 Sem
提升回 Tactical
环境(这只是头部的另一个效果),所以我们使用 raise
.
- 由于我们提升的
f
产生了 f (Maybe b)
结果,我们需要解压它以获得正确的 return 类型。为此,我们可以使用检查器,它将 f
转换为 Maybe
,给我们 Maybe (Maybe b)
,然后我们可以将其展平为列表。
为了完整起见,下面是 KingoftheHomeless 编写的 pooledMapConcurrently
的实现:
pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
pooledMapConcurrently i f t = withWeavingToFinal $ \s wv ins ->
(<$ s) <$> pooledMapConcurrentlyIO i (\a -> ins <$> wv (f a <$ s)) t
我目前正在玩 Polysemy,重写我的一个小玩具项目以适应它。我偶然发现了一段使用 pooledMapConcurrentlyN
的代码,所以基本上是具有有限并发性的遍历的并行版本。
我可以将示例简化为:
foo :: Sem r Int
foo = do
res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
pure $ sum res
action :: String -> Sem r Int
action = pure. length
这无法编译,因为 MonadUnliftIO (Sem r)
没有实例。当我使用 traverse
时它确实可以编译,但我正在寻找并发版本。我不知道我现在应该走哪条路。
我看到以下选项:
- 实施一个
MonadUnliftIO (Sem r)
实例。我看到在 this GitHub issue 中有一些关于 adding/implementing 这样的实例的讨论。但是,我不清楚这样做是否是个好主意。 - 使用
pooledMapConcurrentlyN
以外的东西给我一个等效的行为。我知道 par-dual package, but that would require aParDual
instance. Theparallel
包中的parTraverse
也可以使解决方案成为可能,但我不熟悉它,所以我不知道它是否可能。 - 将平行导线建模为效果。我试过了,但我无法设法实现效果。我试过的效果定义如下所示:
data ParTraverse m a where
TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)
我对 GADT 和 Polysemy 都不是很熟悉,所以我可能在这里遗漏了一些明显的东西。
编辑: 正如下面的答案所指出的,最合适的解决方案是将其建模为一种效果,并处理效果解释中的并发性,而不是业务逻辑.这意味着我正在寻找类似于上面 ParTraverse
效果的高阶效果 (?):
data ParTraverse m a where
TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)
makeSem ''ParTraverse
parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ \case
TraverseP f ta -> do
_something
我不确定此类型签名是否正确(操作是否应具有类型 a -> Sem r b
?traverse
的签名对 [=25] 具有 Applicative
约束=],我该如何建模?)
那我会试试:
- 不要将并发作为业务逻辑中的效果
- 使用 pooledMapconcurrentlyIO + 嵌入你的解释器
所以你会有这样的东西
data GetThings m a where
GetThings :: [InfoToFetchThing] -> GetThings m [Thing]
runGetThingsConcurrently :: Member (Embed IO) r => Sem (GetThings ': r) a -> Sem r a
runGetThingsConcurrently = interpret \case
GetThings infos -> do
...
embed $ pooledMapConcurrentlyIO 42 <fetch-action> infos
当然,你也可以自定义很多 - 使用 Traversable
而不是列表,将 <fetch-action>
作为参数传递给解释器,传递你想要的线程数作为给你解释器的论点等
编辑:由于要执行的操作也需要在 Sem r 中,而不是在 IO 中,您可以使用 withWeavingToFinal 来(可能)从 Sem r 中获取 IO,如link.
至于 ParTraverse
实现,这是我在 github 上针对 t
专门针对 []
的版本的回复:
pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
pooledMapConcurrently num f ta =
...
data ParTraverse m a where
TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]
makeSem ''ParTraverse
parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
parTraverseToIO =
interpretH \case
TraverseP f ta -> do
taT <- traverse pureT ta
fT <- bindT f
tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
ins <- getInspectorT
pureT (catMaybes (inspect ins <$> catMaybes tb))
对interpretH
内部使用的组合器的一些解释,我们在Tactical
环境中操作:
- 由于我们正在处理一个函数
a -> m b
,其中m
在解释器中被实例化为Sem rInitial
,我们必须使用bindT
来获取一个函数这类似于f a -> Sem r (f b)
,其中f
是解释器的单子状态。 - 我们不能直接在
Sem rInitial
上 运行pooledMapConcurrently
,因为Member (Final IO)
只给r
。 ta
包含f
的输入,但由于我们将其提升为期望f a
,我们还必须对 [=27] 的每个元素调用pureT
=], 使用traverse
因为它是一个 monadic 动作。- 由
bindT
(和runT
)产生的函数产生的Sem
s 仍然具有当前效果,ParTraverse
,在头部,因为效果必须在包装的Sem
中被解释(作为a -> m b
传入)。这甚至允许为内部程序使用不同的解释器。在我们的例子中,我们再次简单地 运行parTraverseToIO
f
的结果。之后,我们必须将这个Sem
提升回Tactical
环境(这只是头部的另一个效果),所以我们使用raise
. - 由于我们提升的
f
产生了f (Maybe b)
结果,我们需要解压它以获得正确的 return 类型。为此,我们可以使用检查器,它将f
转换为Maybe
,给我们Maybe (Maybe b)
,然后我们可以将其展平为列表。
为了完整起见,下面是 KingoftheHomeless 编写的 pooledMapConcurrently
的实现:
pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
pooledMapConcurrently i f t = withWeavingToFinal $ \s wv ins ->
(<$ s) <$> pooledMapConcurrentlyIO i (\a -> ins <$> wv (f a <$ s)) t