基于超时的 BufferedWriter 刷新
Timeout-based BufferedWriter flush
我正在使用默认大小为 8192 个字符的 BufferedWriter
将行写入本地文件。使用 BufferedReader
readLine 方法从套接字输入流中读取行,阻塞 I/O。
平均行长度为 50 个字符。这一切都运行良好且足够快(每秒超过 100 万行)但是如果客户端停止写入,当前存储在 BufferedWriter
缓冲区中的行将不会刷新到磁盘。事实上,在客户端恢复写入或连接关闭之前,缓冲的字符不会被刷新到磁盘。这转化为客户端传输时间线与该线提交到文件的时间之间的延迟,因此长尾延迟会增加。
有没有办法在超时时刷新不完整的 BufferedWriter
缓冲区,例如100 毫秒内?
@copeg 是对的——在每一行之后刷新它。时间段刷新很容易,但是只有一半的记录无法进行有什么意义?
您可以在此处应用 Observer、Manager 和 Factory 模式,并让一个中央 BufferedWriterManager 生成您的 BufferedWriters 并维护一个活动实例列表。内部线程可能会定期唤醒并刷新活动实例。这也可能是弱引用的机会,因此不需要您的消费者显式释放对象。相反,GC 将完成这项工作,而您的管理器只需要处理其内部引用变为 null 的情况(即,当所有强引用被删除时)。
像这样的事情怎么样?这不是 真实的 BufferedWriter
,而是 Writer
。它的工作原理是定期检查 底层 的最后一个写入器,希望是无缓冲的写入器,然后在 BufferedWriter
超过超时时间时刷新
。
public class PeriodicFlushingBufferedWriter extends Writer {
protected final MonitoredWriter monitoredWriter;
protected final BufferedWriter writer;
protected final long timeout;
protected final Thread thread;
public PeriodicFlushingBufferedWriter(Writer out, long timeout) {
this(out, 8192, timeout);
}
public PeriodicFlushingBufferedWriter(Writer out, int sz, final long timeout) {
monitoredWriter = new MonitoredWriter(out);
writer = new BufferedWriter(monitoredWriter, sz);
this.timeout = timeout;
thread = new Thread(new Runnable() {
@Override
public void run() {
long deadline = System.currentTimeMillis() + timeout;
while (!Thread.interrupted()) {
try {
Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0));
} catch (InterruptedException e) {
return;
}
synchronized (PeriodicFlushingBufferedWriter.this) {
if (Thread.interrupted()) {
return;
}
long lastWrite = monitoredWriter.getLastWrite();
if (System.currentTimeMillis() - lastWrite >= timeout) {
try {
writer.flush();
} catch (IOException e) {
}
}
deadline = lastWrite + timeout;
}
}
}
});
thread.start();
}
@Override
public synchronized void write(char[] cbuf, int off, int len) throws IOException {
this.writer.write(cbuf, off, len);
}
@Override
public synchronized void flush() throws IOException {
this.writer.flush();
}
@Override
public synchronized void close() throws IOException {
try {
thread.interrupt();
} finally {
this.writer.close();
}
}
private static class MonitoredWriter extends FilterWriter {
protected final AtomicLong lastWrite = new AtomicLong();
protected MonitoredWriter(Writer out) {
super(out);
}
@Override
public void write(int c) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(c);
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(cbuf, off, len);
}
@Override
public void write(String str, int off, int len) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(str, off, len);
}
@Override
public void flush() throws IOException {
lastWrite.set(System.currentTimeMillis());
super.flush();
}
public long getLastWrite() {
return this.lastWrite.get();
}
}
}
不要尝试这种复杂的方案,太难了。只需在构造 BufferedWriter.
时指定缓冲区大小即可减少缓冲区大小,直到找到所需的性能和延迟之间的平衡。
我正在使用默认大小为 8192 个字符的 BufferedWriter
将行写入本地文件。使用 BufferedReader
readLine 方法从套接字输入流中读取行,阻塞 I/O。
平均行长度为 50 个字符。这一切都运行良好且足够快(每秒超过 100 万行)但是如果客户端停止写入,当前存储在 BufferedWriter
缓冲区中的行将不会刷新到磁盘。事实上,在客户端恢复写入或连接关闭之前,缓冲的字符不会被刷新到磁盘。这转化为客户端传输时间线与该线提交到文件的时间之间的延迟,因此长尾延迟会增加。
有没有办法在超时时刷新不完整的 BufferedWriter
缓冲区,例如100 毫秒内?
@copeg 是对的——在每一行之后刷新它。时间段刷新很容易,但是只有一半的记录无法进行有什么意义?
您可以在此处应用 Observer、Manager 和 Factory 模式,并让一个中央 BufferedWriterManager 生成您的 BufferedWriters 并维护一个活动实例列表。内部线程可能会定期唤醒并刷新活动实例。这也可能是弱引用的机会,因此不需要您的消费者显式释放对象。相反,GC 将完成这项工作,而您的管理器只需要处理其内部引用变为 null 的情况(即,当所有强引用被删除时)。
像这样的事情怎么样?这不是 真实的 BufferedWriter
,而是 Writer
。它的工作原理是定期检查 底层 的最后一个写入器,希望是无缓冲的写入器,然后在 BufferedWriter
超过超时时间时刷新
public class PeriodicFlushingBufferedWriter extends Writer {
protected final MonitoredWriter monitoredWriter;
protected final BufferedWriter writer;
protected final long timeout;
protected final Thread thread;
public PeriodicFlushingBufferedWriter(Writer out, long timeout) {
this(out, 8192, timeout);
}
public PeriodicFlushingBufferedWriter(Writer out, int sz, final long timeout) {
monitoredWriter = new MonitoredWriter(out);
writer = new BufferedWriter(monitoredWriter, sz);
this.timeout = timeout;
thread = new Thread(new Runnable() {
@Override
public void run() {
long deadline = System.currentTimeMillis() + timeout;
while (!Thread.interrupted()) {
try {
Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0));
} catch (InterruptedException e) {
return;
}
synchronized (PeriodicFlushingBufferedWriter.this) {
if (Thread.interrupted()) {
return;
}
long lastWrite = monitoredWriter.getLastWrite();
if (System.currentTimeMillis() - lastWrite >= timeout) {
try {
writer.flush();
} catch (IOException e) {
}
}
deadline = lastWrite + timeout;
}
}
}
});
thread.start();
}
@Override
public synchronized void write(char[] cbuf, int off, int len) throws IOException {
this.writer.write(cbuf, off, len);
}
@Override
public synchronized void flush() throws IOException {
this.writer.flush();
}
@Override
public synchronized void close() throws IOException {
try {
thread.interrupt();
} finally {
this.writer.close();
}
}
private static class MonitoredWriter extends FilterWriter {
protected final AtomicLong lastWrite = new AtomicLong();
protected MonitoredWriter(Writer out) {
super(out);
}
@Override
public void write(int c) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(c);
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(cbuf, off, len);
}
@Override
public void write(String str, int off, int len) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(str, off, len);
}
@Override
public void flush() throws IOException {
lastWrite.set(System.currentTimeMillis());
super.flush();
}
public long getLastWrite() {
return this.lastWrite.get();
}
}
}
不要尝试这种复杂的方案,太难了。只需在构造 BufferedWriter.
时指定缓冲区大小即可减少缓冲区大小,直到找到所需的性能和延迟之间的平衡。