基于无限序列的 Observable 动画图表

Animate chart with Observable based on infinite sequence

我在使用 FSharp.ChartingFSharp.Control.Reactive[=73= 创建我的一些数据的动画可视化时遇到问题].

本质上,我有一个无限的点生成器。我的真实生成器比下面的例子更复杂,但是这个简化的例子重现了这个问题:

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

move 函数的类型为 (int * int) list -> seq<(int * int) list>。对于每次迭代,它将输入列表中的所有点平移 1,以便这些点向上和向右移动。

我想在 LiveChart.Point 上展示这个。 move 生成的序列可以转换为合适的 Observable,但它本身 运行 相当快,所以我先放慢一点:

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

这使我能够从点序列中创建一个 Observable:

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable

我还可以看到,如果我打印到控制台,它会起作用:

obs.Subscribe (fun x -> printfn "%O" x)

通过打印到控制台,也很清楚这会阻塞执行环境;例如,如果您将脚本发送到 F# Interactive (FSI),它会继续打印,您必须取消评估或重置会话才能停止它。

我的理论是,这是因为 obs 与执行环境 运行 在同一个线程上。

如果我尝试用它制作 LiveChart.Point,也会发生同样的情况:

let lc = obs |> LiveChart.Point
lc.ShowChart()

如果我将其发送给 FSI,则什么也不会发生(不显示图表),并且 FSI 会阻止。

这似乎与我的理论一致,即观察者 运行与图表在同一线程上。

如何让观察者 运行 在不同的线程上?

我发现 Observable.observeOn,它需要一个 IScheduler。浏览MSDN,我发现了NewThreadScheduler, ThreadPoolScheduler, and TaskPoolScheduler,它们都实现了IScheduler。这些类的名字听起来很有前途,但是我找不到它们!

根据文档,它们都在 System.Reactive.dll 中定义,但是虽然我拥有 FSharp.Control.Reactive 的所有依赖项,但我在任何地方都没有该程序集。搜索互联网也没有透露从哪里得到它。

它是 Reactive Extensions 的旧版本还是新版本?我的方向是否正确?

如何可视化 LiveChart 上的无限点序列?


这是重现问题的完整脚本:

#r @"../packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"../packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"../packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"../packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"../packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"

open System
open FSharp.Control.Reactive
open FSharp.Charting

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable

//obs.Subscribe (fun x -> printfn "%O" x)

let lc = obs |> LiveChart.Point
lc.ShowChart()

已安装的 NuGet 包:

Id                                  Versions
--                                  --------
FSharp.Charting                     {0.90.12}
FSharp.Control.Reactive             {3.2.0}
FSharp.Core                         {3.1.2}
Rx-Core                             {2.2.5}
Rx-Interfaces                       {2.2.5}
Rx-Linq                             {2.2.5}

调度程序在 System.Reactive.PlatformServices.dll 中定义(由 Rx-PlatformServices 包安装)。

我在这里找到了它们: https://github.com/Reactive-Extensions/Rx.NET/tree/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency

例如使用新线程调度器:(observable).ObserveOn(System.Reactive.Concurrency.NewThreadScheduler.Default)

诀窍是使用 subscribeOn。 来自 introtorx.com 在 subscribeOn 和 observeOn:

One pitfall I want to point out here is, the first few times I used these overloads, I was confused as to what they actually do. You should use the SubscribeOn method to describe how you want any warm-up and background processing code to be scheduled. For example, if you were to use SubscribeOn with Observable.Create, the delegate passed to the Create method would be run on the specified scheduler.

The ObserveOn method is used to declare where you want your notifications to be scheduled to. I would suggest the ObserveOn method is most useful when working with STA systems, most commonly UI applications.

下面的完整脚本:

#r @"packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"packages/Rx-PlatformServices.2.2.5/lib/net45/System.Reactive.PlatformServices.dll"
#r @"packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"

open System
open FSharp.Control.Reactive
open FSharp.Charting
open System.Reactive.Concurrency

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable
    |> Observable.subscribeOn NewThreadScheduler.Default 

let lc = obs |> LiveChart.Point
lc.ShowChart()

通常在 UI 应用程序中,您会将 subscribeOn 和 observeOn 配对以确保结果在 UI 线程上返回。这里似乎不需要,因为看起来图表会为您处理这个问题(对我有用)。