Return 将结果发送给具有限制队列的调用者

Return results to the caller with a throttling queue

构建在 snippet and answer, would it be possible to return results to the caller from the throttling queue? I've tried PostAndAsyncReply 上以接收通道上的回复,但如果我使用 Enqueue 对其进行管道传输,则会引发错误。这是代码。

欣赏围绕队列或邮箱设计模式的基于 F# 核心 vanilla 的解决方案。

问题

问题是能够根据节流阀(一次最多 3 个)异步调用函数,从数组中传递每个项目,等待整个 queue/array 直到它完成,同时收集所有结果,然后 return 将结果发送给调用者。 (Return 给调用者的结果就是这里待处理的结果)

被叫代码

// Message type used by the agent - contains queueing
// of work items and notification of completion
type ThrottlingAgentMessage =
  | Completed
  | Enqueue of Async<unit>

/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
let throttlingAgent limit =
    MailboxProcessor.Start(fun inbox ->
    async {
      // The agent body is not executing in parallel,
      // so we can safely use mutable queue & counter
      let queue = System.Collections.Generic.Queue<Async<unit>>()
      let running = ref 0

      while true do

        // Enqueue new work items or decrement the counter
        // of how many tasks are running in the background
        let! msg = inbox.Receive()
        match msg with
        | Completed -> decr running
        | Enqueue w -> queue.Enqueue(w)

        // If we have less than limit & there is some work to
        // do, then start the work in the background!
        while running.Value < limit && queue.Count > 0 do
          let work = queue.Dequeue()
          incr running
          do! // When the work completes, send 'Completed'
              // back to the agent to free a slot
              async {
                do! work
                inbox.Post(Completed)
              }
              |> Async.StartChild
              |> Async.Ignore
    })


let requestDetailAsync (url: string) : Async<Result<string, Error>> =
     async {
       Console.WriteLine ("Simulating request " + url)
       try
           do! Async.Sleep(1000) // let's say each request takes about a second
           return Ok (url + ":body...")
       with :? WebException as e ->
           return Error {Code = "500"; Message = "Internal Server Error"; Status = HttpStatusCode.InternalServerError}
     }

let requestMasterAsync() : Async<Result<System.Collections.Concurrent.ConcurrentBag<_>, Error>> =
    async {
        let urls = [|
                    "http://www.example.com/1";
                    "http://www.example.com/2";
                    "http://www.example.com/3";
                    "http://www.example.com/4";
                    "http://www.example.com/5";
                    "http://www.example.com/6";
                    "http://www.example.com/7";
                    "http://www.example.com/8";
                    "http://www.example.com/9";
                    "http://www.example.com/10";
                |]

        let results = System.Collections.Concurrent.ConcurrentBag<_>()
        let agent = throttlingAgent 3

        for url in urls do
            async {
                let! res = requestDetailAsync url
                results.Add res
            }
            |> Enqueue
            |> agent.Post

        return Ok results
    }

来电号码

[<TestMethod>]
member this.TestRequestMasterAsync() =
    match Entity.requestMasterAsync() |> Async.RunSynchronously with
    | Ok result -> Console.WriteLine result
    | Error error -> Console.WriteLine error

您可以使用 Hopac.Streams。使用这样的工具非常简单:

open Hopac
open Hopac.Stream
open System

let requestDetailAsync url = async {
   Console.WriteLine ("Simulating request " + url)
   try
       do! Async.Sleep(1000) // let's say each request takes about a second
       return Ok (url + ":body...")
   with :? Exception as e ->
       return Error e
 }

let requestMasterAsync() : Stream<Result<string,exn>> =
    [| "http://www.example.com/1"
       "http://www.example.com/2"
       "http://www.example.com/3"
       "http://www.example.com/4"
       "http://www.example.com/5"
       "http://www.example.com/6"
       "http://www.example.com/7"
       "http://www.example.com/8"
       "http://www.example.com/9"
       "http://www.example.com/10" |]
    |> Stream.ofSeq
    |> Stream.mapPipelinedJob 3 (requestDetailAsync >> Job.fromAsync)

requestMasterAsync()
|> Stream.iterFun (printfn "%A")
|> queue //prints all results asynchronously

let allResults : Result<string,exn> list = 
    requestMasterAsync()
    |> Stream.foldFun (fun results cur -> cur::results ) []
    |> run //fold stream into list synchronously

已添加 如果你只想使用带有邮箱的香草 FSharp.Core 试试这个:

type ThrottlingAgentMessage =
  | Completed
  | Enqueue of Async<unit>

let inline (>>=) x f = async.Bind(x, f)
let inline (>>-) x f = async.Bind(x, f >> async.Return)

let throttlingAgent limit =
    let agent = MailboxProcessor.Start(fun inbox ->
        let queue = System.Collections.Generic.Queue<Async<unit>>()

        let startWork work = 
            work
            >>- fun _ -> inbox.Post Completed
            |> Async.StartChild |> Async.Ignore

        let rec loop curWorkers =
            inbox.Receive()
            >>= function
            | Completed when queue.Count > 0 -> 
                queue.Dequeue() |> startWork
                >>= fun _ -> loop curWorkers
            | Completed -> 
                loop (curWorkers - 1)
            | Enqueue w when curWorkers < limit ->
                w |> startWork
                >>= fun _ -> loop (curWorkers + 1)
            | Enqueue w ->
                queue.Enqueue w
                loop curWorkers

        loop 0)
    Enqueue >> agent.Post

这几乎是相同的逻辑,但稍微优化为如果有可用的工作容量则不使用队列(只需开始工作,不要打扰 queue/dequeue)。

throttlingAgent 是函数 int -> Async<unit> -> unit 因为我们不希望客户打扰我们内部的 ThrottlingAgentMessage 类型。

这样使用:

let throttler = throttlingAgent 3

for url in urls do
    async {
        let! res = requestDetailAsync url
        results.Add res
    }
    |> throttler