非阻塞ruby数据处理或方法调用

non-blocking ruby data processing or method call

我有一个 ruby 脚本可以从串口读取数据。 该数据可能是一些表示特定协议数据报的原始二进制字符串(我现在正在尝试 XBee API)。

必须在很长的 运行 方法调用中处理此数据,例如:

  1. 阅读连载
  2. 解析二进制数据报
  3. 解析负载
  4. 转换值(即:来自时间戳、线性回归等的日期)
  5. 转换为JSON
  6. 插入数据库

数据收入频率快于我的处理能力。我需要做这样的事情:

loop do
  begin
    res = @xbee.getresponse
    return_super_fast_and_work_that_in_the_background res
  rescue => e
    puts e #append to some log here or something
  end
end

所以,我能想到的是,我可能需要收集相当数量的这些数据报,然后批量处理所有这些数据报。

但是我无法想象如何实现这样的方法:

#return_super_fast_and_work_in_the_background()

我能找到的所有示例都与非阻塞 IO 或网络任务以及 Eventmachine 有关。

我身边有 redis,在这里可能会派上用场,并且可以在这个脚本之外启动另一个脚本。 (实际上我有一个 sinatra api 连接到数据库,还有一个 pubsub/websocket 等待在两者之间使用以在新数据到来时通知)

如有任何建议,我们将不胜感激!

每次获得更多数据时,您都可以分拆一个线程。

def do_work res
  # parse, transform, insert, etc.
end

def read_loop
  loop do
    begin
      res = @xbee.getresponse
      Thread.new(res, &method(:do_work))
    rescue => e
      # ...
    end
  end
end

如果您的 do_work 方法涉及一些公共资源(日志、数据库、标准输出等),您将需要使用 Mutex 保护该资源以防止不同的线程步进在彼此身上。另请注意,Ruby 并非真正的多线程,因此虽然这将实现您快速返回以获取更多数据的目标,但它实际上不会提供处理加速。