重复读取 Ruby IO 直到读取 X 字节,Y 秒过去,或 EOF,以先到者为准
repeatedly read Ruby IO until X bytes have been read, Y seconds have elapsed, or EOF, whichever comes first
我想将日志从 IO 管道转发到 API。理想情况下,不会超过例如10 秒的延迟(因此查看日志的人不会不耐烦)。
实现此目的的一种天真的方法是使用 IO.each_byte
并在每个字节可用时立即将其发送到 API,但每个字节处理请求的开销会导致额外的延迟.
IO#each(limit)
也接近我想要的,但是如果限制是 50 kB 并且在 10 秒后,只读取了 20 kB,我想继续发送那 20 kB 而无需等待更多的。如何同时应用时间和大小限制?
一种天真的方法是使用 IO#each_byte
enumerator。
人为的、未经测试的示例:
enum = io.each_byte
now = Time.now
res = while Time.now - now < 20 do
begin
send_byte enum.next
rescue e => StopIteration
# no more data
break :closed
end
end
puts "NO MORE DATA" if res == :closed
这是我最终得到的结果。更简单的解决方案仍然值得赞赏!
def read_chunks(io, byte_interval: 200 * 1024, time_interval: 5)
buffer = last = nil
reset = lambda do
buffer = ''
last = Time.now
end
reset.call
mutex = Mutex.new
cv = ConditionVariable.new
[
lambda do
IO.select [io]
mutex.synchronize do
begin
chunk = io.readpartial byte_interval
buffer.concat chunk
rescue EOFError
raise StopIteration
ensure
cv.signal
end
end
end,
lambda do
mutex.synchronize do
until io.eof? || Time.now > (last + time_interval) || buffer.length > byte_interval
cv.wait mutex, time_interval
end
unless buffer.empty?
buffer_io = StringIO.new buffer
yield buffer_io.read byte_interval until buffer_io.eof?
reset.call
end
raise StopIteration if io.eof?
end
end,
].map do |function|
Thread.new { loop { function.call } }
end.each(&:join)
end
我想将日志从 IO 管道转发到 API。理想情况下,不会超过例如10 秒的延迟(因此查看日志的人不会不耐烦)。
实现此目的的一种天真的方法是使用 IO.each_byte
并在每个字节可用时立即将其发送到 API,但每个字节处理请求的开销会导致额外的延迟.
IO#each(limit)
也接近我想要的,但是如果限制是 50 kB 并且在 10 秒后,只读取了 20 kB,我想继续发送那 20 kB 而无需等待更多的。如何同时应用时间和大小限制?
一种天真的方法是使用 IO#each_byte
enumerator。
人为的、未经测试的示例:
enum = io.each_byte
now = Time.now
res = while Time.now - now < 20 do
begin
send_byte enum.next
rescue e => StopIteration
# no more data
break :closed
end
end
puts "NO MORE DATA" if res == :closed
这是我最终得到的结果。更简单的解决方案仍然值得赞赏!
def read_chunks(io, byte_interval: 200 * 1024, time_interval: 5)
buffer = last = nil
reset = lambda do
buffer = ''
last = Time.now
end
reset.call
mutex = Mutex.new
cv = ConditionVariable.new
[
lambda do
IO.select [io]
mutex.synchronize do
begin
chunk = io.readpartial byte_interval
buffer.concat chunk
rescue EOFError
raise StopIteration
ensure
cv.signal
end
end
end,
lambda do
mutex.synchronize do
until io.eof? || Time.now > (last + time_interval) || buffer.length > byte_interval
cv.wait mutex, time_interval
end
unless buffer.empty?
buffer_io = StringIO.new buffer
yield buffer_io.read byte_interval until buffer_io.eof?
reset.call
end
raise StopIteration if io.eof?
end
end,
].map do |function|
Thread.new { loop { function.call } }
end.each(&:join)
end