F# AsyncSeq- mke asyncSeq 如何迭代返回的顺序值?
F# AsyncSeq- How do a mke asyncSeq Iterate in the order values are returned?
我在使用 AsyncSeq 时遇到一些问题,它并行运行一些任务,然后迭代这些并行任务的结果,一次只执行一个任务。我认为这对于 AsyncSeq 来说应该是完美的,但是因为它按照序列的初始顺序进行迭代,所以它不会在任务进入时执行任务。
很难解释。只看这个小模型可能很容易:
//completes after the given time
let randomwait time=
async{
printfn "started waiting : %i"time
do! Async.Sleep(time*1000);
printfn "waited %i" time
return time
}
//Creates 10 tasks in decending order of time taken to complete: 10s,9s 8s, etc
let stream=
asyncSeq{
for i=10 downto 1 do
let waitTime= i
yield randomwait waitTime
}
let run =
let task=
stream
|> AsyncSeq.mapAsyncParallel id // This runs all our randomWait tasks at once
|>AsyncSeq. 1(fun time ->async{ printfn "printing for time : %i" time})
Async.RunSynchronously task
我希望代码在每次打印之间延迟一秒输出以下内容。
Printing for Time: 1
Printing for Time: 2
etc etc
然而,由于迭代顺序不是按照之前并行任务的完成顺序设置的,所以结果是倒退的,并且在第一个 10 秒任务完成后立即打印出来。
Printing for Time: 10
Printing for Time: 9
etc etc
非常感谢任何帮助。如有必要,我很乐意使用其他解决方案,任何允许并行然后一次迭代的解决方案。
代码中重要的关键操作是 mapAsyncParallel
。这会遍历输入的异步序列,启动所有任务,然后按照它们启动的顺序产生结果。
该操作不会等待所有任务完成,但它只会在产生所有 N-1 个较早任务的结果后才产生第 N 个任务的结果。
以下示例流比您的示例更好地展示了行为:
let stream=
asyncSeq {
for waitTime = 5 downto 1 do
yield randomwait waitTime
for waitTime = 10 to 15 do
yield randomwait waitTime
}
如果您以此为示例,您的代码将等待 5 秒,然后将打印“打印时间”5、4、3、2、1(因为它必须等待 5 秒才能执行第一个任务完成,与此同时,剩余的 4 个完成),但随后它将再等待 5 秒并打印“打印时间”6,等待 1 秒,打印 7,等待 1 秒,打印 8 等
如果您仅将 mapAsyncParalle
替换为 mapAsync
,则代码将按顺序 运行 任务并(按顺序)等待每个任务完成。那么你将不会并行发生任何事情并且等待时间会更长。
为了做(我认为)你想做的事,最好的选择是从使用 AsyncSeq<T>
切换到使用 Observable<T>
。异步序列是顺序的并保留元素的顺序。 Observable 不会这样做。使用 FSharp.Control.Reactive 库,您可以:
let task=
stream
|> AsyncSeq.toObservable
|> Observable.bind Observable.ofAsync
|> Observable.iter (fun time -> printfn "printing for time : %i" time)
Observable.wait task |> ignore
这里,bind
操作接受一个 observable,对于每个产生的值,它启动一个新的 observable(在我们的例子中,它只产生一个结果),但随后它按顺序收集所有结果他们到达的地方,所以你首先得到 1
的结果,即使这是作为第五个元素开始的。
非常感谢 Tomas Petricek。
这是我的最终结果:
let observables= schedulesGrouped|>List.map(fun (schedules,groupName)->
printfn "Setting up observables for group: %s" groupName
schedules
|>AsyncSeq.toObservable
|>Observable.bind Observable.ofAsync
|>Observable.iter(fun transferTask ->
Async.Start( processTask groupName transferTask))
)
let outPut=observables|>Observable.mergeSeq
outPut|>Observable.wait
对您最初的想法进行了一些调整,因为我想 运行 一个在迭代中部分异步但本质上仍然相同的任务。
非常感谢您的帮助。
我在使用 AsyncSeq 时遇到一些问题,它并行运行一些任务,然后迭代这些并行任务的结果,一次只执行一个任务。我认为这对于 AsyncSeq 来说应该是完美的,但是因为它按照序列的初始顺序进行迭代,所以它不会在任务进入时执行任务。 很难解释。只看这个小模型可能很容易:
//completes after the given time
let randomwait time=
async{
printfn "started waiting : %i"time
do! Async.Sleep(time*1000);
printfn "waited %i" time
return time
}
//Creates 10 tasks in decending order of time taken to complete: 10s,9s 8s, etc
let stream=
asyncSeq{
for i=10 downto 1 do
let waitTime= i
yield randomwait waitTime
}
let run =
let task=
stream
|> AsyncSeq.mapAsyncParallel id // This runs all our randomWait tasks at once
|>AsyncSeq. 1(fun time ->async{ printfn "printing for time : %i" time})
Async.RunSynchronously task
我希望代码在每次打印之间延迟一秒输出以下内容。
Printing for Time: 1
Printing for Time: 2
etc etc
然而,由于迭代顺序不是按照之前并行任务的完成顺序设置的,所以结果是倒退的,并且在第一个 10 秒任务完成后立即打印出来。
Printing for Time: 10
Printing for Time: 9
etc etc
非常感谢任何帮助。如有必要,我很乐意使用其他解决方案,任何允许并行然后一次迭代的解决方案。
代码中重要的关键操作是 mapAsyncParallel
。这会遍历输入的异步序列,启动所有任务,然后按照它们启动的顺序产生结果。
该操作不会等待所有任务完成,但它只会在产生所有 N-1 个较早任务的结果后才产生第 N 个任务的结果。
以下示例流比您的示例更好地展示了行为:
let stream=
asyncSeq {
for waitTime = 5 downto 1 do
yield randomwait waitTime
for waitTime = 10 to 15 do
yield randomwait waitTime
}
如果您以此为示例,您的代码将等待 5 秒,然后将打印“打印时间”5、4、3、2、1(因为它必须等待 5 秒才能执行第一个任务完成,与此同时,剩余的 4 个完成),但随后它将再等待 5 秒并打印“打印时间”6,等待 1 秒,打印 7,等待 1 秒,打印 8 等
如果您仅将 mapAsyncParalle
替换为 mapAsync
,则代码将按顺序 运行 任务并(按顺序)等待每个任务完成。那么你将不会并行发生任何事情并且等待时间会更长。
为了做(我认为)你想做的事,最好的选择是从使用 AsyncSeq<T>
切换到使用 Observable<T>
。异步序列是顺序的并保留元素的顺序。 Observable 不会这样做。使用 FSharp.Control.Reactive 库,您可以:
let task=
stream
|> AsyncSeq.toObservable
|> Observable.bind Observable.ofAsync
|> Observable.iter (fun time -> printfn "printing for time : %i" time)
Observable.wait task |> ignore
这里,bind
操作接受一个 observable,对于每个产生的值,它启动一个新的 observable(在我们的例子中,它只产生一个结果),但随后它按顺序收集所有结果他们到达的地方,所以你首先得到 1
的结果,即使这是作为第五个元素开始的。
非常感谢 Tomas Petricek。 这是我的最终结果:
let observables= schedulesGrouped|>List.map(fun (schedules,groupName)->
printfn "Setting up observables for group: %s" groupName
schedules
|>AsyncSeq.toObservable
|>Observable.bind Observable.ofAsync
|>Observable.iter(fun transferTask ->
Async.Start( processTask groupName transferTask))
)
let outPut=observables|>Observable.mergeSeq
outPut|>Observable.wait
对您最初的想法进行了一些调整,因为我想 运行 一个在迭代中部分异步但本质上仍然相同的任务。
非常感谢您的帮助。