如何分离 IObservable 和 IObserver

How to Separate IObservable and IObserver

更新:查看底部的示例

我需要在 类 之间发消息。发布者将无限循环,调用一些方法来获取数据,然后将调用的结果传递给 OnNext。可以有很多订阅者,但应该只有一个 IObservable 和一个 long-运行 任务。这是一个实现。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            var subject = new Subject<string>();

            //Create a class and inject the subject as IObserver
            new Publisher(subject);

            //Create a class and inject the subject as IObservable
            new Subscriber(subject, 1.ToString());
            new Subscriber(subject, 2.ToString());
            new Subscriber(subject, 3.ToString());

            //Run the loop for 3 seconds
            await Task.Delay(3000);
        }

        class Publisher
        {
            public Publisher(IObserver<string> observer)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data, publish it with OnNext and wait 500 milliseconds
                        observer.OnNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            public string Name;

            //Listen for OnNext and write to the debug window when it happens
            public Subscriber(IObservable<string> observable, string name)
            {
                Name = name;
                var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
            }
        }
    }
}

输出:

Name: 1 Message: Hi

Name: 2 Message: Hi

Name: 3 Message: Hi

Name: 1 Message: Hi

Name: 2 Message: Hi

Name: 3 Message: Hi

这很好用。请注意,只有一个 IObserver 发送消息,但所有订阅都会接收消息。 但是,如何将 IObservableIObserver 分开? 它们粘在一起成为 Subject。这是另一种方法。

[TestMethod]
public async Task RunMessagingAsync2()
{
    var observers = new List<IObserver<string>>();

    var observable = Observable.Create(
    (IObserver<string> observer) =>
    {
        observers.Add(observer);

        Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    observer.OnNext(GetSomeData());
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }

                await Task.Delay(500);
            }
        });

        return Disposable.Create(() => { });
    });

    //Create a class and inject the subject as IObservable
    new Subscriber(observable);
    new Subscriber(observable);

    //Run the loop for 10 seconds
    await Task.Delay(10000);

    Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}

这里的问题是这会创建两个单独的 Task 和两个单独的 IObserver。每个订阅都会创建一个新的 IObserver。您可以确认,因为这里的 Assert 失败了。这对我来说真的没有任何意义。根据我对响应式编程的理解,我不希望这里的 Subscribe 方法每次都创建一个新的 IObserver 。查看 this gist. It is a slight modification of the Observable.Create example。它显示了 Subscribe 方法如何在每次调用时创建一个 IObserver。 如何在不使用 Subject 的情况下实现第一个示例的功能?

这是另一种完全不使用 Reactive UI 的方法...如果需要,您可以从发布者那里创建 Subject,但这不是必需的。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";
   
        class Publisher
        {
            public Publisher(Action<string> onNext)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data, publish it with OnNext and wait 500 milliseconds
                        onNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            //Listen for OnNext and write to the debug window when it happens
            public void ReceiveMessage(string message) => Debug.WriteLine(message);
        }

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            //Create a class and inject the subject as IObservable
            var subscriber = new Subscriber();

            //Create a class and inject the subject as IObserver
            new Publisher(subscriber.ReceiveMessage);

            //Run the loop for 10 seconds
            await Task.Delay(10000);
        }
    }
}

最后,我要补充一点,ReactiveUI 曾经有一个 MessageBus class。我不确定它是否被删除,但不再推荐。他们建议我们改用什么?

工作示例

这个版本是正确的。我想我现在唯一要问的是 我如何用 Observable.Create 做同样的事情? Observable.Create 的问题是它为每个订阅运行操作。这不是预期的功能。这里的长运行任务无论有多少订阅都只运行一次

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable, string name)
        {
            Name = name;
            var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    internal class BasicObservable<T> : IObservable<T>
    {
        List<IObserver<T>> _observers = new List<IObserver<T>>();

        public BasicObservable(
            Func<T> getData,
            TimeSpan? interval = null,
            CancellationToken cancellationToken = default
            ) =>

            Task.Run(async () =>
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
                        var data = getData();
                        _observers.ForEach(o => o.OnNext(data));
                    }
                    catch (Exception ex)
                    {
                        _observers.ForEach(o => o.OnError(ex));
                    }
                }

                _observers.ForEach(o => o.OnCompleted());

            }, cancellationToken);

        public IDisposable Subscribe(IObserver<T> observer)
        {
            _observers.Add(observer);
            return Disposable.Create(observer, (o) => _observers.Remove(o));
        }
    }

    public static class ObservableExtensions
    {
        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,
            CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData, default, cancellationToken);

        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,
            TimeSpan? interval = null,
            CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData, interval, cancellationToken);
    }

    [TestClass]
    public class UnitTest1
    {
        string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            Func<string> getData = GetData;

            var publisher = getData.CreateObservable(cancellationToken);

            new Subscriber(publisher, "One");
            new Subscriber(publisher, "Two");

            for (var i = 0; true; i++)
            {
                if (i >= 5)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }
        }
    }

}

