RG20 RG20 - 1 month ago 8
Python Question

MPI python-Open-MPI

I have 20,000*515 numpy matrix,representing biological datas. I need to find the correlation coefficient of the biological data,meaning as a result I would have a 20,000*20,000 matrix with correlation value. Then I populate a numpy array with 1's and 0's if each correlation coefficient is greater than a threshold value.

I used numpy.corrcoef to find the correlation coefficient and it works well in my machine.

Then I wanted to put in the cluster(having 10 computers and node varying from 2 to 8). when I tried putting it in the cluster, each node generating (40)random numbers and getting those 40 random columns from the biological data resulting in 20,000*40 matrix, I ran into memory issue saying.

mpirun noticed that process rank # with PID # on node name exited on signal 9 (Killed).

Then I tried rewriting the program like getting each rows finding the correlation coefficient and if the value is greater than the threshold then store 1 in the matrix or else 0 rather than creating a correlation matrix.
But it takes 1.30 hrs to run this program.I need to run it 100 times.

Can anyone please suggest a better way of doing this, like how to solve the memory issue by allocating jobs once each core has finished it's job. I am new to MPI. Below is my code.

If you need any more information please let me know.
Thank you

import numpy
from mpi4py import MPI
import time


Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();

RandomNumbers={};





rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)


Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);




RandomNumbers[Rank]=rndm_indx;
CORR_CR=numpy.zeros((Data.shape[0],Data.shape[0]));


start=time.time();
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-np.mean(Data[i]);
alpha1=1./np.linalg.norm(Data[i]);
for j in range(i,Data.shape[0]):
if(i==j):
CORR_CR[i][j]=1;
else:
Data[j]=Data[j]-np.mean(Data[j]);
alpha2=1./np.linalg.norm(Data[j]);
corr=np.inner(Data[i],Data[j])*(alpha1*alpha2);
corr=int(np.absolute(corrcoef)>=0.9)
CORR_CR[i][j]=CORR_CR[j][i]=corr
end=time.time();

CORR_CR=CORR_CR-numpy.eye(CORR_CR.shape[0]);


elapsed=(end-start)
print('Total Time',elapsed)

Answer

The execution time of the program you posted is about 96s on my computer. Let's optimize a couple of things before exploring parallel computations.

  • Let's store the norms of the vectors to avoid computing it each time the norm is needed. Getting alpha1=1./numpy.linalg.norm(Data[i]); out of the second loop is a good starting point. Since the vectors do not change during computations, their norms can be computed in advance:

    alpha=numpy.zeros(Data.shape[0])
    for i in range(0,Data.shape[0]):
      Data[i]=Data[i]-numpy.mean(Data[i])
      alpha[i]=1./numpy.linalg.norm(Data[i])
    
    for i in range(0,Data.shape[0]):
      for j in range(i,Data.shape[0]):
        if(i==j):
           CORR_CR[i][j]=1;
        else:
           corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
           corr=int(numpy.absolute(corr)>=0.9)
           CORR_CR[i][j]=CORR_CR[j][i]=corr
    

The computation time is already down to 17s.

  • Assuming that the vectors are not highly correlated, most of the correlation coefficients will be rounded to zero. Hence, the matrix is likely to be sparce (full of zeros). Let's use the scipy.sparse.coo_matrix sparce matrix format, which is very easy to populate: the non-null items and their coordinates i,j are to be stored in lists.

    data=[]
    ii=[]
    jj=[]
    ...
      if(corr!=0):
               data.append(corr)
               ii.append(i)
               jj.append(j)
               data.append(corr)
               ii.append(j)
               jj.append(i)
    ...
    CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))
    

The computation time is down to 13s (negligeable improvement ?) and the memory footprint is greatly reduced. It would be a major improvement if larger datasets were to be considered.

  • for loops in python are pretty unefficient. See for loop in python is 10x slower than matlab for instance. But there are plenty of ways around, such as using vectorized operations or optimized iterators such as those provided by numpy.nditer. One of the reasons for for loops being unefficient is that python is a interpreted language: not compilation occurs in the process. Hence, to overcome this problem, the most tricky part of the code can be compiled by using a tool like Cython.

    1. The critical part of the code are written in Cython, in a dedicated file correlator.pyx.

    2. This file is turned into a correlator.c file by Cython

    3. This file is compiled by your favorite c compiler gcc to build a shared library correlator.so

    4. The optimized function can be used in your program after import correlator.

