Neel Kamal Neel Kamal - 10 days ago 8
PHP Question

Error in implementing message queue using redis, error in using BLPOP

I am trying to build a message queue using Redis.
Whenever client sends new data, they are added to a list.

Here is the code for it

$client->lPush("my_queue", $data);


Now there is a separate script slave.php which pops the newly arrived data and process it.
The code for slave.php

while (true) {
list($queue, $message) = $client->brPop(["my_queue"], 0);

/*
Logic to process the data
*/
}


I have modified the apache startup script so that slave.php should start & stop with apache. It works well. But after waiting for few minutes the brPop stops listening with the error message like this :

Uncaught exception 'Predis\Connection\ConnectionException' with message 'Error while reading line from the server [tcp://127.0.0.1:6379]' in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php:139
Stack trace:
#0 /var/www/api/lib/predis-0.8/lib/Predis/Connection/StreamConnection.php(205): Predis\Connection\AbstractConnection->onConnectionError('Error while rea...')
#1 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(128): Predis\Connection\StreamConnection->read()
#2 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(120): Predis\Connection\AbstractConnection->readResponse(Object(Predis\Command\ListPopLastBlocking))
#3 /var/www/api/lib/predis-0.8/lib/Predis/Client.php(227): Predis\Connection\AbstractConnection->executeCommand(Object(Predis\Command\ListPopLastBlocking))
#4 /var/www/api/lib/slave.php(7): Predis\Client->__call('brPop', Array)
#5 /var/www/api/lib/slave.php(7): Predis\Client->brPop(Array, 0)
#6 {main}
thrown in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php on line 139


According to the documentation, if list is empty then, BLPOP/BRPOP blocks the connection until another client performs an LPUSH or RPUSH operation against one of the keys.
But this is not happening in my case.
In my case once the brpop blocks the connection, it doesn't listen again even when new data arrives in the list.

What changes I should make to get this working?

Answer

Its working for me now but I am not sure whether it is the right method to do this. Now I am catching the error and recursively calling the function in case of connection failure. My new slave.php looks like this :

function process_data()
{
    try {
        $client = new \Predis\Client();

        require_once("logger.php");

        while (true) {
            list($queue, $message) = $client->brPop(["bookmark_queue"], 0);
            // logic
        }
    } catch (Exception $ex) {
        $error = $ex->getMessage();
        log_error($error, "slave.php");
        process_data(); // call the function recursively if connection fails
    }
}
process_data(); // call the function