ruby 块内的赛璐珞异步不起作用

Celluloid async inside ruby blocks does not work

尝试在我的工作示例中实施 Celluloid async 似乎表现出奇怪的行为。

这里是我的代码

 class Indefinite
    include Celluloid

      def run!
         loop do 
           [1].each do |i|
             async.on_background
           end
         end
      end 


       def on_background
         puts "Running in background" 
       end
   end

   Indefinite.new.run!

但是当我 运行 上面的代码时,我从来没有看到 puts "运行 in Background"

但是,如果我 sleep 代码似乎可以工作。

class Indefinite
   include Celluloid

    def run! 
      loop do 
        [1].each do |i|
          async.on_background
        end
        sleep 0.5
      end 
    end


   def on_background
     puts "Running in background" 
   end
 end

 Indefinite.new.run!

有什么想法吗?为什么在上述两种情况下会有如此不同。

谢谢。

线程如何与 Celluloid 一起工作

Celluloid 没有为每个异步任务创建新线程。它有一个线程池,它在其中运行每个任务,包括同步任务和异步任务。关键是库将 run! 函数视为同步任务,并在与异步任务相同的上下文中执行它。

默认情况下,Celluloid 在单个线程中运行所有内容,使用队列系统为以后安排异步任务。它仅在需要时创建新线程。

除此之外,Celluloid 覆盖了 sleep 功能。这意味着每次您在扩展 Celluloid class 的 class 中调用 sleep 时,库将检查其池中是否有非休眠线程。 在您的情况下,您第一次调用 sleep 0.5 时,它将创建一个新线程来执行队列中的异步任务,而第一个线程正在休眠。

所以在你的第一个例子中,只有一个 Celluloid 线程是 运行,执行循环。在你的第二个例子中,两个 Celluloid 线程是 运行,第一个执行循环并在每次迭代时休眠,另一个执行后台任务。

例如,您可以更改第一个示例以执行有限次数的迭代:

def run! 
  (0..100).each do
    [1].each do |i|
      async.on_background
    end
  end
  puts "Done!"
end

当使用这个run!函数时,你会看到Done!打印在所有Running in background之前,这意味着Celluloid完成了run!函数的执行在同一线程中启动异步任务之前。

您的主循环控制着 actor/application 的线程。

您的程序所做的只是产生后台进程,但永远不会 运行 启动它们。您在循环中需要 sleep 纯粹是为了让后台线程得到关注。

像这里那样让无条件循环产生无限后台进程通常不是一个好主意。应该有一个延迟,或者在那里放一个条件语句......否则你只会有一个无限循环产生永远不会被调用的东西。

这样想:如果你把 puts "looping" 放在你的循环中,而你没有看到 Running in the background ...你会一遍又一遍地看到 looping .


方法 #1:使用 everyafter 块。

解决此问题的最佳方法是不要在 loop 中使用 sleep,而是使用 afterevery 块,如下所示:

every(0.1) {
    on_background
}

或者最重要的是,如果您想确保进程 运行 在再次 运行 之前完成,请改用 after

def run_method
    @running ||= false
    unless @running
        @running = true
        on_background
        @running = false
    end
    after(0.1) { run_method }
 end

使用 loopasync 不是一个好主意,除非有某种流量控制完成,或者像 @server.accept 这样的阻塞过程......否则它将无缘无故地拉动 100% 的 CPU 核心。

顺便说一下,您也可以使用 now_and_everynow_and_after...这会立即 运行 阻止,然后再次 运行在你想要的时间之后。

此要点中显示了使用 every


我认为的理想情况:

这是一个粗略但立即可用的示例:


require 'celluloid/current'

class Indefinite
  include Celluloid

  INTERVAL = 0.5
  ONE_AT_A_TIME = true

  def self.run!
    puts "000a Instantiating."
    indefinite = new
    indefinite.run
    puts "000b Running forever:"
    sleep
  end

  def initialize
    puts "001a Initializing."
    @mutex = Mutex.new if ONE_AT_A_TIME
    @running = false
    puts "001b Interval: #{INTERVAL}"
  end

  def run
    puts "002a Running."
    unless ONE_AT_A_TIME && @running
      if ONE_AT_A_TIME
        @mutex.synchronize {
          puts "002b Inside lock."
          @running = true
          on_background
          @running = false
        }
      else
        puts "002b Without lock."
        on_background
      end
    end
    puts "002c Setting new timer."
    after(INTERVAL) { run }
  end


  def on_background
    if ONE_AT_A_TIME
      puts "003 Running background processor in foreground."
    else
      puts "003 Running in background"
    end
  end
