如何捕获线程中的错误,然后在所有线程完成后重新抛出该错误?

How do I catch an error from a thread and then re-throw that error when all the threads have completed?

我正在使用 Rails 5。我有这个 gem 用于管理线程 ...

gem 'concurrent-ruby'

我注意到,如果我的一个线程抛出错误,它就会被吞没,而我永远不会发现它。我在控制台中试过这个

pool = Concurrent::FixedThreadPool.new(1)
  # => #<Concurrent::FixedThreadPool:0x007fe3585ab368 @__lock__=#<Thread::Mutex:0x007fe3585ab0c0>, @__condition__=#<Thread::ConditionVariable:0x007fe3585ab098>, @min_length=1, @max_length=1, @idletime=60, @max_queue=0, @fallback_policy=:abort, @auto_terminate=true, @pool=[], @ready=[], @queue=[], @scheduled_task_count=0, @completed_task_count=0, @largest_length=0, @ruby_pid=23932, @gc_interval=30, @next_gc_time=252232.13299, @StopEvent=#<Concurrent::Event:0x007fe3585aaf30 @__lock__=#<Thread::Mutex:0x007fe3585aaeb8>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aae90>, @set=false, @iteration=0>, @StoppedEvent=#<Concurrent::Event:0x007fe3585aadc8 @__lock__=#<Thread::Mutex:0x007fe3585aad78>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aad50>, @set=false, @iteration=0>> 
nums.each do |num|
  pool.post do
    if num == 1
      asdfasdf
    end
  end
end
  # => [1, 2, 3] 
pool.shutdown             # => true 
pool.wait_for_termination # => true 

我想知道,如果我的池中的一个线程抛出错误,我可以在所有线程完成后抛出异常,从而停止我的程序。如果 none 个线程抛出错误,那么我可以继续发生的任何事情。

在上面,你会注意到我故意造成一个应该导致错误的条件,但我从来没有发现它,因为我猜线程池正在吞噬异常的输出。

如果您需要built-in 异常处理,您应该使用higher-level 抽象而不是直接使用线程池。参考这个comment from the author of concurrent-ruby:

Most applications should not use thread pools directly. Thread pools are a low-level abstraction meant for internal use. All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.

If you feel the need to configure your own thread pool rather than use the global thread pool, you can still use the high-level abstractions. They all support an :executor option which allows you to inject your custom thread pool. You can then use the exception handling provided by the high-level abstraction.

这是使用 Promise 抽象的示例的变体。一旦线程池引发异常,这将 re-raise 异常:

require 'concurrent'
pool = Concurrent::FixedThreadPool.new(1)
promises = (1..10).map do |num|
  Concurrent::Promise.execute(executor: pool) do
    if num == 1
      asdfasdf
    else
      num
    end
  end
end
promises.map(&:value!)

# NameError: undefined local variable or method `asdfasdf' for main:Object
#     from (irb):57:in `block (2 levels) in irb_binding'
#     [...]

到re-raise一个异常只有在所有线程都完成之后(不是在第一个异常时立即),你可以用[=16替换promises.map(&:value!) =].

要在没有 re-raising 的情况下将异常存储在收集结果中,您可以执行类似 promises.map { |p| p.value || p.reason }:

的操作
# => [#<NameError: undefined local variable or method `asdfasdf' for main:Object>, 2, 3, 4, 5, 6, 7, 8, 9, 10]

最后请注意,只有1个线程的固定线程池将在单个线程上顺序执行所有任务。要并行执行它们(在具有 10 个线程的池上),请将 thread-pool 初始化程序更改为 pool = Concurrent::FixedThreadPool.new(10).

回答你的问题 - 没有实际的方法作为库 explicitly silences exceptions 并且没有配置。

一种可能的解决方法是手动捕获异常:

error = nil
pool = Concurrent::FixedThreadPool.new(1)
numbers.each do |number|
  pool.post do
    begin
      some_dangerous_action(number)
    rescue Exception => e
      error = e
      raise # still let the gem do its thing
    end
  end
end

pool.shutdown
pool.wait_for_termination

raise error if error