反应性测试以观察不同值和重复值

Reactive Test to observe both distinct and duplicate values

我写了测试来观察 Distinc 操作作为

public class Test: ReactiveTest {
    [Fact]
    public void Observe_distint_nonDistinc() {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateHotObservable(
            OnNext(100, "a"),
            OnNext(110, "b"),
            OnNext(200, "a"),
            OnNext(220, "c"),
            OnNext(221, "a")
        );


        var results = scheduler.CreateObserver<string>();
        source.Distinct().Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(OnNext(100,"a"),OnNext(110,"b"),OnNext(220,"c"));
    }
}

测试通过ok,但是我不确定如何同时观察重复项。我尝试了一些 PublishCombineLatest 的组合,但我觉得它们不值得一提。我的 duplicate 流应该只有 2 个项目 OnNext(200,"a"), OnNext(221,"a")

这是一个完整的解决方案:

[Fact]
public void ObserveDistinctNonDistinct()
{
    var scheduler = new TestScheduler();
    var source = scheduler.CreateHotObservable(
        OnNext(100, "a"),
        OnNext(110, "b"),
        OnNext(200, "a"),
        OnNext(220, "c"),
        OnNext(221, "a")
    ).Publish();

    var distinctResults = scheduler.CreateObserver<string>();

    source
        .Distinct()
        .Subscribe(distinctResults);

    var nonDistinctResults = scheduler.CreateObserver<string>();

    (from letter in source
        group letter by letter
        into groupedLetters
        from count in groupedLetters
            .Window(Observable.Never<string>())
            .SelectMany(ol =>
                ol.Scan(0, (c, _) => ++c))
        where count > 1
        select groupedLetters.Key)
    .Distinct()
    .Subscribe(nonDistinctResults);

    source.Connect();

    scheduler.AdvanceBy(1000);

    distinctResults.Messages.AssertEqual(OnNext(100, "a"), OnNext(110, "b"), OnNext(220, "c"));
    nonDistinctResults.Messages.AssertEqual(OnNext(200, "a"));
}

它匹配任何重复项的第二次出现。

使用方法语法:

source
    .GroupBy(s => s)
    .SelectMany(g =>
        g.Window(Observable.Never<string>())
            .SelectMany(ol =>
                ol.Scan(0, (c, _) => ++c))
            .Where(l => l > 1)
            .Select(_ => g.Key))
    .Distinct()
    .Subscribe(nonDistinctResults);

直到我找到一种更快的实现方式,我想 post 我的自定义 CollectDublicates 扩展方法。测试现在按预期通过了。

public class Test : ReactiveTest {
    [Fact]
    public void Observe_distint_nonDistinc() {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateHotObservable(
            OnNext(100, "a"),
            OnNext(110, "b"),
            OnNext(200, "a"),
            OnNext(220, "c"),
            OnNext(221, "a")
        ).Publish();


        var distinnctResults = scheduler.CreateObserver<string>();
        source.Distinct().Subscribe(distinnctResults);
        var duplicatesResults = scheduler.CreateObserver<string>();
        source.CollectDuplicates().Subscribe(duplicatesResults);
        source.Connect();
        scheduler.AdvanceBy(1000);

        distinnctResults.Messages.AssertEqual(OnNext(100, "a"), OnNext(110, "b"), OnNext(220, "c"));
        duplicatesResults.Messages.AssertEqual(OnNext(200,"a"),OnNext(221,"a"));
    }
}

public static class RxEx{
    class DubplicateCollector<T> : IEqualityComparer<T> {
        readonly Subject<T> _matches = new Subject<T>();
        public IObservable<T> Matches => _matches;

        public bool Equals(T x, T y) {
            var @equals = x.Equals(y);
            if (equals)
                _matches.OnNext(x);
            return @equals;
        }

        public int GetHashCode(T obj) {
            return obj.GetHashCode();
        }
    }

    public static IObservable<TSource> CollectDuplicates<TSource>(this IObservable<TSource> source) {
        var dubplicateCollector = new DubplicateCollector<TSource>();
        var compositeDisposable = new CompositeDisposable { source.Distinct(dubplicateCollector).Subscribe() };
        return Observable.Create<TSource>(observer => {
            var disposable = dubplicateCollector.Matches.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
            compositeDisposable.Add(disposable);
            return () => compositeDisposable.Dispose();
        });

    }
}