为什么我的 Rx.NET observable 似乎产生了整个序列两次?

Why does my Rx.NET observable appear to produce its entire sequence twice?

我有一个随机失败的单元测试,我无法解释。这涉及使用 Rx.NET 的可观察序列和我为转换序列所做的扩展方法。首先,让我展示一下测试是如何失败的:

Machine.Specifications.SpecificationException:   Expected: System.Collections.Generic.List`1[System.Int32]:
{
  [8],
  [10],
  [11]
}

  But was:  System.Collections.Generic.List`1[System.Int32]:
{
  [8],
  [10],
  [11],
  [8],
  [10],
  [11]
}

好的,所以你看,我得到了整个序列两次而不是一次。这是测试:

[Subject(typeof(ObservableExtensions), "Shutter Current Readings")]
internal class when_a_shutter_current_reading_is_received
    {
    Establish context = () => source = "Z8\nZ10\nZ11\n".ToObservable();
    Because of = () => source
        .ShutterCurrentReadings().Trace("Unbelievable")
        .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
    It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
    static List<int> elementHistory = new List<int>();
    static List<int> expectedElements = new List<int> {8, 10, 11};
    static IObservable<char> source;

    }

SubscribeAndWaitForCompletion()是一个扩展方法,定义如下:

public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
    {
    var sequenceComplete = new ManualResetEvent(false);
    var subscription = sequence.Subscribe(
        onNext: observer,
        onCompleted: () => sequenceComplete.Set()
        );
    sequenceComplete.WaitOne();
    subscription.Dispose();
    sequenceComplete.Dispose();
    }

你会注意到那里有一个 .Trace() 调用,另一个在扩展方法中,这会通过 NLog 生成关于可观察序列的日志记录,这是跟踪输出:

20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Subscribe()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Subscribe()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(8)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(8)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(10)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(10)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(11)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(11)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnCompleted()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnCompleted()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Dispose()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Dispose()
Child test failed

这正是我所期望的。我从我的扩展方法内部获得一个跟踪输出,然后在扩展方法外部的转换序列上获得另一个跟踪输出。正如预期的那样,序列中的每个元素恰好流经系统一次。然而,我在测试中 两次 捕获了整个序列。

我最好提供扩展方法,这样我们就可以看到它的作用。这是:

    public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
        {
        const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
        var shutterCurrentRegex =
            new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
        var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
        var shutterCurrentValues = from buffer in buffers
                                   let message = new string(buffer.ToArray())
                                   let patternMatch = shutterCurrentRegex.Match(message)
                                   where patternMatch.Success
                                   let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
                                   select shutterCurrent;
        return shutterCurrentValues.Trace("ShutterCurrent");
        }

所以这里的目的是从数据流中挑选出电流传感器的读数。读数的格式为 Znn(文字 'Z' 后跟一个或两个十进制数字,然后是换行符。扩展方法将原始输入字符序列转换为表示当前读数的整数序列。过滤器使用 Rx Buffer 运算符缓冲它认为可能是有效传感器读数的字符。当看到 'Z' 字符时打开缓冲区,当看到非数字字符时关闭缓冲区。这是通过匹配和双重检查在正则表达式中解析,然后如果结果通过所有,则将其转换为整数并在输出序列中发出。

谁能看出为什么我的结果中可能会出现双重数据?

更新:与调查相关的附加代码。

    public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
        Predicate<char> bufferOpening, Predicate<char> bufferClosing)
        {
        return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
        }

Trace 扩展方法可在 NuGet 包 TA.ASCOM.ReactiveCommunications(我的一个)中找到,但这是来源:

    public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
        {
        var log = LogManager.GetLogger(name);
        var id = 0;
        return Observable.Create<TSource>(observer =>
            {
            var idClosure = ++id;
            Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
            trace("Subscribe", "");
            var disposable = source.Subscribe(
                v =>
                    {
                    trace("OnNext", v);
                    observer.OnNext(v);
                    },
                e =>
                    {
                    trace("OnError", "");
                    observer.OnError(e);
                    },
                () =>
                    {
                    trace("OnCompleted", "");
                    observer.OnCompleted();
                    });
            return () =>
                {
                trace("Dispose", "");
                disposable.Dispose();
                };
            });
        }

