我如何修改 IObservable<char> 以便我收集字符直到一段时间内没有字符?
How can I modify an IObservable<char> such that I collect characters until there have been no characters for a period of time?
我想编写一个接受 IObvservable<char>
并生成 IObservable<string>
的 Rx 查询。应缓冲字符串,直到在指定时间内没有生成任何字符。
数据源是一个串行端口,我从中捕获了 DataReceived
事件,并从中生成了一个 IObservable<char>
。我处理的协议基本上是基于字符的,但它的实现不是很一致,所以我需要以各种不同的方式观察字符流。在某些情况下,有一个响应结束终止符(但不是换行符),在一种情况下,我得到一个长度未知的字符串,我知道它已经全部到达的唯一方法是在几百毫秒内没有其他任何东西到达.这就是我要解决的问题。
我发现了
var result = source.Buffer(TimeSpan.FromMilliseconds(200))
.Select(s=>new string(s.ToArray()));
Buffer(TimeSpan)
几乎是我所需要的,但不完全是。我需要在每次新字符到达时重置计时器,以便仅在自上一个字符以来经过足够的时间后才生成缓冲区。
拜托,任何人都可以提供有关如何实现此目标的建议吗?
[更新]
当我在等待答案时,我想出了一个我自己的解决方案,它基本上重新发明了 Throttle:
public virtual IObservable<string> BufferUntilQuiescentFor(IObservable<char> source, TimeSpan quietTime)
{
var shared = source.Publish().RefCount();
var timer = new Timer(quietTime.TotalMilliseconds);
var bufferCloser = new Subject<Unit>();
// Hook up the timer's Elapsed event so that it notifies the bufferCloser sequence
timer.Elapsed += (sender, args) =>
{
timer.Stop();
bufferCloser.OnNext(Unit.Default); // close the buffer
};
// Whenever the shared source sequence produces a value, reset the timer, which will prevent the buffer from closing.
shared.Subscribe(value =>
{
timer.Stop();
timer.Start();
});
// Finally, return the buffered sequence projected into IObservable<string>
var sequence = shared.Buffer(() => bufferCloser).Select(s=>new string(s.ToArray()));
return sequence;
}
我没有正确理解 Throttle
,我认为它的行为与实际不同 - 现在我已经用 'marble diagram' 向我解释了它并且我理解正确,我相信它实际上是一个比我想出的更优雅的解决方案(我也没有测试我的代码)。不过这是一个有趣的练习 ;-)
这是否满足您的需求?
var result =
source
.Publish(hot =>
hot.Buffer(() =>
hot.Throttle(TimeSpan.FromMilliseconds(200))))
.Select(s => new string(s.ToArray()));
这一切都归功于 Enigmativity - 我只是在这里重复它以配合我添加的解释。
var dueTime = TimeSpan.FromMilliseconds(200);
var result = source
.Publish(o => o.Buffer(() => o.Throttle(dueTime)))
.Select(cs => new string(cs.ToArray()));
其工作原理如下图所示(其中dueTime
对应时间的三划线):
source: -----h--el--l--o----wo-r--l-d---|
throttled: ------------------o------------d|
buffer[0]: -----h--el--l--o--|
buffer[1]: -wo-r--l-d--|
result: ------------------"hello"------"world"
使用 Publish
只是为了确保 Buffer
和 Throttle
共享对基础 source
的单一订阅。来自 Throttle
的文档:
Ignores the values from an observable sequence which are followed by another value before due time...
正在使用的 Buffer
的重载需要一个 "buffer closings." 序列,每次序列发出一个值时,当前缓冲区结束并开始下一个缓冲区。
我想编写一个接受 IObvservable<char>
并生成 IObservable<string>
的 Rx 查询。应缓冲字符串,直到在指定时间内没有生成任何字符。
数据源是一个串行端口,我从中捕获了 DataReceived
事件,并从中生成了一个 IObservable<char>
。我处理的协议基本上是基于字符的,但它的实现不是很一致,所以我需要以各种不同的方式观察字符流。在某些情况下,有一个响应结束终止符(但不是换行符),在一种情况下,我得到一个长度未知的字符串,我知道它已经全部到达的唯一方法是在几百毫秒内没有其他任何东西到达.这就是我要解决的问题。
我发现了
var result = source.Buffer(TimeSpan.FromMilliseconds(200))
.Select(s=>new string(s.ToArray()));
Buffer(TimeSpan)
几乎是我所需要的,但不完全是。我需要在每次新字符到达时重置计时器,以便仅在自上一个字符以来经过足够的时间后才生成缓冲区。
拜托,任何人都可以提供有关如何实现此目标的建议吗?
[更新] 当我在等待答案时,我想出了一个我自己的解决方案,它基本上重新发明了 Throttle:
public virtual IObservable<string> BufferUntilQuiescentFor(IObservable<char> source, TimeSpan quietTime)
{
var shared = source.Publish().RefCount();
var timer = new Timer(quietTime.TotalMilliseconds);
var bufferCloser = new Subject<Unit>();
// Hook up the timer's Elapsed event so that it notifies the bufferCloser sequence
timer.Elapsed += (sender, args) =>
{
timer.Stop();
bufferCloser.OnNext(Unit.Default); // close the buffer
};
// Whenever the shared source sequence produces a value, reset the timer, which will prevent the buffer from closing.
shared.Subscribe(value =>
{
timer.Stop();
timer.Start();
});
// Finally, return the buffered sequence projected into IObservable<string>
var sequence = shared.Buffer(() => bufferCloser).Select(s=>new string(s.ToArray()));
return sequence;
}
我没有正确理解 Throttle
,我认为它的行为与实际不同 - 现在我已经用 'marble diagram' 向我解释了它并且我理解正确,我相信它实际上是一个比我想出的更优雅的解决方案(我也没有测试我的代码)。不过这是一个有趣的练习 ;-)
这是否满足您的需求?
var result =
source
.Publish(hot =>
hot.Buffer(() =>
hot.Throttle(TimeSpan.FromMilliseconds(200))))
.Select(s => new string(s.ToArray()));
这一切都归功于 Enigmativity - 我只是在这里重复它以配合我添加的解释。
var dueTime = TimeSpan.FromMilliseconds(200);
var result = source
.Publish(o => o.Buffer(() => o.Throttle(dueTime)))
.Select(cs => new string(cs.ToArray()));
其工作原理如下图所示(其中dueTime
对应时间的三划线):
source: -----h--el--l--o----wo-r--l-d---|
throttled: ------------------o------------d|
buffer[0]: -----h--el--l--o--|
buffer[1]: -wo-r--l-d--|
result: ------------------"hello"------"world"
使用 Publish
只是为了确保 Buffer
和 Throttle
共享对基础 source
的单一订阅。来自 Throttle
的文档:
Ignores the values from an observable sequence which are followed by another value before due time...
正在使用的 Buffer
的重载需要一个 "buffer closings." 序列,每次序列发出一个值时,当前缓冲区结束并开始下一个缓冲区。