Dahlai Dahlai - 5 months ago 29
Python Question

Parallelization/multiprocessing of conditional for loop

I want to use multiprocessing in Python to speed up a while loop.

More specifically:

I have a matrix (samples*features). I want to select x subsets of samples whose values at a random subset of features is unequal to a certain value (-1 in this case).

My serial code:

np.random.seed(43)
datafile = '...'
df = pd.read_csv(datafile, sep=" ", nrows = 89)

no_feat = 500
no_samp = 5
no_trees = 5
i=0
iter=0


samples = np.zeros((no_trees, no_samp))
features = np.zeros((no_trees, no_feat))

while i < no_trees:
rand_feat = np.random.choice(df.shape[1], no_feat, replace=False)
iter_order = np.random.choice(df.shape[0], df.shape[0], replace=False)

samp_idx = []
a=0

#--------------
#how to run in parallel?

for j in iter_order:
pot_samp = df.iloc[j, rand_feat]
if len(np.where(pot_samp==-1)[0]) == 0:
samp_idx.append(j)
if len(samp_idx) == no_samp:
print a
break
a+=1

#--------------

if len(samp_idx) == no_samp:
samples[i,:] = samp_idx
features[i, :] = rand_feat
i+=1
iter+=1
if iter>1000: #break if subsets cannot be found
break


Searching for fitting samples is the potentially expensive part (the j for loop), which in theory can be run in parallel. In some cases, it is not necessary to iterate over all samples to find a large enough subset, which is why I am breaking out of the loop as soon as the subset is large enough.

I am struggling to find an implementation that would allow for checks of how many valid results are generated already. Is it even possible?

I have used
joblib
before. If I understand correctly this uses the
pool
methods of multiprocessing as a backend which only works for separate tasks? I am thinking that
queues
might be helpful but thus far I failed at implementing them.

Answer

I found a working solution. I decided to run the while loop in parallel and have the different processes interact over a shared counter. Furthermore, I vectorized the search for suitable samples.

The vectorization yielded a ~300x speedup and running on 4 cores speeds up the computation ~twofold.

First I tried to implement separate processes and put the results into a queue. Turns out these aren't made to store large amounts of data.

If someone sees another bottleneck in that code I would be glad if someone pointed it out.

With my basically nonexistent knowledge about parallel computing I found it really hard to puzzle this together, especially since the example on the internet are all very basic. I learnt a lot though =)

My code:

import numpy as np
import pandas as pd
import itertools
from multiprocessing import Pool, Lock, Value
from datetime import datetime
import settings


val = Value('i', 0)
worker_ID = Value('i', 1)
lock = Lock()

def findSamp(no_trees, df, no_feat, no_samp):
    lock.acquire()
    print 'starting worker - {0}'.format(worker_ID.value)
    worker_ID.value +=1
    worker_ID_local = worker_ID.value
    lock.release()

    max_iter = 100000
    samp = []
    feat = []
    iter_outer = 0
    iter = 0
    while val.value < no_trees and iter_outer<max_iter:
        rand_feat = np.random.choice(df.shape[1], no_feat, replace=False

        #get samples with random features from dataset;
        #find and select samples that don't have missing values in the random features
        samp_rand = df.iloc[:,rand_feat]
        nan_idx = np.unique(np.where(samp_rand == -1)[0])
        all_idx = np.arange(df.shape[0])
        notnan_bool = np.invert(np.in1d(all_idx, nan_idx))
        notnan_idx = np.where(notnan_bool == True)[0]

        if notnan_idx.shape[0] >= no_samp:
            #if enough samples for random feature subset, select no_samp samples randomly
            notnan_idx_rand = np.random.choice(notnan_idx, no_samp, replace=False)
            rand_feat_rand = rand_feat

            lock.acquire()
            val.value += 1
            #x = val.value
            lock.release()
            #print 'no of trees generated: {0}'.format(x)
            samp.append(notnan_idx_rand)
            feat.append(rand_feat_rand)

        else:
            #increase iter_outer counter if no sample subset could be found for random feature subset
            iter_outer += 1

        iter+=1
    if iter >= max_iter:
        print 'exiting worker{0} because iter >= max_iter'.format(worker_ID_local)
    else:
        print 'worker{0} - finished'.format(worker_ID_local)
    return samp, feat

def initialize(*args):
    global val, worker_ID, lock
    val, worker_ID, lock  = args

def star_findSamp(i_df_no_feat_no_samp):
    return findSamp(*i_df_no_feat_no_samp)


if __name__ == '__main__':
    np.random.seed(43)
    datafile = '...'
    df = pd.read_csv(datafile, sep=" ", nrows = 89)
    df = df.fillna(-1)
    df = df.iloc[:, 6:]

    no_feat = 700
    no_samp = 10
    no_trees = 5000


    startTime = datetime.now()
    print 'starting multiprocessing'
    ncores = 4
    p = Pool(ncores, initializer=initialize, initargs=(val, worker_ID, lock))
    args = itertools.izip([no_trees]*ncores, itertools.repeat(df), itertools.repeat(no_feat), itertools.repeat(no_samp))

    result = p.map(star_findSamp, args)#, callback=log_result)
    p.close()
    p.join()

    print '{0} sample subsets for tree training have been found'.format(val.value)

    samples = [x[0] for x in result if x != None]
    samples = np.vstack(samples)
    features = [x[1] for x in result if x != None]
    features = np.vstack(features)
    print datetime.now() - startTime