Ruby - 迭代时使用多线程(producer/consumer 模型)出现意外结果
Ruby - Unexpected results using multi-threading (producer/consumer model) when iterating
注意:我选择使用线程来解析 DNS 名称,但任何类型的类似操作都可能重现相同的行为。
当我尝试将我的(以前工作的)代码从标准的单线程执行转移到多线程时,我收到了意外的结果。具体来说,我的代码遍历哈希数组并向数组中的每个哈希添加一个 key/value 对。
我遇到的问题似乎来自正在创建新 key/value 对的 dns_cname.map
循环。而不是具有正确值的 "external_dns_entry"
键(即 result.name.to_s
包含由 DNS 解析的名称),我得到的是 url_nameserver_mapping
中许多其他服务器之一的名称.
我感觉 DNS 解析是在线程可用时发生的,哈希更新的顺序是乱序的,但我什至不知道如何开始跟踪这样的问题。
有问题的结果:针对服务器 1 的 DNS 解析 运行 映射到服务器 17。同样,服务器 17 映射到服务器 99,等等。其余的哈希保持不变。
非常感谢任何帮助。首先十分感谢!
这是我的代码未启用多线程时(工作正常):
url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
"server2" => "portlanddns.dns.com",
"server3" => "losangelesdns.dns.com" }
# Parse the JSON string response from the API into a valid Ruby Hash
# The net/http GET request is not shown here for brevity but it was stored in 'response'
unsorted_urls = JSON.parse(response.body)
# Sort (not sure this is relevant)
# I left it since my data is being populated to the Hash incorrectly (w/ threading enabled)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}
url_nameserver_mapping.each do |server,location|
dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
dns_cname = dns.getresources(server, Resolv::DNS::Resource::IN::CNAME)
dns_cname.map do |result|
# Create a new key/value for each Hash in url_properties Array
# Occurs if the server compared matches the value of url['server'] key
url_properties.each do |url|
url["external_dns_entry"] = result.name.to_s if url['server'] == server
end
end
end
我按照 https://blog.engineyard.cm/2013/ruby-concurrency 的指南实施了 producer/consumer 线程模型。
这是我改编的代码启用多线程时(不工作):
require 'thread'
require 'monitor'
thread_count = 8
threads = Array.new(thread_count)
producer_queue = SizedQueue.new(thread_count)
threads.extend(MonitorMixin)
threads_available = threads.new_cond
sysexit = false
url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
"server2" => "portlanddns.dns.com",
"server3" => "losangelesdns.dns.com" }
unsorted_urls = JSON.parse(response.body)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}
####################
##### Consumer #####
####################
consumer_thread = Thread.new do
loop do
break if sysexit && producer_queue.length == 0
found_index = nil
threads.synchronize do
threads_available.wait_while do
threads.select { |thread| thread.nil? ||
thread.status == false ||
thread["finished"].nil? == false}.length == 0
end
# Get the index of the available thread
found_index = threads.rindex { |thread| thread.nil? ||
thread.status == false ||
thread["finished"].nil? == false }
end
@domain = producer_queue.pop
threads[found_index] = Thread.new(@domain) do
dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
dns_cname = dns.getresources(@domain, Resolv::DNS::Resource::IN::CNAME)
dns_cname.map do |result|
url_properties.each do |url|
url["external_dns_entry"] = result.name.to_s if url['server'] == @domain
end
end
Thread.current["finished"] = true
# Notify the consumer that another batch of work has been completed
threads.synchronize { threads_available.signal }
end
end
end
####################
##### Producer #####
####################
producer_thread = Thread.new do
url_nameserver_mapping.each do |server,location|
producer_queue << server
threads.synchronize do
threads_available.signal
end
end
sysexit = true
end
# Join on both the producer and consumer threads so the main thread doesn't exit
producer_thread.join
consumer_thread.join
# Join on the child processes to allow them to finish
threads.each do |thread|
thread.join unless thread.nil?
end
@domain
由所有线程共享 - 这种共享是您问题的根源:当它通过从队列中弹出下一个工作单元进行更新时,您的所有线程都会看到该更改。你可以通过做
来避免这个问题
Thread.new(producer_queue.pop) do |domain|
#domain isn't shared with anyone (as long as there
#is no local variable called domain in the enclosing scope
end
与您的问题无关,但这似乎是一种真正过度设计的方法。提前启动一堆消费者线程并让它们直接从工作队列中读取要容易得多。
注意:我选择使用线程来解析 DNS 名称,但任何类型的类似操作都可能重现相同的行为。
当我尝试将我的(以前工作的)代码从标准的单线程执行转移到多线程时,我收到了意外的结果。具体来说,我的代码遍历哈希数组并向数组中的每个哈希添加一个 key/value 对。
我遇到的问题似乎来自正在创建新 key/value 对的 dns_cname.map
循环。而不是具有正确值的 "external_dns_entry"
键(即 result.name.to_s
包含由 DNS 解析的名称),我得到的是 url_nameserver_mapping
中许多其他服务器之一的名称.
我感觉 DNS 解析是在线程可用时发生的,哈希更新的顺序是乱序的,但我什至不知道如何开始跟踪这样的问题。
有问题的结果:针对服务器 1 的 DNS 解析 运行 映射到服务器 17。同样,服务器 17 映射到服务器 99,等等。其余的哈希保持不变。
非常感谢任何帮助。首先十分感谢!
这是我的代码未启用多线程时(工作正常):
url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
"server2" => "portlanddns.dns.com",
"server3" => "losangelesdns.dns.com" }
# Parse the JSON string response from the API into a valid Ruby Hash
# The net/http GET request is not shown here for brevity but it was stored in 'response'
unsorted_urls = JSON.parse(response.body)
# Sort (not sure this is relevant)
# I left it since my data is being populated to the Hash incorrectly (w/ threading enabled)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}
url_nameserver_mapping.each do |server,location|
dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
dns_cname = dns.getresources(server, Resolv::DNS::Resource::IN::CNAME)
dns_cname.map do |result|
# Create a new key/value for each Hash in url_properties Array
# Occurs if the server compared matches the value of url['server'] key
url_properties.each do |url|
url["external_dns_entry"] = result.name.to_s if url['server'] == server
end
end
end
我按照 https://blog.engineyard.cm/2013/ruby-concurrency 的指南实施了 producer/consumer 线程模型。
这是我改编的代码启用多线程时(不工作):
require 'thread'
require 'monitor'
thread_count = 8
threads = Array.new(thread_count)
producer_queue = SizedQueue.new(thread_count)
threads.extend(MonitorMixin)
threads_available = threads.new_cond
sysexit = false
url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
"server2" => "portlanddns.dns.com",
"server3" => "losangelesdns.dns.com" }
unsorted_urls = JSON.parse(response.body)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}
####################
##### Consumer #####
####################
consumer_thread = Thread.new do
loop do
break if sysexit && producer_queue.length == 0
found_index = nil
threads.synchronize do
threads_available.wait_while do
threads.select { |thread| thread.nil? ||
thread.status == false ||
thread["finished"].nil? == false}.length == 0
end
# Get the index of the available thread
found_index = threads.rindex { |thread| thread.nil? ||
thread.status == false ||
thread["finished"].nil? == false }
end
@domain = producer_queue.pop
threads[found_index] = Thread.new(@domain) do
dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
dns_cname = dns.getresources(@domain, Resolv::DNS::Resource::IN::CNAME)
dns_cname.map do |result|
url_properties.each do |url|
url["external_dns_entry"] = result.name.to_s if url['server'] == @domain
end
end
Thread.current["finished"] = true
# Notify the consumer that another batch of work has been completed
threads.synchronize { threads_available.signal }
end
end
end
####################
##### Producer #####
####################
producer_thread = Thread.new do
url_nameserver_mapping.each do |server,location|
producer_queue << server
threads.synchronize do
threads_available.signal
end
end
sysexit = true
end
# Join on both the producer and consumer threads so the main thread doesn't exit
producer_thread.join
consumer_thread.join
# Join on the child processes to allow them to finish
threads.each do |thread|
thread.join unless thread.nil?
end
@domain
由所有线程共享 - 这种共享是您问题的根源:当它通过从队列中弹出下一个工作单元进行更新时,您的所有线程都会看到该更改。你可以通过做
Thread.new(producer_queue.pop) do |domain|
#domain isn't shared with anyone (as long as there
#is no local variable called domain in the enclosing scope
end
与您的问题无关,但这似乎是一种真正过度设计的方法。提前启动一堆消费者线程并让它们直接从工作队列中读取要容易得多。