首先你必须熟悉"cold" and "hot" observables. Here is the definition from the Introduction to RX的理论。

  1. Cold 是被动序列,并根据请求(订阅时)开始生成通知。
  2. 热门 是活跃的序列,无论订阅如何都会产生通知。

你想要的是热可观察对象,问题是 Observable.Create 方法创建了冷可观察对象。但是您可以使用 Publish 运算符使任何 observable 变热。该运算符提供了一种方法,可以让多个独立的观察者共享一个单一的底层订阅。示例:

int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
    _ = Task.Run(async () =>
    {
        while (true)
        {
            observer.OnNext(++index);
            await Task.Delay(1000);
        }
    });
    return Disposable.Empty;
});

IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop

hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));

调用 hotObservable.Connect 方法时订阅由 Observable.Create 创建的 coldObservable,然后该单个订阅生成的所有通知将传播到 [=] 的所有订阅者18=].

输出:

Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...

重要提示: 上面示例的目的是演示 Publish 运算符,而不是作为高质量 RX 代码的示例。它的一个问题是,通过在连接到源之后订阅观察者,理论上可能不会将第一个通知发送给部分或所有观察者,因为它可能在他们订阅之前创建。换句话说,存在竞争条件。

有另一种方法可以管理 IConnectableObservable 的生命周期,运算符 RefCount:

Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.

var hotObservable = coldObservable.Publish().RefCount();

这样您就不需要手动 Connect 了。连接在第一次订阅时自动发生,并在最后一次取消订阅时自动释放。

感谢上面的回答,我最终得到了想要的结果,而无需实施IObservable。西奥多是对的。答案是使用 Publish() 方法将 IObservable 转换为热。

我写了一篇关于这个的文章here

虽然这可行,但 Enigmativity 上面的答案要好得多。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Observables
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable, string name)
        {
            Name = name;
            observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    [TestClass]
    public class UnitTest1
    {
        static string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            var coldObservable = Observable.Create<string>(observer =>
            {
                _ = Task.Run(async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var data = GetData();
                        observer.OnNext(data);
                        await Task.Delay(1000);
                    }
                }, cancellationToken);

                return Disposable.Empty;
            });

            var publisher = coldObservable.Publish();
            var connection = publisher.Connect();

            new Subscriber(publisher, "One");
            new Subscriber(publisher, "Two");

            for (var i = 0; i < 5; i++)
            {
                if (i == 4)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }

            connection.Dispose();
        }
    }
}

我将此添加为答案,因为我觉得 Christian 在他的答案中发布的代码很危险,因为它混合了 Tasks 和 Rx,并且存在竞争条件。

这是解决大部分问题的替代方法:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IDisposable Subscriber(IObservable<string> observable, string name) =>
        observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData());
                
        var publisher = coldObservable.Publish();

        var subscriptions =
            new CompositeDisposable(
                Subscriber(publisher, "One"),
                Subscriber(publisher, "Two"),
                publisher.Connect());

        await Task.Delay(TimeSpan.FromSeconds(5.0));

        subscriptions.Dispose();
    }
}

不过,更好的是,我会考虑这样做:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IObservable<string> Subscriber(IObservable<string> observable, string name) =>
        observable.Select(s => $"Name: {name} Message: {s}");
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData())
                .Do(_ => Debug.WriteLine("Called GetData()"))
                .Publish(published =>
                    Observable
                        .Merge(
                            Subscriber(published, "One"),
                            Subscriber(published, "Two")))
                .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
                .Do(x => Debug.WriteLine(x));
    
        await coldObservable;
    }
}

始终最好使用 Rx 的内置运算符,而不是任务的混合方法。