faye-websocket 保持直到块完成
faye-websocket holding until block finishes
当我通过 faye-websocket 服务器作为中间件发送消息时,我发现消息直到块完成后才发送。
这是我试过的几个例子:
无线程
require 'faye/websocket'
require 'eventmachine'
require 'json'
Faye::WebSocket.load_adapter('thin')
module SocketTest
class Websocket
def initialize(app)
@app = app
end
def long_function()
sleep 20
"foo"
end
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
long_function_result = long_function()
response = {
:responseCode => 200,
:message => "Long function ran, result is #{long_function_result}"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
end
end
Chrome输出
5:25:09pm WebSocket Connection Established
5:25:09pm {"request":"test"}
5:25:09pm {"responseCode":100,"message":"Connection opened"}
5:25:30pm {"responseCode":100,"message":"Received request, running slow function"}
5:25:30pm {"responseCode":200,"message":"Long function ran, result is foo"}
5:25:30pm Connection Close Frame
5:25:30pm Connection Close Frame
控制台输出
I, [2020-01-15T17:25:10.006006 #25394] INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:25:10.024170 #25394] INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:25:30.034885 #25394] INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}I, [2020-01-15T06:25:30.189606 #25394] INFO -- : < CLOSE
正如我们在上面看到的,"Received request, running slow function" 的记录器输出是即时的 (17:25:09),但是需要额外的 20 秒才能将响应发送到客户端。我注意到 PING/PONG 消息也有同样的事情 - 在阻塞 sleep
完成之前,WebSocket 服务器什么也没有。
我也尝试过这样的修改版本:
正在穿线
# Just the call function, all other parts are the same
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
long_function = Thread.new { long_function() }
until (long_function.alive? == false) do
response = {
:responseCode => 100,
:message => "Waiting for long function to complete"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
sleep 5
end
response = {
:responseCode => 200,
:message => "Long function ran, result is #{long_function.value}"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
Chrome输出
5:34:33pm WebSocket Connection Established
5:34:33pm {"request":"test"}
5:34:33pm {"responseCode":100,"message":"Connection opened"}
5:34:53pm {"responseCode":100,"message":"Received request, running slow function"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":200,"message":"Long function ran, result is foo"}
5:34:53pm Connection Close Frame
5:34:53pm Connection Close Frame
控制台输出
I, [2020-01-15T17:34:33.473295 #25729] INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:34:33.489433 #25729] INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:34:33.489638 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:38.490995 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:43.491927 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:48.497281 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:53.499446 #25729] INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}
I, [2020-01-15T17:34:53.632997 #25729] INFO -- : < CLOSE
结果还是一样,虽然 $logger
对象在等待线程结果的同时继续运行(向我表明我们不再阻塞),但 WebSocket 似乎仍在等待它的缓冲区。我怎样才能强制退出这个缓冲区?
使用 @max_pleaner 建议的 add_timer EventMachine 函数,我能够构建代码的工作版本,并通过内部回调函数来创建循环:
require 'faye/websocket'
require 'eventmachine'
require 'json'
Faye::WebSocket.load_adapter('thin')
module SocketTest
class Websocket
def initialize(app)
@app = app
end
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
total_runs = 4
def long_function(ws, count, total_runs)
if count > total_runs then
# Error out
puts "Reached full count, exiting"
ws.close()
return
end
# Logic here
if count == 3 then
response = {
:responseCode => 200,
:message => "Long function ran"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
else
response = {
:responseCode => 100,
:message => "Waiting for long function to complete"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
EventMachine.add_timer(5) {
long_function(ws, count + 1, total_runs)
}
end
end
long_function(ws, 1, total_runs)
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
end
end
当我通过 faye-websocket 服务器作为中间件发送消息时,我发现消息直到块完成后才发送。
这是我试过的几个例子:
无线程
require 'faye/websocket'
require 'eventmachine'
require 'json'
Faye::WebSocket.load_adapter('thin')
module SocketTest
class Websocket
def initialize(app)
@app = app
end
def long_function()
sleep 20
"foo"
end
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
long_function_result = long_function()
response = {
:responseCode => 200,
:message => "Long function ran, result is #{long_function_result}"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
end
end
Chrome输出
5:25:09pm WebSocket Connection Established
5:25:09pm {"request":"test"}
5:25:09pm {"responseCode":100,"message":"Connection opened"}
5:25:30pm {"responseCode":100,"message":"Received request, running slow function"}
5:25:30pm {"responseCode":200,"message":"Long function ran, result is foo"}
5:25:30pm Connection Close Frame
5:25:30pm Connection Close Frame
控制台输出
I, [2020-01-15T17:25:10.006006 #25394] INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:25:10.024170 #25394] INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:25:30.034885 #25394] INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}I, [2020-01-15T06:25:30.189606 #25394] INFO -- : < CLOSE
正如我们在上面看到的,"Received request, running slow function" 的记录器输出是即时的 (17:25:09),但是需要额外的 20 秒才能将响应发送到客户端。我注意到 PING/PONG 消息也有同样的事情 - 在阻塞 sleep
完成之前,WebSocket 服务器什么也没有。
我也尝试过这样的修改版本:
正在穿线
# Just the call function, all other parts are the same
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
long_function = Thread.new { long_function() }
until (long_function.alive? == false) do
response = {
:responseCode => 100,
:message => "Waiting for long function to complete"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
sleep 5
end
response = {
:responseCode => 200,
:message => "Long function ran, result is #{long_function.value}"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
Chrome输出
5:34:33pm WebSocket Connection Established
5:34:33pm {"request":"test"}
5:34:33pm {"responseCode":100,"message":"Connection opened"}
5:34:53pm {"responseCode":100,"message":"Received request, running slow function"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":200,"message":"Long function ran, result is foo"}
5:34:53pm Connection Close Frame
5:34:53pm Connection Close Frame
控制台输出
I, [2020-01-15T17:34:33.473295 #25729] INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:34:33.489433 #25729] INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:34:33.489638 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:38.490995 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:43.491927 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:48.497281 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:53.499446 #25729] INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}
I, [2020-01-15T17:34:53.632997 #25729] INFO -- : < CLOSE
结果还是一样,虽然 $logger
对象在等待线程结果的同时继续运行(向我表明我们不再阻塞),但 WebSocket 似乎仍在等待它的缓冲区。我怎样才能强制退出这个缓冲区?
使用 @max_pleaner 建议的 add_timer EventMachine 函数,我能够构建代码的工作版本,并通过内部回调函数来创建循环:
require 'faye/websocket'
require 'eventmachine'
require 'json'
Faye::WebSocket.load_adapter('thin')
module SocketTest
class Websocket
def initialize(app)
@app = app
end
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
total_runs = 4
def long_function(ws, count, total_runs)
if count > total_runs then
# Error out
puts "Reached full count, exiting"
ws.close()
return
end
# Logic here
if count == 3 then
response = {
:responseCode => 200,
:message => "Long function ran"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
else
response = {
:responseCode => 100,
:message => "Waiting for long function to complete"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
EventMachine.add_timer(5) {
long_function(ws, count + 1, total_runs)
}
end
end
long_function(ws, 1, total_runs)
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
end
end