如何使用 Applicative 实现并发?

How to use Applicative for concurrency?

这是我之前 question. I copied the example below from Haxl

的后续

假设我正在从博客服务器获取数据以呈现一个博客页面,其中包含最近的帖子、热门帖子和帖子主题。

我有以下数据获取API:

val getRecent  : Server => Seq[Post] = ...
val getPopular : Server => Seq[Post] = ...
val getTopics  : Server => Seq[Topic] = ...

现在我需要组合它们来实现一个新功能getPageData

val getPageData: Server => (Seq[Post],  Seq[Post], Seq[Topic])

Haxl 建议使用新的 monad Fetch 使 API 可组合。

val getRecent  : Fetch[Seq[Posts]] = ...
val getPopular : Fetch[Seq[Posts]] = ...
val getTopics  : Fetch[Seq[Topic]] = ...

现在我可以用 monadic 组合定义我的 getPageData: Fetch[A]

val getPageData = for {
  recent  <- getRecent
  popular <- getPopular
  topics  <- getTopics
} yield (recent, popular, topics)

但它不会同时运行 getRecentgetPopulargetTopics

Haxl 建议使用 applicative 组合 <*> 来组合 "concurrent" 函数(即可以并发运行的函数)。所以我的问题是:

How to implement getPageData assuming Fetch[A] is an Applicative ?

我们需要做的就是放弃单子绑定 >>= 以支持应用 <*>。所以而不是

val getPageData = for {
  recent  <- getRecent
  popular <- getPopular
  topics  <- getTopics
} yield (recent, popular, topics)

我们会写类似的东西(在 Haskell 语法中;抱歉,我无法随心所欲地使用 Scala):

getPageData = makeTriple <$> getRecent <*> getPopular <*> getTopics
  where
    makeTriple x y z = (x, y, z)

但这是否有预期的效果取决于第二个问题!

How to implement Fetch as an Applicative but not a Monad ?

单子序列和应用序列之间的主要区别在于,单子序列可以依赖于单子值中的值,而应用序列 <*> 则不能。请注意上面 getPageData 的单子表达式如何在到达 getTopics 之前绑定名称 recentpopular。这些名称 可以 用于更改表达式的结构,例如在 recent 为空的情况下获取其他数据源。但是对于应用表达式,getRecentgetPopular 的结果不是表达式本身结构中的因素。 属性 允许我们同时触发应用表达式中的每个术语,因为我们静态地知道表达式的结构。

因此,使用上面的观察,以及显然是 Fetch 数据类型的特殊形状,我们可以为 <*> 提出一个合适的定义。我认为以下说明了总体思路:

data Fetch a = Fetch { runFetch :: IO a }

fetchF <*> fetchX = Fetch $ do
  -- Fire off both IOs concurrently.
  resultF <- async $ runFetch fetchF
  resultX <- async $ runFetch fetchX
  -- Wait for both results to be ready.
  f <- wait resultF
  x <- wait resultX
  return $ f x

为了比较,假设我们尝试通过并发评估进行单子绑定:

fetchF >>= fetchK = Fetch $ do
  resultF <- async $ runFetch fetchF
  -- Oh no, we need resultF in order to produce the next
  -- Fetch value! We just have to wait...
  f <- wait resultF
  fetchX <- async $ runFetch (fetchK f)
  x <- wait $ runFetch fetchX
  return $ f x