Nicola Nicola - 1 month ago 8
R Question

R, dplyr and snow: how to parallelize functions which use dplyr

Let's suppose that I want to apply, in a parallel fashion,

myfunction
to each row of
myDataFrame
. Suppose that
otherDataFrame
is a dataframe with two columns:
COLUNM1_odf
and
COLUMN2_odf
used for some reasons in
myfunction
. So I would like to write a code using
parApply
like this:

clus <- makeCluster(4)
clusterExport(clus, list("myfunction","%>%"))

myfunction <- function(fst, snd) {
#otherFunction and aGlobalDataFrame are defined in the global env
otherFunction(aGlobalDataFrame)

# some code to create otherDataFrame **INTERNALLY** to this function
otherDataFrame %>% filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
return(otherDataFrame)
}
do.call(bind_rows,parApply(clus,myDataFrame,1,function(r) { myfunction(r[1],r[2]) }


The problem here is that R doesn't recognize
COLUMN1_odf
and
COLUMN2_odf
even if I insert them in
clusterExport
. How can I solve this problem? Is there a way to "export" all the object that
snow
needs in order to not enumerate each of them?

EDIT 1: I've added a comment (in the code above) in order to specify that the
otherDataFrame
is created interally to
myfunction
.

EDIT 2: I've added some pseudo-code in order to generalize
myfunction
: it now uses a global dataframe (
aGlobalDataFrame
and another function
otherFunction
)

Answer

Done some experiments, so I solved my problem (with the suggestion of Benjamin and considering the 'edit' that I've added to the question) with:

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(magrittr)})
clusterExport(clus, "myfunction", "otherfunction", aGlobalDataFrame)

myfunction <- function(fst, snd) {
 #otherFunction and aGlobalDataFrame are defined in the global env
 otherFunction(aGlobalDataFrame)

 # some code to create otherDataFrame **INTERNALLY** to this function
 otherDataFrame %>% dplyr::filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
 return(otherDataFrame)
}

do.call(bind_rows, parApply(clus, myDataFrame, 1, 
        {function(r) { myfunction(r[1], r[2]) } )

In this way I've registered aGlobalDataFrame, myfunction and otherfunction, in short all the function and the data used by the function used to parallelize the job (myfunction itself)

Comments