重复读取 Ruby IO 直到读取 X 字节,Y 秒过去,或 EOF,以先到者为准

repeatedly read Ruby IO until X bytes have been read, Y seconds have elapsed, or EOF, whichever comes first

我想将日志从 IO 管道转发到 API。理想情况下,不会超过例如10 秒的延迟(因此查看日志的人不会不耐烦)。

实现此目的的一种天真的方法是使用 IO.each_byte 并在每个字节可用时立即将其发送到 API,但每个字节处理请求的开销会导致额外的延迟.

IO#each(limit) 也接近我想要的,但是如果限制是 50 kB 并且在 10 秒后,只读取了 20 kB,我想继续发送那 20 kB 而无需等待更多的。如何同时应用时间和大小限制?

一种天真的方法是使用 IO#each_byte enumerator

人为的、未经测试的示例:

enum = io.each_byte
now = Time.now
res = while Time.now - now < 20 do
  begin
    send_byte enum.next
  rescue e => StopIteration
    # no more data
    break :closed
  end 
end
puts "NO MORE DATA" if res == :closed

这是我最终得到的结果。更简单的解决方案仍然值得赞赏!

def read_chunks(io, byte_interval: 200 * 1024, time_interval: 5)
  buffer = last = nil
  reset = lambda do
    buffer = ''
    last = Time.now
  end
  reset.call
  mutex = Mutex.new
  cv = ConditionVariable.new
  [
    lambda do
      IO.select [io]
      mutex.synchronize do
        begin
          chunk = io.readpartial byte_interval
          buffer.concat chunk
        rescue EOFError
          raise StopIteration
        ensure
          cv.signal
        end
      end
    end,
    lambda do
      mutex.synchronize do
        until io.eof? || Time.now > (last + time_interval) || buffer.length > byte_interval
          cv.wait mutex, time_interval
        end
        unless buffer.empty?
          buffer_io = StringIO.new buffer
          yield buffer_io.read byte_interval until buffer_io.eof?
          reset.call
        end
        raise StopIteration if io.eof?
      end
    end,
  ].map do |function|
    Thread.new { loop { function.call } }
  end.each(&:join)
end