JamesTreanor JamesTreanor - 2 months ago 20
Python Question

Scikit-learn: Parallelize stochastic gradient descent

I have a fairly large training matrix (over 1 billion rows, two features per row). There are two classes (0 and 1).
This is too large for a single machine, but fortunately I have about 200 MPI hosts at my disposal. Each is a modest dual-core workstation.

Feature generation is already successfully distributed.

The answers in Multiprocessing scikit-learn suggest it is possible to distribute the work of a SGDClassifier:


You can distribute the data sets across cores, do partial_fit, get the weight vectors, average them, distribute them to the estimators, do partial fit again.


When I have run partial_fit for the second time on each estimator, where do I go from there to get a final aggregate estimator?

My best guess was to average the coefs and the intercepts again and make an estimator with those values. The resulting estimator gives a different result than an estimator constructed with fit() on the entire data.

Details



Each host generates a local matrix and a local vector. This is n rows of the test set and the corresponding n target values.

Each host uses the local matrix and local vector to make an SGDClassifier and do a partial fit. Each then sends the coef vector and the intercept to root. Root averages these and sends them back to the hosts. The hosts do another partial_fit and sends the coef vector and the intercept to root.

Root constructs a new estimator with these values.

local_matrix = get_local_matrix()
local_vector = get_local_vector()

estimator = linear_model.SGDClassifier()
estimator.partial_fit(local_matrix, local_vector, [0,1])

comm.send((estimator.coef_,estimator.intersept_),dest=0,tag=rank)

average_coefs = None
avg_intercept = None

comm.bcast(0,root=0)
if rank > 0:
comm.send( (estimator.coef_, estimator.intercept_ ), dest=0, tag=rank)
else:
pairs = [comm.recv(source=r, tag=r) for r in range(1,size)]
pairs.append( (estimator.coef_, estimator.intercept_) )
average_coefs = np.average([ a[0] for a in pairs ],axis=0)
avg_intercept = np.average( [ a[1][0] for a in pairs ] )

estimator.coef_ = comm.bcast(average_coefs,root=0)
estimator.intercept_ = np.array( [comm.bcast(avg_intercept,root=0)] )
estimator.partial_fit(metric_matrix, edges_exist,[0,1])

if rank > 0:
comm.send( (estimator.coef_, estimator.intercept_ ), dest=0, tag=rank)
else:
pairs = [comm.recv(source=r, tag=r) for r in range(1,size)]
pairs.append( (estimator.coef_, estimator.intercept_) )
average_coefs = np.average([ a[0] for a in pairs ],axis=0)
avg_intercept = np.average( [ a[1][0] for a in pairs ] )

estimator.coef_ = average_coefs
estimator.intercept_ = np.array( [avg_intercept] )
print("The estimator at rank 0 should now be working")


Thank you!

Answer Source

Training a linear model on a dataset with 1e9 samples and 2 features is very likely to underfit or waste CPU / IO time in case the data is actually linearly separable. Don't waste time thinking about parallelizing such a problem with a linear model:

  • either switch to a more complex class of models (e.g. train random forests on smaller partitions of the data that fit in memory and aggregate them)

  • or either select random subsamples of your dataset of increasing and train linear models. Measure the predictive accuracy on an held out test and stop when you see diminishing returns (probably after a couple 10s of thousands of samples of the minority class).