如何在 RX 中有条件地缓冲
How to conditionally buffer in RX
我正在尝试将日志文件作为流加载,这样我就可以通过网络获得实时日志裁剪器。
它应该从日志文件中加载最新的历史行,然后为保存的每个新行触发更新。
为了减少 signalR 流量,我使用 RX 缓冲区将它们分成很多 100 行,但是这在加载初始文件内容时是一个问题 - 这可能是 100k 行。以 100 个为一组加载这个太慢了。初始文件内容应作为一条消息发送。
我真正想要的是首先在 Observable 上发送一个标记,其中包含迄今为止文件的全部内容,然后从那时起,为新行写入触发缓冲更新。但我不确定如何将初始内容作为单个滴答通过,然后从那时起缓冲。
到目前为止我的代码
var watcherSubject = new ReplaySubject<LogTailMessage>()
var watcher = new logFileWatcher(logFileLocation)
new TaskFactory().StartNew(() => watcher.StartFileWatch(data => watcherSubject.OnNext(data), CancellationToken.None));
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.Replay()
.RefCount();
更新解决方案
var initialFileLines = watcher.GetInitialData();
new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
Stream = watcherSubject.Buffer(TimeSpan.FromMilliseconds(500), 100)
.StartWith(initialFileLines)
.Replay()
.RefCount();
使用StartWith
:
var originalFileLines = new List<LogTailMessage>(); //Initialize with file contents.
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.StartWith(originalFileLines)
.Replay()
.RefCount();
更新:我不确定为什么 StartWith
不能可靠地工作。你能用模拟的例子修改答案吗?
.Concat
应该可以,但我认为这基本上就是 StartWith
应该做的。:
Stream = Observable.Return(originalFileLines).Concat(
watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.Replay()
.RefCount()
);
我正在尝试将日志文件作为流加载,这样我就可以通过网络获得实时日志裁剪器。
它应该从日志文件中加载最新的历史行,然后为保存的每个新行触发更新。
为了减少 signalR 流量,我使用 RX 缓冲区将它们分成很多 100 行,但是这在加载初始文件内容时是一个问题 - 这可能是 100k 行。以 100 个为一组加载这个太慢了。初始文件内容应作为一条消息发送。
我真正想要的是首先在 Observable 上发送一个标记,其中包含迄今为止文件的全部内容,然后从那时起,为新行写入触发缓冲更新。但我不确定如何将初始内容作为单个滴答通过,然后从那时起缓冲。
到目前为止我的代码
var watcherSubject = new ReplaySubject<LogTailMessage>()
var watcher = new logFileWatcher(logFileLocation)
new TaskFactory().StartNew(() => watcher.StartFileWatch(data => watcherSubject.OnNext(data), CancellationToken.None));
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.Replay()
.RefCount();
更新解决方案
var initialFileLines = watcher.GetInitialData();
new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
Stream = watcherSubject.Buffer(TimeSpan.FromMilliseconds(500), 100)
.StartWith(initialFileLines)
.Replay()
.RefCount();
使用StartWith
:
var originalFileLines = new List<LogTailMessage>(); //Initialize with file contents.
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.StartWith(originalFileLines)
.Replay()
.RefCount();
更新:我不确定为什么 StartWith
不能可靠地工作。你能用模拟的例子修改答案吗?
.Concat
应该可以,但我认为这基本上就是 StartWith
应该做的。:
Stream = Observable.Return(originalFileLines).Concat(
watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.Replay()
.RefCount()
);