The content of correlator.pyx, starting from Numpy vs Cython speed , looks like:

import numpy

cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):

    cdef unsigned int rows = array.shape[0]
    cdef  int cols = array.shape[1]
    cdef unsigned int row, row2
    cdef  int one=1
    ii=[]
    jj=[]
    data=[]

    for row in range(imin,imax):
        for row2 in range(row,rows):
            if row==row2:
               data.append(0)
               ii.append(row)
               jj.append(row2)
            else:
                corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
                corr=int(numpy.absolute(corr)>=0.9)
                if(corr!=0):
                    data.append(corr)
                    ii.append(row)
                    jj.append(row2)

                    data.append(corr)
                    ii.append(row2)
                    jj.append(row)

    return ii,jj,data

where scipy.linalg.cython_blas.ddot() will perform the inner product.

To cythonize and compile the .pyx file, the following makefile can be used (I hope you are using Linux...)

all: correlator correlatorb


correlator: correlator.pyx
    cython -a correlator.pyx

correlatorb: correlator.c
    gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c

The new correlation function is called in the main python file by:

import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])

By using a compiled loop, the time is down to 5.4s! It's already ten times faster. Moreover, this computations are performed on a single process!

Let's introduce parallel computations. Please notice that two arguments are added to the function process : imin and imax. These arguments signals the range of rows of CORR_CR that are computed. It is performed so as to anticipate the use of parallel computation. Indeed, a straightforward way to parallelize the program is to split the outer for loop (index i) to the different processes.

Each processes will perform the outer for loop for a particular range of the index i which is computed so as to balance the workload between the processes.

The program has to complete the following operations:

  1. Process 0 ("root process") reads the vectors Data in the file.
  2. The Data is broadcast to all processes by using the MPI function bcast().
  3. The range of indexes i to be considered by each process is computed.
  4. The correlation coefficient are computed by each process for the corresponding indexes. The non-null terms of the matrix are stored in lists data,ii,jj on each process.
  5. These lists are gathered on the root process by calling the MPI function gather(). It produces three lists of Size lists which are concatenated to get 3 lists required to create the sparce adjacency matrix.

Here goes the code:

import numpy
from mpi4py import MPI
import time
import scipy.sparse

import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)

Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();

#a dummy set of data is created here. 
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)

Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
    #Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
    Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
    lin=numpy.linspace(0.,1.,515)
    for i in range(Data.shape[0]):
         Data[i]+=numpy.sin((1+i%10)*10*lin)*100

start=time.time();

#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)

RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]

#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-numpy.mean(Data[i])
        if numpy.linalg.norm(Data[i])==0:
            print "process "+str(Rank)+" is facing a big problem"
        else:
            alpha[i]=1./numpy.linalg.norm(Data[i])


#balancing the computational load between the processes. 
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows. 
#Of course, the last rank must care about more rows than the first one.

ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
    nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
    icurr=0
    for i in range(Size):
        nbjob=0
        while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
            nbjob+=(Data.shape[0]-icurr)
            icurr+=1
        ilimits[i+1]=icurr
    ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)          

#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])

#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)

if Rank==0:
   #concatenate the lists
   data2=sum(data,[])
   ii2=sum(ii,[])
   jj2=sum(jj,[])
   #create the adjency matrix
   CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))

   print CORR_CR

end=time.time();           

elapsed=(end-start)
print('Total Time',elapsed)

By running mpirun -np 4 main.py, the computation time is 3.4s. It's not 4 time faster... This is likely due to the fact that the bottleneck of the computation is the computations of scalar products, which requires a large memory bandwidth. And my personnal computer is really limited regarding the memory bandwidth...

Last comment: there are plenty of possibilities for improvements. - The vectors in Data are copied to every processes... This affects the memory required by the program. Dispatching the computation in a different way and trading memory against communications could overcome this problem... - Each process still computes the norms of all the vectors...It could be improved by having each process computing the norms of some vector and using the MPI function allreduce() on alpha...

What to do with this adjacency matrix ?

I think you already know the answer to this question, but you can provide this adjacency matrix to sparse.csgraph routines such as connected_components() or laplacian(). Indeed, you are not very far from spectral clustering!

For instance, if the clusters are obvious, using connected_components() is sufficient:

if Rank==0:
    # coo to csr format
    S=CORR_CR.tocsr()
    print S
    n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)

    print "number of different famillies "+str(n_components)

    numpy.set_printoptions(threshold=numpy.nan)
    print labels