WebSocket 和 EventMachine 超时和错误恢复

WebSocket and EventMachine timeout and error recovery

使用 puma、faye-websocket-ruby 和 eventmachine,我正在尝试实现一个扩展为支持使用 redis.rb 的通道的 WebSocket 服务器。每个客户端将使用当前正在开发的路由提供一个通道:“/C#{random number}”。所有这些逻辑都需要驻留在服务器中,因为客户端将是基于微处理器的 Python 系统,不支持更高级别的库。

我的代码以 ruby-websockets-chat-demo 为起点。一项重大更改是将其配置为在 WebSocket "on open".

期间支持多个通道

代码在运行正常时运行。但是,通常当一个客户端掉线时,服务器会挂起,直到它重新启动。我正在尝试解决该问题,但到目前为止还未能解决。最初,Heroku 会抛出一个 H12 超时。我已经实施了机架超时。我试过在服务器内挽救超时,但这些超时从未发生过。我在服务器中实现了一个 "on error" 事件,但它从未触发。大多数情况下,服务器只是消失,直到重新启动。客户端应该自生​​自灭,但我需要服务器恢复并继续。

config.ru:

require './app'
require './middlewares/myserver_backend'
require 'rack-timeout'
use Rack::Timeout, service_timeout: 20, wait_timeout: 30, wait_overtime: 60, service_past_wait: false
use Myserver::MyserverBackend
run Myserver::App

机架中间件"backend":

%w(faye/websocket thread redis json erb).each { |m| require m }
module Myserver
  class MyserverBackend
    KEEPALIVE_TIME = ENV['KEEPALIVE_TIME']

    def initialize(app)
      @app = app
      @clients = []
      @uri = URI.parse(ENV["REDISCLOUD_URL"])
      @redis = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password)
    end

    def call(env)
      begin
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
          ws.on :open do |event|
            channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
            Thread.new do
              redis_sub = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password)
              redis_sub.subscribe(channel) do |on|
                on.message do |message_channel, message|
                  puts "MyserverBackend>> Redis     message received on channel:#{message_channel}; Message is:#{message};"
                  @clients.each { |clients_ws, clients_channel| clients_ws.send(message) if clients_channel == message_channel }
                end
              end
            end
            @clients << [ws, channel]
            @clients.each do |clients_ws, clients_channel|
              puts "MyserverBackend>> Client:#{clients_ws.object_id}; Channel:#{clients_channel};"
            end
          end

          ws.on :message do |event|
            @clients.each do |clients_ws, clients_channel|
              if clients_ws == ws
                puts "MyserverBackend>> Websocket message received on channel:#{clients_channel}; Message is:#{event.data};"
                @redis.publish(clients_channel, sanitize(event.data))
              end
            end
          end

          ws.on :close do |event|
            # Close all channels for this client first
            # ws gives a channel which we use to identify it here, but we're closing all of those that are open
            @clients.each { |clients_ws, clients_channel| @redis.unsubscribe(clients_channel) if clients_ws == ws }
            @clients.delete_if { |clients_ws, clients_channel| clients_ws == ws }
            channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
            puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};"
            ws = nil
          end

          ws.on :error do |event|
            puts "Error raised:#{nil}; ws:#{ws.object_id};"
            ws.close unless ws.nil?
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end

      rescue Rack::Timeout::RequestTimeoutError, Rack::Timeout::RequestExpiryError => exception
        puts "Exception raised:#{exception}; ws:#{ws.object_id};"
        ws.close(code=4999, reason=9999) unless ws.nil?
        # ensure is executed immediately so it doesn't help...
      end
    end

    private
    def sanitize(message)
      json = JSON.parse(message)
      json.each { |key, value| json[key] = ERB::Util.html_escape(value) }
      JSON.generate(json)
    end
  end
end

Sinatra "frontend":

# https://github.com/heroku-examples/ruby-websockets-chat-demo
require 'rubygems'
require 'bundler'
require 'sinatra/base'
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

module Myserver
  class App < Sinatra::Base
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end
end

测试客户端:

# https://github.com/faye/faye-websocket-ruby/issues/52
# https://github.com/faye/faye-websocket-ruby
%w(bundler/setup faye/websocket eventmachine json).each { |m| require m }
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
class ClientWs

  def self.em_run
    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    EM.run do

      uri = 'myserver.herokuapp.com'
      #uri = 'localhost' if env == 'development'
      channel = "C#{rand(999999999999).to_s}"
      url = uri == 'localhost' ? "ws://#{uri}:3000/#{channel}" : "ws://#{uri}/#{channel}"
      @ws = Faye::WebSocket::Client.new(url)
      start = Time.now
      count ||= 0

      timer = EventMachine.add_periodic_timer(5+rand(5)) {
        count += 1
        send({'PING': channel, 'COUNT': count.to_s})
      }

      @ws.on :open do |event|
        puts "{'OPEN':#{channel}}"
        ClientWs.send({'OPEN': channel})
      end

      @ws.on :message do |event|
        @ip_address ||= Addrinfo.ip(URI.parse(event.target.url).host).ip_address
        begin
          parsed = JSON.parse event.data
        rescue => e
          puts ">>>> [Error! Failed to parse JSON]"
          puts ">>>> [#{e.message}]"
          puts ">>>> #{event.data}"
        end
        puts ">> #{@ip_address}:#{channel}:#{event.data};"
      end

      @ws.on :close do |event|
        timer.cancel 
        stop = Time.now - start
        puts "#{stop} seconds;"
        p [:close, event.code, event.reason]
        ws = nil
        ClientWs.em_run
      end
    end
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end

