RG20 - 1 year ago 63

Python Question

I have 20,000*515 numpy matrix,representing biological datas. I need to find the correlation coefficient of the biological data,meaning as a result I would have a 20,000*20,000 matrix with correlation value. Then I populate a numpy array with 1's and 0's if each correlation coefficient is greater than a threshold value.

I used numpy.corrcoef to find the correlation coefficient and it works well in my machine.

Then I wanted to put in the cluster(having 10 computers and node varying from 2 to 8). when I tried putting it in the cluster, each node generating (40)random numbers and getting those 40 random columns from the biological data resulting in 20,000*40 matrix, I ran into memory issue saying.

mpirun noticed that process rank # with PID # on node name exited on signal 9 (Killed).

Then I tried rewriting the program like getting each rows finding the correlation coefficient and if the value is greater than the threshold then store 1 in the matrix or else 0 rather than creating a correlation matrix.

But it takes 1.30 hrs to run this program.I need to run it 100 times.

Can anyone please suggest a better way of doing this, like how to solve the memory issue by allocating jobs once each core has finished it's job. I am new to MPI. Below is my code.

If you need any more information please let me know.

Thank you

`import numpy`

from mpi4py import MPI

import time

Size=MPI.COMM_WORLD.Get_size();

Rank=MPI.COMM_WORLD.Get_rank();

Name=MPI.Get_processor_name();

RandomNumbers={};

rndm_indx=numpy.random.choice(range(515),40,replace=False)

rndm_indx=numpy.sort(rndm_indx)

Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);

RandomNumbers[Rank]=rndm_indx;

CORR_CR=numpy.zeros((Data.shape[0],Data.shape[0]));

start=time.time();

for i in range(0,Data.shape[0]):

Data[i]=Data[i]-np.mean(Data[i]);

alpha1=1./np.linalg.norm(Data[i]);

for j in range(i,Data.shape[0]):

if(i==j):

CORR_CR[i][j]=1;

else:

Data[j]=Data[j]-np.mean(Data[j]);

alpha2=1./np.linalg.norm(Data[j]);

corr=np.inner(Data[i],Data[j])*(alpha1*alpha2);

corr=int(np.absolute(corrcoef)>=0.9)

CORR_CR[i][j]=CORR_CR[j][i]=corr

end=time.time();

CORR_CR=CORR_CR-numpy.eye(CORR_CR.shape[0]);

elapsed=(end-start)

print('Total Time',elapsed)

Answer Source

The execution time of the program you posted is about 96s on my computer. **Let's optimize a couple of things before exploring parallel computations.**

Let's

**store the norms of the vectors to avoid computing it each time the norm is needed**. Getting`alpha1=1./numpy.linalg.norm(Data[i]);`

out of the second loop is a good starting point. Since the vectors do not change during computations, their norms can be computed in advance:`alpha=numpy.zeros(Data.shape[0]) for i in range(0,Data.shape[0]): Data[i]=Data[i]-numpy.mean(Data[i]) alpha[i]=1./numpy.linalg.norm(Data[i]) for i in range(0,Data.shape[0]): for j in range(i,Data.shape[0]): if(i==j): CORR_CR[i][j]=1; else: corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]); corr=int(numpy.absolute(corr)>=0.9) CORR_CR[i][j]=CORR_CR[j][i]=corr`

The computation time is already down to 17s.

Assuming that the vectors are not highly correlated, most of the correlation coefficients will be rounded to zero.

**Hence, the matrix is likely to be sparce (full of zeros). Let's use the**, which is very easy to populate: the non-null items and their coordinates i,j are to be stored in lists.`scipy.sparse.coo_matrix`

sparce matrix format`data=[] ii=[] jj=[] ... if(corr!=0): data.append(corr) ii.append(i) jj.append(j) data.append(corr) ii.append(j) jj.append(i) ... CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))`

The computation time is down to 13s (negligeable improvement ?) and the memory footprint is greatly reduced. It would be a major improvement if larger datasets were to be considered.

`for`

loops in python are pretty unefficient. See for loop in python is 10x slower than matlab for instance. But there are plenty of ways around, such as using vectorized operations or optimized iterators such as those provided by`numpy.nditer`

. One of the reasons for`for`

loops being unefficient is that python is a interpreted language: not compilation occurs in the process. Hence, to overcome this problem,**the most tricky part of the code can be compiled by using a tool like**`Cython`

.The critical part of the code are written in Cython, in a dedicated file

`correlator.pyx`

.This file is turned into a

`correlator.c`

file by CythonThis file is compiled by your favorite c compiler

`gcc`

