为什么这个订阅会阻塞主线程?
Why does this subscription block the main thread?
我正在尝试使用 RxRuby,并希望将从 tcp 套接字接收的数据转换为可以使用的流。以下代码 "works" 从套接字接收到的数据是流式传输的,即当新数据到达时,我收到 "data = " 消息,但订阅块。如果我在 subscription =
语句之后添加任何代码,它不会执行,直到我关闭套接字。我本以为需要一个循环来阻止程序立即完成。
require 'rx'
require 'socket'
Thread.abort_on_exception=true
class StellariumInterface
attr_accessor :server, :client, :goto_stream
def initialize(host:, port:)
@host = host
@port = port
@server = TCPServer.new host, port
@goto_stream = nil
@client = nil
end
def accept
puts "connecting"
self.client = server.accept
puts "connected"
end
def listen
self.goto_stream = Rx::Observable.create do |observer|
l = Thread.new do
loop do
raw_data = client.recvfrom(1000)
break if raw_data.first.empty?
data = raw_data.first.unpack('ssqLl')
p data
observer.on_next(data)
sleep 0.1
end
observer.on_completed
end
l.join
end
end
end
source = StellariumInterface.new host: 'localhost', port: 10001
source.accept
source.listen
subscription = source.goto_stream.subscribe(
lambda { |x| puts "data = #{x}" },
lambda { |x| puts "error "},
lambda { puts "stream done "}
)
recvfrom
是一个阻塞操作,即使它在一个新的线程中,当join
发生时,这意味着主线程要等待新创建的线程.如果删除 join
它应该允许线程保持独立并且循环线程不会阻塞主线程。
我正在尝试使用 RxRuby,并希望将从 tcp 套接字接收的数据转换为可以使用的流。以下代码 "works" 从套接字接收到的数据是流式传输的,即当新数据到达时,我收到 "data = " 消息,但订阅块。如果我在 subscription =
语句之后添加任何代码,它不会执行,直到我关闭套接字。我本以为需要一个循环来阻止程序立即完成。
require 'rx'
require 'socket'
Thread.abort_on_exception=true
class StellariumInterface
attr_accessor :server, :client, :goto_stream
def initialize(host:, port:)
@host = host
@port = port
@server = TCPServer.new host, port
@goto_stream = nil
@client = nil
end
def accept
puts "connecting"
self.client = server.accept
puts "connected"
end
def listen
self.goto_stream = Rx::Observable.create do |observer|
l = Thread.new do
loop do
raw_data = client.recvfrom(1000)
break if raw_data.first.empty?
data = raw_data.first.unpack('ssqLl')
p data
observer.on_next(data)
sleep 0.1
end
observer.on_completed
end
l.join
end
end
end
source = StellariumInterface.new host: 'localhost', port: 10001
source.accept
source.listen
subscription = source.goto_stream.subscribe(
lambda { |x| puts "data = #{x}" },
lambda { |x| puts "error "},
lambda { puts "stream done "}
)
recvfrom
是一个阻塞操作,即使它在一个新的线程中,当join
发生时,这意味着主线程要等待新创建的线程.如果删除 join
它应该允许线程保持独立并且循环线程不会阻塞主线程。