end
ClientWs.em_run

Gemfile.lock:

GEM
  remote: https://rubygems.org/
  specs:
    activesupport (4.2.5.1)
      i18n (~> 0.7)
      json (~> 1.7, >= 1.7.7)
      minitest (~> 5.1)
      thread_safe (~> 0.3, >= 0.3.4)
      tzinfo (~> 1.1)
    eventmachine (1.2.0.1-x86-mingw32)
    faye-websocket (0.10.4)
      eventmachine (>= 0.12.0)
      websocket-driver (>= 0.5.1)
    i18n (0.7.0)
    json (1.8.3)
    json_pure (1.8.3)
    minitest (5.9.0)
    multi_json (1.12.1)
    oj (2.16.1)
    permessage_deflate (0.1.3)
    progressbar (0.21.0)
    puma (3.4.0)
    rack (1.6.4)
    rack-protection (1.5.3)
      rack
    rack-timeout (0.4.2)
    rake (11.2.2)
    redis (3.3.0)
    rollbar (2.11.5)
      multi_json
    sinatra (1.4.7)
      rack (~> 1.5)
      rack-protection (~> 1.4)
      tilt (>= 1.3, < 3)
    thread_safe (0.3.5)
    tilt (2.0.5)
    tzinfo (1.2.2)
      thread_safe (~> 0.1)
    websocket-driver (0.6.4)
      websocket-extensions (>= 0.1.0)
    websocket-extensions (0.1.2)

PLATFORMS
  x86-mingw32

DEPENDENCIES
  activesupport (= 4.2.5.1)
  bundler
  faye-websocket
  json_pure
  oj (~> 2.16.0)
  permessage_deflate
  progressbar
  puma
  rack
  rack-timeout
  rake
  redis (>= 3.2.0)
  rollbar
  sinatra

RUBY VERSION
   ruby 2.2.4p230

BUNDLED WITH
   1.12.5

客户端在尝试连接到停滞的服务器时看到的内容:

ruby client.rb
20.098119 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
20.07921 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
20.075731 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]   

config/puma.rb:

env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
if env.nil? || env == 'development' || env == 'test'
  concurrency = 0  # Set to zero to ensure single mode, not clustered mode
  max_threads = 1
end
# WEB_CONCURRENCY and RAILS_MAX_THREADS == 1 in Heroku for now.
concurrency ||= (ENV['WEB_CONCURRENCY'] || 2)
max_threads ||= (ENV['RAILS_MAX_THREADS'] || 5)
worker_timeout 15
workers Integer(concurrency)
threads_count = Integer(max_threads)
threads threads_count, threads_count

#preload_app!

rackup      DefaultRackup
port        ENV['PORT']     || 3000
environment ENV['RACK_ENV'] || 'development'

我需要做的是完成服务器的 "on close" 事件。它需要清理所有内容,然后自行重启,但它没有这样做。

不过,我不喜欢将此作为最终答案。问题是,为什么服务器关闭商店、终止并重新启动只是因为客户端掉线了?难道没有更干净的方法来清除失败客户端的碎屑吗?跟进:此修复确实回答了这个特定问题,无论如何,完成 onclose 解决了所述问题。除了 Redis 事件之外,进一步的增强还对客户端的 WebSocket 事件进行了线程处理,这样 onclose 只会关闭客户端而不是服务器。

新活动是:

  ws.on :close do |event|
    if @debug
      puts "MyserverBackend>> Close entered.  Last error:#{$!.class}:#{$!.to_s};Module:#{[=10=]};Line:#{$.};"
      $@.each { |backtrace| puts backtrace }
      exit
    end
    @clients.each do |clients_ws, clients_channel|
      begin
        @redis.unsubscribe(clients_channel)
        rescue RuntimeError => exception
          unless exception.to_s == "Can't unsubscribe if not subscribed."
            raise
          end
        false
      end
    end
    @clients.delete_if { |clients_ws, clients_channel| clients_ws == ws }
    channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
    puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};"
    ws = nil
    app = Myserver::App
    myserver = MyserverBackend.new(app)
    myserver
  end