Crystal 将线程池背后的想法转换为 Fibers/spawn
Crystal convert the idea behind Thread pool to Fibers/spawn
我很难学习 Fibers\coroutines 背后的想法和 Crystal 中的实现。
我希望这是问这个问题的正确地方,我会完全接受 "not here" 的回答:)
这是我在Ruby中处理多线程的常用方式:
threads = []
max_threads = 10
loop do
begin
threads << Thread.new do
helper_method(1,2,3,4)
end
rescue Exception => e
puts "Error Starting thread"
end
begin
threads = threads.select { |t| t.alive? ? true : (t.join; false) }
while threads.size >= max_threads
puts 'Got Maximum threads'
sleep 1
threads = threads.select { |t| t.alive? ? true : (t.join; false) }
end
rescue Exception => e
puts e
end
end
通过这种方式,我打开了一个新的线程,通常是传入连接或其他一些东西,将线程添加到线程数组,然后检查我没有比我想要的更多的线程。
使用 spawn\channels\fibers 等实现 Crystal 中类似内容的好方法是什么?
像这样:
require "socket"
ch = Channel(TCPSocket).new
10.times do
spawn do
loop do
socket = ch.receive
socket.puts "Hi!"
socket.close
end
end
end
server = TCPServer.new(1234)
loop do
socket = server.accept
ch.send socket
end
此代码将预生成 10 个纤维来处理请求。该通道是无缓冲的,因此如果任何光纤都无法连接,连接将不会排队。
您无法复制线程的工作方式。 spawn
不是 return 协程对象,也没有办法 join
协程。
然而我们可以打开一个通道来在协程和池管理器之间进行通信。这个管理器可以 运行 在它自己的协程中或者是主协程——这将阻止进程退出。
这是一个工作示例,使用 worker(&block)
方法生成协程,并打开一个通道 return 它的状态(失败或终止),以及 pool(&block)
方法,该方法将保留此类工作人员池并从结果通道中读取以了解协程的状态,并不断产生新的协程。
def worker(&block)
result = UnbufferedChannel(Exception?).new
::spawn do
begin
block.call
rescue ex
result.send(ex)
else
result.send(nil)
end
end
result
end
def pool(size, &block)
counter = 0
results = [] of UnbufferedChannel(Exception?)
loop do
while counter < size
counter += 1
puts "spawning worker"
results << worker(&block)
end
result = Channel.select(results)
counter -= 1
results.delete(result)
if ex = result.receive
puts "ERROR: #{ex.message}"
else
puts "worker terminated"
end
end
end
pool(5) do
loop { helper_method(1, 2, 3, 4) }
end
我很难学习 Fibers\coroutines 背后的想法和 Crystal 中的实现。
我希望这是问这个问题的正确地方,我会完全接受 "not here" 的回答:)
这是我在Ruby中处理多线程的常用方式:
threads = []
max_threads = 10
loop do
begin
threads << Thread.new do
helper_method(1,2,3,4)
end
rescue Exception => e
puts "Error Starting thread"
end
begin
threads = threads.select { |t| t.alive? ? true : (t.join; false) }
while threads.size >= max_threads
puts 'Got Maximum threads'
sleep 1
threads = threads.select { |t| t.alive? ? true : (t.join; false) }
end
rescue Exception => e
puts e
end
end
通过这种方式,我打开了一个新的线程,通常是传入连接或其他一些东西,将线程添加到线程数组,然后检查我没有比我想要的更多的线程。
使用 spawn\channels\fibers 等实现 Crystal 中类似内容的好方法是什么?
像这样:
require "socket"
ch = Channel(TCPSocket).new
10.times do
spawn do
loop do
socket = ch.receive
socket.puts "Hi!"
socket.close
end
end
end
server = TCPServer.new(1234)
loop do
socket = server.accept
ch.send socket
end
此代码将预生成 10 个纤维来处理请求。该通道是无缓冲的,因此如果任何光纤都无法连接,连接将不会排队。
您无法复制线程的工作方式。 spawn
不是 return 协程对象,也没有办法 join
协程。
然而我们可以打开一个通道来在协程和池管理器之间进行通信。这个管理器可以 运行 在它自己的协程中或者是主协程——这将阻止进程退出。
这是一个工作示例,使用 worker(&block)
方法生成协程,并打开一个通道 return 它的状态(失败或终止),以及 pool(&block)
方法,该方法将保留此类工作人员池并从结果通道中读取以了解协程的状态,并不断产生新的协程。
def worker(&block)
result = UnbufferedChannel(Exception?).new
::spawn do
begin
block.call
rescue ex
result.send(ex)
else
result.send(nil)
end
end
result
end
def pool(size, &block)
counter = 0
results = [] of UnbufferedChannel(Exception?)
loop do
while counter < size
counter += 1
puts "spawning worker"
results << worker(&block)
end
result = Channel.select(results)
counter -= 1
results.delete(result)
if ex = result.receive
puts "ERROR: #{ex.message}"
else
puts "worker terminated"
end
end
end
pool(5) do
loop { helper_method(1, 2, 3, 4) }
end