在 concurrent-ruby 线程池中处理异常
Handle exceptions in concurrent-ruby thread pool
如何处理并发-ruby线程池(http://ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html)中的异常?
示例:
pool = Concurrent::FixedThreadPool.new(5)
pool.post do
raise 'something goes wrong'
end
# how to rescue this exception here
更新:
这是我的代码的简化版本:
def process
pool = Concurrent::FixedThreadPool.new(5)
products.each do |product|
new_product = generate_new_product
pool.post do
store_in_db(new_product) # here exception is raised, e.g. connection to db failed
end
end
pool.shutdown
pool.wait_for_terminaton
end
所以我想要实现的是在出现任何异常时停止处理(中断循环)。
这个异常也在更高级别的应用程序中被拯救并且执行了一些清理工作(比如将模型状态设置为失败并发送一些通知)。
可能有更好的方法,但这确实有效。您需要更改 wait_for_pool_to_finish
内的错误处理。
def process
pool = Concurrent::FixedThreadPool.new(10)
errors = Concurrent::Array.new
10_000.times do
pool.post do
begin
# do the work
rescue StandardError => e
errors << e
end
end
end
wait_for_pool_to_finish(pool, errors)
end
private
def wait_for_pool_to_finish(pool, errors)
pool.shutdown
until pool.shutdown?
if errors.any?
pool.kill
fail errors.first
end
sleep 1
end
pool.wait_for_termination
end
以下回答来自jdantonio 来自这里https://github.com/ruby-concurrency/concurrent-ruby/issues/616
”
大多数应用程序不应该直接使用线程池。线程池是一种低级抽象,供内部使用。该库中的所有高级抽象(Promise、Actor 等)都 post 作业到全局线程池,并且都提供异常处理。只需选择最适合您的用例的抽象并使用它。
如果您觉得需要配置自己的线程池而不是使用全局线程池,您仍然可以使用高级抽象。它们都支持 :executor 选项,允许您注入自定义线程池。然后,您可以使用高级抽象提供的异常处理。
如果您绝对坚持 post 将作业直接发送到线程池而不是使用我们的高级抽象(我强烈反对),那么只需创建一个作业包装器。您可以在我们所有的高级抽象、Rails ActiveJob、Sucker Punch 和其他使用我们的线程池的库中找到作业包装器的示例。
那么用 Promises 实现怎么样?
http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html
在您的情况下,它看起来像这样:
promises = []
products.each do |product|
new_product = generate_new_prodcut
promises << Concurrent::Promise.execute do
store_in_db(new_product)
end
end
# .value will wait for the Thread to finish.
# The ! means, that all exceptions will be propagated to the main thread
# .zip will make one Promise which contains all other promises.
Concurrent::Promise.zip(*promises).value!
我创建了一个问题 #634。并发线程池可以毫无问题地支持abortable worker。
require "concurrent"
Concurrent::RubyThreadPoolExecutor.class_eval do
# Inspired by "ns_kill_execution".
def ns_abort_execution aborted_worker
@pool.each do |worker|
next if worker == aborted_worker
worker.kill
end
@pool = [aborted_worker]
@ready.clear
stopped_event.set
nil
end
def abort_worker worker
synchronize do
ns_abort_execution worker
end
nil
end
def join
shutdown
# We should wait for stopped event.
# We couldn't use timeout.
stopped_event.wait nil
@pool.each do |aborted_worker|
# Rubinius could receive an error from aborted thread's "join" only.
# MRI Ruby doesn't care about "join".
# It will receive error anyway.
# We can "raise" error in aborted thread and than "join" it from this thread.
# We can "join" aborted thread from this thread and than "raise" error in aborted thread.
# The order of "raise" and "join" is not important. We will receive target error anyway.
aborted_worker.join
end
@pool.clear
nil
end
class AbortableWorker < self.const_get :Worker
def initialize pool
super
@thread.abort_on_exception = true
end
def run_task pool, task, args
begin
task.call *args
rescue StandardError => error
pool.abort_worker self
raise error
end
pool.worker_task_completed
nil
end
def join
@thread.join
nil
end
end
self.send :remove_const, :Worker
self.const_set :Worker, AbortableWorker
end
class MyError < StandardError; end
pool = Concurrent::FixedThreadPool.new 5
begin
pool.post do
sleep 1
puts "we shouldn't receive this message"
end
pool.post do
puts "raising my error"
raise MyError
end
pool.join
rescue MyError => error
puts "received my error, trace: \n#{error.backtrace.join("\n")}"
end
sleep 2
输出:
raising my error
received my error, trace:
...
此补丁适用于任何版本的 MRI Ruby 和 Rubinius。 JRuby 不工作,我不在乎。如果你想支持 JRuby executor,请打补丁。应该很简单。
如何处理并发-ruby线程池(http://ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html)中的异常?
示例:
pool = Concurrent::FixedThreadPool.new(5)
pool.post do
raise 'something goes wrong'
end
# how to rescue this exception here
更新:
这是我的代码的简化版本:
def process
pool = Concurrent::FixedThreadPool.new(5)
products.each do |product|
new_product = generate_new_product
pool.post do
store_in_db(new_product) # here exception is raised, e.g. connection to db failed
end
end
pool.shutdown
pool.wait_for_terminaton
end
所以我想要实现的是在出现任何异常时停止处理(中断循环)。
这个异常也在更高级别的应用程序中被拯救并且执行了一些清理工作(比如将模型状态设置为失败并发送一些通知)。
可能有更好的方法,但这确实有效。您需要更改 wait_for_pool_to_finish
内的错误处理。
def process
pool = Concurrent::FixedThreadPool.new(10)
errors = Concurrent::Array.new
10_000.times do
pool.post do
begin
# do the work
rescue StandardError => e
errors << e
end
end
end
wait_for_pool_to_finish(pool, errors)
end
private
def wait_for_pool_to_finish(pool, errors)
pool.shutdown
until pool.shutdown?
if errors.any?
pool.kill
fail errors.first
end
sleep 1
end
pool.wait_for_termination
end
以下回答来自jdantonio 来自这里https://github.com/ruby-concurrency/concurrent-ruby/issues/616
” 大多数应用程序不应该直接使用线程池。线程池是一种低级抽象,供内部使用。该库中的所有高级抽象(Promise、Actor 等)都 post 作业到全局线程池,并且都提供异常处理。只需选择最适合您的用例的抽象并使用它。
如果您觉得需要配置自己的线程池而不是使用全局线程池,您仍然可以使用高级抽象。它们都支持 :executor 选项,允许您注入自定义线程池。然后,您可以使用高级抽象提供的异常处理。
如果您绝对坚持 post 将作业直接发送到线程池而不是使用我们的高级抽象(我强烈反对),那么只需创建一个作业包装器。您可以在我们所有的高级抽象、Rails ActiveJob、Sucker Punch 和其他使用我们的线程池的库中找到作业包装器的示例。
那么用 Promises 实现怎么样? http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html 在您的情况下,它看起来像这样:
promises = []
products.each do |product|
new_product = generate_new_prodcut
promises << Concurrent::Promise.execute do
store_in_db(new_product)
end
end
# .value will wait for the Thread to finish.
# The ! means, that all exceptions will be propagated to the main thread
# .zip will make one Promise which contains all other promises.
Concurrent::Promise.zip(*promises).value!
我创建了一个问题 #634。并发线程池可以毫无问题地支持abortable worker。
require "concurrent"
Concurrent::RubyThreadPoolExecutor.class_eval do
# Inspired by "ns_kill_execution".
def ns_abort_execution aborted_worker
@pool.each do |worker|
next if worker == aborted_worker
worker.kill
end
@pool = [aborted_worker]
@ready.clear
stopped_event.set
nil
end
def abort_worker worker
synchronize do
ns_abort_execution worker
end
nil
end
def join
shutdown
# We should wait for stopped event.
# We couldn't use timeout.
stopped_event.wait nil
@pool.each do |aborted_worker|
# Rubinius could receive an error from aborted thread's "join" only.
# MRI Ruby doesn't care about "join".
# It will receive error anyway.
# We can "raise" error in aborted thread and than "join" it from this thread.
# We can "join" aborted thread from this thread and than "raise" error in aborted thread.
# The order of "raise" and "join" is not important. We will receive target error anyway.
aborted_worker.join
end
@pool.clear
nil
end
class AbortableWorker < self.const_get :Worker
def initialize pool
super
@thread.abort_on_exception = true
end
def run_task pool, task, args
begin
task.call *args
rescue StandardError => error
pool.abort_worker self
raise error
end
pool.worker_task_completed
nil
end
def join
@thread.join
nil
end
end
self.send :remove_const, :Worker
self.const_set :Worker, AbortableWorker
end
class MyError < StandardError; end
pool = Concurrent::FixedThreadPool.new 5
begin
pool.post do
sleep 1
puts "we shouldn't receive this message"
end
pool.post do
puts "raising my error"
raise MyError
end
pool.join
rescue MyError => error
puts "received my error, trace: \n#{error.backtrace.join("\n")}"
end
sleep 2
输出:
raising my error
received my error, trace:
...
此补丁适用于任何版本的 MRI Ruby 和 Rubinius。 JRuby 不工作,我不在乎。如果你想支持 JRuby executor,请打补丁。应该很简单。