批处理文件流 reader 以发送到 RX
Batching up a filestream reader to send to RX
我想将文件的内容逐行加载到流中,然后坐在那里每秒观察文件中的新条目 - 所以一个实时文件 reader 将输出通过管道传输到接收.
我通过一次读取一行来实现这一点,如果 readline() 上有数据,则通过传入的操作回调,在调用者中,将数据放在 RX 订阅者的 ReplaySubject 上.
问题是在 RX 流上一次只发回一行。我想将它们分批处理,这样它就不会回电,直到您说要发回 10 件物品,或者过了一定时间 - 例如 5-10 秒。
我的回调是一个数据集合,现在我已经将它硬编码为集合中的 return 单个项目,因为我不知道如何进行基于时间的批处理。
任何人都可以建议如何实现这一目标吗?
到目前为止我的代码
public void StartFileWatcher(Action<LogTailMessage[]> callbackAction, CancellationToken cancellationToken)
{
var wh = new AutoResetEvent(false);
var fsw = new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
};
fsw.Changed += (s, e) => wh.Set();
var lineNumber = 1;
var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using (var sr = new StreamReader(fs))
{
while (!cancellationToken.IsCancellationRequested && !_isCancelled)
{
var s = sr.ReadLine();
if (s != null)
{
//todo - batch these up so we only call back once we have 10 items, or if a certain amount of time has passed, send what we have
callbackAction(new [] {new LogTailMessage(lineNumber, s)});
lineNumber++;
}
else
wh.WaitOne(1000);
}
}
}
更新:缓冲液
var watcherSubject = new ReplaySubject<LogTailMessage>();
var watcher = new LogFileWatcher(path, filename);
new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 20)
.Where(d => d != null)
.Replay()
.RefCount();
和文件观察者
public void StartFileWatcher(Action<LogTailMessage> callbackAction, CancellationToken cancellationToken)
{
var wh = new AutoResetEvent(false);
var fsw = new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
};
fsw.Changed += (s, e) => wh.Set();
var fileName = Path.Combine(_path, _file);
var startLine = GetFileStartLine(fileName);
var lineNumber = 1;
var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using (var sr = new StreamReader(fs))
{
while (!cancellationToken.IsCancellationRequested && !_isCancelled)
{
var s = sr.ReadLine();
if (s != null)
{
if (lineNumber >= startLine)
callbackAction(new LogTailMessage(lineNumber, s));
lineNumber++;
}
else
{
wh.WaitOne(1000);
}
}
}
}
您可以在主题上使用 Buffer
:
var subject = ReplaySubject<LogTailMessage>();
StartFileWatcher(a => a.ToList().ForEach(ltm => subject.OnNext(ltm)), CancellationToken.None);
bufferedSubject = subject.Buffer (TimeSpan.FromSeconds(5), 10);
您在原始代码中做了很多实际上不需要的工作,并且您正在创建未被清理的一次性用品和事件处理程序。
您真的可以在几个 observable 中完成所有事情。
首先,您需要观察文件中的变化。方法如下:
IObservable<Unit> fileSystemWatcherChanges =
Observable
.Using(() =>
new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
},
fsw =>
Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h, h => fsw.Changed -= h)
.Select(x => Unit.Default));
现在您需要打开一个流并在每次文件更改时从流中读取:
IObservable<LogTailMessage> messages =
Observable
.Using(
() => new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite),
fs =>
Observable
.Using(
() => new StreamReader(fs),
sr =>
fileSystemWatcherChanges
.StartWith(Unit.Default)
.Select(x =>
Observable
.Defer(() => Observable.FromAsync(() => sr.ReadLineAsync()))
.Repeat()
.TakeUntil(w => w == null))
.Merge()
.Where(w => w != null)))
.Select((x, n) => new LogTailMessage(n, x));
IObservable<IList<LogTailMessage>> buffered =
messages
.Buffer(TimeSpan.FromSeconds(5), 10);
我在我的电脑上对此进行了测试,我相信它会提供您需要的结果。
这是一个完整的 Rx 管道,所以如果您像 IDisposable subscription = buffered.Subscribe();
一样订阅,然后调用 subscription.Dispose();
,那么它会自行清理。
而且它会避开主题。
我想将文件的内容逐行加载到流中,然后坐在那里每秒观察文件中的新条目 - 所以一个实时文件 reader 将输出通过管道传输到接收.
我通过一次读取一行来实现这一点,如果 readline() 上有数据,则通过传入的操作回调,在调用者中,将数据放在 RX 订阅者的 ReplaySubject 上.
问题是在 RX 流上一次只发回一行。我想将它们分批处理,这样它就不会回电,直到您说要发回 10 件物品,或者过了一定时间 - 例如 5-10 秒。
我的回调是一个数据集合,现在我已经将它硬编码为集合中的 return 单个项目,因为我不知道如何进行基于时间的批处理。
任何人都可以建议如何实现这一目标吗?
到目前为止我的代码
public void StartFileWatcher(Action<LogTailMessage[]> callbackAction, CancellationToken cancellationToken)
{
var wh = new AutoResetEvent(false);
var fsw = new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
};
fsw.Changed += (s, e) => wh.Set();
var lineNumber = 1;
var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using (var sr = new StreamReader(fs))
{
while (!cancellationToken.IsCancellationRequested && !_isCancelled)
{
var s = sr.ReadLine();
if (s != null)
{
//todo - batch these up so we only call back once we have 10 items, or if a certain amount of time has passed, send what we have
callbackAction(new [] {new LogTailMessage(lineNumber, s)});
lineNumber++;
}
else
wh.WaitOne(1000);
}
}
}
更新:缓冲液
var watcherSubject = new ReplaySubject<LogTailMessage>();
var watcher = new LogFileWatcher(path, filename);
new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 20)
.Where(d => d != null)
.Replay()
.RefCount();
和文件观察者
public void StartFileWatcher(Action<LogTailMessage> callbackAction, CancellationToken cancellationToken)
{
var wh = new AutoResetEvent(false);
var fsw = new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
};
fsw.Changed += (s, e) => wh.Set();
var fileName = Path.Combine(_path, _file);
var startLine = GetFileStartLine(fileName);
var lineNumber = 1;
var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using (var sr = new StreamReader(fs))
{
while (!cancellationToken.IsCancellationRequested && !_isCancelled)
{
var s = sr.ReadLine();
if (s != null)
{
if (lineNumber >= startLine)
callbackAction(new LogTailMessage(lineNumber, s));
lineNumber++;
}
else
{
wh.WaitOne(1000);
}
}
}
}
您可以在主题上使用 Buffer
:
var subject = ReplaySubject<LogTailMessage>();
StartFileWatcher(a => a.ToList().ForEach(ltm => subject.OnNext(ltm)), CancellationToken.None);
bufferedSubject = subject.Buffer (TimeSpan.FromSeconds(5), 10);
您在原始代码中做了很多实际上不需要的工作,并且您正在创建未被清理的一次性用品和事件处理程序。
您真的可以在几个 observable 中完成所有事情。
首先,您需要观察文件中的变化。方法如下:
IObservable<Unit> fileSystemWatcherChanges =
Observable
.Using(() =>
new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
},
fsw =>
Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h, h => fsw.Changed -= h)
.Select(x => Unit.Default));
现在您需要打开一个流并在每次文件更改时从流中读取:
IObservable<LogTailMessage> messages =
Observable
.Using(
() => new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite),
fs =>
Observable
.Using(
() => new StreamReader(fs),
sr =>
fileSystemWatcherChanges
.StartWith(Unit.Default)
.Select(x =>
Observable
.Defer(() => Observable.FromAsync(() => sr.ReadLineAsync()))
.Repeat()
.TakeUntil(w => w == null))
.Merge()
.Where(w => w != null)))
.Select((x, n) => new LogTailMessage(n, x));
IObservable<IList<LogTailMessage>> buffered =
messages
.Buffer(TimeSpan.FromSeconds(5), 10);
我在我的电脑上对此进行了测试,我相信它会提供您需要的结果。
这是一个完整的 Rx 管道,所以如果您像 IDisposable subscription = buffered.Subscribe();
一样订阅,然后调用 subscription.Dispose();
,那么它会自行清理。
而且它会避开主题。