响应式扩展,包装第三方 API 事件 - 零拷贝?

Reactive Extension, wrap third party API event - zero copy?

我有一个第三方 API 我没有来源。我像这样实例化一个事件回调:

using namespace API; // This is where APIClient lives

namespace TestApiClientUI
{
    public partial class Form1 : Form
    {
        APIClient apiClient = new APICLient();
        apiClient.QuoteUpdated += api_ClientUpdated;

        private void api_ClientUpdated(object sender, string s, double b, double a)
        {
        }
    }
}

如何将其包装到 Rx Observable.FromEvent?

另外,有没有办法让包装副本的开销尽可能少(零副本)?

让我们定义一个结构来存储事件参数。

struct QuoteUpdate
{
    public string S { get; private set; }
    public double B { get; private set; }
    public double A { get; private set; }
    public QuoteUpdate(string s, double b, double a) : this()
    {
        S = s;
        B = b;
        A = a;
    }
}

现在我们可以定义一个IObservable<QuoteUpdate>如下。

var quoteUpdates = Observable.FromEvent<ApiQuoteHandler, QuoteUpdate>(
    emit => (_, s, b, a) => emit(new QuoteUpdate(s, b, a)),
    handler => apiClient.QuoteUpdated += handler,
    handler => apiClient.QuoteUpdated -= handler);

第一个 lambda 定义了从 Action<QuoteUpdate>ApiQuoteHandler 的映射。该操作称为 emit,实际上是向我们定义的可观察对象的订阅者广播一个值。将 emit(value); 想象成 foreach (var subscriber in subscribers) { subscriber.OnNext(value); }.

我们需要第一个映射的原因是因为实际的底层事件只知道如何订阅和取消订阅 ApiQuoteHandler 个实例。第二个和第三个 lambda 的类型为 Action<ApiQuoteHandler。它们分别在订阅和取消订阅时调用。

IObserver<QuoteUpdate> observer 订阅我们定义的 IObservable<QuoteUpdate> 时,会发生以下情况:

  1. observer.OnNext(类型 Action<QuoteUpdate>)作为变量 emit 传递到我们的第一个 lambda 中。这将创建一个 ApiQuoteHandler 来包装 observer.
  2. 然后 ApiQuoteHandler 作为变量 handler 传递给我们的第二个 lambda。 handler 订阅了 apiClient 上的 QuoteUpdated 事件。
  3. 从我们 IObservable<QuoteUpdate> 上的 Subscribe 方法返回的 IDisposable 包含对 ApiQuoteHandler 的引用,该引用由我们的三个 lambda 中的第一个创建。
  4. 稍后处理此 IDisposable 时,将使用与之前相同的 ApiQuoteHandler 调用第三个 lambda,取消订阅基础事件。

因为某些 T 需要单个 IObservable<T> 但您有事件参数 (s, b, a),所以您必须定义一些结构或 class 来存储这三个值。我不会担心复制 string 引用和两个 double 值的成本。