学习 Ruby 线程 - 线程完成时触发事件
Learning Ruby threading - trigger an event when thread finishes
我是多线程的新手,我正在寻找一些帮助来理解线程完成时执行某些操作的惯用方式,例如更新进度条。在下面的例子中,我有几个项目列表和例程来对每个项目进行一些“解析”。我计划为每个列表设置一个进度条,因此我希望能够让每个列表的解析例程更新已完成项目的百分比。我看到的唯一“触发”点是在项的 sleepy 方法(正在线程化的方法)末尾的 puts 语句。捕获完成的普遍接受的策略是什么,特别是当操作范围超出线程中的方法 运行 时?
谢谢!
# frozen_string_literal: true
require 'concurrent'
$stdout.sync = true
class TheList
attr_reader :items
def initialize(list_id, n_items)
@id = list_id
@items = []
n_items.times { |n| @items << Item.new(@id, n) }
end
def parse_list(pool)
@items.each do |item|
pool.post { item.sleepy(rand(3..8)) }
end
end
end
class Item
attr_reader :id
def initialize (list_id, item_id)
@id = item_id
@list_id = list_id
end
def sleepy(seconds)
sleep(seconds)
# This puts statement signifies the end of the method threaded
puts "List ID: #{@list_id} item ID:#{@id} slept for #{seconds} seconds"
end
end
lists = []
5.times do |i|
lists << TheList.new(i, rand(5..10))
end
pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count)
lists.each do |list|
list.parse_list(pool)
end
pool.shutdown
pool.wait_for_termination
问题不在于“知道线程何时结束”,而是如何在没有竞争条件的情况下更新共享进度条。
为了解释这个问题:假设你有一个中心变量 ThreadList#progress_var
,并且作为每个线程的最后一行,你用 +=
递增了它。这会引入竞争条件,因为两个线程可以同时执行操作(并且可能会覆盖彼此的结果)。
为了解决这个问题,典型的方法是使用 Mutex 如果您正在学习多线程,这是理解的基本概念。
实际实现起来并不难:
require 'mutex'
class ThreadList
def initialize
@semaphore = Mutex.new
@progress_bar = 0
end
def increment_progress_bar(amount)
@semaphore.synchronize do
@progress_bar += amount
end
end
end
由于那个 @semaphore.synchronize
块,您现在可以安全地从线程调用此 increment_progress_bar
方法,而没有竞争条件的风险。
我是多线程的新手,我正在寻找一些帮助来理解线程完成时执行某些操作的惯用方式,例如更新进度条。在下面的例子中,我有几个项目列表和例程来对每个项目进行一些“解析”。我计划为每个列表设置一个进度条,因此我希望能够让每个列表的解析例程更新已完成项目的百分比。我看到的唯一“触发”点是在项的 sleepy 方法(正在线程化的方法)末尾的 puts 语句。捕获完成的普遍接受的策略是什么,特别是当操作范围超出线程中的方法 运行 时?
谢谢!
# frozen_string_literal: true
require 'concurrent'
$stdout.sync = true
class TheList
attr_reader :items
def initialize(list_id, n_items)
@id = list_id
@items = []
n_items.times { |n| @items << Item.new(@id, n) }
end
def parse_list(pool)
@items.each do |item|
pool.post { item.sleepy(rand(3..8)) }
end
end
end
class Item
attr_reader :id
def initialize (list_id, item_id)
@id = item_id
@list_id = list_id
end
def sleepy(seconds)
sleep(seconds)
# This puts statement signifies the end of the method threaded
puts "List ID: #{@list_id} item ID:#{@id} slept for #{seconds} seconds"
end
end
lists = []
5.times do |i|
lists << TheList.new(i, rand(5..10))
end
pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count)
lists.each do |list|
list.parse_list(pool)
end
pool.shutdown
pool.wait_for_termination
问题不在于“知道线程何时结束”,而是如何在没有竞争条件的情况下更新共享进度条。
为了解释这个问题:假设你有一个中心变量 ThreadList#progress_var
,并且作为每个线程的最后一行,你用 +=
递增了它。这会引入竞争条件,因为两个线程可以同时执行操作(并且可能会覆盖彼此的结果)。
为了解决这个问题,典型的方法是使用 Mutex 如果您正在学习多线程,这是理解的基本概念。
实际实现起来并不难:
require 'mutex'
class ThreadList
def initialize
@semaphore = Mutex.new
@progress_bar = 0
end
def increment_progress_bar(amount)
@semaphore.synchronize do
@progress_bar += amount
end
end
end
由于那个 @semaphore.synchronize
块,您现在可以安全地从线程调用此 increment_progress_bar
方法,而没有竞争条件的风险。