WillR WillR - 5 months ago 22
Python Question

Using Python RLocks across multiple independent processes

I am working on a Django project that uses Celery to schedule some long-term tasks. Both Django and Celery run in completely independent processes and need a way to coordinate access to the database. I'd like to use Python's

multiprocessing.RLock
class (or equivalent) as I need the lock to be reentrant.

My question is, how do I provide access to the RLock for the separate processes?

The two best solutions I've found (posix_ipc module and fcntl) are limited to Unix-based systems, and we'd like to avoid restricting ourselves to that.

Is there a cross-platform way to share the locks between processes without having a common ancestor process?

Answer

I ended up using RabbitMQ as a way to create distributed locks. Details on how to do this can be found on RabbitMQ's blog: https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/.

In short, you create a RabbitMQ queue for the lock and send a single message to it. To acquire the lock, run a basic_get (non-blocking) or basic_consume (blocking) on the queue. This removes the message from the queue, preventing other threads from acquiring the lock. Once your work is finished, sending a negative ack will cause RabbitMQ to requeue the message, allowing the next thread to continue.

Unfortunately, this doesn't allow for reentrant locks.

The link referenced above gives Java code for how to go about doing this. Figuring out how to translate this into Python/Pika was annoying enough that I thought I should post some example code here.

To produce the lock:

import pika

with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
    channel = connection.channel()
    channel.queue_declare(queue="LockQueue")
    channel.basic_publish(exchange='', routing_key='LockQueue', body='Lock')
    channel.close()

Acquiring the lock:

import pika
import time

def callback(ch, method, properties, body):
    print("Got lock")

    for i in range(5, 0, -1):
        print("Tick {}".format(i))
        time.sleep(1)

    print("Releasing lock")
    ch.basic_nack(delivery_tag=method.delivery_tag)
    ch.close()  # Close the channel to continue on with normal processing. Without this, `callback` will continue to request the lock.

with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
    channel = connection.channel()

    channel.queue_declare(queue='LockQueue')
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='LockQueue')

    print("Waiting for lock")
    channel.start_consuming()
    print("Task completed")

EDIT: The above example is the minimum to get something working. If you decide you want to use RabbitMQ for locking, I highly encourage you to implement it on top of the Asynchronous consumer example given in the Pika docs. That example shows how to properly handle unexpected disconnects from RabbitMQ.

Comments