使用 System.Reactive.Linq 进行间隔轮询

Using System.Reactive.Linq for polling at an interval

我花了几个小时梳理文档和教程,但无法弄清楚如何使用 ReactiveX 每隔一段时间轮询一次外部资源或与此相关的任何事情。下面是我编写的一些代码,用于每隔一段时间从 REST API 获取信息。

open System
open System.Reactive.Linq

module MyObservable =
    let getResources = 
        async {
            use client = new HttpClient()
            let! response = client.GetStringAsync("http://localhost:8080/items") |> Async.AwaitTask
            return response
        } |> Async.StartAsTask

    let getObservable (interval: TimeSpan) = 
        let f () = getResources.Result
        Observable.Interval(interval) 
        |> Observable.map(fun _ -> f ()) 

为了对此进行测试,我尝试订阅 Observable 并等待五秒钟。它确实在五秒钟内每秒接收一些东西,但是 getResources 只被第一次调用,然后结果只在每个时间间隔使用。我如何修改它以在每个时间间隔进行 REST 调用,而不仅仅是一遍又一遍地使用第一次调用的结果?

let mutable res = Seq.empty
getObservable (new TimeSpan(0,0,1))
|> Observable.subscribe(fun (x: seq<string>) -> res <- res |> Seq.append x;) 
|> ignore
Threading.Thread.Sleep(5000)

不要使用 Task。 Tasks就是我们所说的“hot”,意思是如果你手上有一个Task类型的值,说明这个任务已经是运行ning了,你也无能为力了.特别是,这意味着您无法重新启动它,或启动它的第二个实例。一旦创建了 Task,就太晚了。

在您的特定情况下,这意味着 getResources 不是“开始任务的一种方式”,而只是“一项任务”。已经开始,已经运行ning.

如果你想每次都开始一个新任务,你有两个选择:

首先(更糟糕的选择),你可以让getResources成为一个函数而不是一个值,你可以通过给它一个参数来做到这一点:

let getResources () = 
    async { ...

然后用那个参数调用它:

let f () = getResources().Result

这将在您每次调用 f() 时重新 运行 getResources 函数,每次都会创建一个新的 Task 并启动它。

其次(更好的选择),根本不要使用Task。您正在创建一个非常好的 async 计算,然后将其转换为 Task 只是为了阻止获得其结果。为什么?您也可以阻止 async 的结果!

let getResources = async { ... }

let getObservable interval =
  let f () = getResources |> Async.RunSynchronously
  ...

即使 getResources 不是一个函数,这仍然有效,因为 asyncs 与 Tasks 不同,我们称之为“冷”。这意味着,如果你手上有一个async,并不意味着它已经运行ning了。 asyncTask 不同,它代表的不是“已经 运行ning” 的计算,而是“开始计算的一种方式”。一个推论是您可以从相同的 async 值多次启动它。

启动它的一种方法是通过 Async.RunSynchronously,就像我在上面的示例中所做的那样。这不是最好的方法,因为它会阻塞当前线程直到计算完成,但这等同于您访问 Task.Result 属性 所做的,它也会阻塞直到 Task 完成了。