带节流的无序 F# AsyncSeq.mapParallel

Unordered F# AsyncSeq.mapParallel with throttling

我正在使用 F# 并且有一个 AsyncSeq<Async<'t>>。每个项目将花费不同的时间来处理,并且 I/O 这是有速率限制的。

我想 运行 并行所有操作,然后将它们作为 AsyncSeq<'t> 在链中传递,这样我就可以对它们执行进一步的操作,并最终 AsyncSeq.fold 它们进入最终结果。

以下AsyncSeq操作几乎满足我的需求:

我真正需要的是mapAsyncParallelThrottled。但是,更准确地说,真正的操作将被命名为 mapAsyncParallelThrottledUnordered.

我正在考虑的事情:

  1. 使用 mapAsyncParallel 但在函数中使用 Semaphore 来约束并行性,这在并发性方面可能不是最佳的,并且由于缓冲结果以重新排序.
  2. 使用 iterAsyncParallelThrottled 并在结果到达时将结果折叠到累加器中 guarded by a lock kinda like this - 但我不需要排序,所以它不会是最优的。
  3. 通过 AsyncSeqSrc like this 枚举源并发出结果来构建我需要的东西。我可能有一组 Async.StartAsTask 任务正在进行中,并且在每个 Task.WaitAny 给我一些东西 AsyncSeqSrc.put 之后开始更多任务,直到我到达 maxDegreeOfParallelism

我肯定错过了一个简单的答案,还有更好的方法吗?

如果做不到这一点,希望有人能在任一方向上检查我的选项 3!

我愿意使用 AsyncSeq.toAsyncEnum,然后使用 IAsyncEnumerable 方法来实现相同的结果(如果存在的话),但如果可以避免的话,最好不要进入 TPL DataFlow 或 RX land (我已经进行了广泛的 SO 搜索,但没有结果...)。

如果我理解您的要求,那么类似的方法就可以了。它有效地将无序 iter 与通道结合起来以允许映射。

let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<_>) source = asyncSeq {
    let! ct = Async.CancellationToken
    let channel = Channel.CreateUnbounded()

    let! _ = 
        async {
            do!
                source
                |> AsyncSeq.iterAsyncParallelThrottled boundedAmount (fun s -> async {
                    let! orderChild = mapper s
                    do! channel.Writer.WriteAsync(orderChild, ct)
                })

            channel.Writer.Complete()
        } 
        |> Async.StartChild

    for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
        let! toReturn = item
        yield toReturn
}

还有一点点上面的变化(例如子任务)你可以让它有序和并行有界。

let mapAsyncParallelBounded boundedAmount mapper source = asyncSeq {
    let! ct = Async.CancellationToken
    let channel = Channel.CreateBounded(BoundedChannelOptions(boundedAmount))

    let! _ =
        source
        |> AsyncSeq.iterAsync (fun s -> async {
            let! orderChild = mapper s |> Async.StartChild
            do! channel.Writer.WriteAsync(orderChild, ct)
        })
        |> Async.StartChild

    let! ct = Async.CancellationToken
    for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
        let! toReturn = item
        yield toReturn
}

这是我用来验证@akara 出色工作的测试平台:

#r "nuget:FSharp.Control.AsyncSeq"
open FSharp.Control
module AsyncSeqEx =

    open System.Threading.Channels

    let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
        let! ct = Async.CancellationToken
        let channel : Channel<'u> = Channel.CreateUnbounded()
        let handle req = async {
            let! res = mapper req
            do! let t = channel.Writer.WriteAsync(res, ct) in t.AsTask() |> Async.AwaitTask }
        let! _ = Async.StartChild <| async {
            do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
            channel.Writer.Complete() }
        yield! channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum
    }

我还移植了相同的代码以使用 AsyncSeqSrc 而不是通道,这似乎也有效,具有等效的性能:

    // AsyncSeqSrc-based reimpl of the above
    let mapAsyncParallelBoundedUnordered2 boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
        let output = AsyncSeqSrc.create ()
        let handle req = async { let! res = mapper req in AsyncSeqSrc.put res output }
        let! _ = Async.StartChild <| async {
            do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
            AsyncSeqSrc.close output }
        yield! AsyncSeqSrc.toAsyncSeq output
    }

下面的 impl,依靠 AsyncSeq.mapAsyncParallel 似乎实现了与两者相似的性能:

module Async =

    let parallelThrottled dop f = Async.Parallel(f, maxDegreeOfParallelism = dop)
    type Semaphore(max) =
        let inner = new System.Threading.SemaphoreSlim(max)
        member _.Await() = async {
            let! ct = Async.CancellationToken
            return! inner.WaitAsync ct |> Async.AwaitTask }
        member _.Release() =
            inner.Release() |> ignore
    let throttle degreeOfParallelism f =
        let s = Semaphore degreeOfParallelism
        fun x -> async {
            do! s.Await()
            try return! f x
            finally s.Release() }

module AsyncSeq =

    open FSharp.Control

    // see 
    let mapAsyncParallelThrottled degreeOfParallelism (f: 't -> Async<'u>) : AsyncSeq<'t> -> AsyncSeq<'u> =
        let throttle = Async.throttle degreeOfParallelism
        AsyncSeq.mapAsyncParallel (throttle f)

测试平台:

let dop = 10
let r = System.Random()
let durations = Array.init 10000 (fun _ -> r.Next(10, 100))
let work =
    let sleep (x : int) = async {
        do! Async.Sleep x
        return x
    }
    AsyncSeq.ofSeq durations |> AsyncSeq.mapAsyncParallelThrottled dop sleep
let start = System.Diagnostics.Stopwatch.StartNew()
let results = work |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
let timeTaken = start.ElapsedMilliseconds
let totalTimeTaken = Array.sum results
let expectedWallTime = float totalTimeTaken / float dop
let overhead = timeTaken - int64 expectedWallTime
let inline stringf format (x : ^a) =
    (^a : (member ToString : string -> string) (x, format))
let inline sep x = stringf "N0" x
printfn $"Gross {sep totalTimeTaken}ms Threads {dop} Wall {sep timeTaken}ms overhead {sep overhead}ms ordered: {durations = results}"

结果:

Gross 544,873ms Threads 10 Wall 55,659ms overhead 1,172ms ordered: True

就目前而言,对于我的用例来说,承认无序结果与仅使用 mapAsyncParallel self-govern 的函数参数来实现相比,似乎没有重大胜利所需的节流效果