Haskell 中的并发 HTTP 请求

Concurrent HTTP requests in Haskell

我有一组函数,旨在从 Asana API 构建子任务树。为此,我有一个相当简单的模块,称为“Asana.hs”,其最重要的两个功能是使用 Network.HTTP.Simple 执行请求的功能:

getTasksForProject :: String -> String -> IO [Task]
getTasksForProject token projectId = getFromAsana token $ "projects/" ++ projectId ++ "/tasks"

getSubtasks :: String -> String -> IO [Task]
getSubtasks token taskId = getFromAsana token $ "tasks/" ++ taskId ++ "/subtasks"

问题是当我想构建一个包含我必须完成的所有任务的图表时:

  1. 获取任务列表
  2. 迭代这些任务以获得它们的子任务
  3. 递归

例如,我有这些函数来构建节点和边的“图”:

type TaskGraph = ([Task], [Edge])

merge :: TaskGraph -> TaskGraph -> TaskGraph
merge (aTasks, aEdges) (bTasks, bEdges) = (aTasks ++ bTasks, aEdges ++ bEdges)

makeEdge :: Relation -> Task -> Task -> Edge
makeEdge rel parent child = Edge rel (taskId parent) (taskId child)

rFetchTaskGraph :: String -> Task -> IO TaskGraph
rFetchTaskGraph token task = do
  subtasks <- getSubtasks token $ taskId task
  let edges = map (makeEdge Subtask task) subtasks
  foldr merge ([task], edges) <$> mapM (rFetchTaskGraph token) subtasks

这非常慢,因为据我所知,它会按顺序发出每个 HTTP 请求。如果我在类似 Javascript 中这样做,Promises 将允许我急切地执行所有计算,但将请求排队,因此仅在请求完成时才解析相关的 Promise,但将并行性集中到某种连接池管理器。

如何在Haskell中提高效率?我有一些想法:

  1. 也许我需要创建一个新的 Monad 来表示这个池化资源访问?
  2. 我可以急切地计算整个列表吗(当然,在我可以的范围内,因为有些请求只有在其他请求的结果出现后才能知道 return)?
  3. 我需要显式使用线程吗?

而不是

mapM (rFetchTaskGraph token) subtasks

使用

mapConcurrently (rFetchTaskGraph token) subtasks

其中 mapConcurrently is from the async 图书馆。

但是,在发出并发 HTTP 请求时,应该小心地限制它们,以免压垮远程服务器或被它禁止。进行节流的一种简单方法是使用 semaphore, as described in .

rFetchTaskGraph 的所有调用进行门控

因为 rFetchTaskGraph 是递归的,它应该接受信号量作为参数,以便将其传递给它的子调用:

rFetchTaskGraph :: QSem -> String -> Task -> IO TaskGraph
rFetchTaskGraph sem token task = 
    bracket_ 
      (waitQSem sem) 
      (signalQSem sem)
      (do
        subtasks <- getSubtasks token $ taskId task
        let edges = map (makeEdge Subtask task) subtasks
        foldr merge ([task], edges) <$> mapConcurrently (rFetchTaskGraph sem token) subtasks)

更全面的解决方案将涉及线程池 and/or concurrent queues.

编辑: 我认为前面的代码在实践中可能会导致死锁,因为临界区的范围太大了。这样的东西应该会更好:

rFetchTaskGraph sem token task = do
       subtasks <- bracket_ (waitQSem sem) (signalQSem sem) $ getSubtasks token $ taskId task
       let edges = map (makeEdge Subtask task) subtasks
       foldr merge ([task], edges) <$> mapConcurrently (rFetchTaskGraph sem token) subtasks 

也就是说,将临界区限制为实际的 HTTP 请求。