同步环境中的反应式编程:性能与依赖管理

Reactive Programming in synchronous environment: performance vs dependency management

我知道反应式编程在异步环境(网络请求,或繁重的 IO/multi-threading/background 任务)中大放异彩。但是,在同步世界中,我发现反应式编程仍然有很大的好处,可以从程序员。我正在使用 C# 编写一个像电子表格一样的桌面应用程序:大量输入,对这些输入和输出进行计算。我正在使用 RX.net 并享受它给我的免费依赖管理的好处:当输入更改时,我不需要知道什么计算需要重做,什么 ui 需要更新。但是,随着涉及更多 synchronous/sequential 计算,使用 observable 对性能的影响变得更大。考虑这两种编码方式:

  private static void async_world()
  {
     Subject<string> a_ob = new Subject<string>();
     IObservable<string> A_ob = a_ob.Select(str =>
     {
        return my_to_upper(str);
     });
     IObservable<string> AA_ob = A_ob.Select(str => $"{str}{str}");
     IObservable<string> AAA_ob = A_ob.Select(str => $"{str}{str}{str}");
     IObservable<string> AA_AAA_ob = Observable.CombineLatest(AA_ob, AAA_ob,
     (AA, AAA) =>
     {
        return $"{AA}_{AAA}";
     });

     AA_AAA_ob.Subscribe(str => Console.Out.WriteLine(str));
     a_ob.OnNext("a");
  }

  private static void sync_world()
  {
     Subject<string> a_ob = new Subject<string>();

     IObservable<string> result_ob = a_ob.Select(str =>
     {
        var upper = my_to_upper(str);

        var AA = $"{upper}{upper}";
        var AAA = $"{upper}{upper}{upper}";

        return $"{AA}_{AAA}";
     });

     result_ob.Subscribe(str => Console.Out.WriteLine(str));
     a_ob.OnNext("a");
  }

假设 my_to_upper() 是一个缓慢的过程:

  private static string my_to_upper(string str)
  {
     Console.Out.WriteLine($"{str}.ToUpper...");
     for (int i = 0; i < 1000000; i++)
     {
        for (int j = 0; j < 2000; j++)
        {

        }
     }
     Console.Out.WriteLine($"{str}.ToUpper...done");
     return str.ToUpper();
  }

对于async_world(),与sync_world()相比,my_to_upper()执行了两次。当数据到达时(在每次 onNext 调用时)A_ob 执行计算和 "cache" my_to_upper() 的结果并将其传递给 AA_ob 和 AA 会很好A_ob

所以我的问题是:这是我们必须做出的权衡:让计算机自动为我们管理效率低下的依赖关系,或者手动管理依赖关系以获得更好的性能。

可以 'cache' 各种 Publish 重载的结果:

这是一个发布的异步发布形式 + RefCount:

private static void async_world_publish_refcount()
{
    Subject<string> a_ob = new Subject<string>();
    IObservable<string> A_ob = a_ob
        .Select(str => my_to_upper(str)) //same function call as async_world_original, just removed braces.
        .Publish()
        .RefCount();

    IObservable<string> AA_ob = A_ob.Select(str => $"{str}{str}");
    IObservable<string> AAA_ob = A_ob.Select(str => $"{str}{str}{str}");
    IObservable<string> AA_AAA_ob = Observable.CombineLatest(AA_ob, AAA_ob,
    (AA, AAA) =>
    {
        return $"{AA}_{AAA}";
    });

    AA_AAA_ob.Subscribe(str => Console.Out.WriteLine(str));
    a_ob.OnNext("a");
}

这是一个没有 RefCount 的已发布表单:

private static void async_world_publish_only()
{
    Subject<string> a_ob = new Subject<string>();
    IObservable<string> AA_AAA_ob = a_ob
        .Select(str => my_to_upper(str)) //same function call as async_world_original, just removed braces.
        .Publish(A_ob => 
            Observable.CombineLatest(
                A_ob.Select(str => $"{str}{str}"),
                A_ob.Select(str => $"{str}{str}{str}"),
                (AA, AAA) => $"{AA}_{AAA}"
            )
        );

    AA_AAA_ob.Subscribe(str => Console.Out.WriteLine(str));
    a_ob.OnNext("a");
}

如果您是功能性的、响应式的东西的粉丝,那么不带 RefCount 形式的发布可能是首选:它有效地缓存了一个可观察对象的结果值,并使它们在您必须生成一个有限的 lambda 范围内可用选择器来决定如何处理它。如有必要,您可以嵌套 Publish lambda 以访问多个 "cached" 可观察值。

Publish + RefCount,相反,它促进了声明式、迭代式的流水线。一般不太推荐。

您可以阅读更多关于 PublishRefCount 和热与冷可观察量 here