在多义词中使用 pooledMapConcurrentlyN

Using pooledMapConcurrentlyN in polysemy

我目前正在玩 Polysemy,重写我的一个小玩具项目以适应它。我偶然发现了一段使用 pooledMapConcurrentlyN 的代码,所以基本上是具有有限并发性的遍历的并行版本。

我可以将示例简化为:

foo :: Sem r Int
foo = do
  res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
  pure $ sum res

action :: String -> Sem r Int 
action = pure. length

这无法编译,因为 MonadUnliftIO (Sem r) 没有实例。当我使用 traverse 时它确实可以编译,但我正在寻找并发版本。我不知道我现在应该走哪条路。

我看到以下选项:

data ParTraverse m a where
  TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)

我对 GADT 和 Polysemy 都不是很熟悉,所以我可能在这里遗漏了一些明显的东西。


编辑: 正如下面的答案所指出的,最合适的解决方案是将其建模为一种效果,并处理效果解释中的并发性,而不是业务逻辑.这意味着我正在寻找类似于上面 ParTraverse 效果的高阶效果 (?):

data ParTraverse m a where
  TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)

makeSem ''ParTraverse

parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ \case
  TraverseP f ta -> do
    _something

我不确定此类型签名是否正确(操作是否应具有类型 a -> Sem r btraverse 的签名对 [=25] 具有 Applicative 约束=],我该如何建模?)

那我会试试:

  1. 不要将并发作为业务逻辑中的效果
  2. 使用 pooledMapconcurrentlyIO + 嵌入你的解释器

所以你会有这样的东西

data GetThings m a where
  GetThings :: [InfoToFetchThing] -> GetThings m [Thing]

runGetThingsConcurrently :: Member (Embed IO) r => Sem (GetThings ': r) a -> Sem r a
runGetThingsConcurrently = interpret \case
  GetThings infos -> do
  ...
  embed $ pooledMapConcurrentlyIO 42 <fetch-action> infos

当然,你也可以自定义很多 - 使用 Traversable 而不是列表,将 <fetch-action> 作为参数传递给解释器,传递你想要的线程数作为给你解释器的论点等

编辑:由于要执行的操作也需要在 Sem r 中,而不是在 IO 中,您可以使用 withWeavingToFinal 来(可能)从 Sem r 中获取 IO,如link.

至于 ParTraverse 实现,这是我在 github 上针对 t 专门针对 [] 的版本的回复:

pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
pooledMapConcurrently num f ta =
  ...

data ParTraverse m a where
  TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]

makeSem ''ParTraverse

parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
parTraverseToIO =
  interpretH \case
   TraverseP f ta -> do
     taT <- traverse pureT ta
     fT <- bindT f
     tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
     ins <- getInspectorT
     pureT (catMaybes (inspect ins <$> catMaybes tb))

interpretH内部使用的组合器的一些解释,我们在Tactical环境中操作:

  • 由于我们正在处理一个函数 a -> m b,其中 m 在解释器中被实例化为 Sem rInitial,我们必须使用 bindT 来获取一个函数这类似于 f a -> Sem r (f b),其中 f 是解释器的单子状态。
  • 我们不能直接在 Sem rInitial 上 运行 pooledMapConcurrently,因为 Member (Final IO) 只给 r
  • ta 包含 f 的输入,但由于我们将其提升为期望 f a,我们还必须对 [=27] 的每个元素调用 pureT =], 使用 traverse 因为它是一个 monadic 动作。
  • bindT(和 runT)产生的函数产生的 Sems 仍然具有当前效果,ParTraverse,在头部,因为效果必须在包装的 Sem 中被解释(作为 a -> m b 传入)。这甚至允许为内部程序使用不同的解释器。在我们的例子中,我们再次简单地 运行 parTraverseToIO f 的结果。之后,我们必须将这个 Sem 提升回 Tactical 环境(这只是头部的另一个效果),所以我们使用 raise.
  • 由于我们提升的 f 产生了 f (Maybe b) 结果,我们需要解压它以获得正确的 return 类型。为此,我们可以使用检查器,它将 f 转换为 Maybe,给我们 Maybe (Maybe b),然后我们可以将其展平为列表。

为了完整起见,下面是 KingoftheHomeless 编写的 pooledMapConcurrently 的实现:

pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
pooledMapConcurrently i f t = withWeavingToFinal $ \s wv ins ->
  (<$ s) <$> pooledMapConcurrentlyIO i (\a -> ins <$> wv (f a <$ s)) t