如何在 RX 中有条件地缓冲

How to conditionally buffer in RX

我正在尝试将日志文件作为流加载,这样我就可以通过网络获得实时日志裁剪器。

它应该从日志文件中加载最新的历史行,然后为保存的每个新行触发更新。

为了减少 signalR 流量,我使用 RX 缓冲区将它们分成很多 100 行,但是这在加载初始文件内容时是一个问题 - 这可能是 100k 行。以 100 个为一组加载这个太慢了。初始文件内容应作为一条消息发送。

我真正想要的是首先在 Observable 上发送一个标记,其中包含迄今为止文件的全部内容,然后从那时起,为新行写入触发缓冲更新。但我不确定如何将初始内容作为单个滴答通过,然后从那时起缓冲。

到目前为止我的代码

var watcherSubject = new ReplaySubject<LogTailMessage>()
var watcher = new logFileWatcher(logFileLocation)
new TaskFactory().StartNew(() => watcher.StartFileWatch(data => watcherSubject.OnNext(data), CancellationToken.None));

Stream = watcherSubject
    .Buffer(TimeSpan.FromMilliseconds(500), 100)
    .Where(d => d != null)
    .Replay()
    .RefCount();

更新解决方案

var initialFileLines = watcher.GetInitialData();

new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));

Stream = watcherSubject.Buffer(TimeSpan.FromMilliseconds(500), 100)
    .StartWith(initialFileLines)
    .Replay()
    .RefCount();

使用StartWith:

var originalFileLines = new List<LogTailMessage>(); //Initialize with file contents.

Stream = watcherSubject
    .Buffer(TimeSpan.FromMilliseconds(500), 100)
    .Where(d => d != null)
    .StartWith(originalFileLines)
    .Replay()
    .RefCount();

更新:我不确定为什么 StartWith 不能可靠地工作。你能用模拟的例子修改答案吗?

.Concat 应该可以,但我认为这基本上就是 StartWith 应该做的。:

Stream = Observable.Return(originalFileLines).Concat(
    watcherSubject
        .Buffer(TimeSpan.FromMilliseconds(500), 100)
        .Where(d => d != null)
        .Replay()
        .RefCount()
   );