to build a shared library`correlator.so`

The optimized function can be used in your program after

`import correlator`

.

The content of `correlator.pyx`

, starting from Numpy vs Cython speed , looks like:

```
import numpy
cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):
cdef unsigned int rows = array.shape[0]
cdef int cols = array.shape[1]
cdef unsigned int row, row2
cdef int one=1
ii=[]
jj=[]
data=[]
for row in range(imin,imax):
for row2 in range(row,rows):
if row==row2:
data.append(0)
ii.append(row)
jj.append(row2)
else:
corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
corr=int(numpy.absolute(corr)>=0.9)
if(corr!=0):
data.append(corr)
ii.append(row)
jj.append(row2)
data.append(corr)
ii.append(row2)
jj.append(row)
return ii,jj,data
```

where `scipy.linalg.cython_blas.ddot()`

will perform the inner product.

To cythonize and compile the .pyx file, the following makefile can be used (I hope you are using Linux...)

```
all: correlator correlatorb
correlator: correlator.pyx
cython -a correlator.pyx
correlatorb: correlator.c
gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c
```

The new correlation function is called in the main python file by:

```
import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])
```

By using a compiled loop, the time is down to 5.4s! It's already ten times faster. Moreover, this computations are performed on a single process!

**Let's introduce parallel computations.**
Please notice that two arguments are added to the function `process`

: `imin`

and `imax`

. These arguments signals the range of rows of `CORR_CR`

that are computed. It is performed so as to anticipate the use of parallel computation. Indeed, a straightforward way to parallelize the program is to split the outer `for`

loop (index `i`

) to the different processes.

Each processes will perform the outer for loop for a particular range of the index `i`

which is computed so as to balance the workload between the processes.

The program has to complete the following operations:

- Process 0 ("root process") reads the vectors
`Data`

in the file. - The
`Data`

is broadcast to all processes by using the MPI function`bcast()`

. - The range of indexes
`i`

to be considered by each process is computed. - The correlation coefficient are computed by each process for the corresponding indexes. The non-null terms of the matrix are stored in lists
`data`

,`ii`

,`jj`

on each process. - These lists are gathered on the root process by calling the MPI function
`gather()`

. It produces three lists of`Size`

lists which are concatenated to get 3 lists required to create the sparce adjacency matrix.

Here goes the code:

```
import numpy
from mpi4py import MPI
import time
import scipy.sparse
import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)
Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();
#a dummy set of data is created here.
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)
Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
#Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
lin=numpy.linspace(0.,1.,515)
for i in range(Data.shape[0]):
Data[i]+=numpy.sin((1+i%10)*10*lin)*100
start=time.time();
#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)
RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]
#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-numpy.mean(Data[i])
if numpy.linalg.norm(Data[i])==0:
print "process "+str(Rank)+" is facing a big problem"
else:
alpha[i]=1./numpy.linalg.norm(Data[i])
#balancing the computational load between the processes.
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows.
#Of course, the last rank must care about more rows than the first one.
ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
icurr=0
for i in range(Size):
nbjob=0
while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
nbjob+=(Data.shape[0]-icurr)
icurr+=1
ilimits[i+1]=icurr
ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)
#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])
#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)
if Rank==0:
#concatenate the lists
data2=sum(data,[])
ii2=sum(ii,[])
jj2=sum(jj,[])
#create the adjency matrix
CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))
print CORR_CR
end=time.time();
elapsed=(end-start)
print('Total Time',elapsed)
```

By running `mpirun -np 4 main.py`

, the computation time is 3.4s. It's not 4 time faster... This is likely due to the fact that the bottleneck of the computation is the computations of scalar products, which requires a large memory bandwidth. And my personnal computer is really limited regarding the memory bandwidth...

Last comment: there are plenty of possibilities for improvements.
- The vectors in `Data`

are copied to every processes... This affects the memory required by the program. Dispatching the computation in a different way and trading memory against communications could overcome this problem...
- Each process still computes the norms of all the vectors...It could be improved by having each process computing the norms of some vector and using the MPI function `allreduce()`

on `alpha`

...

**What to do with this adjacency matrix ?**

I think you already know the answer to this question, but you can provide this adjacency matrix to `sparse.csgraph`

routines such as `connected_components()`

or `laplacian()`

. Indeed, you are not very far from spectral clustering!

For instance, if the clusters are obvious, using `connected_components()`

is sufficient:

```
if Rank==0:
# coo to csr format
S=CORR_CR.tocsr()
print S
n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)
print "number of different famillies "+str(n_components)
numpy.set_printoptions(threshold=numpy.nan)
print labels
```