批处理文件流 reader 以发送到 RX

Batching up a filestream reader to send to RX

我想将文件的内容逐行加载到流中,然后坐在那里每秒观察文件中的新条目 - 所以一个实时文件 reader 将输出通过管道传输到接收.

我通过一次读取一行来实现这一点,如果 readline() 上有数据,则通过传入的操作回调,在调用者中,将数据放在 RX 订阅者的 ReplaySubject 上.

问题是在 RX 流上一次只发回一行。我想将它们分批处理,这样它就不会回电,直到您说要发回 10 件物品,或者过了一定时间 - 例如 5-10 秒。

我的回调是一个数据集合,现在我已经将它硬编码为集合中的 return 单个项目,因为我不知道如何进行基于时间的批处理。

任何人都可以建议如何实现这一目标吗?

到目前为止我的代码

public void StartFileWatcher(Action<LogTailMessage[]> callbackAction, CancellationToken cancellationToken)
        {
            var wh = new AutoResetEvent(false);
            var fsw = new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            };
            fsw.Changed += (s, e) => wh.Set();

            var lineNumber = 1;
            var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            using (var sr = new StreamReader(fs))
            {
                while (!cancellationToken.IsCancellationRequested && !_isCancelled)
                {
                    var s = sr.ReadLine();
                    if (s != null)
                    {
                        //todo - batch these up so we only call back once we have 10 items, or if a certain amount of time has passed, send what we have
                        callbackAction(new [] {new LogTailMessage(lineNumber, s)});
                        lineNumber++;
                    }
                    else
                        wh.WaitOne(1000);
                }
            }
        }

更新:缓冲液

var watcherSubject = new ReplaySubject<LogTailMessage>();

            var watcher = new LogFileWatcher(path, filename);

            new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));

            Stream = watcherSubject
                .Buffer(TimeSpan.FromMilliseconds(500), 20)
                .Where(d => d != null)
                .Replay()
                .RefCount();

和文件观察者

public void StartFileWatcher(Action<LogTailMessage> callbackAction, CancellationToken cancellationToken)
        {
            var wh = new AutoResetEvent(false);
            var fsw = new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            };
            fsw.Changed += (s, e) => wh.Set();

            var fileName = Path.Combine(_path, _file);

            var startLine = GetFileStartLine(fileName);

            var lineNumber = 1;
            var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            using (var sr = new StreamReader(fs))
            {
                while (!cancellationToken.IsCancellationRequested && !_isCancelled)
                {
                    var s = sr.ReadLine();
                    if (s != null)
                    {
                        if (lineNumber >= startLine)
                            callbackAction(new LogTailMessage(lineNumber, s));

                        lineNumber++;
                    }
                    else
                    {
                        wh.WaitOne(1000);
                    }
                }
            }
        }

您可以在主题上使用 Buffer

var subject = ReplaySubject<LogTailMessage>();
StartFileWatcher(a => a.ToList().ForEach(ltm => subject.OnNext(ltm)), CancellationToken.None);
bufferedSubject = subject.Buffer (TimeSpan.FromSeconds(5), 10);

您在原始代码中做了很多实际上不需要的工作,并且您正在创建未被清理的一次性用品和事件处理程序。

您真的可以在几个 observable 中完成所有事情。

首先,您需要观察文件中的变化。方法如下:

IObservable<Unit> fileSystemWatcherChanges =
    Observable
        .Using(() =>
            new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            },
            fsw =>
                Observable
                    .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                        h => fsw.Changed += h, h => fsw.Changed -= h)
                    .Select(x => Unit.Default));

现在您需要打开一个流并在每次文件更改时从流中读取:

IObservable<LogTailMessage> messages =
    Observable
        .Using(
            () => new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite),
            fs =>
                Observable
                    .Using(
                        () => new StreamReader(fs),
                        sr =>
                            fileSystemWatcherChanges
                                .StartWith(Unit.Default)
                                .Select(x =>
                                    Observable
                                        .Defer(() => Observable.FromAsync(() => sr.ReadLineAsync()))
                                        .Repeat()
                                        .TakeUntil(w => w == null))
                                .Merge()
                                .Where(w => w != null)))
        .Select((x, n) => new LogTailMessage(n, x));

IObservable<IList<LogTailMessage>> buffered =
    messages
        .Buffer(TimeSpan.FromSeconds(5), 10);

我在我的电脑上对此进行了测试,我相信它会提供您需要的结果。

这是一个完整的 Rx 管道,所以如果您像 IDisposable subscription = buffered.Subscribe(); 一样订阅,然后调用 subscription.Dispose();,那么它会自行清理。

而且它会避开主题。