end

Indefinite.run!
puts "004 End of application."

这将是它的输出,如果 ONE_AT_A_TIMEtrue:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.

如果 ONE_AT_A_TIMEfalse:

,这将是它的输出
000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.

您需要 "evented" 多于 "threaded" 才能正确发布任务并保持范围和状态,而不是在 threads/actors 之间发布命令...这就是 everyafter 块提供。除此之外,无论哪种方式都是很好的做法,即使您没有要处理的 Global Interpreter Lock 也是如此,因为在您的示例中,您似乎没有在处理阻塞过程。如果你有一个阻塞过程,那么一定会有一个无限循环。但是由于您最终会在处理一个任务之前产生无限数量的后台任务,因此您需要使用 sleep 就像您的问题开始时一样,或者完全使用不同的策略,并使用 everyafter 这就是 Celluloid 本身鼓励您在处理任何类型的套接字上的数据时进行操作的方式。


方法 #2:使用递归方法调用。

这刚刚出现在 Google 组中。下面的示例代码实际上将允许执行其他任务,即使它是一个无限循环。

这种方法不太理想,因为它可能会产生更多的开销,产生一系列纤程。

def work
    # ...
    async.work
end

问题 #2:ThreadFiber 行为。

第二个问题是为什么以下会起作用:loop { Thread.new { puts "Hello" } }

这会产生无限数量的进程线程,这些线程由 RVM 直接管理。即使您正在使用的 RVM 中有一个 Global Interpreter Lock... 这仅意味着没有使用 green threads,这是由操作系统本身提供的...而是处理这些由过程本身。进程 运行 的 CPU 调度程序每个 Thread 本身,毫不犹豫。在这个例子中,Thread 运行 很快就死了。

async任务相比,使用了Fiber。所以发生的事情是这样的,在默认情况下:

  1. 进程开始。
  2. Actor 实例化。
  3. 方法调用调用循环。
  4. 循环调用 async 方法。
  5. async方法添加任务到邮箱。
  6. 邮箱未被调用,循环继续。
  7. 另一个 async 任务已添加到邮箱。
  8. 这将无限继续下去。

以上是因为循环方法本身是一个 Fiber 调用,它永远不会被挂起(除非调用 sleep!)因此添加到邮箱的附加任务是从不调用新的 FiberFiber 的行为不同于 Thread。这是一个很好的参考 material 讨论差异:


问题 #3:CelluloidCelluloid::ZMQ 行为。

第三个问题是为什么 include Celluloid 的行为不同于 Celluloid::ZMQ ...

这是因为 Celluloid::ZMQ 使用基于反应堆的事件邮箱,而 Celluloid 使用基于条件变量的邮箱。

阅读更多关于流水线和执行模式的信息:

这就是两个例子的区别。如果您对这些邮箱的行为方式有其他疑问,请随时在 Google Group 上 post ...您面临的主要动态是 GILFiber vs. Thread vs. Reactor 行为。

您可以在此处阅读有关反应器模式的更多信息:

并在此处查看 Celluloid::ZMQ 使用的具体反应器:

所以在事件邮箱场景中发生的事情是,当 sleep 被命中时,这是一个阻塞调用,导致反应器移动到邮箱中的下一个任务。

而且,这对您的情况来说是独一无二的,Celluloid::ZMQ 使用的特定反应器正在使用一个永恒的 C 库……特别是 0MQ 库。该反应器在您的应用程序外部,其行为与 Celluloid::IOCelluloid 本身不同,这也是行为发生与您预期不同的原因。

多核支持选择

如果维护状态和范围对您来说不重要,如果您使用不限于一个操作系统线程的 jRubyRubinius,而不是使用具有Global Interpreter Lock,您可以实例化多个 actor 并在 actor 之间同时发出 async 调用。

