Haskell 中的并发 HTTP 请求
Concurrent HTTP requests in Haskell
我有一组函数,旨在从 Asana API 构建子任务树。为此,我有一个相当简单的模块,称为“Asana.hs”,其最重要的两个功能是使用 Network.HTTP.Simple
执行请求的功能:
getTasksForProject :: String -> String -> IO [Task]
getTasksForProject token projectId = getFromAsana token $ "projects/" ++ projectId ++ "/tasks"
getSubtasks :: String -> String -> IO [Task]
getSubtasks token taskId = getFromAsana token $ "tasks/" ++ taskId ++ "/subtasks"
问题是当我想构建一个包含我必须完成的所有任务的图表时:
- 获取任务列表
- 迭代这些任务以获得它们的子任务
- 递归
例如,我有这些函数来构建节点和边的“图”:
type TaskGraph = ([Task], [Edge])
merge :: TaskGraph -> TaskGraph -> TaskGraph
merge (aTasks, aEdges) (bTasks, bEdges) = (aTasks ++ bTasks, aEdges ++ bEdges)
makeEdge :: Relation -> Task -> Task -> Edge
makeEdge rel parent child = Edge rel (taskId parent) (taskId child)
rFetchTaskGraph :: String -> Task -> IO TaskGraph
rFetchTaskGraph token task = do
subtasks <- getSubtasks token $ taskId task
let edges = map (makeEdge Subtask task) subtasks
foldr merge ([task], edges) <$> mapM (rFetchTaskGraph token) subtasks
这非常慢,因为据我所知,它会按顺序发出每个 HTTP 请求。如果我在类似 Javascript 中这样做,Promises 将允许我急切地执行所有计算,但将请求排队,因此仅在请求完成时才解析相关的 Promise,但将并行性集中到某种连接池管理器。
如何在Haskell中提高效率?我有一些想法:
- 也许我需要创建一个新的 Monad 来表示这个池化资源访问?
- 我可以急切地计算整个列表吗(当然,在我可以的范围内,因为有些请求只有在其他请求的结果出现后才能知道 return)?
- 我需要显式使用线程吗?
而不是
mapM (rFetchTaskGraph token) subtasks
使用
mapConcurrently (rFetchTaskGraph token) subtasks
其中 mapConcurrently
is from the async 图书馆。
但是,在发出并发 HTTP 请求时,应该小心地限制它们,以免压垮远程服务器或被它禁止。进行节流的一种简单方法是使用 semaphore, as described in .
对 rFetchTaskGraph
的所有调用进行门控
因为 rFetchTaskGraph
是递归的,它应该接受信号量作为参数,以便将其传递给它的子调用:
rFetchTaskGraph :: QSem -> String -> Task -> IO TaskGraph
rFetchTaskGraph sem token task =
bracket_
(waitQSem sem)
(signalQSem sem)
(do
subtasks <- getSubtasks token $ taskId task
let edges = map (makeEdge Subtask task) subtasks
foldr merge ([task], edges) <$> mapConcurrently (rFetchTaskGraph sem token) subtasks)
更全面的解决方案将涉及线程池 and/or concurrent queues.
编辑: 我认为前面的代码在实践中可能会导致死锁,因为临界区的范围太大了。这样的东西应该会更好:
rFetchTaskGraph sem token task = do
subtasks <- bracket_ (waitQSem sem) (signalQSem sem) $ getSubtasks token $ taskId task
let edges = map (makeEdge Subtask task) subtasks
foldr merge ([task], edges) <$> mapConcurrently (rFetchTaskGraph sem token) subtasks
也就是说,将临界区限制为实际的 HTTP 请求。
我有一组函数,旨在从 Asana API 构建子任务树。为此,我有一个相当简单的模块,称为“Asana.hs”,其最重要的两个功能是使用 Network.HTTP.Simple
执行请求的功能:
getTasksForProject :: String -> String -> IO [Task]
getTasksForProject token projectId = getFromAsana token $ "projects/" ++ projectId ++ "/tasks"
getSubtasks :: String -> String -> IO [Task]
getSubtasks token taskId = getFromAsana token $ "tasks/" ++ taskId ++ "/subtasks"
问题是当我想构建一个包含我必须完成的所有任务的图表时:
- 获取任务列表
- 迭代这些任务以获得它们的子任务
- 递归
例如,我有这些函数来构建节点和边的“图”:
type TaskGraph = ([Task], [Edge])
merge :: TaskGraph -> TaskGraph -> TaskGraph
merge (aTasks, aEdges) (bTasks, bEdges) = (aTasks ++ bTasks, aEdges ++ bEdges)
makeEdge :: Relation -> Task -> Task -> Edge
makeEdge rel parent child = Edge rel (taskId parent) (taskId child)
rFetchTaskGraph :: String -> Task -> IO TaskGraph
rFetchTaskGraph token task = do
subtasks <- getSubtasks token $ taskId task
let edges = map (makeEdge Subtask task) subtasks
foldr merge ([task], edges) <$> mapM (rFetchTaskGraph token) subtasks
这非常慢,因为据我所知,它会按顺序发出每个 HTTP 请求。如果我在类似 Javascript 中这样做,Promises 将允许我急切地执行所有计算,但将请求排队,因此仅在请求完成时才解析相关的 Promise,但将并行性集中到某种连接池管理器。
如何在Haskell中提高效率?我有一些想法:
- 也许我需要创建一个新的 Monad 来表示这个池化资源访问?
- 我可以急切地计算整个列表吗(当然,在我可以的范围内,因为有些请求只有在其他请求的结果出现后才能知道 return)?
- 我需要显式使用线程吗?
而不是
mapM (rFetchTaskGraph token) subtasks
使用
mapConcurrently (rFetchTaskGraph token) subtasks
其中 mapConcurrently
is from the async 图书馆。
但是,在发出并发 HTTP 请求时,应该小心地限制它们,以免压垮远程服务器或被它禁止。进行节流的一种简单方法是使用 semaphore, as described in
rFetchTaskGraph
的所有调用进行门控
因为 rFetchTaskGraph
是递归的,它应该接受信号量作为参数,以便将其传递给它的子调用:
rFetchTaskGraph :: QSem -> String -> Task -> IO TaskGraph
rFetchTaskGraph sem token task =
bracket_
(waitQSem sem)
(signalQSem sem)
(do
subtasks <- getSubtasks token $ taskId task
let edges = map (makeEdge Subtask task) subtasks
foldr merge ([task], edges) <$> mapConcurrently (rFetchTaskGraph sem token) subtasks)
更全面的解决方案将涉及线程池 and/or concurrent queues.
编辑: 我认为前面的代码在实践中可能会导致死锁,因为临界区的范围太大了。这样的东西应该会更好:
rFetchTaskGraph sem token task = do
subtasks <- bracket_ (waitQSem sem) (signalQSem sem) $ getSubtasks token $ taskId task
let edges = map (makeEdge Subtask task) subtasks
foldr merge ([task], edges) <$> mapConcurrently (rFetchTaskGraph sem token) subtasks
也就是说,将临界区限制为实际的 HTTP 请求。