使用带有回复通道的 MailboxProcessor 来创建按顺序 return 值的有限代理
Use a MailboxProcessor with reply-channel to create limited agents that return values in order
基本上,我想将以下更改为有限线程解决方案,因为在我的情况下,计算列表太大,产生了太多线程,我想用更少的线程来试验和测量性能。
// the trivial approach (and largely my current situation)
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i) // longest thread will run 1 sec
return i * i // some complex calculation returning a certain type
})
|> Async.Parallel
|> Async.RunSynchronously // works, total wall time 1s
我的新方法,此代码是 borrowed/inspired by this online snippet from Tomas Petricek(我测试过,它有效,但我需要它 return 一个值,而不是单位)。
type LimitAgentMessage =
| Start of Async<int> * AsyncReplyChannel<int>
| Finished
let threadingLimitAgent limit = MailboxProcessor.Start(fun inbox -> async {
let queue = System.Collections.Generic.Queue<_>()
let count = ref 0
while true do
let! msg = inbox.Receive()
match msg with
| Start (work, reply) -> queue.Enqueue((work, reply))
| Finished -> decr count
if count.Value < limit && queue.Count > 0 then
incr count
let work, reply = queue.Dequeue()
// Start it in a thread pool (on background)
Async.Start(async {
let! x = work
do! async {reply.Reply x }
inbox.Post(Finished)
})
})
// given a synchronous list of tasks, run each task asynchronously,
// return calculated values in original order
let worker lst =
// this doesn't work as expected, it waits for each reply
let agent = threadingLimitAgent 10
lst
|> List.map(fun x ->
agent.PostAndReply(
fun replyChannel -> Start(x, replyChannel)))
现在,有了这个,原始代码将变成:
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i) // longest thread will run 1 sec
return i * i // some complex calculation returning a certain type
})
|> worker // worker is not working (correct output, runs 5.5s)
总而言之,输出是正确的(它 确实 计算并传播回回复),但它在(有限的一组)线程中不这样做。
我玩了一会儿,但我想我错过了明显的东西(此外,谁知道呢,有人可能喜欢 return 计算的有限线程邮箱处理器的想法按顺序)。
问题是对 agent.PostAndReply
的调用。 PostAndReply
将阻塞直到工作完成。在 List.map
内调用它会导致工作按顺序执行。一种解决方案是使用不会阻塞的 PostAndAsyncReply
并且 returns 你使用异步句柄来获取结果。
let worker lst =
let agent = threadingLimitAgent 10
lst
|> List.map(fun x ->
agent.PostAndAsyncReply(
fun replyChannel -> Start(x, replyChannel)))
|> Async.Parallel
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i)
return i * i
})
|> worker
|> Async.RunSynchronously
这当然只是一种可能的解决方案(取回所有异步句柄并并行等待它们)。
基本上,我想将以下更改为有限线程解决方案,因为在我的情况下,计算列表太大,产生了太多线程,我想用更少的线程来试验和测量性能。
// the trivial approach (and largely my current situation)
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i) // longest thread will run 1 sec
return i * i // some complex calculation returning a certain type
})
|> Async.Parallel
|> Async.RunSynchronously // works, total wall time 1s
我的新方法,此代码是 borrowed/inspired by this online snippet from Tomas Petricek(我测试过,它有效,但我需要它 return 一个值,而不是单位)。
type LimitAgentMessage =
| Start of Async<int> * AsyncReplyChannel<int>
| Finished
let threadingLimitAgent limit = MailboxProcessor.Start(fun inbox -> async {
let queue = System.Collections.Generic.Queue<_>()
let count = ref 0
while true do
let! msg = inbox.Receive()
match msg with
| Start (work, reply) -> queue.Enqueue((work, reply))
| Finished -> decr count
if count.Value < limit && queue.Count > 0 then
incr count
let work, reply = queue.Dequeue()
// Start it in a thread pool (on background)
Async.Start(async {
let! x = work
do! async {reply.Reply x }
inbox.Post(Finished)
})
})
// given a synchronous list of tasks, run each task asynchronously,
// return calculated values in original order
let worker lst =
// this doesn't work as expected, it waits for each reply
let agent = threadingLimitAgent 10
lst
|> List.map(fun x ->
agent.PostAndReply(
fun replyChannel -> Start(x, replyChannel)))
现在,有了这个,原始代码将变成:
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i) // longest thread will run 1 sec
return i * i // some complex calculation returning a certain type
})
|> worker // worker is not working (correct output, runs 5.5s)
总而言之,输出是正确的(它 确实 计算并传播回回复),但它在(有限的一组)线程中不这样做。
我玩了一会儿,但我想我错过了明显的东西(此外,谁知道呢,有人可能喜欢 return 计算的有限线程邮箱处理器的想法按顺序)。
问题是对 agent.PostAndReply
的调用。 PostAndReply
将阻塞直到工作完成。在 List.map
内调用它会导致工作按顺序执行。一种解决方案是使用不会阻塞的 PostAndAsyncReply
并且 returns 你使用异步句柄来获取结果。
let worker lst =
let agent = threadingLimitAgent 10
lst
|> List.map(fun x ->
agent.PostAndAsyncReply(
fun replyChannel -> Start(x, replyChannel)))
|> Async.Parallel
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i)
return i * i
})
|> worker
|> Async.RunSynchronously
这当然只是一种可能的解决方案(取回所有异步句柄并并行等待它们)。