使用变量 Period 将 IEnumerable 转换为 IObservable
Convert IEnumerable to IObservable with variable Period
我正在使用 Rx 消耗 3 轴加速度计数据。我需要设置一些单元测试。数据帧进入速度很快,帧之间的中值时间跨度为 80 毫秒,但有时会达到 120 毫秒。此外,它永远不会正好是 80 毫秒,而是在该范围内徘徊。
所以我创建了一个 class 来订阅 IObservable 并将数据帧按顺序记录在 .csv 文件中。 .csv 文件中的一个字段是帧的开始时间,第一帧的开始时间 = 0.0。
现在我想读取该文件并再次流式传输它以进行测试和开发。我想使用 StartTime 字段作为我在测试时触发任何给定加速度计帧的时间表。
我看了这个问题的答案,Scheduling a IEnumerable periodically with .NET reactive extensions
但它似乎只能解决恒定的时间跨度。
问题:
在 Rx 框架中是否已经有一种规范且首选的方式来安排以不规则(但已知)的间隔推送帧,或者我应该以某种方式自己滚动?
编辑 2:
我对一些简单的东西感兴趣:
IObservable<T> AsObservable(
IEnumerable<T> source, Func<T, TimeSpan> getTimeDelta)
{
var retVal = ColdObservableVaryingTime();
foreach(var frame in source)
{
retVal.AddScheduled(getTimeDelta, frame);
}
return retVal;
}
编辑 1:我在这个问题中所说的 "frames",Rx 文档调用 TState。
为此,您可以使用 TestScheduler(获取 Rx-Testing Nuget 包)。使用此 class 作为测试中的 IScheduler 实现将允许您安排序列,因为测试调度程序允许您通过说明何时应推送每个项目来创建可观察的序列。然后,您可以 'play' 调度程序或提前到某个时间等,看看发生了什么。请注意,在此设置中,时间是虚拟化的,为了简单起见,TestScheduler 将 Ticks 作为时间进程的单位进行处理。
RX 网站介绍有一个很好的 RX 测试部分:http://www.introtorx.com/content/v1.0.10621.0/16_TestingRx.html
这是您的另一个选择。 Observable.Generate
方法非常强大,可用于生成非常复杂的值序列。
这是您可以做到的。
因此,从这样的 CSV 文件开始:
var csvLines = new []
{
"A,0",
"B,3",
"C,4",
"D,6",
};
(可以读成 var csvLines = File.ReadAllLines(@"...");
。)
然后您可以解析以下行:
var parsed =
(
from line in csvLines
let parts = line.Split(',')
select new
{
item = parts[0],
seconds = int.Parse(parts[1])
}
).ToArray();
然后确定 CSV 中每行之间的秒数 offset
:
var data =
parsed
.Zip(parsed.Skip(1), (p0, p1) => new
{
p1.item,
offset = p1.seconds - p0.seconds,
})
.ToArray();
现在您可以创建可观察对象了:
var observable =
Observable
.Generate(
0,
n => n < data.Length,
n => n + 1,
n => data[n].item,
n => TimeSpan.FromSeconds(data[n].offset))
.StartWith(parsed[0].item);
当我运行通过这段代码时,我从我的源文件中得到了正确的计时。
我已经根据您在下面评论中的 class 定义更新了我的代码:
IEnumerable<AccelerometerFrame_raw> frames = ...;
var data =
frames
.Zip(frames.Skip(1), (f0, f1) => new
{
f1,
offset = f1.TimeStampSeconds - f0.TimeStampSeconds,
})
.ToArray();
IObservable<AccelerometerFrame_raw> observable =
Observable
.Generate(
0,
n => n < data.Length,
n => n + 1,
n => data[n].f1,
n => TimeSpan.FromSeconds(data[n].offset))
.StartWith(frames.First());
James Lucas 提供的答案很好地指明了正确的方向,但成功的答案涉及更多。
用 csv 文件中的值填充我的 IEnumerable 后,我必须填充
的数组
Recorded<Notification<AccelerometerFrame_raw>>
然后我将该数组作为参数传递给调度程序 CreateColdAbservable。完成此操作后,冷的 Observable 就坐在那里等待启动。在我的特定情况下,代码如下所示:
private TestScheduler sched {get; set;}
public IObservable<AccelerometerFrame_raw> DataStream { get; protected set; }
ctor()
{
DataStream = SetupDeviceStream();
}
private IObservable<AccelerometerFrame_raw> SetupDeviceStream()
{
var framesArray =
new Recorded<Notification<AccelerometerFrame_raw>>[allFrames.Count+1];
int i = 0;
long timeStamp = 0;
foreach(var item in allFrames)
{
timeStamp += (long) (item.TimeStampSeconds * 1000.0);
framesArray[i] = new Recorded<Notification<AccelerometerFrame_raw>>(
timeStamp,
Notification.CreateOnNext(item));
i++;
}
framesArray[i] = new Recorded<Notification<AccelerometerFrame_raw>>(
timeStamp + 10,
Notification.CreateOnCompleted<AccelerometerFrame_raw>());
sched = new TestScheduler();
var stream =sched.CreateColdObservable(framesArray);
return stream;
}
//// 当我准备启动冷可观察对象时,我调用
sched.Start();
将所有这些放在一起的帮助来自
(菲尔哈克)http://haacked.com/archive/2014/03/10/master-time-with-reactive-extensions/
和
https://msdn.microsoft.com/en-us/library/hh229343(v=vs.103).aspx
还需要安装另一个Nuget包:
http://www.nuget.org/packages/reactiveui-testing/
正如 Phil Haak 在链接博客中所说 post、"Unfortunately, [TestScheduler] is a bit of a pain to use as-is which is why Paul Betts took it upon himself to write some useful TestScheduler extension methods"
我正在使用 Rx 消耗 3 轴加速度计数据。我需要设置一些单元测试。数据帧进入速度很快,帧之间的中值时间跨度为 80 毫秒,但有时会达到 120 毫秒。此外,它永远不会正好是 80 毫秒,而是在该范围内徘徊。
所以我创建了一个 class 来订阅 IObservable 并将数据帧按顺序记录在 .csv 文件中。 .csv 文件中的一个字段是帧的开始时间,第一帧的开始时间 = 0.0。
现在我想读取该文件并再次流式传输它以进行测试和开发。我想使用 StartTime 字段作为我在测试时触发任何给定加速度计帧的时间表。
我看了这个问题的答案,Scheduling a IEnumerable periodically with .NET reactive extensions 但它似乎只能解决恒定的时间跨度。
问题: 在 Rx 框架中是否已经有一种规范且首选的方式来安排以不规则(但已知)的间隔推送帧,或者我应该以某种方式自己滚动?
编辑 2: 我对一些简单的东西感兴趣:
IObservable<T> AsObservable(
IEnumerable<T> source, Func<T, TimeSpan> getTimeDelta)
{
var retVal = ColdObservableVaryingTime();
foreach(var frame in source)
{
retVal.AddScheduled(getTimeDelta, frame);
}
return retVal;
}
编辑 1:我在这个问题中所说的 "frames",Rx 文档调用 TState。
为此,您可以使用 TestScheduler(获取 Rx-Testing Nuget 包)。使用此 class 作为测试中的 IScheduler 实现将允许您安排序列,因为测试调度程序允许您通过说明何时应推送每个项目来创建可观察的序列。然后,您可以 'play' 调度程序或提前到某个时间等,看看发生了什么。请注意,在此设置中,时间是虚拟化的,为了简单起见,TestScheduler 将 Ticks 作为时间进程的单位进行处理。
RX 网站介绍有一个很好的 RX 测试部分:http://www.introtorx.com/content/v1.0.10621.0/16_TestingRx.html
这是您的另一个选择。 Observable.Generate
方法非常强大,可用于生成非常复杂的值序列。
这是您可以做到的。
因此,从这样的 CSV 文件开始:
var csvLines = new []
{
"A,0",
"B,3",
"C,4",
"D,6",
};
(可以读成 var csvLines = File.ReadAllLines(@"...");
。)
然后您可以解析以下行:
var parsed =
(
from line in csvLines
let parts = line.Split(',')
select new
{
item = parts[0],
seconds = int.Parse(parts[1])
}
).ToArray();
然后确定 CSV 中每行之间的秒数 offset
:
var data =
parsed
.Zip(parsed.Skip(1), (p0, p1) => new
{
p1.item,
offset = p1.seconds - p0.seconds,
})
.ToArray();
现在您可以创建可观察对象了:
var observable =
Observable
.Generate(
0,
n => n < data.Length,
n => n + 1,
n => data[n].item,
n => TimeSpan.FromSeconds(data[n].offset))
.StartWith(parsed[0].item);
当我运行通过这段代码时,我从我的源文件中得到了正确的计时。
我已经根据您在下面评论中的 class 定义更新了我的代码:
IEnumerable<AccelerometerFrame_raw> frames = ...;
var data =
frames
.Zip(frames.Skip(1), (f0, f1) => new
{
f1,
offset = f1.TimeStampSeconds - f0.TimeStampSeconds,
})
.ToArray();
IObservable<AccelerometerFrame_raw> observable =
Observable
.Generate(
0,
n => n < data.Length,
n => n + 1,
n => data[n].f1,
n => TimeSpan.FromSeconds(data[n].offset))
.StartWith(frames.First());
James Lucas 提供的答案很好地指明了正确的方向,但成功的答案涉及更多。
用 csv 文件中的值填充我的 IEnumerable 后,我必须填充
的数组Recorded<Notification<AccelerometerFrame_raw>>
然后我将该数组作为参数传递给调度程序 CreateColdAbservable。完成此操作后,冷的 Observable 就坐在那里等待启动。在我的特定情况下,代码如下所示:
private TestScheduler sched {get; set;}
public IObservable<AccelerometerFrame_raw> DataStream { get; protected set; }
ctor()
{
DataStream = SetupDeviceStream();
}
private IObservable<AccelerometerFrame_raw> SetupDeviceStream()
{
var framesArray =
new Recorded<Notification<AccelerometerFrame_raw>>[allFrames.Count+1];
int i = 0;
long timeStamp = 0;
foreach(var item in allFrames)
{
timeStamp += (long) (item.TimeStampSeconds * 1000.0);
framesArray[i] = new Recorded<Notification<AccelerometerFrame_raw>>(
timeStamp,
Notification.CreateOnNext(item));
i++;
}
framesArray[i] = new Recorded<Notification<AccelerometerFrame_raw>>(
timeStamp + 10,
Notification.CreateOnCompleted<AccelerometerFrame_raw>());
sched = new TestScheduler();
var stream =sched.CreateColdObservable(framesArray);
return stream;
}
//// 当我准备启动冷可观察对象时,我调用
sched.Start();
将所有这些放在一起的帮助来自
(菲尔哈克)http://haacked.com/archive/2014/03/10/master-time-with-reactive-extensions/
和 https://msdn.microsoft.com/en-us/library/hh229343(v=vs.103).aspx
还需要安装另一个Nuget包:
http://www.nuget.org/packages/reactiveui-testing/
正如 Phil Haak 在链接博客中所说 post、"Unfortunately, [TestScheduler] is a bit of a pain to use as-is which is why Paul Betts took it upon himself to write some useful TestScheduler extension methods"