Ataxias Ataxias - 1 month ago 6
Python Question

Why is parallel implementation slower than serial? (Python multiprocessing module)

I am trying to extend the scikit-learn class

KNeighborsClassifier
by introducing an alternative method of computing distances between neighbours (see here if interested).

The parallelization scheme is the following:
given that we want to calculate distances between all elements of set A and set B, for each element in A (taken sequentially one after the other), calculate distances to all elements in B in parallel.
The time-consuming operation is calculating the distance between any two elements, so each process should carry out this fundamental operation.

The problem is that parallel execution is much slower than serial execution (using Python's
multiprocessing
module), both when using synchronous as well as asynchronous calls, regardless of the machine and the numbers of cores used.

I suspect that this is related to using shared variables, which are communicated in the background. The question is, which variables are being communicated and how could this be avoided?

Code:

class WordMoversKNN(KNeighborsClassifier):
"""K nearest neighbors classifier using the Word Mover's Distance.
Parameters
----------

W_embed : array, shape: (vocab_size, embed_size)
Precomputed word embeddings between vocabulary items.
Row indices should correspond to the columns in the bag-of-words input.
n_neighbors : int
Number of neighbors to use by default for :meth:`k_neighbors` queries.
n_jobs : int
The number of parallel jobs to run for Word Mover's Distance computation.
If ``-1``, then the number of jobs is set to the number of CPU cores.
verbose : int, optional
Controls the verbosity; the higher, the more messages. Defaults to 0.

"""

def __init__(self, W_embed, n_neighbors=1, n_jobs=1, verbose=5):
self.W_embed = W_embed
self.verbose = verbose
if n_jobs == -1:
n_jobs = mp.cpu_count()

super(WordMoversKNN, self).__init__(n_neighbors=n_neighbors, n_jobs=n_jobs, metric='precomputed', algorithm='brute')

def _wmd(self, i, row, X_train):
"""Compute the WMD between training sample i and given test row.

Assumes that `row` and train samples are sparse BOW vectors summing to 1.
"""
union_idx = np.union1d(X_train[i].indices, row.indices)
W_minimal = self.W_embed[union_idx]
W_dist = euclidean_distances(W_minimal)
bow_i = X_train[i, union_idx].A.ravel()
bow_j = row[:, union_idx].A.ravel()
return emd(bow_i, bow_j, W_dist)

def _wmd_row(self, row, X_train):
"""Wrapper to compute the WMD of a row with all training samples.

Assumes that `row` and train samples are sparse BOW vectors summing to 1.
Useful for parallelization.
"""
n_samples_train = X_train.shape[0]
return [self._wmd(i, row, X_train) for i in range(n_samples_train)]

def _pairwise_wmd(self, X_test, X_train=None, ordered=True):
"""Computes the word mover's distance between all train and test points.

Parallelized over rows of X_test.

Assumes that train and test samples are sparse BOW vectors summing to 1.

Parameters
----------
X_test: scipy.sparse matrix, shape: (n_test_samples, vocab_size)
Test samples.

X_train: scipy.sparse matrix, shape: (n_train_samples, vocab_size)
Training samples. If `None`, uses the samples the estimator was fit with.
ordered: returns result keeping the order of the rows in dist (following X_test).
Otherwise, the rows of dist follow a potentially random order which does not follow the order
of indices in X_test. However, computation is faster in this case (asynchronous parallel execution)

Returns
-------
dist : array, shape: (n_test_samples, n_train_samples)
Distances between all test samples and all train samples.

"""
n_samples_test = X_test.shape[0]

if X_train is None: X_train = self._fit_X

if (self.n_jobs == 1) or (n_samples_test < 2*self.n_jobs): # to avoid parallelism overhead for small test samples
dist = [ self._wmd_row( test_sample , X_train ) for test_sample in X_test ]
else:
if self.verbose:
print("WordMoversKNN set to use {} parallel processes".format(self.n_jobs))
if ordered:
dist = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)( delayed(self._wmd_row) (test_sample, X_train) for test_sample in X_test)
else: # Asynchronous call is faster but returns results in random order
pool = mp.Pool(processes=self.n_jobs)

results = [pool.apply_async(self._wmd_row, args=(test_sample, X_train)) for test_sample in X_test]
dist = [p.get() for p in results]
return np.array(dist)


def calculate(self, X):
"""Predict the class labels for the provided data
Parameters
----------
X : scipy.sparse matrix, shape (n_test_samples, vocab_size)
Test samples.
Returns
-------
y : array of shape [n_samples]
Class labels for each data sample.
"""
X = check_array(X, accept_sparse='csr', copy=True)
X = normalize(X, norm='l1', copy=False)
dist = self._pairwise_wmd(X)
# A matrix of distances given to predict in combination with metric = 'precomputed'
# means that no more distance calculations take place. Neighbors are found simply by sorting
return super(WordMoversKNN, self).predict(dist)

Answer

The main problem was that for each row of matrix X_test a new process was spawned, each time requiring passing the full X_train as well as other variables (e.g. self.X_embed) for every process. Pickling and dispatching these variables is very time consuming, because of their size. I got a tremendous speed-up when I split the matrix X_test in n_jobs chunks of size X_test.shape[0]//n_jobs, overall spawning only n_jobs processes and having to pass the variables n_jobs times instead of X_test.shape[0] times. However, because of the size of the variables that have to be communicated, I believe that for this type of problem, data parallelism is a much more appropriate approach than task parallelism, and I therefore intend to use mpi4py, so that each process separately creates its own self.W_embed, X_train and X_test matrices, communicating only the results of the computation.

Comments