R_G R_G - 4 months ago 19
Ruby Question

Running multiple copies of Ruby script using thread/load only yields one running copy

I am working on developing a server and so have developed a client script for testing. This client script works well if I fire up several/many command processors and start it in each using "ruby client.rb". To make this process more efficient, I am trying to write a parent script, "threads.rb", that can fire up as many clients as I want using threads. They all start but only the last one effectively runs when using threads.rb.

client.rb does access Redis and I use a mutex there to prevent collisions. I have no lock on the rest of the client, nor have I done anything to prevent issues within it. I'd like to be able to use threads.rb, since client.rb can run in multiple command processors

threads.rb is:

threads = 4
the_threads = []
(1..threads).each do |thread|
puts "Creating thread #{thread}"
the_threads << Thread.new(thread) do |t|
puts "Running Client #{thread};\n"
load "./samples/client.rb"
end
end


client.rb

%w(rubygems bundler faye/websocket eventmachine json redis redis-classy redis-mutex).each { |m| require m }
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
class ClientWs

def self.em_run
EM.run do
puts "EM is running."
uri = 'localhost'
redis_uri = URI.parse("redis://127.0.0.1:6379")
redis ||= Redis.new(host: redis_uri.host, port: redis_uri.port, password: redis_uri.password)
RedisClassy.redis = Redis.new(host: redis_uri.host, port: redis_uri.port, password: redis_uri.password)

mutex = RedisMutex.new(:device, block: 0.5, sleep: 0.5)
if mutex.lock
device = 0
channel = 100
type = 'Reader'
begin
channel += 1
type = type == 'Display' ? 'Reader' : 'Display'
device += 1 if type == 'Display'
lockbox = "Lbox_#{device}"
unit = "#{lockbox}.#{type}.Status"
status = redis.get(unit)
end until status.nil? || !status
channel = type == 'Reader' ? "RDR:#{channel}" : "DSP:#{channel}"
result = redis.set(unit, 'Offline')
puts "Connecting with:#{unit}; Using channel:#{channel};"
url = uri == 'localhost' ? "ws://#{uri}:3000/#{channel}" : "ws://#{uri}/#{channel}"
puts url
@ws = Faye::WebSocket::Client.new(url)
start = Time.now
count ||= 0
mutex.unlock
else
puts 'RedisMutex failed to obtain mutex lock.'
exit
end

timer = EventMachine.add_periodic_timer(5+rand(5)) {
count += 1
count.even? ? send({"ECHO": {"CMD": "PING"}}) : send({"CMD": "PING"})
Thread.pass
}

@ws.on :open do |event|
ClientWs.send({"OPEN": lockbox})
end

@ws.on :message do |event|
@ip_address ||= Addrinfo.ip(URI.parse(event.target.url).host).ip_address
begin
parsed = JSON.parse event.data
rescue => e
puts ">>>> [Error! Failed to parse JSON]"
puts ">>>> [#{e.message}]"
puts ">>>> #{event.data}"
end
puts ">> #{@ip_address}:#{channel}:#{event.data};"
#@ws.close
end

@ws.on :close do |event|
timer.cancel
stop = Time.now - start
result = redis.del(unit)
puts "#{stop} seconds;"
p [:close, event.code, event.reason]
ws = nil
ClientWs.em_run
end
end
end

def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end

end
ClientWs.em_run

R_G R_G
Answer

I converted it to use Kernel#spawn in lieu of threads.

the_pids = []
(1..pids).each do |pid|
  puts "Running Client #{pid};\n"
  Dir.chdir("D:/projects/wsserver/")
  the_pids << spawn(RbConfig.ruby, "./samples/client.rb")
end