faye-websocket 保持直到块完成

faye-websocket holding until block finishes

当我通过 faye-websocket 服务器作为中间件发送消息时,我发现消息直到块完成后才发送。

这是我试过的几个例子:

无线程

require 'faye/websocket'
require 'eventmachine'
require 'json'

Faye::WebSocket.load_adapter('thin')

module SocketTest
  class Websocket

    def initialize(app)
      @app     = app
    end

    def long_function()
      sleep 20
      "foo"
    end    

    def call(env)
      EM.run {
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: 15 })
          ws.on :open do |event|
            response = {
              :responseCode => 100,
              :message => "Connection opened"
            }
            $logger.info "< #{response.to_json}"
            ws.send(response.to_json)
          end

          ws.on :message do |event|
            response = {
              :responseCode => 100,
              :message => "Received request, running slow function"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            long_function_result = long_function()
            response = {
              :responseCode => 200,
              :message => "Long function ran, result is #{long_function_result}"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            ws.close()
          end
          ws.on :close do |event|
            $logger.info "< CLOSE"
            ws = nil
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end
      }
    end    
  end
end

Chrome输出

5:25:09pm   WebSocket Connection Established
5:25:09pm   {"request":"test"}
5:25:09pm   {"responseCode":100,"message":"Connection opened"}
5:25:30pm   {"responseCode":100,"message":"Received request, running slow function"}
5:25:30pm   {"responseCode":200,"message":"Long function ran, result is foo"}
5:25:30pm   Connection Close Frame
5:25:30pm   Connection Close Frame

控制台输出

I, [2020-01-15T17:25:10.006006 #25394]  INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:25:10.024170 #25394]  INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:25:30.034885 #25394]  INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}I, [2020-01-15T06:25:30.189606 #25394]  INFO -- : < CLOSE

正如我们在上面看到的,"Received request, running slow function" 的记录器输出是即时的 (17:25:09),但是需要额外的 20 秒才能将响应发送到客户端。我注意到 PING/PONG 消息也有同样的事情 - 在阻塞 sleep 完成之前,WebSocket 服务器什么也没有。

我也尝试过这样的修改版本:

正在穿线

# Just the call function, all other parts are the same
    def call(env)
      EM.run {
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: 15 })
          ws.on :open do |event|
            response = {
              :responseCode => 100,
              :message => "Connection opened"
            }
            $logger.info "< #{response.to_json}"
            ws.send(response.to_json)
          end

          ws.on :message do |event|
            response = {
              :responseCode => 100,
              :message => "Received request, running slow function"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            long_function = Thread.new { long_function() }
            until (long_function.alive? == false) do
              response = {
                :responseCode => 100,
                :message => "Waiting for long function to complete"
              }
              ws.send(response.to_json)
              $logger.info "< #{response.to_json}"
              sleep 5

            end
            response = {
              :responseCode => 200,
              :message => "Long function ran, result is #{long_function.value}"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            ws.close()
          end
          ws.on :close do |event|
            $logger.info "< CLOSE"
            ws = nil
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end
      }
    end    

Chrome输出

5:34:33pm   WebSocket Connection Established
5:34:33pm   {"request":"test"}
5:34:33pm   {"responseCode":100,"message":"Connection opened"}
5:34:53pm   {"responseCode":100,"message":"Received request, running slow function"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":200,"message":"Long function ran, result is foo"}
5:34:53pm   Connection Close Frame
5:34:53pm   Connection Close Frame

控制台输出

I, [2020-01-15T17:34:33.473295 #25729]  INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:34:33.489433 #25729]  INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:34:33.489638 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:38.490995 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:43.491927 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:48.497281 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:53.499446 #25729]  INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}
I, [2020-01-15T17:34:53.632997 #25729]  INFO -- : < CLOSE

结果还是一样,虽然 $logger 对象在等待线程结果的同时继续运行(向我表明我们不再阻塞),但 WebSocket 似乎仍在等待它的缓冲区。我怎样才能强制退出这个缓冲区?

使用 @max_pleaner 建议的 add_timer EventMachine 函数,我能够构建代码的工作版本,并通过内部回调函数来创建循环:

require 'faye/websocket'
require 'eventmachine'
require 'json'

Faye::WebSocket.load_adapter('thin')

module SocketTest
  class Websocket

    def initialize(app)
      @app     = app
    end

    def call(env)
      EM.run {
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: 15 })
          ws.on :open do |event|
            response = {
              :responseCode => 100,
              :message => "Connection opened"
            }
            $logger.info "< #{response.to_json}"
            ws.send(response.to_json)
          end

          ws.on :message do |event|
            response = {
              :responseCode => 100,
              :message => "Received request, running slow function"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            total_runs = 4
            def long_function(ws, count, total_runs)
              if count > total_runs then
                # Error out
                puts "Reached full count, exiting"
                ws.close()
                return
              end
              # Logic here
              if count == 3 then
                response = {
                  :responseCode => 200,
                  :message => "Long function ran"
                }
                ws.send(response.to_json)
                $logger.info "< #{response.to_json}"
                ws.close()
              else
                response = {
                  :responseCode => 100,
                  :message => "Waiting for long function to complete"
                }
                ws.send(response.to_json)
                $logger.info "< #{response.to_json}"
                EventMachine.add_timer(5) {
                  long_function(ws, count + 1, total_runs)
                }
              end
            end
            long_function(ws, 1, total_runs)
          end
          ws.on :close do |event|
            $logger.info "< CLOSE"
            ws = nil
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end
      }
    end    
  end
end