在 F# 中一次执行一个 Async Observable
Enforcing one Async Observable at a time in F#
我有一个可观察到的事物序列需要映射到
一个 C# 任务。这些 C# 任务不应 运行 并发,而应在一个之后
其他。基本上,我需要实现与此 C# 等效的 F#
问题:
Enforcing one async observable at a time
天真地翻译这段 C# 代码给出如下内容:
let run (idx:int) (delay:int) =
async {
sprintf "start: %i (%i)" idx delay |> System.Diagnostics.Trace.WriteLine
let! t = System.Threading.Tasks.Task.Delay(delay) |> Async.AwaitTask
sprintf "finish: %i" idx |> System.Diagnostics.Trace.WriteLine
t
}
Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
|> Observable.take 20
|> Observable.mapi(fun idx delay -> idx, delay)
|> Observable.bind(fun (idx, delay) -> Observable.ofAsync (run idx delay))
|> Observable.subscribe ignore
|> ignore
这没有按预期工作,因为我没有在任何地方等待结果。甚至有
一种在 F# 中执行此操作而不阻塞线程的方法,就像 C# 的 await 一样?
F# 中有一个方便的库,称为 AsyncSeq:
https://www.nuget.org/packages/FSharp.Control.AsyncSeq/
它与添加到 C# 8.0 中的 IAsyncEnumerable<T>
非常相似,这为您提供了一个很好的解决方案。
解决方案:
open System
open FSharp.Control
open FSharp.Control.Reactive
[<EntryPoint>]
let main _ =
let run (idx:int) (delay:int) =
async {
sprintf "start: %i (%i)" idx delay |> Console.WriteLine
do! Async.Sleep delay
sprintf "finish: %i" idx |> Console.WriteLine
}
Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
|> Observable.take 20
|> Observable.mapi(fun idx delay -> idx, delay)
|> AsyncSeq.ofObservableBuffered
|> AsyncSeq.iterAsync (fun (idx,delay) -> run idx delay)
|> Async.RunSynchronously
0
AsyncSeq.ofObservableBuffered
完成订阅您的 Observable
的工作,并作为 AsyncSeq
源,您可以在其上进行管道传输。最后我们调用 Async.RunSynchronously
将其启动并等待入口点线程。
注意:我还更新了 run
,因为它正在返回 Async<Async<unit>>
,我认为这不是故意的。
我有一个可观察到的事物序列需要映射到 一个 C# 任务。这些 C# 任务不应 运行 并发,而应在一个之后 其他。基本上,我需要实现与此 C# 等效的 F# 问题:
Enforcing one async observable at a time
天真地翻译这段 C# 代码给出如下内容:
let run (idx:int) (delay:int) =
async {
sprintf "start: %i (%i)" idx delay |> System.Diagnostics.Trace.WriteLine
let! t = System.Threading.Tasks.Task.Delay(delay) |> Async.AwaitTask
sprintf "finish: %i" idx |> System.Diagnostics.Trace.WriteLine
t
}
Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
|> Observable.take 20
|> Observable.mapi(fun idx delay -> idx, delay)
|> Observable.bind(fun (idx, delay) -> Observable.ofAsync (run idx delay))
|> Observable.subscribe ignore
|> ignore
这没有按预期工作,因为我没有在任何地方等待结果。甚至有 一种在 F# 中执行此操作而不阻塞线程的方法,就像 C# 的 await 一样?
F# 中有一个方便的库,称为 AsyncSeq: https://www.nuget.org/packages/FSharp.Control.AsyncSeq/
它与添加到 C# 8.0 中的 IAsyncEnumerable<T>
非常相似,这为您提供了一个很好的解决方案。
解决方案:
open System
open FSharp.Control
open FSharp.Control.Reactive
[<EntryPoint>]
let main _ =
let run (idx:int) (delay:int) =
async {
sprintf "start: %i (%i)" idx delay |> Console.WriteLine
do! Async.Sleep delay
sprintf "finish: %i" idx |> Console.WriteLine
}
Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
|> Observable.take 20
|> Observable.mapi(fun idx delay -> idx, delay)
|> AsyncSeq.ofObservableBuffered
|> AsyncSeq.iterAsync (fun (idx,delay) -> run idx delay)
|> Async.RunSynchronously
0
AsyncSeq.ofObservableBuffered
完成订阅您的 Observable
的工作,并作为 AsyncSeq
源,您可以在其上进行管道传输。最后我们调用 Async.RunSynchronously
将其启动并等待入口点线程。
注意:我还更新了 run
,因为它正在返回 Async<Async<unit>>
,我认为这不是故意的。