mphowarth mphowarth - 4 months ago 25
SQL Question

Run Celery task for each row returned from MySQL query?

I've used Python before but only for Flask applications, but I've never used Celery before. After reading the docs and setting everything up (and it works as I've tested it with multiple workers) I'm trying to run an SQL query and for each row that gets returned from the query send it off to be processed by a Celery worker.

Below is a sample of the very basic code.

from celery import Celery
import MySQLdb

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME")
cur = db.cursor()
cur.execute("SELECT * FROM myTable")

for row in cur.fetchall():
print_query_result(row[0])

db.close()

def print_query_result(result):
print result


Basically it selects everything in the 'myTable' table and for each row returned it prints it out. If I call the code using just Python it works fine and prints all the data from the MySQL table. When I call it using the .delay() function to send it off to a worker to process it only sends it to the one worker and only outputs the top row in the database.

I've been trying to read up on subtasks but I'm not sure if I'm going in the right direction with that.

In short, I'm wanting this to happen, but I've no where to start with it. Has anyone got any ideas?


  • SQL query to select all rows in table

  • Send each row/result to a worker to process some code

  • Return code result back into a database

  • Pick up next item in queue (if any)



Thanks in advance.

EDIT 1:

I've updated my code to use SQLAlchemy instead, but the results are still returning like my old query which is fine.

from celery import Celery
from models import DBDomains

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
print i.domain
print_query_result.s()

@app.task
def print_query_result():
print "Received job"

print_domain.delay()


The worker when running the .py file returns:

[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de]
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None


As you can see, the worker gets 'result1' and 'result2' from the table I'm querying but then it doesn't seem to execute the command in the subtask which is just to print "Job received".

UPDATE: It looks like the subtask had to have a .delay() on the end of it as per the Celery docs so my code looks like this and successfully distributes the jobs across the workers now.

from celery import Celery
from models import DBDomains

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
subtask = print_query_result.s(i.domain)
subtask.delay()


@app.task
def print_query_result(domain):
print domain

print_domain.delay()

Answer

Whenever you call a task from within a task, you have to use subtasks. Fortunately the syntax is easy.

from celery import Celery

app = Celery('tasks', broker='redis://127.0.0.1:6379/0')


@app.task
def print_domain():
    for x in range(20):
        print_query_result.s(x)


@app.task
def print_query_result(result):
    print(result)

(Substitute for x in range(20) with your query results.) And if you're watching the celery output, you'll see the tasks created and distributed across the workers.