cwadding cwadding - 1 month ago 5
Ruby Question

How to test that a block is called within a thread?

I am working on wrapping the

ruby-mqtt
gem into a class which implements a
subscribe
and
publish
method. The
subscribe
method connects to the server and listens in a separate thread because this call is synchronous.

module PubSub
class MQTT
attr_accessor :host, :port, :username, :password

def initialize(params = {})
params.each do |attr, value|
self.public_send("#{attr}=", value)
end if params
super()
end

def connection_options
{
remote_host: self.host,
remote_port: self.port,
username: self.username,
password: self.password,
}
end

def subscribe(name, &block)
channel = name
connect_opts = connection_options
code_block = block
::Thread.new do
::MQTT::Client.connect(connect_opts) do |c|
c.get(channel) do |topic, message|
puts "channel: #{topic} data: #{message.inspect}"
code_block.call topic, message
end
end
end
end

def publish(channel = nil, data)
::MQTT::Client.connect(connection_options) do |c|
c.publish(channel, data)
end
end

end
end


I have a test that I have written using rspec to test the class but it does not pass.

mqtt = ::PubSub::MQTT.new({host: "localhost",port: 1883})
block = lambda { |channel, data| puts "channel: #{channel} data: #{data.inspect}"}
block.should_receive(:call).with("channel", {"some" => "data"})
thr = mqtt.subscribe("channel", &block)
mqtt.publish("channel", {"some" => "data"})


When I run the following ruby-mqtt-example I have now problems at all.

uri = URI.parse ENV['CLOUDMQTT_URL'] || 'mqtt://localhost:1883'
conn_opts = {
remote_host: uri.host,
remote_port: uri.port,
username: uri.user,
password: uri.password,
}

# Subscribe example
Thread.new do
puts conn_opts
MQTT::Client.connect(conn_opts) do |c|
# The block will be called when you messages arrive to the topic
c.get('test') do |topic, message|
puts "#{topic}: #{message}"
end
end
end

# Publish example
puts conn_opts
MQTT::Client.connect(conn_opts) do |c|
# publish a message to the topic 'test'
loop do
c.publish('test', 'Hello World')
sleep 1
end
end


So my question is, what am I doing wrong when I simply create a class and separate out the publish and subscribe logic? My guess is that it has something to do with Threading in the function call but I can't seem to figure it out. Any help is much appreciated.

UPDATE

I believe I know why the test is not passing and it is because when I pass a
lambda
in to
subscribe
expecting it to receive a call it actually will not receive the call when it exits the method or until
publish
is called. So I would like to rephrase the question to: How do I test that a block is called within a thread? If someone answers, "you don't", then the question is: How do you test that block is being called in an infinite loop like in the example of calling
get
within
ruby-mqtt
gem.

Answer

The RSpec expectations machinery will work fine with threads, as evidenced by the following example, which passes:

def foo(&block)
  block.call(42)
end

describe "" do
  it "" do
    l = lambda {}
    expect(l).to receive(:call).with(42)
    Thread.new { foo(&l) }.join
  end
end

The join waits for the thread(s) to finish before going further.

Comments