Ataxias - 2 months ago 12

Python Question

I am trying to extend the scikit-learn class

`KNeighborsClassifier`

The parallelization scheme is the following:

given that we want to calculate distances between all elements of set A and set B, for each element in A (taken

The time-consuming operation is calculating the distance between any two elements, so each process should carry out this fundamental operation.

The problem is that parallel execution is much slower than serial execution (using Python's

`multiprocessing`

I suspect that this is related to using shared variables, which are communicated in the background. The question is, which variables are being communicated and how could this be avoided?

Code:

`class WordMoversKNN(KNeighborsClassifier):`

"""K nearest neighbors classifier using the Word Mover's Distance.

Parameters

----------

W_embed : array, shape: (vocab_size, embed_size)

Precomputed word embeddings between vocabulary items.

Row indices should correspond to the columns in the bag-of-words input.

n_neighbors : int

Number of neighbors to use by default for :meth:`k_neighbors` queries.

n_jobs : int

The number of parallel jobs to run for Word Mover's Distance computation.

If ``-1``, then the number of jobs is set to the number of CPU cores.

verbose : int, optional

Controls the verbosity; the higher, the more messages. Defaults to 0.

"""

def __init__(self, W_embed, n_neighbors=1, n_jobs=1, verbose=5):

self.W_embed = W_embed

self.verbose = verbose

if n_jobs == -1:

n_jobs = mp.cpu_count()

super(WordMoversKNN, self).__init__(n_neighbors=n_neighbors, n_jobs=n_jobs, metric='precomputed', algorithm='brute')

def _wmd(self, i, row, X_train):

"""Compute the WMD between training sample i and given test row.

Assumes that `row` and train samples are sparse BOW vectors summing to 1.

"""

union_idx = np.union1d(X_train[i].indices, row.indices)

W_minimal = self.W_embed[union_idx]

W_dist = euclidean_distances(W_minimal)

bow_i = X_train[i, union_idx].A.ravel()

bow_j = row[:, union_idx].A.ravel()

return emd(bow_i, bow_j, W_dist)

def _wmd_row(self, row, X_train):

"""Wrapper to compute the WMD of a row with all training samples.

Assumes that `row` and train samples are sparse BOW vectors summing to 1.

Useful for parallelization.

"""

n_samples_train = X_train.shape[0]

return [self._wmd(i, row, X_train) for i in range(n_samples_train)]

def _pairwise_wmd(self, X_test, X_train=None, ordered=True):

"""Computes the word mover's distance between all train and test points.

Parallelized over rows of X_test.

Assumes that train and test samples are sparse BOW vectors summing to 1.

Parameters

----------

X_test: scipy.sparse matrix, shape: (n_test_samples, vocab_size)

Test samples.

X_train: scipy.sparse matrix, shape: (n_train_samples, vocab_size)

Training samples. If `None`, uses the samples the estimator was fit with.

ordered: returns result keeping the order of the rows in dist (following X_test).

Otherwise, the rows of dist follow a potentially random order which does not follow the order

of indices in X_test. However, computation is faster in this case (asynchronous parallel execution)

Returns

-------

dist : array, shape: (n_test_samples, n_train_samples)

Distances between all test samples and all train samples.

"""

n_samples_test = X_test.shape[0]

if X_train is None: X_train = self._fit_X

if (self.n_jobs == 1) or (n_samples_test < 2*self.n_jobs): # to avoid parallelism overhead for small test samples

dist = [ self._wmd_row( test_sample , X_train ) for test_sample in X_test ]

else:

if self.verbose:

print("WordMoversKNN set to use {} parallel processes".format(self.n_jobs))

if ordered:

dist = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)( delayed(self._wmd_row) (test_sample, X_train) for test_sample in X_test)

else: # Asynchronous call is faster but returns results in random order

pool = mp.Pool(processes=self.n_jobs)

results = [pool.apply_async(self._wmd_row, args=(test_sample, X_train)) for test_sample in X_test]

dist = [p.get() for p in results]

return np.array(dist)

def calculate(self, X):

"""Predict the class labels for the provided data

Parameters

----------

X : scipy.sparse matrix, shape (n_test_samples, vocab_size)

Test samples.

Returns

-------

y : array of shape [n_samples]

Class labels for each data sample.

"""

X = check_array(X, accept_sparse='csr', copy=True)

X = normalize(X, norm='l1', copy=False)

dist = self._pairwise_wmd(X)

# A matrix of distances given to predict in combination with metric = 'precomputed'

# means that no more distance calculations take place. Neighbors are found simply by sorting

return super(WordMoversKNN, self).predict(dist)

Answer

The main problem was that *for each row* of matrix `X_test`

a new process was spawned, each time requiring passing the full `X_train`

as well as other variables (e.g. `self.X_embed`

) for every process. Pickling and dispatching these variables is very time consuming, because of their size.
I got a tremendous speed-up when I split the matrix `X_test`

in `n_jobs`

chunks of size `X_test.shape[0]//n_jobs`

, overall spawning only `n_jobs`

processes and having to pass the variables `n_jobs`

times instead of `X_test.shape[0]`

times.
However, because of the size of the variables that have to be communicated, I believe that for this type of problem, data parallelism is a much more appropriate approach than task parallelism, and I therefore intend to use `mpi4py`

, so that each process separately creates its own `self.W_embed`

, `X_train`

and `X_test`

matrices, communicating only the results of the computation.