harvey_slash harvey_slash - 4 months ago 29
PHP Question

Ratchet PHP and long running tasks

I am using ratchet php. I am starting it like this:

$loop = \React\EventLoop\Factory::create();
$webSock = new \React\Socket\Server($loop);

$webSock->listen($this->port, $this->host);

$webServer = new \Ratchet\Server\IoServer(
new \Ratchet\Http\HttpServer(
new \Ratchet\WebSocket\WsServer(
new PusherServer()
)
),
$webSock
);

return $loop;


Now, in my
onMessage()
of my
Pusherserver
class (which implements
MessageComponentInterface
), I want to perform a long, blocking task. It will be an HTTP request which could take up to ten seconds to complete.

How do I make
onMessage()
free to handle other requests while the previous HTTP request is executing? I cannot use pthreads as I don't have access to change the php version that I am already given (which is thread safe).

Answer

This is exactly the problem that you have to avoid when doing anything within your event loop: it can't be blocking, because anyone else then trying to subscribe, or call message, or have anything else event-driven happening can't happen until this is finished.

This is more of an architecture problem, and once you've figured out the best way of doing it, it's about streamlining that and making sure it works for all of the tasks you need.

Ratchet provides a ZMQ binding - this is awesome because once you set it up, anything you receive on port 5555 will hit your event loop in onYourMethodName(), or whatever you want to call it!

With this in mind, you need to send the work that needs to be done off into a job queue, another process (react has it's child-process extension which I don't particularly like because it's polling in user land as opposed to interrupt-driven I/O like PHP's PCNTL extension) or similar.

If you want to "just get it working", fire off the work that needs to be done, along with the connection id or another id so you know who the response needs to go back to, in a child process and when it's done send it out. This won't block!

If you want to do it the better way, and I highly recommend looking into this and the architecture for it so that you can take this knowledge with you in your career when you approach an async problem like this again, adopt a 'fire-and-forget' approach. Fire what needs to be done into a job queue within your event loop, then forget about it.

Your job queue can perform it's stuff, and when it's done, fire the result of that back over ZMQ (port 5555 that's listening, remember), which can then send the data back to the client.

websockets torrents image

For an awesome talk on job queues, I highly recommend this one from PHPNW.

Final note, because you have this thing open and listening on port 5555 for data, you can send this data from anywhere. It can be inter-process-communication, as in your have a java app that sends data to port 5555, or literally anything. It's binding things together, but not coupling them, that is important in your architecture.


For an example of actually using ZMQ, they provide it all on this page here (as linked above), but I'll try and explain a little bit about what's going on.

$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onYourMethodName'));

This part means that when anything sends data to port 5555, and it's a "message" (you can google the other options available instead of message), it'll call onYourMethodName in your $pusher object. It really is that simple. Anything over 5555, hits $pusher::onYourMethodName.

As a result, you just need to create your method in the event handler now (next to onMessage(), onSubscribe() etc)... Again this is all mentioned on that page.

public function onYourMethodName($data) 
{
    /** You'll probably want to send the data in JSON format **/
    /** Imagine you get through a 'topic' in here... **/
    $data = json_decode($data, true);

    /** You should already have stored the people who are connected, topics etc - see the tutorial **/
    $topic = $this->subscribedTopics[$data['topic']];

    /** Send the data out to everyone subscribed to this topic **/
    $topic->broadcast($data);
}

If you want to be able to send data to a specific user and not everyone, there are many ways of doing that. Take a look at this question for how I did it, but this was a while back now.

The only thing you need to do yourself now is, in your handler (in onMessage or whatever), actually put what needs to be done in the queue along with who it should be sent back to (the topic).

At the end of your worker doing it's stuff and getting the data, it'll need to call this to hit the code I've shown above:

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");

$socket->send(json_encode($data));

So here's what you need to do:

  • Get the first bit working with listening on port 5555
  • Create your method in your event handler etc
  • Don't worry about queues or anything just yet
  • Create a really simple php script that does the above 4 lines of code, and prove that when you run that script separately, it does indeed send the data into your event loop
  • Then worry about how to add it to your queue and get your workers to handle it
Comments