反应式服务。分组和缓存流

Reactive services. Grouping and caching streams

新:带有测试的完整源代码现在位于 https://github.com/bboyle1234/ReactiveTest

假设我们有一个视图状态对象,它可以通过小的局部视图更改事件进行更新。以下是总视图、增量视图更新事件和构建总视图的累加器函数 Update 的一些示例模型:

interface IDeviceView : ICloneable {
    Guid DeviceId { get; }
}

class DeviceTotalView : IDeviceView {
    public Guid DeviceId { get; set; }
    public int Voltage { get; set; }
    public int Currents { get; set; }
    public object Clone() => this.MemberwiseClone();
}

class DeviceVoltagesUpdateView : IDeviceView {
    public Guid DeviceId { get; set; }
    public int Voltage { get; set; }
    public object Clone() => this.MemberwiseClone();
}

class DeviceCurrentsUpdateView : IDeviceView {
    public Guid DeviceId { get; set; }
    public int Current { get; set; }
    public object Clone() => this.MemberwiseClone();
}

class DeviceUpdateEvent {
    public DeviceTotalView View;
    public IDeviceView LastUpdate;
}

static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) {
    if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception).");
    var view = (DeviceTotalView)previousUpdate.View.Clone();
    switch (update) {
        case DeviceVoltagesUpdateView x: {
            view.Voltage = x.Voltage;
            break;
        }
        case DeviceCurrentsUpdateView x: {
            view.Currents = x.Current;
            break;
        }
    }
    return new DeviceUpdateEvent { View = view, LastUpdate = update };
}

接下来,假设我们已经有一个可注入服务,能够为所有设备生成可观察的小更新事件流,并且我们想要创建一个服务,为单个设备生成聚合视图流.

这是我们要创建的服务的接口:

interface IDeviceService {
    /// <summary>
    /// Gets an observable that produces aggregated update events for the device with the given deviceId.
    /// On subscription, the most recent event is immediately pushed to the subscriber.
    /// There can be multiple subscribers.
    /// </summary>
    IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId);
}

如何使用 System.Reactive v4 库中针对 .netstandard2.0 的响应式扩展来实现此接口及其要求?这是我的带注释的锅炉代码,这是我所能得到的。

class DeviceService : IDeviceService {

    readonly IObservable<IDeviceView> Source;

    public DeviceService(IObservable<IDeviceView> source) { // injected parameter
        /// When injected here, "source" is cold in the sense that it won't produce events until the first time it is subscribed.
        /// "source" will throw an exception if its "Subscribe" method is called more than once as it is intended to have only one observer and 
        /// be read all the way from the beginning.
        Source = source;

        /// Callers of the "Subscribe" method below will expect data to be preloaded and will expect to be immediately delivered the most
        /// recent event. So we need to immediately subscribe to "source" and start preloading the aggregate streams.

        /// I'm assuming there is going to need to be a groupby to split the stream by device id.
        var groups = source.GroupBy(x => x.DeviceId);
        /// Now somehow we need to perform the aggregrate function on each grouping.
        /// And create an observable that immediately delivers the most recent aggregated event when "Subscribe" is called below.
    }

    public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
        /// How do we implement this? The observable that we return must be pre-loaded with the latest update
        throw new NotImplementedException();
    }
}

你在那个要点中有一些奇怪的代码。这是我的工作:

public class DeviceService : IDeviceService, IDisposable
{

    readonly IObservable<IDeviceView> Source;
    private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();
    private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;
    private readonly CompositeDisposable _disposable = new CompositeDisposable();

    public DeviceService(IObservable<IDeviceView> source)
    {
        Source = source;

        _groupedStream = source
            .GroupBy(v => v.DeviceId)
            .Select(o => (o.Key, o
                .Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))
                .Replay(1)
                .RefCount()
            ));

        var groupSubscription = _groupedStream.Subscribe(t =>
        {
            _updateStreams[t.Item1] = t.Item2;
            _disposable.Add(t.Item2.Subscribe());
        });
        _disposable.Add(groupSubscription);
    }

    public void Dispose()
    {
        _disposable.Dispose();
    }

    public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)
    {
        /// How do we implement this? The observable that we return must be pre-loaded with the latest update
        if(this._updateStreams.ContainsKey(deviceId))
            return this._updateStreams[deviceId];
        return _groupedStream
            .Where(t => t.Item1 == deviceId)
            .Select(t => t.Item2)
            .Switch();


    }
}

这里的肉是_groupedStream块。正如您所说,您按 DeviceId 分组,然后使用 Scan 更新状态。我还将 Update 移动到静态 class 并使其成为扩展方法。你需要一个初始状态,所以我修改了你的 DeviceTotalView class 来获得它。相应修改:

public class DeviceTotalView : IDeviceView
{
    public Guid DeviceId { get; set; }
    public int Voltage { get; set; }
    public int Currents { get; set; }
    public object Clone() => this.MemberwiseClone();
    public static DeviceTotalView GetInitialView(Guid deviceId)
    {
        return new DeviceTotalView
        {
            DeviceId = deviceId,
            Voltage = 0,
            Currents = 0
        };
    }
}

