R_G R_G - 5 months ago 100
Ruby Question

WebSocket and EventMachine timeout and error recovery

Using puma, faye-websocket-ruby and eventmachine, I am trying to implement a WebSocket server that is extended to support channels using redis.rb. Each client will supply a channel using a route currently in development as: "/C#{random number}". All of this logic needs to reside in the server, as the clients will be microprocessor-based Python systems that will not support higher-level libraries.

My code was based on ruby-websockets-chat-demo, as a starting point. One major change was to configure it to support multiple channels during WebSocket "on open".

The code is working when run normally. However, often when one client drops, the server hangs until it is restarted. I am trying to resolve that issue, but have not been able to do so so far. Initially, Heroku would throw an H12 timeout. I've implemented rack-timeout. I've tried rescuing timeouts within the server, but those never fire. I've implemented an "on error" event within the server but it never fires. Most often, the server just goes away until restarted. The client should fend for itself, but I need the server to recover and continue.

config.ru:

require './app'
require './middlewares/myserver_backend'
require 'rack-timeout'
use Rack::Timeout, service_timeout: 20, wait_timeout: 30, wait_overtime: 60, service_past_wait: false
use Myserver::MyserverBackend
run Myserver::App


Rack middleware "backend":

%w(faye/websocket thread redis json erb).each { |m| require m }
module Myserver
class MyserverBackend
KEEPALIVE_TIME = ENV['KEEPALIVE_TIME']

def initialize(app)
@app = app
@clients = []
@uri = URI.parse(ENV["REDISCLOUD_URL"])
@redis = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password)
end

def call(env)
begin
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
Thread.new do
redis_sub = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password)
redis_sub.subscribe(channel) do |on|
on.message do |message_channel, message|
puts "MyserverBackend>> Redis message received on channel:#{message_channel}; Message is:#{message};"
@clients.each { |clients_ws, clients_channel| clients_ws.send(message) if clients_channel == message_channel }
end
end
end
@clients << [ws, channel]
@clients.each do |clients_ws, clients_channel|
puts "MyserverBackend>> Client:#{clients_ws.object_id}; Channel:#{clients_channel};"
end
end

ws.on :message do |event|
@clients.each do |clients_ws, clients_channel|
if clients_ws == ws
puts "MyserverBackend>> Websocket message received on channel:#{clients_channel}; Message is:#{event.data};"
@redis.publish(clients_channel, sanitize(event.data))
end
end
end

ws.on :close do |event|
# Close all channels for this client first
# ws gives a channel which we use to identify it here, but we're closing all of those that are open
@clients.each { |clients_ws, clients_channel| @redis.unsubscribe(clients_channel) if clients_ws == ws }
@clients.delete_if { |clients_ws, clients_channel| clients_ws == ws }
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};"
ws = nil
end

ws.on :error do |event|
puts "Error raised:#{nil}; ws:#{ws.object_id};"
ws.close unless ws.nil?
end

# Return async Rack response
ws.rack_response

else
@app.call(env)
end

rescue Rack::Timeout::RequestTimeoutError, Rack::Timeout::RequestExpiryError => exception
puts "Exception raised:#{exception}; ws:#{ws.object_id};"
ws.close(code=4999, reason=9999) unless ws.nil?
# ensure is executed immediately so it doesn't help...
end
end

private
def sanitize(message)
json = JSON.parse(message)
json.each { |key, value| json[key] = ERB::Util.html_escape(value) }
JSON.generate(json)
end
end
end


The Sinatra "frontend":

# https://github.com/heroku-examples/ruby-websockets-chat-demo
require 'rubygems'
require 'bundler'
require 'sinatra/base'
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

module Myserver
class App < Sinatra::Base
get "/" do
erb :"index.html"
end

get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end
end


The test client:

# https://github.com/faye/faye-websocket-ruby/issues/52
# https://github.com/faye/faye-websocket-ruby
%w(bundler/setup faye/websocket eventmachine json).each { |m| require m }
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
class ClientWs

def self.em_run
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
EM.run do

uri = 'myserver.herokuapp.com'
#uri = 'localhost' if env == 'development'
channel = "C#{rand(999999999999).to_s}"
url = uri == 'localhost' ? "ws://#{uri}:3000/#{channel}" : "ws://#{uri}/#{channel}"
@ws = Faye::WebSocket::Client.new(url)
start = Time.now
count ||= 0

timer = EventMachine.add_periodic_timer(5+rand(5)) {
count += 1
send({'PING': channel, 'COUNT': count.to_s})
}