我怀疑我可能从其他人那里复制了这段代码,但我似乎没有记下是谁。

编辑:

这是一种在 LinqPad 中模拟问题的方法,无需使用 MSpec/NChrunch (?) 运行ner:

void Main()
{
    //static initializers
    List<int> expectedElements = new List<int> { 8, 10, 11 };
    List<int> elementHistory = new List<int>();
    IObservable<char> source;

    //simulated continuous running of MSpec test
    for (int i = 0; i < 20; i++)
    {

        //establish
        source = "Z8\nZ10\nZ11\n".ToObservable();

        //because
        source
            .ShutterCurrentReadings()
            .Trace("Unbelievable")
            .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));

        //it
        elementHistory.Dump(i.ToString()); //Linqpad
        if(elementHistory.Count > 3)
            throw new Exception("Assert.ShouldNotHappen");
    }
}

public static class Extensions
{
    public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
    {
        const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
        var shutterCurrentRegex =
            new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
        var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
        var shutterCurrentValues = from buffer in buffers
                                   let message = new string(buffer.ToArray())
                                   let patternMatch = shutterCurrentRegex.Match(message)
                                   where patternMatch.Success
                                   let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
                                   select shutterCurrent;
        return shutterCurrentValues.Trace("ShutterCurrent");
    }

    public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
    {
        var sequenceComplete = new ManualResetEvent(false);
        var subscription = sequence.Subscribe(
            onNext: observer,
            onCompleted: () => sequenceComplete.Set()
            );
        sequenceComplete.WaitOne();
        subscription.Dispose();
        sequenceComplete.Dispose();
    }

    public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
    {
        var log = LogManager.GetLogger(name);
        var id = 0;
        return Observable.Create<TSource>(observer =>
            {
                var idClosure = ++id;
                Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
                trace("Subscribe", "");
                var disposable = source.Subscribe(
                    v =>
                        {
                            trace("OnNext", v);
                            observer.OnNext(v);
                        },
                    e =>
                        {
                            trace("OnError", "");
                            observer.OnError(e);
                        },
                    () =>
                        {
                            trace("OnCompleted", "");
                            observer.OnCompleted();
                        });
                return () =>
                    {
                        trace("Dispose", "");
                        disposable.Dispose();
                    };
            });
    }

    public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
        Predicate<char> bufferOpening, Predicate<char> bufferClosing)
    {
        return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
    }
}

这失败了,就像你的场景一样。

我最好的修复建议是将 elementHistory 的初始化移动到 Establish 步骤。您还可以将 source 变量从建立中移开,这样您的测试将如下所示:

internal class when_a_shutter_current_reading_is_received
{
    Establish context = () => elementHistory = new List<int>();
    Because of = () => "Z8\nZ10\nZ11\n".ToObservable()
        .ShutterCurrentReadings()
        .Trace("Unbelievable")
        .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
    It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
    static List<int> elementHistory;
    static List<int> expectedElements = new List<int> { 8, 10, 11 };

}

您可能还想查看 Microsoft.Reactive.Testing,它对 Rx 查询提供了一些更强大的测试,尽管它不像您的测试那样简单。


旧答案:

由于缺少 TraceShouldEqualBufferByPredicates 函数,我无法编译您的代码。如果它们来自外部来源,请记录来源。

我猜问题出在 BufferByPredicates 实现、Trace 实现、Publish 之后缺少 Connect 或静态 elementHistory

我最好的猜测是静态 elementHistory:如果该测试同时 运行 两次,那么你有一个竞争条件,你可能会得到双重结果 (Establish 运行s 两次,然后 Because 运行s 两次,然后 It 将失败)。