异步等待 n 件事发生
Asynchronously wait for n things to happen
我正在寻找类似 Semaphore
的东西,但在释放所有插槽后解决。
像这样:
use semaphore = new SemaphoreSlim(0,100)
anEvent.add(fun _ -> semaphore.Release(1) |> ignore);
async {
do! thingThatCausesAnEventToFire100Times()
//where 100 is the available slots instead of the timeout.
let! thingsHappened = semaphore.WaitAsync(100) |> Async.AwaitTask
thingsHappened |> should be True
}
听起来像是 MailboxProcessor 的工作。这个怎么样:
type SemaphoreCommand =
|Release
|Wait of AsyncReplyChannel<unit>
let semaphore slots =
Agent.Start
<| fun inbox ->
let rec loop c (w:AsyncReplyChannel<unit> list) =
async {
let! command = inbox.Receive()
match command with
|Release -> if (c + 1) = slots then w |> List.iter(fun t -> t.Reply())
else return! loop (c + 1) w
|Wait a -> return! loop c (a::w)
}
loop 0 []
let slotWaiter = semaphore 100
//Events will fill up slots
Release |> slotWaiter.Post
Release |> slotWaiter.Post
async{
//Wait for all slots to be filled
do! slotWaiter.PostAndAsyncReply(fun t -> Wait t)
//All slots filled - continue
}
我不会处理这样的情况,即您可能在所有槽都填满时尚未注册 AsyncReplyChannel,或者在所有槽都填满后重置,但这是相当微不足道的,我将把它留作练习reader :)
我正在寻找类似 Semaphore
的东西,但在释放所有插槽后解决。
像这样:
use semaphore = new SemaphoreSlim(0,100)
anEvent.add(fun _ -> semaphore.Release(1) |> ignore);
async {
do! thingThatCausesAnEventToFire100Times()
//where 100 is the available slots instead of the timeout.
let! thingsHappened = semaphore.WaitAsync(100) |> Async.AwaitTask
thingsHappened |> should be True
}
听起来像是 MailboxProcessor 的工作。这个怎么样:
type SemaphoreCommand =
|Release
|Wait of AsyncReplyChannel<unit>
let semaphore slots =
Agent.Start
<| fun inbox ->
let rec loop c (w:AsyncReplyChannel<unit> list) =
async {
let! command = inbox.Receive()
match command with
|Release -> if (c + 1) = slots then w |> List.iter(fun t -> t.Reply())
else return! loop (c + 1) w
|Wait a -> return! loop c (a::w)
}
loop 0 []
let slotWaiter = semaphore 100
//Events will fill up slots
Release |> slotWaiter.Post
Release |> slotWaiter.Post
async{
//Wait for all slots to be filled
do! slotWaiter.PostAndAsyncReply(fun t -> Wait t)
//All slots filled - continue
}
我不会处理这样的情况,即您可能在所有槽都填满时尚未注册 AsyncReplyChannel,或者在所有槽都填满后重置,但这是相当微不足道的,我将把它留作练习reader :)