为什么在 Celluloid 中会出现“Celluloid::Condition signaled spuriously”错误?

Why is the error `Celluloid::Condition signaled spuriously` caused in Celluloid?

我正在尝试等待异步代码的条件,这里是代码片段:

condition = Celluloid::Condition.new   
Rails.logger.debug "Sending RPC request to #{subject}"
NATS.start uri: ENV['NATS_SERVER'] do 
  Rails.logger.debug "Connected to #{ENV['NATS_SERVER']}"
  sid = NATS.request(subject,msg) do |response|
    Rails.logger.debug "Assigning response"
    condition.signal response
    NATS.stop
  end
  NATS.timeout(sid, 1) do
    NATS.stop
    condition.signal ASYNC_ERROR
    raise "One second timeout waiting for a NATS RPC reply from #{subject}"
  end
end

result = condition.wait
if result = ASYNC_ERROR
  raise "Error in RPC call"
else
  return result
end

我得到了异常 Celluloid::Condition signaled spuriously,但没有额外的信息,我也不太明白为什么会这样,https://github.com/celluloid/celluloid/wiki/Conditions 也没有提供更多信息。

为什么会这样,我该如何解决?

你的条件不是在演员的背景下运作。请注意,在示例中,使用了一个演员。可以避免使用 actor 上下文,但这是非常不同的,并且是您收到的错误的根源。

如果您不想在 actor 中实现它,如示例中所示,这里是您可以在没有 actor 的情况下实现的方法:

使用Celluloid::Future(即使嵌套的async调用在里面)

nats = Celluloid::Future.new {
    blocker = Queue.new
    Rails.logger.debug "Sending RPC request to #{subject}"
    NATS.start uri: ENV['NATS_SERVER'] do 
        Rails.logger.debug "Connected to #{ENV['NATS_SERVER']}"
        sid = NATS.request(subject,msg) do |response|
            Rails.logger.debug "Assigning response"
            NATS.stop
            blocker << response
        end
        NATS.timeout(sid, 1) do
            NATS.stop
            blocker << nil
            #de "One second timeout waiting for a NATS RPC reply from #{subject}"
        end
    end
    blocker.pop || raise ASYNC_ERROR
}

begin
    result = nats.value
rescue ASYNC_ERROR
    raise "Error in RPC call"
rescue => ex
    #de Other exception
else
    return result
end

以上是实现 async 响应收集的松散示例,带有异常处理。这是许多可能方法中的一个例子。

并使用条件完全按照您提出的问题来回答您的问题。

演员范围内:

class Nats
  include Celluloid

  def initialize
    @condition = Celluloid::Condition.new
  end

  def start
    Rails.logger.debug "Sending RPC request to #{subject}"
    NATS.start uri: ENV['NATS_SERVER'] do 
      Rails.logger.debug "Connected to #{ENV['NATS_SERVER']}"
      sid = NATS.request(subject,msg) do |response|
        Rails.logger.debug "Assigning response"
        @condition.signal response
        NATS.stop
      end
      NATS.timeout(sid, 1) do
        NATS.stop
        @condition.signal ASYNC_ERROR
        raise "One second timeout waiting for a NATS RPC reply from #{subject}"
      end
    end
  end

  def value
    @condition.wait
  end
end

nats = Nats.new
nats.async.start

result = nats.value
if result = ASYNC_ERROR
  raise "Error in RPC call"
else
  return result
end

这甚至没有经过测试,但如果您不打算像我的其他答案那样使用 Future,则应该向您展示基本方法。