但我的拙见是,使用频率非常高的计时器会更好,例如我示例中的 0.0010.1,这对于所有意图和目的来说似乎都是瞬时的,但也允许 actor 线程有足够的时间来切换光纤和 运行 邮箱中的其他任务。

让我们做一个实验,稍微修改一下你的例子(我们修改它是因为这样我们得到相同的 "weird" 行为,同时让事情更清楚):

class Indefinite
  include Celluloid

  def run!
    (1..100).each do |i|
      async.on_background i
    end
    puts "100 requests sent from #{Actor.current.object_id}"
  end 

  def on_background(num)
    (1..100000000).each {}
    puts "message #{num} on #{Actor.current.object_id}" 
  end
end

Indefinite.new.run!
sleep

# =>
# 100 requests sent from 2084
# message 1 on 2084
# message 2 on 2084
# message 3 on 2084
# ...

你可以在任何Ruby解释器上运行它,使用CelluloidCelluloid::ZMQ,结果总是相同。另请注意,Actor.current.object_id 的输出在两种方法中都是相同的,这给了我们线索,即我们在实验中处理的是单个参与者。

所以ruby和Celluloid的实现没有太大区别,只要是这个实验就可以了。

让我们首先解决 为什么 这段代码的行为如此?

不难理解为什么会这样。 Celluloid 正在接收传入的请求并将它们保存在任务队列中以供适当的参与者使用。请注意,我们最初对 run! 的调用在队列的顶部。

Celluloid 然后处理这些任务,一次一个。如果恰好有阻塞调用或sleep调用,则根据documentation,将调用下一个任务,而不是等待当前任务完成。

请注意,在我们的实验中没有阻塞调用。这意味着,run! 方法将从头到尾执行,只有在完成后,每个 on_background 调用才会以完美的顺序调用。

它应该是这样工作的。

如果您在代码中添加 sleep 调用,它会通知 Celluloid,它应该开始处理队列中的下一个任务。因此,您在第二个示例中的行为。

现在让我们继续如何设计系统的部分,使其不依赖于sleep调用,这至少很奇怪。

实际上 Celluloid-ZMQ project 页面上有一个很好的例子。注意这个循环:

def run
  loop { async.handle_message @socket.read }
end

它做的第一件事是@socket.read。请注意,这是一个阻塞操作。因此,Celluloid 将处理队列中的下一条消息(如果有的话)。 @socket.read一响应,就会生成一个新的任务。但是在再次调用 @socket.read 之前不会执行此任务,从而阻止执行,并通知 Celluloid 处理队列中的下一个项目。

您可能会看到示例的不同之处。您没有阻止任何东西,因此没有给 Celluloid 处理队列的机会。

我们如何获得 Celluloid::ZMQ 示例中给出的行为?

第一个(在我看来,更好的)解决方案是进行实际的阻塞调用,例如 @socket.read

如果你的代码中没有阻塞调用,你仍然需要在后台处理事情,那么你应该考虑 Celluloid 提供的其他机制。

Celluloid 有多种选择。 可以使用 conditions, futures, notifications,或者只是在低级别调用 wait/signal,如本例所示:

class Indefinite
  include Celluloid

  def run!
    loop do
      async.on_background
      result = wait(:background) #=> 33
    end
  end 

  def on_background
    puts "background" 

    # notifies waiters, that they can continue
    signal(:background, 33)
  end
end

Indefinite.new.run!
sleep

# ...
# background
# background
# background
# ...

使用 sleep(0)Celluloid::ZMQ

我还注意到 working.rb 您在评论中提到的文件。它包含以下循环:

loop { [1].each { |i|  async.handle_message 'hello' } ; sleep(0) }

看起来它正在做正确的工作。实际上,运行在 jRuby 下显示它正在泄漏内存。为了让它更明显,尝试在 handle_message 正文中添加睡眠调用:

def handle_message(message)
  sleep 0.5
  puts "got message: #{message}"
end

高内存使用率可能与队列填充速度非常快,无法在给定时间内处理有关。会比较麻烦,如果handle_message比较费力,那就现在

sleep

的解决方案

我对 sleep 的解决方案持怀疑态度。它们可能需要大量内存,甚至会产生内存泄漏。并且不清楚应该将什么作为参数传递给 sleep 方法以及为什么。