反应式服务。分组和缓存流
Reactive services. Grouping and caching streams
新:带有测试的完整源代码现在位于 https://github.com/bboyle1234/ReactiveTest
假设我们有一个视图状态对象,它可以通过小的局部视图更改事件进行更新。以下是总视图、增量视图更新事件和构建总视图的累加器函数 Update
的一些示例模型:
interface IDeviceView : ICloneable {
Guid DeviceId { get; }
}
class DeviceTotalView : IDeviceView {
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public int Currents { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceVoltagesUpdateView : IDeviceView {
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceCurrentsUpdateView : IDeviceView {
public Guid DeviceId { get; set; }
public int Current { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceUpdateEvent {
public DeviceTotalView View;
public IDeviceView LastUpdate;
}
static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) {
if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception).");
var view = (DeviceTotalView)previousUpdate.View.Clone();
switch (update) {
case DeviceVoltagesUpdateView x: {
view.Voltage = x.Voltage;
break;
}
case DeviceCurrentsUpdateView x: {
view.Currents = x.Current;
break;
}
}
return new DeviceUpdateEvent { View = view, LastUpdate = update };
}
接下来,假设我们已经有一个可注入服务,能够为所有设备生成可观察的小更新事件流,并且我们想要创建一个服务,为单个设备生成聚合视图流.
这是我们要创建的服务的接口:
interface IDeviceService {
/// <summary>
/// Gets an observable that produces aggregated update events for the device with the given deviceId.
/// On subscription, the most recent event is immediately pushed to the subscriber.
/// There can be multiple subscribers.
/// </summary>
IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId);
}
如何使用 System.Reactive v4
库中针对 .netstandard2.0
的响应式扩展来实现此接口及其要求?这是我的带注释的锅炉代码,这是我所能得到的。
class DeviceService : IDeviceService {
readonly IObservable<IDeviceView> Source;
public DeviceService(IObservable<IDeviceView> source) { // injected parameter
/// When injected here, "source" is cold in the sense that it won't produce events until the first time it is subscribed.
/// "source" will throw an exception if its "Subscribe" method is called more than once as it is intended to have only one observer and
/// be read all the way from the beginning.
Source = source;
/// Callers of the "Subscribe" method below will expect data to be preloaded and will expect to be immediately delivered the most
/// recent event. So we need to immediately subscribe to "source" and start preloading the aggregate streams.
/// I'm assuming there is going to need to be a groupby to split the stream by device id.
var groups = source.GroupBy(x => x.DeviceId);
/// Now somehow we need to perform the aggregrate function on each grouping.
/// And create an observable that immediately delivers the most recent aggregated event when "Subscribe" is called below.
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
/// How do we implement this? The observable that we return must be pre-loaded with the latest update
throw new NotImplementedException();
}
}
你在那个要点中有一些奇怪的代码。这是我的工作:
public class DeviceService : IDeviceService, IDisposable
{
readonly IObservable<IDeviceView> Source;
private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();
private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;
private readonly CompositeDisposable _disposable = new CompositeDisposable();
public DeviceService(IObservable<IDeviceView> source)
{
Source = source;
_groupedStream = source
.GroupBy(v => v.DeviceId)
.Select(o => (o.Key, o
.Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))
.Replay(1)
.RefCount()
));
var groupSubscription = _groupedStream.Subscribe(t =>
{
_updateStreams[t.Item1] = t.Item2;
_disposable.Add(t.Item2.Subscribe());
});
_disposable.Add(groupSubscription);
}
public void Dispose()
{
_disposable.Dispose();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)
{
/// How do we implement this? The observable that we return must be pre-loaded with the latest update
if(this._updateStreams.ContainsKey(deviceId))
return this._updateStreams[deviceId];
return _groupedStream
.Where(t => t.Item1 == deviceId)
.Select(t => t.Item2)
.Switch();
}
}
这里的肉是_groupedStream
块。正如您所说,您按 DeviceId 分组,然后使用 Scan
更新状态。我还将 Update
移动到静态 class 并使其成为扩展方法。你需要一个初始状态,所以我修改了你的 DeviceTotalView
class 来获得它。相应修改:
public class DeviceTotalView : IDeviceView
{
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public int Currents { get; set; }
public object Clone() => this.MemberwiseClone();
public static DeviceTotalView GetInitialView(Guid deviceId)
{
return new DeviceTotalView
{
DeviceId = deviceId,
Voltage = 0,
Currents = 0
};
}
}
接下来,.Replay(1).Refcount()
用于记住最近的更新,然后在订阅时提供更新。然后,我们将所有这些子 observables 填充到字典中,以便在方法调用时轻松检索。虚拟订阅 (_disposable.Add(t.Item2.Subscribe())
) 是 Replay
工作所必需的。
如果有对尚未更新的 DeviceId 的早期请求,我们订阅 _groupedStream
它将等待第一次更新,生成该 ID 的可观察值,然后 .Switch
订阅那个子对象。
然而,所有这一切都未能针对您的测试代码,我猜是因为 ConnectableObservableForAsyncProducerConsumerQueue
class。我不想调试它,因为我不建议这样做。一般来说,不建议混合使用 TPL 和 Rx 代码。他们解决的问题在很大程度上是重叠的,并且彼此妨碍。所以我修改了你的测试代码,用重播主题替换了那个可连接的可观察队列。
我还为早期请求添加了测试用例(在该设备的更新到达之前):
DeviceUpdateEvent deviceView1 = null;
DeviceUpdateEvent deviceView2 = null;
DeviceUpdateEvent deviceView3 = null;
var subject = new ReplaySubject<IDeviceView>();
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });
var service = new DeviceService(subject);
service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);
service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);
service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);
/// I believe there is no need to pause here because the Subscribe method calls above
/// block until the events have all been pushed into the subscribers above.
Assert.AreEqual(deviceView1.View.DeviceId, id1);
Assert.AreEqual(deviceView2.View.DeviceId, id2);
Assert.AreEqual(deviceView1.View.Voltage, 2);
Assert.AreEqual(deviceView2.View.Voltage, 100);
Assert.IsNull(deviceView3);
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });
Assert.AreEqual(deviceView2.View.Voltage, 101);
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });
Assert.AreEqual(deviceView3.View.DeviceId, id3);
Assert.AreEqual(deviceView3.View.Voltage, 101);
这很好,可以 运行 没有异步。
此外,作为一般提示,我建议使用 Microsoft.Reactive.Testing
包对 Rx 代码进行单元测试,而不是时间间隔测试。
非常感谢@Shlomo 的上述回答。
接受的答案中给出的实现,虽然对我来说是一个神奇的教育,但有几个问题也需要依次解决。第一个是 threadrace 问题,第二个是系统中有大量设备时的性能问题。我最终解决了 threadrace 并通过这个修改后的实现显着提高了性能:
在构造函数中,分组和扫描的设备流直接订阅到 BehaviorSubject
,它实现了 Replay(1).RefCount()
所需的功能,可以立即通知新订阅者流中的最新值。
在 GetDeviceStream
方法中,我们继续使用字典查找来查找设备流,如果字典中不存在则创建预加载的 BehaviorSubject
。我们已经删除了上述问题中先前实现中存在的 Where
搜索。使用 where 搜索导致线程争用问题,该问题已通过使分组流可重播来解决。这导致了指数性能问题。将其替换为 FirstOrDefault
可将时间减少一半,然后将其完全删除以支持 GetCreate
字典技术可提供完美的性能 O(1) 而不是 O(n2)。
GetCreateSubject
使用 Lazy
代理对象作为字典值,因为 ConcurrentDictionary
有时可以为单个键多次调用 Create
方法。向字典提供 Lazy
可确保 Value
属性 仅在其中一个懒人上调用,因此每个设备仅创建一个 BehaviorSubject
。
class DeviceService : IDeviceService, IDisposable {
readonly CompositeDisposable _disposable = new CompositeDisposable();
readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();
BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {
return _streams.GetOrAdd(deviceId, Create).Value;
Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {
return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {
var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));
_disposable.Add(subject);
return subject;
});
}
}
public DeviceService(IConnectableObservable<IDeviceView> source) {
_disposable.Add(source
.GroupBy(x => x.DeviceId)
.Subscribe(deviceStream => {
_disposable.Add(deviceStream
.Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)
.Subscribe(GetCreateSubject(deviceStream.Key)));
}));
_disposable.Add(source.Connect());
}
public void Dispose() {
_disposable.Dispose();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
return GetCreateSubject(deviceId).AsObservable();
}
}
[TestMethod]
public async Task Test2() {
var input = new AsyncProducerConsumerQueue<IDeviceView>();
var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);
var service = new DeviceService(source);
var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();
var idsRemaining = ids.ToHashSet();
var t1 = Task.Run(async () => {
foreach (var id in ids) {
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });
}
});
var t2 = Task.Run(() => {
foreach (var id in ids) {
service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));
}
});
await Task.WhenAll(t1, t2);
var sw = Stopwatch.StartNew();
while (idsRemaining.Count > 0) {
if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");
await Task.Delay(100);
}
}
在此处查看整个问题源代码和测试代码:https://github.com/bboyle1234/ReactiveTest
新:带有测试的完整源代码现在位于 https://github.com/bboyle1234/ReactiveTest
假设我们有一个视图状态对象,它可以通过小的局部视图更改事件进行更新。以下是总视图、增量视图更新事件和构建总视图的累加器函数 Update
的一些示例模型:
interface IDeviceView : ICloneable {
Guid DeviceId { get; }
}
class DeviceTotalView : IDeviceView {
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public int Currents { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceVoltagesUpdateView : IDeviceView {
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceCurrentsUpdateView : IDeviceView {
public Guid DeviceId { get; set; }
public int Current { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceUpdateEvent {
public DeviceTotalView View;
public IDeviceView LastUpdate;
}
static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) {
if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception).");
var view = (DeviceTotalView)previousUpdate.View.Clone();
switch (update) {
case DeviceVoltagesUpdateView x: {
view.Voltage = x.Voltage;
break;
}
case DeviceCurrentsUpdateView x: {
view.Currents = x.Current;
break;
}
}
return new DeviceUpdateEvent { View = view, LastUpdate = update };
}
接下来,假设我们已经有一个可注入服务,能够为所有设备生成可观察的小更新事件流,并且我们想要创建一个服务,为单个设备生成聚合视图流.
这是我们要创建的服务的接口:
interface IDeviceService {
/// <summary>
/// Gets an observable that produces aggregated update events for the device with the given deviceId.
/// On subscription, the most recent event is immediately pushed to the subscriber.
/// There can be multiple subscribers.
/// </summary>
IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId);
}
如何使用 System.Reactive v4
库中针对 .netstandard2.0
的响应式扩展来实现此接口及其要求?这是我的带注释的锅炉代码,这是我所能得到的。
class DeviceService : IDeviceService {
readonly IObservable<IDeviceView> Source;
public DeviceService(IObservable<IDeviceView> source) { // injected parameter
/// When injected here, "source" is cold in the sense that it won't produce events until the first time it is subscribed.
/// "source" will throw an exception if its "Subscribe" method is called more than once as it is intended to have only one observer and
/// be read all the way from the beginning.
Source = source;
/// Callers of the "Subscribe" method below will expect data to be preloaded and will expect to be immediately delivered the most
/// recent event. So we need to immediately subscribe to "source" and start preloading the aggregate streams.
/// I'm assuming there is going to need to be a groupby to split the stream by device id.
var groups = source.GroupBy(x => x.DeviceId);
/// Now somehow we need to perform the aggregrate function on each grouping.
/// And create an observable that immediately delivers the most recent aggregated event when "Subscribe" is called below.
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
/// How do we implement this? The observable that we return must be pre-loaded with the latest update
throw new NotImplementedException();
}
}
你在那个要点中有一些奇怪的代码。这是我的工作:
public class DeviceService : IDeviceService, IDisposable
{
readonly IObservable<IDeviceView> Source;
private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();
private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;
private readonly CompositeDisposable _disposable = new CompositeDisposable();
public DeviceService(IObservable<IDeviceView> source)
{
Source = source;
_groupedStream = source
.GroupBy(v => v.DeviceId)
.Select(o => (o.Key, o
.Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))
.Replay(1)
.RefCount()
));
var groupSubscription = _groupedStream.Subscribe(t =>
{
_updateStreams[t.Item1] = t.Item2;
_disposable.Add(t.Item2.Subscribe());
});
_disposable.Add(groupSubscription);
}
public void Dispose()
{
_disposable.Dispose();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)
{
/// How do we implement this? The observable that we return must be pre-loaded with the latest update
if(this._updateStreams.ContainsKey(deviceId))
return this._updateStreams[deviceId];
return _groupedStream
.Where(t => t.Item1 == deviceId)
.Select(t => t.Item2)
.Switch();
}
}
这里的肉是_groupedStream
块。正如您所说,您按 DeviceId 分组,然后使用 Scan
更新状态。我还将 Update
移动到静态 class 并使其成为扩展方法。你需要一个初始状态,所以我修改了你的 DeviceTotalView
class 来获得它。相应修改:
public class DeviceTotalView : IDeviceView
{
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public int Currents { get; set; }
public object Clone() => this.MemberwiseClone();
public static DeviceTotalView GetInitialView(Guid deviceId)
{
return new DeviceTotalView
{
DeviceId = deviceId,
Voltage = 0,
Currents = 0
};
}
}
接下来,.Replay(1).Refcount()
用于记住最近的更新,然后在订阅时提供更新。然后,我们将所有这些子 observables 填充到字典中,以便在方法调用时轻松检索。虚拟订阅 (_disposable.Add(t.Item2.Subscribe())
) 是 Replay
工作所必需的。
如果有对尚未更新的 DeviceId 的早期请求,我们订阅 _groupedStream
它将等待第一次更新,生成该 ID 的可观察值,然后 .Switch
订阅那个子对象。
然而,所有这一切都未能针对您的测试代码,我猜是因为 ConnectableObservableForAsyncProducerConsumerQueue
class。我不想调试它,因为我不建议这样做。一般来说,不建议混合使用 TPL 和 Rx 代码。他们解决的问题在很大程度上是重叠的,并且彼此妨碍。所以我修改了你的测试代码,用重播主题替换了那个可连接的可观察队列。
我还为早期请求添加了测试用例(在该设备的更新到达之前):
DeviceUpdateEvent deviceView1 = null;
DeviceUpdateEvent deviceView2 = null;
DeviceUpdateEvent deviceView3 = null;
var subject = new ReplaySubject<IDeviceView>();
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });
var service = new DeviceService(subject);
service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);
service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);
service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);
/// I believe there is no need to pause here because the Subscribe method calls above
/// block until the events have all been pushed into the subscribers above.
Assert.AreEqual(deviceView1.View.DeviceId, id1);
Assert.AreEqual(deviceView2.View.DeviceId, id2);
Assert.AreEqual(deviceView1.View.Voltage, 2);
Assert.AreEqual(deviceView2.View.Voltage, 100);
Assert.IsNull(deviceView3);
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });
Assert.AreEqual(deviceView2.View.Voltage, 101);
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });
Assert.AreEqual(deviceView3.View.DeviceId, id3);
Assert.AreEqual(deviceView3.View.Voltage, 101);
这很好,可以 运行 没有异步。
此外,作为一般提示,我建议使用 Microsoft.Reactive.Testing
包对 Rx 代码进行单元测试,而不是时间间隔测试。
非常感谢@Shlomo 的上述回答。
接受的答案中给出的实现,虽然对我来说是一个神奇的教育,但有几个问题也需要依次解决。第一个是 threadrace 问题,第二个是系统中有大量设备时的性能问题。我最终解决了 threadrace 并通过这个修改后的实现显着提高了性能:
在构造函数中,分组和扫描的设备流直接订阅到 BehaviorSubject
,它实现了 Replay(1).RefCount()
所需的功能,可以立即通知新订阅者流中的最新值。
在 GetDeviceStream
方法中,我们继续使用字典查找来查找设备流,如果字典中不存在则创建预加载的 BehaviorSubject
。我们已经删除了上述问题中先前实现中存在的 Where
搜索。使用 where 搜索导致线程争用问题,该问题已通过使分组流可重播来解决。这导致了指数性能问题。将其替换为 FirstOrDefault
可将时间减少一半,然后将其完全删除以支持 GetCreate
字典技术可提供完美的性能 O(1) 而不是 O(n2)。
GetCreateSubject
使用 Lazy
代理对象作为字典值,因为 ConcurrentDictionary
有时可以为单个键多次调用 Create
方法。向字典提供 Lazy
可确保 Value
属性 仅在其中一个懒人上调用,因此每个设备仅创建一个 BehaviorSubject
。
class DeviceService : IDeviceService, IDisposable {
readonly CompositeDisposable _disposable = new CompositeDisposable();
readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();
BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {
return _streams.GetOrAdd(deviceId, Create).Value;
Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {
return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {
var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));
_disposable.Add(subject);
return subject;
});
}
}
public DeviceService(IConnectableObservable<IDeviceView> source) {
_disposable.Add(source
.GroupBy(x => x.DeviceId)
.Subscribe(deviceStream => {
_disposable.Add(deviceStream
.Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)
.Subscribe(GetCreateSubject(deviceStream.Key)));
}));
_disposable.Add(source.Connect());
}
public void Dispose() {
_disposable.Dispose();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
return GetCreateSubject(deviceId).AsObservable();
}
}
[TestMethod]
public async Task Test2() {
var input = new AsyncProducerConsumerQueue<IDeviceView>();
var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);
var service = new DeviceService(source);
var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();
var idsRemaining = ids.ToHashSet();
var t1 = Task.Run(async () => {
foreach (var id in ids) {
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });
}
});
var t2 = Task.Run(() => {
foreach (var id in ids) {
service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));
}
});
await Task.WhenAll(t1, t2);
var sw = Stopwatch.StartNew();
while (idsRemaining.Count > 0) {
if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");
await Task.Delay(100);
}
}
在此处查看整个问题源代码和测试代码:https://github.com/bboyle1234/ReactiveTest