Return 将结果发送给具有限制队列的调用者
Return results to the caller with a throttling queue
构建在 snippet and answer, would it be possible to return results to the caller from the throttling queue? I've tried PostAndAsyncReply 上以接收通道上的回复,但如果我使用 Enqueue 对其进行管道传输,则会引发错误。这是代码。
欣赏围绕队列或邮箱设计模式的基于 F# 核心 vanilla 的解决方案。
问题
问题是能够根据节流阀(一次最多 3 个)异步调用函数,从数组中传递每个项目,等待整个 queue/array 直到它完成,同时收集所有结果,然后 return 将结果发送给调用者。 (Return 给调用者的结果就是这里待处理的结果)
被叫代码
// Message type used by the agent - contains queueing
// of work items and notification of completion
type ThrottlingAgentMessage =
| Completed
| Enqueue of Async<unit>
/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
let throttlingAgent limit =
MailboxProcessor.Start(fun inbox ->
async {
// The agent body is not executing in parallel,
// so we can safely use mutable queue & counter
let queue = System.Collections.Generic.Queue<Async<unit>>()
let running = ref 0
while true do
// Enqueue new work items or decrement the counter
// of how many tasks are running in the background
let! msg = inbox.Receive()
match msg with
| Completed -> decr running
| Enqueue w -> queue.Enqueue(w)
// If we have less than limit & there is some work to
// do, then start the work in the background!
while running.Value < limit && queue.Count > 0 do
let work = queue.Dequeue()
incr running
do! // When the work completes, send 'Completed'
// back to the agent to free a slot
async {
do! work
inbox.Post(Completed)
}
|> Async.StartChild
|> Async.Ignore
})
let requestDetailAsync (url: string) : Async<Result<string, Error>> =
async {
Console.WriteLine ("Simulating request " + url)
try
do! Async.Sleep(1000) // let's say each request takes about a second
return Ok (url + ":body...")
with :? WebException as e ->
return Error {Code = "500"; Message = "Internal Server Error"; Status = HttpStatusCode.InternalServerError}
}
let requestMasterAsync() : Async<Result<System.Collections.Concurrent.ConcurrentBag<_>, Error>> =
async {
let urls = [|
"http://www.example.com/1";
"http://www.example.com/2";
"http://www.example.com/3";
"http://www.example.com/4";
"http://www.example.com/5";
"http://www.example.com/6";
"http://www.example.com/7";
"http://www.example.com/8";
"http://www.example.com/9";
"http://www.example.com/10";
|]
let results = System.Collections.Concurrent.ConcurrentBag<_>()
let agent = throttlingAgent 3
for url in urls do
async {
let! res = requestDetailAsync url
results.Add res
}
|> Enqueue
|> agent.Post
return Ok results
}
来电号码
[<TestMethod>]
member this.TestRequestMasterAsync() =
match Entity.requestMasterAsync() |> Async.RunSynchronously with
| Ok result -> Console.WriteLine result
| Error error -> Console.WriteLine error
您可以使用 Hopac.Streams
。使用这样的工具非常简单:
open Hopac
open Hopac.Stream
open System
let requestDetailAsync url = async {
Console.WriteLine ("Simulating request " + url)
try
do! Async.Sleep(1000) // let's say each request takes about a second
return Ok (url + ":body...")
with :? Exception as e ->
return Error e
}
let requestMasterAsync() : Stream<Result<string,exn>> =
[| "http://www.example.com/1"
"http://www.example.com/2"
"http://www.example.com/3"
"http://www.example.com/4"
"http://www.example.com/5"
"http://www.example.com/6"
"http://www.example.com/7"
"http://www.example.com/8"
"http://www.example.com/9"
"http://www.example.com/10" |]
|> Stream.ofSeq
|> Stream.mapPipelinedJob 3 (requestDetailAsync >> Job.fromAsync)
requestMasterAsync()
|> Stream.iterFun (printfn "%A")
|> queue //prints all results asynchronously
let allResults : Result<string,exn> list =
requestMasterAsync()
|> Stream.foldFun (fun results cur -> cur::results ) []
|> run //fold stream into list synchronously
已添加
如果你只想使用带有邮箱的香草 FSharp.Core
试试这个:
type ThrottlingAgentMessage =
| Completed
| Enqueue of Async<unit>
let inline (>>=) x f = async.Bind(x, f)
let inline (>>-) x f = async.Bind(x, f >> async.Return)
let throttlingAgent limit =
let agent = MailboxProcessor.Start(fun inbox ->
let queue = System.Collections.Generic.Queue<Async<unit>>()
let startWork work =
work
>>- fun _ -> inbox.Post Completed
|> Async.StartChild |> Async.Ignore
let rec loop curWorkers =
inbox.Receive()
>>= function
| Completed when queue.Count > 0 ->
queue.Dequeue() |> startWork
>>= fun _ -> loop curWorkers
| Completed ->
loop (curWorkers - 1)
| Enqueue w when curWorkers < limit ->
w |> startWork
>>= fun _ -> loop (curWorkers + 1)
| Enqueue w ->
queue.Enqueue w
loop curWorkers
loop 0)
Enqueue >> agent.Post
这几乎是相同的逻辑,但稍微优化为如果有可用的工作容量则不使用队列(只需开始工作,不要打扰 queue/dequeue)。
throttlingAgent
是函数 int -> Async<unit> -> unit
因为我们不希望客户打扰我们内部的 ThrottlingAgentMessage
类型。
这样使用:
let throttler = throttlingAgent 3
for url in urls do
async {
let! res = requestDetailAsync url
results.Add res
}
|> throttler
构建在 snippet and answer, would it be possible to return results to the caller from the throttling queue? I've tried PostAndAsyncReply 上以接收通道上的回复,但如果我使用 Enqueue 对其进行管道传输,则会引发错误。这是代码。
欣赏围绕队列或邮箱设计模式的基于 F# 核心 vanilla 的解决方案。
问题
问题是能够根据节流阀(一次最多 3 个)异步调用函数,从数组中传递每个项目,等待整个 queue/array 直到它完成,同时收集所有结果,然后 return 将结果发送给调用者。 (Return 给调用者的结果就是这里待处理的结果)
被叫代码
// Message type used by the agent - contains queueing
// of work items and notification of completion
type ThrottlingAgentMessage =
| Completed
| Enqueue of Async<unit>
/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
let throttlingAgent limit =
MailboxProcessor.Start(fun inbox ->
async {
// The agent body is not executing in parallel,
// so we can safely use mutable queue & counter
let queue = System.Collections.Generic.Queue<Async<unit>>()
let running = ref 0
while true do
// Enqueue new work items or decrement the counter
// of how many tasks are running in the background
let! msg = inbox.Receive()
match msg with
| Completed -> decr running
| Enqueue w -> queue.Enqueue(w)
// If we have less than limit & there is some work to
// do, then start the work in the background!
while running.Value < limit && queue.Count > 0 do
let work = queue.Dequeue()
incr running
do! // When the work completes, send 'Completed'
// back to the agent to free a slot
async {
do! work
inbox.Post(Completed)
}
|> Async.StartChild
|> Async.Ignore
})
let requestDetailAsync (url: string) : Async<Result<string, Error>> =
async {
Console.WriteLine ("Simulating request " + url)
try
do! Async.Sleep(1000) // let's say each request takes about a second
return Ok (url + ":body...")
with :? WebException as e ->
return Error {Code = "500"; Message = "Internal Server Error"; Status = HttpStatusCode.InternalServerError}
}
let requestMasterAsync() : Async<Result<System.Collections.Concurrent.ConcurrentBag<_>, Error>> =
async {
let urls = [|
"http://www.example.com/1";
"http://www.example.com/2";
"http://www.example.com/3";
"http://www.example.com/4";
"http://www.example.com/5";
"http://www.example.com/6";
"http://www.example.com/7";
"http://www.example.com/8";
"http://www.example.com/9";
"http://www.example.com/10";
|]
let results = System.Collections.Concurrent.ConcurrentBag<_>()
let agent = throttlingAgent 3
for url in urls do
async {
let! res = requestDetailAsync url
results.Add res
}
|> Enqueue
|> agent.Post
return Ok results
}
来电号码
[<TestMethod>]
member this.TestRequestMasterAsync() =
match Entity.requestMasterAsync() |> Async.RunSynchronously with
| Ok result -> Console.WriteLine result
| Error error -> Console.WriteLine error
您可以使用 Hopac.Streams
。使用这样的工具非常简单:
open Hopac
open Hopac.Stream
open System
let requestDetailAsync url = async {
Console.WriteLine ("Simulating request " + url)
try
do! Async.Sleep(1000) // let's say each request takes about a second
return Ok (url + ":body...")
with :? Exception as e ->
return Error e
}
let requestMasterAsync() : Stream<Result<string,exn>> =
[| "http://www.example.com/1"
"http://www.example.com/2"
"http://www.example.com/3"
"http://www.example.com/4"
"http://www.example.com/5"
"http://www.example.com/6"
"http://www.example.com/7"
"http://www.example.com/8"
"http://www.example.com/9"
"http://www.example.com/10" |]
|> Stream.ofSeq
|> Stream.mapPipelinedJob 3 (requestDetailAsync >> Job.fromAsync)
requestMasterAsync()
|> Stream.iterFun (printfn "%A")
|> queue //prints all results asynchronously
let allResults : Result<string,exn> list =
requestMasterAsync()
|> Stream.foldFun (fun results cur -> cur::results ) []
|> run //fold stream into list synchronously
已添加
如果你只想使用带有邮箱的香草 FSharp.Core
试试这个:
type ThrottlingAgentMessage =
| Completed
| Enqueue of Async<unit>
let inline (>>=) x f = async.Bind(x, f)
let inline (>>-) x f = async.Bind(x, f >> async.Return)
let throttlingAgent limit =
let agent = MailboxProcessor.Start(fun inbox ->
let queue = System.Collections.Generic.Queue<Async<unit>>()
let startWork work =
work
>>- fun _ -> inbox.Post Completed
|> Async.StartChild |> Async.Ignore
let rec loop curWorkers =
inbox.Receive()
>>= function
| Completed when queue.Count > 0 ->
queue.Dequeue() |> startWork
>>= fun _ -> loop curWorkers
| Completed ->
loop (curWorkers - 1)
| Enqueue w when curWorkers < limit ->
w |> startWork
>>= fun _ -> loop (curWorkers + 1)
| Enqueue w ->
queue.Enqueue w
loop curWorkers
loop 0)
Enqueue >> agent.Post
这几乎是相同的逻辑,但稍微优化为如果有可用的工作容量则不使用队列(只需开始工作,不要打扰 queue/dequeue)。
throttlingAgent
是函数 int -> Async<unit> -> unit
因为我们不希望客户打扰我们内部的 ThrottlingAgentMessage
类型。
这样使用:
let throttler = throttlingAgent 3
for url in urls do
async {
let! res = requestDetailAsync url
results.Add res
}
|> throttler