为什么我的 Rx.NET observable 似乎产生了整个序列两次?
Why does my Rx.NET observable appear to produce its entire sequence twice?
我有一个随机失败的单元测试,我无法解释。这涉及使用 Rx.NET 的可观察序列和我为转换序列所做的扩展方法。首先,让我展示一下测试是如何失败的:
Machine.Specifications.SpecificationException: Expected: System.Collections.Generic.List`1[System.Int32]:
{
[8],
[10],
[11]
}
But was: System.Collections.Generic.List`1[System.Int32]:
{
[8],
[10],
[11],
[8],
[10],
[11]
}
好的,所以你看,我得到了整个序列两次而不是一次。这是测试:
[Subject(typeof(ObservableExtensions), "Shutter Current Readings")]
internal class when_a_shutter_current_reading_is_received
{
Establish context = () => source = "Z8\nZ10\nZ11\n".ToObservable();
Because of = () => source
.ShutterCurrentReadings().Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
static List<int> elementHistory = new List<int>();
static List<int> expectedElements = new List<int> {8, 10, 11};
static IObservable<char> source;
}
SubscribeAndWaitForCompletion()
是一个扩展方法,定义如下:
public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
{
var sequenceComplete = new ManualResetEvent(false);
var subscription = sequence.Subscribe(
onNext: observer,
onCompleted: () => sequenceComplete.Set()
);
sequenceComplete.WaitOne();
subscription.Dispose();
sequenceComplete.Dispose();
}
你会注意到那里有一个 .Trace()
调用,另一个在扩展方法中,这会通过 NLog 生成关于可观察序列的日志记录,这是跟踪输出:
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Subscribe()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Subscribe()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(8)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(8)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(10)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(10)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(11)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(11)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnCompleted()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnCompleted()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Dispose()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Dispose()
Child test failed
这正是我所期望的。我从我的扩展方法内部获得一个跟踪输出,然后在扩展方法外部的转换序列上获得另一个跟踪输出。正如预期的那样,序列中的每个元素恰好流经系统一次。然而,我在测试中 两次 捕获了整个序列。
我最好提供扩展方法,这样我们就可以看到它的作用。这是:
public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
{
const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
var shutterCurrentRegex =
new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
var shutterCurrentValues = from buffer in buffers
let message = new string(buffer.ToArray())
let patternMatch = shutterCurrentRegex.Match(message)
where patternMatch.Success
let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
select shutterCurrent;
return shutterCurrentValues.Trace("ShutterCurrent");
}
所以这里的目的是从数据流中挑选出电流传感器的读数。读数的格式为 Znn(文字 'Z' 后跟一个或两个十进制数字,然后是换行符。扩展方法将原始输入字符序列转换为表示当前读数的整数序列。过滤器使用 Rx Buffer
运算符缓冲它认为可能是有效传感器读数的字符。当看到 'Z' 字符时打开缓冲区,当看到非数字字符时关闭缓冲区。这是通过匹配和双重检查在正则表达式中解析,然后如果结果通过所有,则将其转换为整数并在输出序列中发出。
谁能看出为什么我的结果中可能会出现双重数据?
更新:与调查相关的附加代码。
public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
Predicate<char> bufferOpening, Predicate<char> bufferClosing)
{
return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
}
Trace
扩展方法可在 NuGet 包 TA.ASCOM.ReactiveCommunications
(我的一个)中找到,但这是来源:
public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
{
var log = LogManager.GetLogger(name);
var id = 0;
return Observable.Create<TSource>(observer =>
{
var idClosure = ++id;
Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v);
observer.OnNext(v);
},
e =>
{
trace("OnError", "");
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}
我怀疑我可能从其他人那里复制了这段代码,但我似乎没有记下是谁。
编辑:
这是一种在 LinqPad 中模拟问题的方法,无需使用 MSpec/NChrunch (?) 运行ner:
void Main()
{
//static initializers
List<int> expectedElements = new List<int> { 8, 10, 11 };
List<int> elementHistory = new List<int>();
IObservable<char> source;
//simulated continuous running of MSpec test
for (int i = 0; i < 20; i++)
{
//establish
source = "Z8\nZ10\nZ11\n".ToObservable();
//because
source
.ShutterCurrentReadings()
.Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
//it
elementHistory.Dump(i.ToString()); //Linqpad
if(elementHistory.Count > 3)
throw new Exception("Assert.ShouldNotHappen");
}
}
public static class Extensions
{
public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
{
const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
var shutterCurrentRegex =
new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
var shutterCurrentValues = from buffer in buffers
let message = new string(buffer.ToArray())
let patternMatch = shutterCurrentRegex.Match(message)
where patternMatch.Success
let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
select shutterCurrent;
return shutterCurrentValues.Trace("ShutterCurrent");
}
public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
{
var sequenceComplete = new ManualResetEvent(false);
var subscription = sequence.Subscribe(
onNext: observer,
onCompleted: () => sequenceComplete.Set()
);
sequenceComplete.WaitOne();
subscription.Dispose();
sequenceComplete.Dispose();
}
public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
{
var log = LogManager.GetLogger(name);
var id = 0;
return Observable.Create<TSource>(observer =>
{
var idClosure = ++id;
Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v);
observer.OnNext(v);
},
e =>
{
trace("OnError", "");
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}
public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
Predicate<char> bufferOpening, Predicate<char> bufferClosing)
{
return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
}
}
这失败了,就像你的场景一样。
我最好的修复建议是将 elementHistory
的初始化移动到 Establish
步骤。您还可以将 source
变量从建立中移开,这样您的测试将如下所示:
internal class when_a_shutter_current_reading_is_received
{
Establish context = () => elementHistory = new List<int>();
Because of = () => "Z8\nZ10\nZ11\n".ToObservable()
.ShutterCurrentReadings()
.Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
static List<int> elementHistory;
static List<int> expectedElements = new List<int> { 8, 10, 11 };
}
您可能还想查看 Microsoft.Reactive.Testing
,它对 Rx 查询提供了一些更强大的测试,尽管它不像您的测试那样简单。
旧答案:
由于缺少 Trace
、ShouldEqual
和 BufferByPredicates
函数,我无法编译您的代码。如果它们来自外部来源,请记录来源。
我猜问题出在 BufferByPredicates
实现、Trace
实现、Publish
之后缺少 Connect
或静态 elementHistory
。
我最好的猜测是静态 elementHistory
:如果该测试同时 运行 两次,那么你有一个竞争条件,你可能会得到双重结果 (Establish
运行s 两次,然后 Because
运行s 两次,然后 It
将失败)。
我有一个随机失败的单元测试,我无法解释。这涉及使用 Rx.NET 的可观察序列和我为转换序列所做的扩展方法。首先,让我展示一下测试是如何失败的:
Machine.Specifications.SpecificationException: Expected: System.Collections.Generic.List`1[System.Int32]: { [8], [10], [11] } But was: System.Collections.Generic.List`1[System.Int32]: { [8], [10], [11], [8], [10], [11] }
好的,所以你看,我得到了整个序列两次而不是一次。这是测试:
[Subject(typeof(ObservableExtensions), "Shutter Current Readings")]
internal class when_a_shutter_current_reading_is_received
{
Establish context = () => source = "Z8\nZ10\nZ11\n".ToObservable();
Because of = () => source
.ShutterCurrentReadings().Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
static List<int> elementHistory = new List<int>();
static List<int> expectedElements = new List<int> {8, 10, 11};
static IObservable<char> source;
}
SubscribeAndWaitForCompletion()
是一个扩展方法,定义如下:
public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
{
var sequenceComplete = new ManualResetEvent(false);
var subscription = sequence.Subscribe(
onNext: observer,
onCompleted: () => sequenceComplete.Set()
);
sequenceComplete.WaitOne();
subscription.Dispose();
sequenceComplete.Dispose();
}
你会注意到那里有一个 .Trace()
调用,另一个在扩展方法中,这会通过 NLog 生成关于可观察序列的日志记录,这是跟踪输出:
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Subscribe() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Subscribe() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(8) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(8) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(10) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(10) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(11) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(11) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnCompleted() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnCompleted() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Dispose() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Dispose() Child test failed
这正是我所期望的。我从我的扩展方法内部获得一个跟踪输出,然后在扩展方法外部的转换序列上获得另一个跟踪输出。正如预期的那样,序列中的每个元素恰好流经系统一次。然而,我在测试中 两次 捕获了整个序列。
我最好提供扩展方法,这样我们就可以看到它的作用。这是:
public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
{
const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
var shutterCurrentRegex =
new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
var shutterCurrentValues = from buffer in buffers
let message = new string(buffer.ToArray())
let patternMatch = shutterCurrentRegex.Match(message)
where patternMatch.Success
let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
select shutterCurrent;
return shutterCurrentValues.Trace("ShutterCurrent");
}
所以这里的目的是从数据流中挑选出电流传感器的读数。读数的格式为 Znn(文字 'Z' 后跟一个或两个十进制数字,然后是换行符。扩展方法将原始输入字符序列转换为表示当前读数的整数序列。过滤器使用 Rx Buffer
运算符缓冲它认为可能是有效传感器读数的字符。当看到 'Z' 字符时打开缓冲区,当看到非数字字符时关闭缓冲区。这是通过匹配和双重检查在正则表达式中解析,然后如果结果通过所有,则将其转换为整数并在输出序列中发出。
谁能看出为什么我的结果中可能会出现双重数据?
更新:与调查相关的附加代码。
public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
Predicate<char> bufferOpening, Predicate<char> bufferClosing)
{
return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
}
Trace
扩展方法可在 NuGet 包 TA.ASCOM.ReactiveCommunications
(我的一个)中找到,但这是来源:
public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
{
var log = LogManager.GetLogger(name);
var id = 0;
return Observable.Create<TSource>(observer =>
{
var idClosure = ++id;
Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v);
observer.OnNext(v);
},
e =>
{
trace("OnError", "");
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}
我怀疑我可能从其他人那里复制了这段代码,但我似乎没有记下是谁。
编辑:
这是一种在 LinqPad 中模拟问题的方法,无需使用 MSpec/NChrunch (?) 运行ner:
void Main()
{
//static initializers
List<int> expectedElements = new List<int> { 8, 10, 11 };
List<int> elementHistory = new List<int>();
IObservable<char> source;
//simulated continuous running of MSpec test
for (int i = 0; i < 20; i++)
{
//establish
source = "Z8\nZ10\nZ11\n".ToObservable();
//because
source
.ShutterCurrentReadings()
.Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
//it
elementHistory.Dump(i.ToString()); //Linqpad
if(elementHistory.Count > 3)
throw new Exception("Assert.ShouldNotHappen");
}
}
public static class Extensions
{
public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
{
const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
var shutterCurrentRegex =
new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
var shutterCurrentValues = from buffer in buffers
let message = new string(buffer.ToArray())
let patternMatch = shutterCurrentRegex.Match(message)
where patternMatch.Success
let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
select shutterCurrent;
return shutterCurrentValues.Trace("ShutterCurrent");
}
public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
{
var sequenceComplete = new ManualResetEvent(false);
var subscription = sequence.Subscribe(
onNext: observer,
onCompleted: () => sequenceComplete.Set()
);
sequenceComplete.WaitOne();
subscription.Dispose();
sequenceComplete.Dispose();
}
public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
{
var log = LogManager.GetLogger(name);
var id = 0;
return Observable.Create<TSource>(observer =>
{
var idClosure = ++id;
Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v);
observer.OnNext(v);
},
e =>
{
trace("OnError", "");
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}
public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
Predicate<char> bufferOpening, Predicate<char> bufferClosing)
{
return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
}
}
这失败了,就像你的场景一样。
我最好的修复建议是将 elementHistory
的初始化移动到 Establish
步骤。您还可以将 source
变量从建立中移开,这样您的测试将如下所示:
internal class when_a_shutter_current_reading_is_received
{
Establish context = () => elementHistory = new List<int>();
Because of = () => "Z8\nZ10\nZ11\n".ToObservable()
.ShutterCurrentReadings()
.Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
static List<int> elementHistory;
static List<int> expectedElements = new List<int> { 8, 10, 11 };
}
您可能还想查看 Microsoft.Reactive.Testing
,它对 Rx 查询提供了一些更强大的测试,尽管它不像您的测试那样简单。
旧答案:
由于缺少 Trace
、ShouldEqual
和 BufferByPredicates
函数,我无法编译您的代码。如果它们来自外部来源,请记录来源。
我猜问题出在 BufferByPredicates
实现、Trace
实现、Publish
之后缺少 Connect
或静态 elementHistory
。
我最好的猜测是静态 elementHistory
:如果该测试同时 运行 两次,那么你有一个竞争条件,你可能会得到双重结果 (Establish
运行s 两次,然后 Because
运行s 两次,然后 It
将失败)。