基于超时的 BufferedWriter 刷新

Timeout-based BufferedWriter flush

我正在使用默认大小为 8192 个字符的 BufferedWriter 将行写入本地文件。使用 BufferedReader readLine 方法从套接字输入流中读取行,阻塞 I/O。

平均行长度为 50 个字符。这一切都运行良好且足够快(每秒超过 100 万行)但是如果客户端停止写入,当前存储在 BufferedWriter 缓冲区中的行将不会刷新到磁盘。事实上,在客户端恢复写入或连接关闭之前,缓冲的字符不会被刷新到磁盘。这转化为客户端传输时间线与该线提交到文件的时间之间的延迟,因此长尾延迟会增加。

有没有办法在超时时刷新不完整的 BufferedWriter 缓冲区,例如100 毫秒内?

@copeg 是对的——在每一行之后刷新它。时间段刷新很容易,但是只有一半的记录无法进行有什么意义?

您可以在此处应用 Observer、Manager 和 Factory 模式,并让一个中央 BufferedWriterManager 生成您的 BufferedWriters 并维护一个活动实例列表。内部线程可能会定期唤醒并刷新活动实例。这也可能是弱引用的机会,因此不需要您的消费者显式释放对象。相反,GC 将完成这项工作,而您的管理器只需要处理其内部引用变为 null 的情况(即,当所有强引用被删除时)。

像这样的事情怎么样?这不是 真实的 BufferedWriter,而是 Writer。它的工作原理是定期检查 底层 的最后一个写入器,希望是无缓冲的写入器,然后在 BufferedWriter 超过超时时间时刷新

public class PeriodicFlushingBufferedWriter extends Writer {

  protected final MonitoredWriter monitoredWriter;
  protected final BufferedWriter writer;

  protected final long timeout;
  protected final Thread thread;

  public PeriodicFlushingBufferedWriter(Writer out, long timeout) {
    this(out, 8192, timeout);
  }

  public PeriodicFlushingBufferedWriter(Writer out, int sz, final long timeout) {
    monitoredWriter = new MonitoredWriter(out);
    writer = new BufferedWriter(monitoredWriter, sz);

    this.timeout = timeout;

    thread = new Thread(new Runnable() {
      @Override
      public void run() {
        long deadline = System.currentTimeMillis() + timeout;
        while (!Thread.interrupted()) {
          try {
            Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0));
          } catch (InterruptedException e) {
            return;
          }

          synchronized (PeriodicFlushingBufferedWriter.this) {
            if (Thread.interrupted()) {
              return;
            }

            long lastWrite = monitoredWriter.getLastWrite();

            if (System.currentTimeMillis() - lastWrite >= timeout) {
              try {
                writer.flush();
              } catch (IOException e) {
              }
            }

            deadline = lastWrite + timeout;
          }
        }
      }
    });

    thread.start();
  }

  @Override
  public synchronized void write(char[] cbuf, int off, int len) throws IOException {
    this.writer.write(cbuf, off, len);
  }

  @Override
  public synchronized void flush() throws IOException {
    this.writer.flush();
  }

  @Override
  public synchronized void close() throws IOException {
    try {
      thread.interrupt();
    } finally {
      this.writer.close();
    }
  }

  private static class MonitoredWriter extends FilterWriter {

    protected final AtomicLong lastWrite = new AtomicLong();

    protected MonitoredWriter(Writer out) {
      super(out);
    }

    @Override
    public void write(int c) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(c);
    }

    @Override
    public void write(char[] cbuf, int off, int len) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(cbuf, off, len);
    }

    @Override
    public void write(String str, int off, int len) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(str, off, len);
    }

    @Override
    public void flush() throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.flush();
    }

    public long getLastWrite() {
      return this.lastWrite.get();
    }
  }
}

不要尝试这种复杂的方案,太难了。只需在构造 BufferedWriter. 时指定缓冲区大小即可减少缓冲区大小,直到找到所需的性能和延迟之间的平衡。