如何分离 IObservable 和 IObserver
How to Separate IObservable and IObserver
更新:查看底部的示例
我需要在 类 之间发消息。发布者将无限循环,调用一些方法来获取数据,然后将调用的结果传递给 OnNext
。可以有很多订阅者,但应该只有一个 IObservable 和一个 long-运行 任务。这是一个实现。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
[TestMethod]
public async Task RunMessagingAsync()
{
var subject = new Subject<string>();
//Create a class and inject the subject as IObserver
new Publisher(subject);
//Create a class and inject the subject as IObservable
new Subscriber(subject, 1.ToString());
new Subscriber(subject, 2.ToString());
new Subscriber(subject, 3.ToString());
//Run the loop for 3 seconds
await Task.Delay(3000);
}
class Publisher
{
public Publisher(IObserver<string> observer)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
observer.OnNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
}
}
输出:
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
这很好用。请注意,只有一个 IObserver
发送消息,但所有订阅都会接收消息。 但是,如何将 IObservable
和 IObserver
分开? 它们粘在一起成为 Subject
。这是另一种方法。
[TestMethod]
public async Task RunMessagingAsync2()
{
var observers = new List<IObserver<string>>();
var observable = Observable.Create(
(IObserver<string> observer) =>
{
observers.Add(observer);
Task.Run(async () =>
{
while (true)
{
try
{
observer.OnNext(GetSomeData());
}
catch (Exception ex)
{
observer.OnError(ex);
}
await Task.Delay(500);
}
});
return Disposable.Create(() => { });
});
//Create a class and inject the subject as IObservable
new Subscriber(observable);
new Subscriber(observable);
//Run the loop for 10 seconds
await Task.Delay(10000);
Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}
这里的问题是这会创建两个单独的 Task
和两个单独的 IObserver
。每个订阅都会创建一个新的 IObserver。您可以确认,因为这里的 Assert
失败了。这对我来说真的没有任何意义。根据我对响应式编程的理解,我不希望这里的 Subscribe
方法每次都创建一个新的 IObserver
。查看 this gist. It is a slight modification of the Observable.Create example。它显示了 Subscribe 方法如何在每次调用时创建一个 IObserver。 如何在不使用 Subject
的情况下实现第一个示例的功能?
这是另一种完全不使用 Reactive UI 的方法...如果需要,您可以从发布者那里创建 Subject
,但这不是必需的。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
class Publisher
{
public Publisher(Action<string> onNext)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
onNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
//Listen for OnNext and write to the debug window when it happens
public void ReceiveMessage(string message) => Debug.WriteLine(message);
}
[TestMethod]
public async Task RunMessagingAsync()
{
//Create a class and inject the subject as IObservable
var subscriber = new Subscriber();
//Create a class and inject the subject as IObserver
new Publisher(subscriber.ReceiveMessage);
//Run the loop for 10 seconds
await Task.Delay(10000);
}
}
}
最后,我要补充一点,ReactiveUI 曾经有一个 MessageBus class。我不确定它是否被删除,但不再推荐。他们建议我们改用什么?
工作示例
这个版本是正确的。我想我现在唯一要问的是 我如何用 Observable.Create
做同样的事情? Observable.Create
的问题是它为每个订阅运行操作。这不是预期的功能。这里的长运行任务无论有多少订阅都只运行一次
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace UnitTestProject1
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
internal class BasicObservable<T> : IObservable<T>
{
List<IObserver<T>> _observers = new List<IObserver<T>>();
public BasicObservable(
Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default
) =>
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
var data = getData();
_observers.ForEach(o => o.OnNext(data));
}
catch (Exception ex)
{
_observers.ForEach(o => o.OnError(ex));
}
}
_observers.ForEach(o => o.OnCompleted());
}, cancellationToken);
public IDisposable Subscribe(IObserver<T> observer)
{
_observers.Add(observer);
return Disposable.Create(observer, (o) => _observers.Remove(o));
}
}
public static class ObservableExtensions
{
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, default, cancellationToken);
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, interval, cancellationToken);
}
[TestClass]
public class UnitTest1
{
string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
Func<string> getData = GetData;
var publisher = getData.CreateObservable(cancellationToken);
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; true; i++)
{
if (i >= 5)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
}
}
}
首先你必须熟悉"cold" and "hot" observables. Here is the definition from the Introduction to RX的理论。
- Cold 是被动序列,并根据请求(订阅时)开始生成通知。
- 热门 是活跃的序列,无论订阅如何都会产生通知。
你想要的是热可观察对象,问题是 Observable.Create
方法创建了冷可观察对象。但是您可以使用 Publish
运算符使任何 observable 变热。该运算符提供了一种方法,可以让多个独立的观察者共享一个单一的底层订阅。示例:
int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
_ = Task.Run(async () =>
{
while (true)
{
observer.OnNext(++index);
await Task.Delay(1000);
}
});
return Disposable.Empty;
});
IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop
hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));
调用 hotObservable.Connect
方法时订阅由 Observable.Create
创建的 coldObservable
,然后该单个订阅生成的所有通知将传播到 [=] 的所有订阅者18=].
输出:
Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...
重要提示: 上面示例的目的是演示 Publish
运算符,而不是作为高质量 RX 代码的示例。它的一个问题是,通过在连接到源之后订阅观察者,理论上可能不会将第一个通知发送给部分或所有观察者,因为它可能在他们订阅之前创建。换句话说,存在竞争条件。
有另一种方法可以管理 IConnectableObservable
的生命周期,运算符 RefCount
:
Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
var hotObservable = coldObservable.Publish().RefCount();
这样您就不需要手动 Connect
了。连接在第一次订阅时自动发生,并在最后一次取消订阅时自动释放。
感谢上面的回答,我最终得到了想要的结果,而无需实施IObservable
。西奥多是对的。答案是使用 Publish()
方法将 IObservable
转换为热。
我写了一篇关于这个的文章here
虽然这可行,但 Enigmativity 上面的答案要好得多。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Observables
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
[TestClass]
public class UnitTest1
{
static string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
var coldObservable = Observable.Create<string>(observer =>
{
_ = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
var data = GetData();
observer.OnNext(data);
await Task.Delay(1000);
}
}, cancellationToken);
return Disposable.Empty;
});
var publisher = coldObservable.Publish();
var connection = publisher.Connect();
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; i < 5; i++)
{
if (i == 4)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
connection.Dispose();
}
}
}
我将此添加为答案,因为我觉得 Christian 在他的答案中发布的代码很危险,因为它混合了 Tasks 和 Rx,并且存在竞争条件。
这是解决大部分问题的替代方法:
public class UnitTest1
{
private string GetData() => "Hi";
private IDisposable Subscriber(IObservable<string> observable, string name) =>
observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
public async Task Messaging()
{
var coldObservable =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
.Select(_ => GetData());
var publisher = coldObservable.Publish();
var subscriptions =
new CompositeDisposable(
Subscriber(publisher, "One"),
Subscriber(publisher, "Two"),
publisher.Connect());
await Task.Delay(TimeSpan.FromSeconds(5.0));
subscriptions.Dispose();
}
}
不过,更好的是,我会考虑这样做:
public class UnitTest1
{
private string GetData() => "Hi";
private IObservable<string> Subscriber(IObservable<string> observable, string name) =>
observable.Select(s => $"Name: {name} Message: {s}");
public async Task Messaging()
{
var coldObservable =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
.Select(_ => GetData())
.Do(_ => Debug.WriteLine("Called GetData()"))
.Publish(published =>
Observable
.Merge(
Subscriber(published, "One"),
Subscriber(published, "Two")))
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
.Do(x => Debug.WriteLine(x));
await coldObservable;
}
}
始终最好使用 Rx 的内置运算符,而不是任务的混合方法。
更新:查看底部的示例
我需要在 类 之间发消息。发布者将无限循环,调用一些方法来获取数据,然后将调用的结果传递给 OnNext
。可以有很多订阅者,但应该只有一个 IObservable 和一个 long-运行 任务。这是一个实现。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
[TestMethod]
public async Task RunMessagingAsync()
{
var subject = new Subject<string>();
//Create a class and inject the subject as IObserver
new Publisher(subject);
//Create a class and inject the subject as IObservable
new Subscriber(subject, 1.ToString());
new Subscriber(subject, 2.ToString());
new Subscriber(subject, 3.ToString());
//Run the loop for 3 seconds
await Task.Delay(3000);
}
class Publisher
{
public Publisher(IObserver<string> observer)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
observer.OnNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
}
}
输出:
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
这很好用。请注意,只有一个 IObserver
发送消息,但所有订阅都会接收消息。 但是,如何将 IObservable
和 IObserver
分开? 它们粘在一起成为 Subject
。这是另一种方法。
[TestMethod]
public async Task RunMessagingAsync2()
{
var observers = new List<IObserver<string>>();
var observable = Observable.Create(
(IObserver<string> observer) =>
{
observers.Add(observer);
Task.Run(async () =>
{
while (true)
{
try
{
observer.OnNext(GetSomeData());
}
catch (Exception ex)
{
observer.OnError(ex);
}
await Task.Delay(500);
}
});
return Disposable.Create(() => { });
});
//Create a class and inject the subject as IObservable
new Subscriber(observable);
new Subscriber(observable);
//Run the loop for 10 seconds
await Task.Delay(10000);
Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}
这里的问题是这会创建两个单独的 Task
和两个单独的 IObserver
。每个订阅都会创建一个新的 IObserver。您可以确认,因为这里的 Assert
失败了。这对我来说真的没有任何意义。根据我对响应式编程的理解,我不希望这里的 Subscribe
方法每次都创建一个新的 IObserver
。查看 this gist. It is a slight modification of the Observable.Create example。它显示了 Subscribe 方法如何在每次调用时创建一个 IObserver。 如何在不使用 Subject
的情况下实现第一个示例的功能?
这是另一种完全不使用 Reactive UI 的方法...如果需要,您可以从发布者那里创建 Subject
,但这不是必需的。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
class Publisher
{
public Publisher(Action<string> onNext)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
onNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
//Listen for OnNext and write to the debug window when it happens
public void ReceiveMessage(string message) => Debug.WriteLine(message);
}
[TestMethod]
public async Task RunMessagingAsync()
{
//Create a class and inject the subject as IObservable
var subscriber = new Subscriber();
//Create a class and inject the subject as IObserver
new Publisher(subscriber.ReceiveMessage);
//Run the loop for 10 seconds
await Task.Delay(10000);
}
}
}
最后,我要补充一点,ReactiveUI 曾经有一个 MessageBus class。我不确定它是否被删除,但不再推荐。他们建议我们改用什么?
工作示例
这个版本是正确的。我想我现在唯一要问的是 我如何用 Observable.Create
做同样的事情? Observable.Create
的问题是它为每个订阅运行操作。这不是预期的功能。这里的长运行任务无论有多少订阅都只运行一次
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace UnitTestProject1
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
internal class BasicObservable<T> : IObservable<T>
{
List<IObserver<T>> _observers = new List<IObserver<T>>();
public BasicObservable(
Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default
) =>
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
var data = getData();
_observers.ForEach(o => o.OnNext(data));
}
catch (Exception ex)
{
_observers.ForEach(o => o.OnError(ex));
}
}
_observers.ForEach(o => o.OnCompleted());
}, cancellationToken);
public IDisposable Subscribe(IObserver<T> observer)
{
_observers.Add(observer);
return Disposable.Create(observer, (o) => _observers.Remove(o));
}
}
public static class ObservableExtensions
{
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, default, cancellationToken);
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, interval, cancellationToken);
}
[TestClass]
public class UnitTest1
{
string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
Func<string> getData = GetData;
var publisher = getData.CreateObservable(cancellationToken);
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; true; i++)
{
if (i >= 5)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
}
}
}
首先你必须熟悉"cold" and "hot" observables. Here is the definition from the Introduction to RX的理论。
- Cold 是被动序列,并根据请求(订阅时)开始生成通知。
- 热门 是活跃的序列,无论订阅如何都会产生通知。
你想要的是热可观察对象,问题是 Observable.Create
方法创建了冷可观察对象。但是您可以使用 Publish
运算符使任何 observable 变热。该运算符提供了一种方法,可以让多个独立的观察者共享一个单一的底层订阅。示例:
int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
_ = Task.Run(async () =>
{
while (true)
{
observer.OnNext(++index);
await Task.Delay(1000);
}
});
return Disposable.Empty;
});
IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop
hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));
调用 hotObservable.Connect
方法时订阅由 Observable.Create
创建的 coldObservable
,然后该单个订阅生成的所有通知将传播到 [=] 的所有订阅者18=].
输出:
Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...
重要提示: 上面示例的目的是演示 Publish
运算符,而不是作为高质量 RX 代码的示例。它的一个问题是,通过在连接到源之后订阅观察者,理论上可能不会将第一个通知发送给部分或所有观察者,因为它可能在他们订阅之前创建。换句话说,存在竞争条件。
有另一种方法可以管理 IConnectableObservable
的生命周期,运算符 RefCount
:
Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
var hotObservable = coldObservable.Publish().RefCount();
这样您就不需要手动 Connect
了。连接在第一次订阅时自动发生,并在最后一次取消订阅时自动释放。
感谢上面的回答,我最终得到了想要的结果,而无需实施IObservable
。西奥多是对的。答案是使用 Publish()
方法将 IObservable
转换为热。
我写了一篇关于这个的文章here
虽然这可行,但 Enigmativity 上面的答案要好得多。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Observables
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
[TestClass]
public class UnitTest1
{
static string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
var coldObservable = Observable.Create<string>(observer =>
{
_ = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
var data = GetData();
observer.OnNext(data);
await Task.Delay(1000);
}
}, cancellationToken);
return Disposable.Empty;
});
var publisher = coldObservable.Publish();
var connection = publisher.Connect();
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; i < 5; i++)
{
if (i == 4)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
connection.Dispose();
}
}
}
我将此添加为答案,因为我觉得 Christian 在他的答案中发布的代码很危险,因为它混合了 Tasks 和 Rx,并且存在竞争条件。
这是解决大部分问题的替代方法:
public class UnitTest1
{
private string GetData() => "Hi";
private IDisposable Subscriber(IObservable<string> observable, string name) =>
observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
public async Task Messaging()
{
var coldObservable =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
.Select(_ => GetData());
var publisher = coldObservable.Publish();
var subscriptions =
new CompositeDisposable(
Subscriber(publisher, "One"),
Subscriber(publisher, "Two"),
publisher.Connect());
await Task.Delay(TimeSpan.FromSeconds(5.0));
subscriptions.Dispose();
}
}
不过,更好的是,我会考虑这样做:
public class UnitTest1
{
private string GetData() => "Hi";
private IObservable<string> Subscriber(IObservable<string> observable, string name) =>
observable.Select(s => $"Name: {name} Message: {s}");
public async Task Messaging()
{
var coldObservable =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
.Select(_ => GetData())
.Do(_ => Debug.WriteLine("Called GetData()"))
.Publish(published =>
Observable
.Merge(
Subscriber(published, "One"),
Subscriber(published, "Two")))
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
.Do(x => Debug.WriteLine(x));
await coldObservable;
}
}
始终最好使用 Rx 的内置运算符,而不是任务的混合方法。