@ws.on :open do |event|
puts "{'OPEN':#{channel}}"
ClientWs.send({'OPEN': channel})
end

@ws.on :message do |event|
@ip_address ||= Addrinfo.ip(URI.parse(event.target.url).host).ip_address
begin
parsed = JSON.parse event.data
rescue => e
puts ">>>> [Error! Failed to parse JSON]"
puts ">>>> [#{e.message}]"
puts ">>>> #{event.data}"
end
puts ">> #{@ip_address}:#{channel}:#{event.data};"
end

@ws.on :close do |event|
timer.cancel
stop = Time.now - start
puts "#{stop} seconds;"
p [:close, event.code, event.reason]
ws = nil
ClientWs.em_run
end
end
end

def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end

end
ClientWs.em_run


The Gemfile.lock:

GEM
remote: https://rubygems.org/
specs:
activesupport (4.2.5.1)
i18n (~> 0.7)
json (~> 1.7, >= 1.7.7)
minitest (~> 5.1)
thread_safe (~> 0.3, >= 0.3.4)
tzinfo (~> 1.1)
eventmachine (1.2.0.1-x86-mingw32)
faye-websocket (0.10.4)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
i18n (0.7.0)
json (1.8.3)
json_pure (1.8.3)
minitest (5.9.0)
multi_json (1.12.1)
oj (2.16.1)
permessage_deflate (0.1.3)
progressbar (0.21.0)
puma (3.4.0)
rack (1.6.4)
rack-protection (1.5.3)
rack
rack-timeout (0.4.2)
rake (11.2.2)
redis (3.3.0)
rollbar (2.11.5)
multi_json
sinatra (1.4.7)
rack (~> 1.5)
rack-protection (~> 1.4)
tilt (>= 1.3, < 3)
thread_safe (0.3.5)
tilt (2.0.5)
tzinfo (1.2.2)
thread_safe (~> 0.1)
websocket-driver (0.6.4)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.2)

PLATFORMS
x86-mingw32

DEPENDENCIES
activesupport (= 4.2.5.1)
bundler
faye-websocket
json_pure
oj (~> 2.16.0)
permessage_deflate
progressbar
puma
rack
rack-timeout
rake
redis (>= 3.2.0)
rollbar
sinatra

RUBY VERSION
ruby 2.2.4p230

BUNDLED WITH
1.12.5


What client sees when attempting to connect to stalled server:

ruby client.rb
20.098119 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
20.07921 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
20.075731 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]


config/puma.rb:

env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
if env.nil? || env == 'development' || env == 'test'
concurrency = 0 # Set to zero to ensure single mode, not clustered mode
max_threads = 1
end
# WEB_CONCURRENCY and RAILS_MAX_THREADS == 1 in Heroku for now.
concurrency ||= (ENV['WEB_CONCURRENCY'] || 2)
max_threads ||= (ENV['RAILS_MAX_THREADS'] || 5)
worker_timeout 15
workers Integer(concurrency)
threads_count = Integer(max_threads)
threads threads_count, threads_count

#preload_app!

rackup DefaultRackup
port ENV['PORT'] || 3000
environment ENV['RACK_ENV'] || 'development'

R_G R_G
Answer

What I needed to do was complete the server's "on close" event. It needed to clean everything up and then restart itself, which it was not doing.

I don't like this as the final answer, however. The question would be, why is the server closing up shop, terminating and restarting just because a client dropped? Isn't there a cleaner way to sweep away the detritus of a failed client? Follow up: This fix does answer this particular question, in any case, in that completing onclose resolved the stated problem. Further enhancements threaded the client's WebSocket events in addition to the Redis events such that onclose only closes the client and not the server.

The new event is:

  ws.on :close do |event|
    if @debug
      puts "MyserverBackend>> Close entered.  Last error:#{$!.class}:#{$!.to_s};Module:#{$0};Line:#{$.};"
      $@.each { |backtrace| puts backtrace }
      exit
    end
    @clients.each do |clients_ws, clients_channel|
      begin
        @redis.unsubscribe(clients_channel)
        rescue RuntimeError => exception
          unless exception.to_s == "Can't unsubscribe if not subscribed."
            raise
          end
        false
      end
    end
    @clients.delete_if { |clients_ws, clients_channel| clients_ws == ws }
    channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
    puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};"
    ws = nil
    app = Myserver::App
    myserver = MyserverBackend.new(app)
    myserver
  end