带节流的无序 F# AsyncSeq.mapParallel
Unordered F# AsyncSeq.mapParallel with throttling
我正在使用 F# 并且有一个 AsyncSeq<Async<'t>>
。每个项目将花费不同的时间来处理,并且 I/O 这是有速率限制的。
我想 运行 并行所有操作,然后将它们作为 AsyncSeq<'t>
在链中传递,这样我就可以对它们执行进一步的操作,并最终 AsyncSeq.fold
它们进入最终结果。
以下AsyncSeq
操作几乎满足我的需求:
mapAsyncParallel
- 并行,但它不受约束,(我不需要保留顺序)
iterAsyncParallelThrottled
- 并行并具有最大并行度 但不让我 return 结果 (而且我不需要顺序保留)
我真正需要的是mapAsyncParallelThrottled
。但是,更准确地说,真正的操作将被命名为 mapAsyncParallelThrottledUnordered
.
我正在考虑的事情:
- 使用
mapAsyncParallel
但在函数中使用 Semaphore
来约束并行性,这在并发性方面可能不是最佳的,并且由于缓冲结果以重新排序.
- 使用
iterAsyncParallelThrottled
并在结果到达时将结果折叠到累加器中 guarded by a lock kinda like this - 但我不需要排序,所以它不会是最优的。
- 通过
AsyncSeqSrc
like this 枚举源并发出结果来构建我需要的东西。我可能有一组 Async.StartAsTask
任务正在进行中,并且在每个 Task.WaitAny
给我一些东西 AsyncSeqSrc.put
之后开始更多任务,直到我到达 maxDegreeOfParallelism
我肯定错过了一个简单的答案,还有更好的方法吗?
如果做不到这一点,希望有人能在任一方向上检查我的选项 3!
我愿意使用 AsyncSeq.toAsyncEnum
,然后使用 IAsyncEnumerable
方法来实现相同的结果(如果存在的话),但如果可以避免的话,最好不要进入 TPL DataFlow 或 RX land (我已经进行了广泛的 SO 搜索,但没有结果...)。
如果我理解您的要求,那么类似的方法就可以了。它有效地将无序 iter 与通道结合起来以允许映射。
let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<_>) source = asyncSeq {
let! ct = Async.CancellationToken
let channel = Channel.CreateUnbounded()
let! _ =
async {
do!
source
|> AsyncSeq.iterAsyncParallelThrottled boundedAmount (fun s -> async {
let! orderChild = mapper s
do! channel.Writer.WriteAsync(orderChild, ct)
})
channel.Writer.Complete()
}
|> Async.StartChild
for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
let! toReturn = item
yield toReturn
}
还有一点点上面的变化(例如子任务)你可以让它有序和并行有界。
let mapAsyncParallelBounded boundedAmount mapper source = asyncSeq {
let! ct = Async.CancellationToken
let channel = Channel.CreateBounded(BoundedChannelOptions(boundedAmount))
let! _ =
source
|> AsyncSeq.iterAsync (fun s -> async {
let! orderChild = mapper s |> Async.StartChild
do! channel.Writer.WriteAsync(orderChild, ct)
})
|> Async.StartChild
let! ct = Async.CancellationToken
for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
let! toReturn = item
yield toReturn
}
这是我用来验证@akara 出色工作的测试平台:
#r "nuget:FSharp.Control.AsyncSeq"
open FSharp.Control
module AsyncSeqEx =
open System.Threading.Channels
let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
let! ct = Async.CancellationToken
let channel : Channel<'u> = Channel.CreateUnbounded()
let handle req = async {
let! res = mapper req
do! let t = channel.Writer.WriteAsync(res, ct) in t.AsTask() |> Async.AwaitTask }
let! _ = Async.StartChild <| async {
do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
channel.Writer.Complete() }
yield! channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum
}
我还移植了相同的代码以使用 AsyncSeqSrc
而不是通道,这似乎也有效,具有等效的性能:
// AsyncSeqSrc-based reimpl of the above
let mapAsyncParallelBoundedUnordered2 boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
let output = AsyncSeqSrc.create ()
let handle req = async { let! res = mapper req in AsyncSeqSrc.put res output }
let! _ = Async.StartChild <| async {
do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
AsyncSeqSrc.close output }
yield! AsyncSeqSrc.toAsyncSeq output
}
下面的 impl,依靠 AsyncSeq.mapAsyncParallel
似乎实现了与两者相似的性能:
module Async =
let parallelThrottled dop f = Async.Parallel(f, maxDegreeOfParallelism = dop)
type Semaphore(max) =
let inner = new System.Threading.SemaphoreSlim(max)
member _.Await() = async {
let! ct = Async.CancellationToken
return! inner.WaitAsync ct |> Async.AwaitTask }
member _.Release() =
inner.Release() |> ignore
let throttle degreeOfParallelism f =
let s = Semaphore degreeOfParallelism
fun x -> async {
do! s.Await()
try return! f x
finally s.Release() }
module AsyncSeq =
open FSharp.Control
// see
let mapAsyncParallelThrottled degreeOfParallelism (f: 't -> Async<'u>) : AsyncSeq<'t> -> AsyncSeq<'u> =
let throttle = Async.throttle degreeOfParallelism
AsyncSeq.mapAsyncParallel (throttle f)
测试平台:
let dop = 10
let r = System.Random()
let durations = Array.init 10000 (fun _ -> r.Next(10, 100))
let work =
let sleep (x : int) = async {
do! Async.Sleep x
return x
}
AsyncSeq.ofSeq durations |> AsyncSeq.mapAsyncParallelThrottled dop sleep
let start = System.Diagnostics.Stopwatch.StartNew()
let results = work |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
let timeTaken = start.ElapsedMilliseconds
let totalTimeTaken = Array.sum results
let expectedWallTime = float totalTimeTaken / float dop
let overhead = timeTaken - int64 expectedWallTime
let inline stringf format (x : ^a) =
(^a : (member ToString : string -> string) (x, format))
let inline sep x = stringf "N0" x
printfn $"Gross {sep totalTimeTaken}ms Threads {dop} Wall {sep timeTaken}ms overhead {sep overhead}ms ordered: {durations = results}"
结果:
Gross 544,873ms Threads 10 Wall 55,659ms overhead 1,172ms ordered: True
就目前而言,对于我的用例来说,承认无序结果与仅使用 mapAsyncParallel
self-govern 的函数参数来实现相比,似乎没有重大胜利所需的节流效果
我正在使用 F# 并且有一个 AsyncSeq<Async<'t>>
。每个项目将花费不同的时间来处理,并且 I/O 这是有速率限制的。
我想 运行 并行所有操作,然后将它们作为 AsyncSeq<'t>
在链中传递,这样我就可以对它们执行进一步的操作,并最终 AsyncSeq.fold
它们进入最终结果。
以下AsyncSeq
操作几乎满足我的需求:
mapAsyncParallel
- 并行,但它不受约束,(我不需要保留顺序)iterAsyncParallelThrottled
- 并行并具有最大并行度 但不让我 return 结果 (而且我不需要顺序保留)
我真正需要的是mapAsyncParallelThrottled
。但是,更准确地说,真正的操作将被命名为 mapAsyncParallelThrottledUnordered
.
我正在考虑的事情:
- 使用
mapAsyncParallel
但在函数中使用Semaphore
来约束并行性,这在并发性方面可能不是最佳的,并且由于缓冲结果以重新排序. - 使用
iterAsyncParallelThrottled
并在结果到达时将结果折叠到累加器中 guarded by a lock kinda like this - 但我不需要排序,所以它不会是最优的。 - 通过
AsyncSeqSrc
like this 枚举源并发出结果来构建我需要的东西。我可能有一组Async.StartAsTask
任务正在进行中,并且在每个Task.WaitAny
给我一些东西AsyncSeqSrc.put
之后开始更多任务,直到我到达maxDegreeOfParallelism
我肯定错过了一个简单的答案,还有更好的方法吗?
如果做不到这一点,希望有人能在任一方向上检查我的选项 3!
我愿意使用 AsyncSeq.toAsyncEnum
,然后使用 IAsyncEnumerable
方法来实现相同的结果(如果存在的话),但如果可以避免的话,最好不要进入 TPL DataFlow 或 RX land (我已经进行了广泛的 SO 搜索,但没有结果...)。
如果我理解您的要求,那么类似的方法就可以了。它有效地将无序 iter 与通道结合起来以允许映射。
let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<_>) source = asyncSeq {
let! ct = Async.CancellationToken
let channel = Channel.CreateUnbounded()
let! _ =
async {
do!
source
|> AsyncSeq.iterAsyncParallelThrottled boundedAmount (fun s -> async {
let! orderChild = mapper s
do! channel.Writer.WriteAsync(orderChild, ct)
})
channel.Writer.Complete()
}
|> Async.StartChild
for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
let! toReturn = item
yield toReturn
}
还有一点点上面的变化(例如子任务)你可以让它有序和并行有界。
let mapAsyncParallelBounded boundedAmount mapper source = asyncSeq {
let! ct = Async.CancellationToken
let channel = Channel.CreateBounded(BoundedChannelOptions(boundedAmount))
let! _ =
source
|> AsyncSeq.iterAsync (fun s -> async {
let! orderChild = mapper s |> Async.StartChild
do! channel.Writer.WriteAsync(orderChild, ct)
})
|> Async.StartChild
let! ct = Async.CancellationToken
for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
let! toReturn = item
yield toReturn
}
这是我用来验证@akara 出色工作的测试平台:
#r "nuget:FSharp.Control.AsyncSeq"
open FSharp.Control
module AsyncSeqEx =
open System.Threading.Channels
let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
let! ct = Async.CancellationToken
let channel : Channel<'u> = Channel.CreateUnbounded()
let handle req = async {
let! res = mapper req
do! let t = channel.Writer.WriteAsync(res, ct) in t.AsTask() |> Async.AwaitTask }
let! _ = Async.StartChild <| async {
do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
channel.Writer.Complete() }
yield! channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum
}
我还移植了相同的代码以使用 AsyncSeqSrc
而不是通道,这似乎也有效,具有等效的性能:
// AsyncSeqSrc-based reimpl of the above
let mapAsyncParallelBoundedUnordered2 boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
let output = AsyncSeqSrc.create ()
let handle req = async { let! res = mapper req in AsyncSeqSrc.put res output }
let! _ = Async.StartChild <| async {
do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
AsyncSeqSrc.close output }
yield! AsyncSeqSrc.toAsyncSeq output
}
下面的 impl,依靠 AsyncSeq.mapAsyncParallel
似乎实现了与两者相似的性能:
module Async =
let parallelThrottled dop f = Async.Parallel(f, maxDegreeOfParallelism = dop)
type Semaphore(max) =
let inner = new System.Threading.SemaphoreSlim(max)
member _.Await() = async {
let! ct = Async.CancellationToken
return! inner.WaitAsync ct |> Async.AwaitTask }
member _.Release() =
inner.Release() |> ignore
let throttle degreeOfParallelism f =
let s = Semaphore degreeOfParallelism
fun x -> async {
do! s.Await()
try return! f x
finally s.Release() }
module AsyncSeq =
open FSharp.Control
// see
let mapAsyncParallelThrottled degreeOfParallelism (f: 't -> Async<'u>) : AsyncSeq<'t> -> AsyncSeq<'u> =
let throttle = Async.throttle degreeOfParallelism
AsyncSeq.mapAsyncParallel (throttle f)
测试平台:
let dop = 10
let r = System.Random()
let durations = Array.init 10000 (fun _ -> r.Next(10, 100))
let work =
let sleep (x : int) = async {
do! Async.Sleep x
return x
}
AsyncSeq.ofSeq durations |> AsyncSeq.mapAsyncParallelThrottled dop sleep
let start = System.Diagnostics.Stopwatch.StartNew()
let results = work |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
let timeTaken = start.ElapsedMilliseconds
let totalTimeTaken = Array.sum results
let expectedWallTime = float totalTimeTaken / float dop
let overhead = timeTaken - int64 expectedWallTime
let inline stringf format (x : ^a) =
(^a : (member ToString : string -> string) (x, format))
let inline sep x = stringf "N0" x
printfn $"Gross {sep totalTimeTaken}ms Threads {dop} Wall {sep timeTaken}ms overhead {sep overhead}ms ordered: {durations = results}"
结果:
Gross 544,873ms Threads 10 Wall 55,659ms overhead 1,172ms ordered: True
就目前而言,对于我的用例来说,承认无序结果与仅使用 mapAsyncParallel
self-govern 的函数参数来实现相比,似乎没有重大胜利所需的节流效果