接下来,.Replay(1).Refcount() 用于记住最近的更新,然后在订阅时提供更新。然后,我们将所有这些子 observables 填充到字典中,以便在方法调用时轻松检索。虚拟订阅 (_disposable.Add(t.Item2.Subscribe())) 是 Replay 工作所必需的。

如果有对尚未更新的 DeviceId 的早期请求,我们订阅 _groupedStream 它将等待第一次更新,生成该 ID 的可观察值,然后 .Switch 订阅那个子对象。

然而,所有这一切都未能针对您的测试代码,我猜是因为 ConnectableObservableForAsyncProducerConsumerQueue class。我不想调试它,因为我不建议这样做。一般来说,不建议混合使用 TPL 和 Rx 代码。他们解决的问题在很大程度上是重叠的,并且彼此妨碍。所以我修改了你的测试代码,用重播主题替换了那个可连接的可观察队列。

我还为早期请求添加了测试用例(在该设备的更新到达之前):

DeviceUpdateEvent deviceView1 = null;
DeviceUpdateEvent deviceView2 = null;
DeviceUpdateEvent deviceView3 = null;

var subject = new ReplaySubject<IDeviceView>();

var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();

subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });

var service = new DeviceService(subject);

service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);
service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);
service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);

/// I believe there is no need to pause here because the Subscribe method calls above 
/// block until the events have all been pushed into the subscribers above.

Assert.AreEqual(deviceView1.View.DeviceId, id1);
Assert.AreEqual(deviceView2.View.DeviceId, id2);
Assert.AreEqual(deviceView1.View.Voltage, 2);
Assert.AreEqual(deviceView2.View.Voltage, 100);
Assert.IsNull(deviceView3);

subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });
Assert.AreEqual(deviceView2.View.Voltage, 101);

subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });
Assert.AreEqual(deviceView3.View.DeviceId, id3);
Assert.AreEqual(deviceView3.View.Voltage, 101);

这很好,可以 运行 没有异步。

此外,作为一般提示,我建议使用 Microsoft.Reactive.Testing 包对 Rx 代码进行单元测试,而不是时间间隔测试。

非常感谢@Shlomo 的上述回答。

接受的答案中给出的实现,虽然对我来说是一个神奇的教育,但有几个问题也需要依次解决。第一个是 threadrace 问题,第二个是系统中有大量设备时的性能问题。我最终解决了 threadrace 并通过这个修改后的实现显着提高了性能:

在构造函数中,分组和扫描的设备流直接订阅到 BehaviorSubject,它实现了 Replay(1).RefCount() 所需的功能,可以立即通知新订阅者流中的最新值。

GetDeviceStream 方法中,我们继续使用字典查找来查找设备流,如果字典中不存在则创建预加载的 BehaviorSubject。我们已经删除了上述问题中先前实现中存在的 Where 搜索。使用 where 搜索导致线程争用问题,该问题已通过使分组流可重播来解决。这导致了指数性能问题。将其替换为 FirstOrDefault 可将时间减少一半,然后将其完全删除以支持 GetCreate 字典技术可提供完美的性能 O(1) 而不是 O(n2)。

GetCreateSubject 使用 Lazy 代理对象作为字典值,因为 ConcurrentDictionary 有时可以为单个键多次调用 Create 方法。向字典提供 Lazy 可确保 Value 属性 仅在其中一个懒人上调用,因此每个设备仅创建一个 BehaviorSubject

class DeviceService : IDeviceService, IDisposable {

    readonly CompositeDisposable _disposable = new CompositeDisposable();
    readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();
    BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {
        return _streams.GetOrAdd(deviceId, Create).Value;
        Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {
            return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {
                var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));
                _disposable.Add(subject);
                return subject;
            });
        }
    }

    public DeviceService(IConnectableObservable<IDeviceView> source) {
        _disposable.Add(source
            .GroupBy(x => x.DeviceId)
            .Subscribe(deviceStream => {
                _disposable.Add(deviceStream
                    .Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)
                    .Subscribe(GetCreateSubject(deviceStream.Key)));
            }));
        _disposable.Add(source.Connect());
    }

    public void Dispose() {
        _disposable.Dispose();
    }

    public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
        return GetCreateSubject(deviceId).AsObservable();
    }
}

[TestMethod]
public async Task Test2() {
    var input = new AsyncProducerConsumerQueue<IDeviceView>();
    var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);
    var service = new DeviceService(source);

    var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();
    var idsRemaining = ids.ToHashSet();
    var t1 = Task.Run(async () => {
        foreach (var id in ids) {
            await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });
        }
    });
    var t2 = Task.Run(() => {
        foreach (var id in ids) {
            service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));
        }
    });
    await Task.WhenAll(t1, t2);
    var sw = Stopwatch.StartNew();
    while (idsRemaining.Count > 0) {
        if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");
        await Task.Delay(100);
    }
}

在此处查看整个问题源代码和测试代码:https://github.com/bboyle1234/ReactiveTest