Nick Miller Nick Miller - 3 months ago 21
MySQL Question

python 27 - Boolean check fails while multiprocessing

I have a script that retrieves a list of active 'jobs' from a MySQL table and then instantiates my main script once per active job using the multiprocessing library. My multiprocessing script has a function that checks if a given job has been claimed by another thread. It does this by checking if a particular column in the DB table is/is not NULL. The DB query returns a single item tuple:

def check_if_job_claimed():
job_claimed = cursor.fetchone() #Returns (claim_id,) for claimed jobs, and (None,) for unclaimed jobs
if job_claimed:
print "This job has already been claimed by another thread."

When I run this function without the multiprocessing portion, the claim check works just fine. But when I try to run the jobs in parallel, the claim check reads all the (None,) tuples as having value and therefore truthiness and therefore the function assumes the job has already been claimed.

I have tried adjusting the number of concurrent processes the multiprocessor uses, but the claim check still doesn't work... even when I set the number of processes to 1. I have also tried playing around with the if statement to see if I could make it work that way:

if job_claimed == True
if job_claimed == (None,)
# etc.

No luck though.

Is anybody aware of something in the multiprocessing library that would prevent my claim checking function from properly interpreting the job_claimed tuple? Maybe there's something wrong with my code?


I had run some truthiness tests on the job_claimed variable in debug mode. Here are the results of those tests:

(pdb) job_claimed
(pdb) len(job_claimed)
(pdb) job_claimed == True
(pdb) job_claimed == False
(pdb) job_claimed[0]
(pdb) job_claimed[0] == True
(pdb) job_claimed[0] == False
(pdb) any(job_claimed)
(pdb) all(job_claimed)
(pdb) job_claimed is not True
(pdb) job_claimed is not False


As requested:

with open('Resource_File.txt', 'r') as f:
creds = eval(
connection = mysql.connector.connect(user=creds["mysql_user"],password=creds["mysql_pw"],host=creds["mysql_host"],database=creds["mysql_db"],use_pure=False,buffered=True)

def check_if_job_claimed(job_id):
cursor = connection.cursor()
thread_id_query = "SELECT Thread_Id FROM jobs WHERE Job_ID=\'{}\';".format(job_id)
job_claimed = cursor.fetchone()
job_claimed = job_claimed[0]
if job_claimed:
print "This job has already been claimed by another thread. Moving on to next job..."
return False
thread_id = socket.gethostname()+':'+str(random.randint(0,1000))
claim_job = "UPDATE jobs SET Thread_Id = \'{}\' WHERE Job_ID = \'{}\';".format(job_id)
print "Job is now claimed"
return True

def call_the_queen(dict_of_job_attributes):
if check_if_job_claimed(dict_of_job_attributes['job_id']):
instance = OM(dict_of_job_attributes) #<-- Create instance of my target class

#multiprocessing code
import multiprocessing as mp
if __name__ == '__main__':
active_jobs = get_active_jobs()
pool = mp.Pool(processes = 4),active_jobs)


Any non-empty tuple (or list, string, iterable, etc.) will evaluate to True. It doesn't matter if the contents of the iterable are non-True. To test that, you can use either any(iterable) or all(iterable) to test whether any or all of the items in the iterable evaluate to True.

However, based on your edits, your problem is likely caused by using a global connection object across multiple processes.

Instead, each process should create it's own connection.

def check_if_job_claimed(job_id):
    connection = mysql.connector.connect(user=creds["mysql_user"],password=creds["mysql_pw"],host=creds["mysql_host"],database=creds["mysql_db"],use_pure=False,buffered=True)

You could also try using connection pooling, but I'm not sure if that would work across process, and would probably require you to switch to threads instead.

Also, I would move all the code under if __name__ == '__main__': into a function. You generally want to avoid polluting the global namespace when using multiprocessing, because when python creates a new process, it tries to copy the global state to the new process. That can lead to some odd bugs since global variables no longer share state (since they're in separate processes), or an object either can't be serialized or loses some information during serialization when it's reconstructed in the new process.