使用 System.Reactive.Linq 进行间隔轮询
Using System.Reactive.Linq for polling at an interval
我花了几个小时梳理文档和教程,但无法弄清楚如何使用 ReactiveX 每隔一段时间轮询一次外部资源或与此相关的任何事情。下面是我编写的一些代码,用于每隔一段时间从 REST API 获取信息。
open System
open System.Reactive.Linq
module MyObservable =
let getResources =
async {
use client = new HttpClient()
let! response = client.GetStringAsync("http://localhost:8080/items") |> Async.AwaitTask
return response
} |> Async.StartAsTask
let getObservable (interval: TimeSpan) =
let f () = getResources.Result
Observable.Interval(interval)
|> Observable.map(fun _ -> f ())
为了对此进行测试,我尝试订阅 Observable 并等待五秒钟。它确实在五秒钟内每秒接收一些东西,但是 getResources
只被第一次调用,然后结果只在每个时间间隔使用。我如何修改它以在每个时间间隔进行 REST 调用,而不仅仅是一遍又一遍地使用第一次调用的结果?
let mutable res = Seq.empty
getObservable (new TimeSpan(0,0,1))
|> Observable.subscribe(fun (x: seq<string>) -> res <- res |> Seq.append x;)
|> ignore
Threading.Thread.Sleep(5000)
不要使用 Task
。 Tasks就是我们所说的“hot”,意思是如果你手上有一个Task
类型的值,说明这个任务已经是运行ning了,你也无能为力了.特别是,这意味着您无法重新启动它,或启动它的第二个实例。一旦创建了 Task
,就太晚了。
在您的特定情况下,这意味着 getResources
不是“开始任务的一种方式”,而只是“一项任务”。已经开始,已经运行ning.
如果你想每次都开始一个新任务,你有两个选择:
首先(更糟糕的选择),你可以让getResources
成为一个函数而不是一个值,你可以通过给它一个参数来做到这一点:
let getResources () =
async { ...
然后用那个参数调用它:
let f () = getResources().Result
这将在您每次调用 f()
时重新 运行 getResources
函数,每次都会创建一个新的 Task
并启动它。
其次(更好的选择),根本不要使用Task
。您正在创建一个非常好的 async
计算,然后将其转换为 Task
只是为了阻止获得其结果。为什么?您也可以阻止 async
的结果!
let getResources = async { ... }
let getObservable interval =
let f () = getResources |> Async.RunSynchronously
...
即使 getResources
不是一个函数,这仍然有效,因为 async
s 与 Task
s 不同,我们称之为“冷”。这意味着,如果你手上有一个async
,并不意味着它已经运行ning了。 async
与 Task
不同,它代表的不是“已经 运行ning” 的计算,而是“开始计算的一种方式”。一个推论是您可以从相同的 async
值多次启动它。
启动它的一种方法是通过 Async.RunSynchronously
,就像我在上面的示例中所做的那样。这不是最好的方法,因为它会阻塞当前线程直到计算完成,但这等同于您访问 Task.Result
属性 所做的,它也会阻塞直到 Task
完成了。
我花了几个小时梳理文档和教程,但无法弄清楚如何使用 ReactiveX 每隔一段时间轮询一次外部资源或与此相关的任何事情。下面是我编写的一些代码,用于每隔一段时间从 REST API 获取信息。
open System
open System.Reactive.Linq
module MyObservable =
let getResources =
async {
use client = new HttpClient()
let! response = client.GetStringAsync("http://localhost:8080/items") |> Async.AwaitTask
return response
} |> Async.StartAsTask
let getObservable (interval: TimeSpan) =
let f () = getResources.Result
Observable.Interval(interval)
|> Observable.map(fun _ -> f ())
为了对此进行测试,我尝试订阅 Observable 并等待五秒钟。它确实在五秒钟内每秒接收一些东西,但是 getResources
只被第一次调用,然后结果只在每个时间间隔使用。我如何修改它以在每个时间间隔进行 REST 调用,而不仅仅是一遍又一遍地使用第一次调用的结果?
let mutable res = Seq.empty
getObservable (new TimeSpan(0,0,1))
|> Observable.subscribe(fun (x: seq<string>) -> res <- res |> Seq.append x;)
|> ignore
Threading.Thread.Sleep(5000)
不要使用 Task
。 Tasks就是我们所说的“hot”,意思是如果你手上有一个Task
类型的值,说明这个任务已经是运行ning了,你也无能为力了.特别是,这意味着您无法重新启动它,或启动它的第二个实例。一旦创建了 Task
,就太晚了。
在您的特定情况下,这意味着 getResources
不是“开始任务的一种方式”,而只是“一项任务”。已经开始,已经运行ning.
如果你想每次都开始一个新任务,你有两个选择:
首先(更糟糕的选择),你可以让getResources
成为一个函数而不是一个值,你可以通过给它一个参数来做到这一点:
let getResources () =
async { ...
然后用那个参数调用它:
let f () = getResources().Result
这将在您每次调用 f()
时重新 运行 getResources
函数,每次都会创建一个新的 Task
并启动它。
其次(更好的选择),根本不要使用Task
。您正在创建一个非常好的 async
计算,然后将其转换为 Task
只是为了阻止获得其结果。为什么?您也可以阻止 async
的结果!
let getResources = async { ... }
let getObservable interval =
let f () = getResources |> Async.RunSynchronously
...
即使 getResources
不是一个函数,这仍然有效,因为 async
s 与 Task
s 不同,我们称之为“冷”。这意味着,如果你手上有一个async
,并不意味着它已经运行ning了。 async
与 Task
不同,它代表的不是“已经 运行ning” 的计算,而是“开始计算的一种方式”。一个推论是您可以从相同的 async
值多次启动它。
启动它的一种方法是通过 Async.RunSynchronously
,就像我在上面的示例中所做的那样。这不是最好的方法,因为它会阻塞当前线程直到计算完成,但这等同于您访问 Task.Result
属性 所做的,它也会阻塞直到 Task
完成了。