example_fun <- function(i) {
Sys.sleep(1)
paste("Running i =", i, "in process:", Sys.getpid())
}
Parallelising tasks
As we mentioned in the introduction to this course, an important tool in leveraging modern hardware in the face of stagnating clock speeds is through parallel computation.
Parallel computation is the simultaneous execution of computations across multiple executing units. These maybe cores within a CPU, maybe multiple CPUs (possibly each with multiple cores), and maybe multiple computers systems.
There are a number of distinct types of workload parallelisation that depend on the actual task being executed in parallel and its properties. Let’s look at a few core concepts before examining parallel workflows in R.
Embarassingly parallel problems
In parallel computing, an embarrassingly parallel workload or problem is one where little or no effort is needed to separate the problem into a number of parallel tasks. This is often the case where there is little or no dependency or need for communication between those parallel tasks, or for results between them.
Data parallel problems
One common type of embarassibgy parallel problems are data parallel problems. This is when the same operations is performed on different subsets of same data.
Examples of data parallel problems:
Generating a synthetic person (if their attributes are independent of each other).
Counting the number of occurrences of a given token in individual documents of a corpus.
Analysing many satellite images using the same algorithm for a specific feature.
Map-reduce parallel programming
Many data parallel problem are solved through the map-reduce programming paradigm. The core principles behind ‘map-reduce’ computing approaches involve
splitting the data as input to a large number of independent computations
collecting the results for further analysis
Parallel computing in R
Data parallel map-reduce problems
A good indication that you are dealing with a map reduce problem that could benefit from data parallelisation is if, when profiling, you find that your code is spending a lot of time in function like lapply
, Map()
, purrr::map
and related functions.
All these functions follow a map reduce paradigm where the input is split up into it’s elements, the same function is applied to each element of the input and the results are aggregated together in the end and returned.
In this section, we’ll see how approaches to optimising such computations through parallelisation have evolved in R.
parallel
package
The first widely used package for parallelisation was the parallel
package which has been shipping along with base R since version 2.14.0. (released in October 2011).
It’s particularly suited to map-reduce data parallel problems as it’s main interface consists of parallel versions of lapply
and similar.
Let’s have a look at how we might use parallel
package to parallelise a simple computation involving lapply
.
First let’s creates a function that lapply
will execute called example_fun
.
The function takes an integer i
as input, sends the process it is running on to sleep for one second and then returns a character string which records the value of i
and the process ID that is performing the computation.
In our example workflow, we then use lapply
to iterate over our data, here a sequence of integers of from 1 to 8. Let’s run our example and time it’s execution
[[1]]
[1] "Running i = 1 in process: 4098"
[[2]]
[1] "Running i = 2 in process: 4098"
[[3]]
[1] "Running i = 3 in process: 4098"
[[4]]
[1] "Running i = 4 in process: 4098"
[[5]]
[1] "Running i = 5 in process: 4098"
[[6]]
[1] "Running i = 6 in process: 4098"
[[7]]
[1] "Running i = 7 in process: 4098"
[[8]]
[1] "Running i = 8 in process: 4098"
toc()
8.035 sec elapsed
We can see that lapply
iterates through our input vector sequentially, all computation is performed by the same process and execution takes about 8
seconds to run.
mclapply()
Now, let’s try and parallelise our computation using the parallel
package.
First let’s load it and decide how much compute power on our machine we want to allocate to the task.
library(parallel)
We can do that by first examining how many cores we have available using detectCores()
[1] 10
I’ve got 10 cores available which is the same as the number of my physical cores. Some systems might show more if the system allows hyperthreading. To return the number of physical cores, you can use detectCores(logical = FALSE)
.
Given I have 10 available, I’ll assign 8 (detectCores() - 2
) to a variable n_cores
that I can use to specify the number of cores I want to use when registering parallel backends. If you have less cores available, you should assign at least 1 less than what you have available to n_cores.
n_cores <- detectCores() - 2
Now, on to parallelising our workflow!
One of the simplest functions used early on to parallelise workflows through the parallel
packages is mclapply
. This can be used as a pretty much drop in replacement for lapply
. The main difference is that we use argument mc.cores
to specify the number of cores we want to parallelise across.
Let’s create some new data that has length equal to the number of cores we’re going to use and run our computation using mclapply()
.
[[1]]
[1] "Running i = 1 in process: 4159"
[[2]]
[1] "Running i = 2 in process: 4160"
[[3]]
[1] "Running i = 3 in process: 4161"
[[4]]
[1] "Running i = 4 in process: 4162"
[[5]]
[1] "Running i = 5 in process: 4163"
[[6]]
[1] "Running i = 6 in process: 4164"
[[7]]
[1] "Running i = 7 in process: 4165"
[[8]]
[1] "Running i = 8 in process: 4166"
toc()
1.018 sec elapsed
This worked on my macOS machine!
It and completed in about 1 second and the output shows that each value of i
was computed on in a different process. It will also have worked for anyone running the code on a Linux machine.
However! For any Windows users out there, this will not have worked!
That’s because mclapply()
uses process forking. One of the benefits of forking is that global variables in the main R session are inherited by the child processes. However, this can cause instabilities and the type of forking used is not supported on Windows machines (and actually can be problematic when running in RStudio too!)
parLapply()
If you’d written a package using mclapply()
to improve it’s performance but now you wanted to support parallelisation on Windows, you’d have to re-write everything using parLapply()
instead.
To use paLapply()
we need to create a cluster object to specify the parallel backend using the parallel::makeCluster()
function. By default it creates a cluster of type "PSOCK"
which uses sockets. A socket is simply a mechanism with which multiple processes or applications running on your computer (or different computers, for that matter) can communicate with each other and will work on any of our local machines. Each thread runs separately without sharing objects or variables, which can only be passed from the parent process explicitly.
We the pass the cluster as the first argument to parLapply()
followed by the standard arguments we are used to in lapply
.
cl <- makeCluster(n_cores)
cl
socket cluster with 8 nodes on host 'localhost'
[[1]]
[1] "Running i = 1 in process: 4168"
[[2]]
[1] "Running i = 2 in process: 4169"
[[3]]
[1] "Running i = 3 in process: 4175"
[[4]]
[1] "Running i = 4 in process: 4173"
[[5]]
[1] "Running i = 5 in process: 4174"
[[6]]
[1] "Running i = 6 in process: 4171"
[[7]]
[1] "Running i = 7 in process: 4170"
[[8]]
[1] "Running i = 8 in process: 4172"
toc()
1.013 sec elapsed
This now works on all systems. It does however includes disadvantages like increased communication overhead (when dealing with larger data), and the fact that global variables have to be identified and explicitly exported to each worker in the cluster before processing (not evident in this simple example but something to be aware of).
The cluster we have created is also till technically running. To free resources when you finish, it’s always good practice to stop it when finished.
stopCluster(cl)
foreach
package
An important stop in the evolution of parallel computation in R was the development of the foreach
package. The package formalised the principle that developers should be able to write parallel code irrespective of the back-end it will eventually be run on while choice of the backend should be left to the user and be defined at runtime.
The form of foreach
expressions looks like a for loop but can be easily expressed in an equivalent way to lapply
expressions.
Let’s convert our previous example to code that work with foreach
The expression starts with a foreach
call in which we specify the data we want to iterate over. This can be followed by the operator %do%
to run the expression that follows sequentially or %dopar%
to run the expression in parallel.
Let’s run our example:
Warning: executing %dopar% sequentially: no parallel backend registered
[[1]]
[1] "Running i = 1 in process: 4098"
[[2]]
[1] "Running i = 2 in process: 4098"
[[3]]
[1] "Running i = 3 in process: 4098"
[[4]]
[1] "Running i = 4 in process: 4098"
[[5]]
[1] "Running i = 5 in process: 4098"
[[6]]
[1] "Running i = 6 in process: 4098"
[[7]]
[1] "Running i = 7 in process: 4098"
[[8]]
[1] "Running i = 8 in process: 4098"
toc()
8.04 sec elapsed
As you can see, example_fun(i)
was actually run sequentially. That’s because, despite using , we had not registered a parallel backend to run the expression (hence the warning) so it falls back to a sequential execution plan.
Now, let’s run our code in parallel. To do so we need to register an appropriate parallel backend using a separate package like doParallel
.
To register a parallel backend we use function registerDoParallel()
. The function takes a cluster object as it’s first argument cl
like the one created in our previous example with the parallel
function makeCluster()
.
Loading required package: iterators
cl <- makeCluster(n_cores)
registerDoParallel(cl)
tic()
foreach(i = data) %dopar% example_fun(i)
[[1]]
[1] "Running i = 1 in process: 4299"
[[2]]
[1] "Running i = 2 in process: 4304"
[[3]]
[1] "Running i = 3 in process: 4298"
[[4]]
[1] "Running i = 4 in process: 4305"
[[5]]
[1] "Running i = 5 in process: 4300"
[[6]]
[1] "Running i = 6 in process: 4301"
[[7]]
[1] "Running i = 7 in process: 4302"
[[8]]
[1] "Running i = 8 in process: 4303"
toc()
1.033 sec elapsed
Now computation is indeed performed in parallel and completes again in close to 1 second.
Combining results
A nice feature of foreach
is that you can specify a function to combine the end results of execution through argument .combine
.
Here foreach
will combine the results into a character vector using c()
[1] "Running i = 1 in process: 4299" "Running i = 2 in process: 4304"
[3] "Running i = 3 in process: 4298" "Running i = 4 in process: 4305"
[5] "Running i = 5 in process: 4300" "Running i = 6 in process: 4301"
[7] "Running i = 7 in process: 4302" "Running i = 8 in process: 4303"
Whereas here foreach
will combine the results into a character matrix using rbind()
[,1]
result.1 "Running i = 1 in process: 4299"
result.2 "Running i = 2 in process: 4304"
result.3 "Running i = 3 in process: 4298"
result.4 "Running i = 4 in process: 4305"
result.5 "Running i = 5 in process: 4300"
result.6 "Running i = 6 in process: 4301"
result.7 "Running i = 7 in process: 4302"
result.8 "Running i = 8 in process: 4303"
Error handling
foreach
also offers nice error handling.
Let’s edit our function and throw an error when the value of i
is 2.
example_fun_error <- function(i) {
if (i == 2L) stop()
Sys.sleep(1)
paste("Running i =", i, "in process:", Sys.getpid())
}
By default, foreach
execution will fail and throw an error is it encounters one.
Through argument .errorhandling
however we can choose to either pass the error through to the results:
[[1]]
[1] "Running i = 1 in process: 4299"
[[2]]
<simpleError in example_fun_error(i): >
[[3]]
[1] "Running i = 3 in process: 4298"
[[4]]
[1] "Running i = 4 in process: 4305"
[[5]]
[1] "Running i = 5 in process: 4300"
[[6]]
[1] "Running i = 6 in process: 4301"
[[7]]
[1] "Running i = 7 in process: 4302"
[[8]]
[1] "Running i = 8 in process: 4303"
Or just remove the result of the failed computation from the overall results.
[[1]]
[1] "Running i = 1 in process: 4299"
[[2]]
[1] "Running i = 3 in process: 4298"
[[3]]
[1] "Running i = 4 in process: 4305"
[[4]]
[1] "Running i = 5 in process: 4300"
[[5]]
[1] "Running i = 6 in process: 4301"
[[6]]
[1] "Running i = 7 in process: 4302"
[[7]]
[1] "Running i = 8 in process: 4303"
Environment management
As mentioned previously, because we are using a socket cluster, object and packages loaded in the global environment where the parent process is executed are not available in the child processes.
For example, the following code uses a function from package tools
to determine the extension of two file names in a parallel foreach
loop. Although the package is loaded in the global environment, it is not available to the child processes and execution results in an error.
Error in file_ext(file): task 1 failed - "could not find function "file_ext""
To make it available to the child processes we need to explicitly pass the package name through argument .packages
. (if child processes need additional variables from the global environment they can be passed similarly through argument .export
)
[[1]]
[1] "txt"
[[2]]
[1] "log"
Now the function file_ext
is available to the child processes and execution completes successfully.
Just to note though that you can easily get around all this by simply including the namespace of the function in function call:
[[1]]
[1] "txt"
[[2]]
[1] "log"
OK, that’s it for our foreach
demo although we’ll return to some details about registering parallel backends in the next section when we compare it the future
ecosystem of packages.
For now let’s stop our cluster and move on.
stopCluster(cl)
The futureverse
Welcome to the futurevese
, the future of parallel execution in R!
The future
package by Henrik Bengtsson and associated package ecosystem provides an an elegant unified abstraction for running parallel computations in R over both “local” and “remote” backends.
With a single unified application-programming interface (API), the futureverse can:
replace simple use cases such as
mclapply()
andparLapply()
by offering parallel versions of theapply
family of functions through packagefuture.apply
.unify and simplify registering parallel backends for
foreach
expressions through packagedoFuture
.parallelise
purrr
expressions by offering parallel versions of many of thepurrr
package functions in packagefurrr
.
This simplified parallel backend specification means it easily can scale to multi-machine or multi-host parallel computing using a variety of parallel computing back-ends.
It also automatically identifies packages and variables in the parent environment and passes them to the child processes.
Execution plan specification in the future
verse
Let’s start with examining how we specify execution strategies in the futureverse
which is consistent regardless of the package you choose to write your parallel code in.
The function used to specify an execution strategy is plan()
.
The future package provides the following built-in backends:
sequential
: Resolves futures sequentially in the current R process, e.g.plan(sequential)
. Also used to close down background workers when parallel execution is no longer required.multisession
: Resolves futures asynchronously (in parallel) in separate R sessions running in the background on the same machine, e.g.plan(multisession)
andplan(multisession, workers = 2)
.multicore
: Resolves futures asynchronously (in parallel) in separate forked R processes running in the background on the same machine, e.g.plan(multicore)
andplan(multicore, workers = 2)
. This backend is not supported on Windows.cluster
: Resolves futures asynchronously (in parallel) in separate R sessions running typically on one or more machines, e.g.plan(cluster)
,plan(cluster, workers = 2)
, andplan(cluster, workers = c("n1", "n1", "n2", "server.remote.org"))
.
Other package provide additional evaluation strategies. For example, the future.batchtools
package implements on top of the batchtools
package, e.g. plan(future.batchtools::batchtools_slurm)
. These types of futures are resolved via job schedulers, which typically are available on high-performance compute (HPC) clusters, e.g. LSF, Slurm, TORQUE/PBS, Sun Grid Engine, and OpenLava.
I’m not going to go into this in detail but the nice thing about future.batchtools
is that it allows R scripts themselves running on a cluster to submit batch jobs to the scheduler as well as specify parallel backends within each job.
Let’s now move on to examine the various packages available for parallelising R code depending on the programming packages you already use and prefer.
future.apply
package
First let’s look at future.apply
which provides parallel versions of the apply
family of functions, therefore replacing approaches in the parallel
package.
The future_lapply()
function can be used as a parallel drop in replacement for lapply()
.
If an execution plan is not specified, the function runs sequentially as lapply()
would.
library(future.apply)
tic()
future_lapply(X = data, FUN = example_fun)
[[1]]
[1] "Running i = 1 in process: 4098"
[[2]]
[1] "Running i = 2 in process: 4098"
[[3]]
[1] "Running i = 3 in process: 4098"
[[4]]
[1] "Running i = 4 in process: 4098"
[[5]]
[1] "Running i = 5 in process: 4098"
[[6]]
[1] "Running i = 6 in process: 4098"
[[7]]
[1] "Running i = 7 in process: 4098"
[[8]]
[1] "Running i = 8 in process: 4098"
toc()
8.055 sec elapsed
To run in parallel, we just specify a parallel execution strategy using the plan()
function.
Let’s use multisession
which works on all operating systems through creating separate R sessions. The default behaviour is to use parallely::availableCores()
to determine the number of cores to run across. We can override that behaviour using the workers
argument.
plan(multisession, workers = n_cores)
tic()
future_lapply(X = data, FUN = example_fun)
[[1]]
[1] "Running i = 1 in process: 4457"
[[2]]
[1] "Running i = 2 in process: 4453"
[[3]]
[1] "Running i = 3 in process: 4460"
[[4]]
[1] "Running i = 4 in process: 4456"
[[5]]
[1] "Running i = 5 in process: 4454"
[[6]]
[1] "Running i = 6 in process: 4455"
[[7]]
[1] "Running i = 7 in process: 4459"
[[8]]
[1] "Running i = 8 in process: 4458"
toc()
1.394 sec elapsed
furrr
package
Package furrr
combines purrr
’s family of mapping functions with future
’s parallel processing capabilities. The result is near drop in replacements for purrr
functions such as map()
and map2_dbl()
, which can be replaced with their furrr
equivalents of future_map()
and future_map2_dbl()
to map in parallel.
Let’ go ahead use future_map
in our example. Under a sequential execution strategy it executes just like purrr::map()
would.
library(furrr)
plan(sequential)
tic()
future_map(data, ~example_fun(.x))
Attaching package: 'purrr'
The following objects are masked from 'package:foreach':
accumulate, when
[[1]]
[1] "Running i = 1 in process: 4098"
[[2]]
[1] "Running i = 2 in process: 4098"
[[3]]
[1] "Running i = 3 in process: 4098"
[[4]]
[1] "Running i = 4 in process: 4098"
[[5]]
[1] "Running i = 5 in process: 4098"
[[6]]
[1] "Running i = 6 in process: 4098"
[[7]]
[1] "Running i = 7 in process: 4098"
[[8]]
[1] "Running i = 8 in process: 4098"
toc()
8.062 sec elapsed
Under multisession
it executes in parallel.
plan(multisession)
tic()
future_map(data, ~example_fun(.x))
[[1]]
[1] "Running i = 1 in process: 4759"
[[2]]
[1] "Running i = 2 in process: 4764"
[[3]]
[1] "Running i = 3 in process: 4762"
[[4]]
[1] "Running i = 4 in process: 4763"
[[5]]
[1] "Running i = 5 in process: 4766"
[[6]]
[1] "Running i = 6 in process: 4765"
[[7]]
[1] "Running i = 7 in process: 4758"
[[8]]
[1] "Running i = 8 in process: 4761"
toc()
2.656 sec elapsed
One thing to note is that the furrr
package approaches have a little more overhead than other approaches. This should be relatively smaller with more computationally intensive executions.
foreach
using doFuture
backend
Finally, if you are a fan of foreach
, you can still continue to use it with the futureverse but use library doFuture
and function registerDoFuture()
to register parallel backends.
library("doFuture")
registerDoFuture()
plan(multisession)
tic()
foreach(i = data) %dopar% example_fun(i)
[[1]]
[1] "Running i = 1 in process: 5149"
[[2]]
[1] "Running i = 2 in process: 5152"
[[3]]
[1] "Running i = 3 in process: 5147"
[[4]]
[1] "Running i = 4 in process: 5146"
[[5]]
[1] "Running i = 5 in process: 5150"
[[6]]
[1] "Running i = 6 in process: 5145"
[[7]]
[1] "Running i = 7 in process: 5154"
[[8]]
[1] "Running i = 8 in process: 5151"
toc()
1.44 sec elapsed
In the passed, to use foreach
with more varied parallel backends you we need to use additional specialised packages. Due to the generic nature of futures, the doFuture package provides the same functionality as many of the existing doNnn packages combined, e.g. doMC, doParallel, doMPI, and doSNOW.
doFuture
replaces existing doNnn
packages
As mentioned, a nice feature of using the futureverse is that environment configuration of child processes happen automatically without having to explicitly pass names of packages and objects.
Task parallel problems
All the examples we’ve discussed above refer to data parallel problems which perform the same operation on subsets of the input data. These are the most common examples of embarassingly parallel problems and often the easiest to parallelise.
However, they are not the only type of problem that can be parallelised. Another type of parallelism involves task parallelism.
Task Parallelism refers to the concurrent execution of different task across multiple executing units. Again these maybe cores within a CPU, maybe multiple CPUs (possibly each with multiple cores), and maybe multiple computers systems. Inputs to the differing operations maybe the same but can also be different data.
Let’s look at the differences between data and task parallelism:
Data parallelism | Task parallelism |
---|---|
Same operations are performed on different subsets of same data. | Different operations are performed on the same or different data. |
Synchronous computation | Asynchronous computation |
Speedup is more as there is only one execution thread operating on all sets of data. | Speedup is less as each processor will execute a different thread or process on the same or different set of data. |
Amount of parallelization is proportional to the input data size. | Amount of parallelization is proportional to the number of independent tasks to be performed. |
Examples of task parallel problems:
Pre-processing different sources of data before being able to combine and analyse.
Applying different algorithms to a single satellite images to detect separate feature.
Task parallelisms and futures
A way to deploy task parallelism is through the concept of futures.
In programming, a future is an abstraction for a value that may be available at some point in the future. The state of a future can either be unresolved or resolved. As soon as it is resolved, the value is available instantaneously.
If the value is queried while the future is still unresolved by a process that requires it to proceed, the process blocked until the future is resolved.
Exactly how and when futures are resolved depends on what strategy is used to evaluate them. For instance, a future can be resolved using a sequential strategy, which means it is resolved in the current R session. Other strategies may be to resolve futures asynchronously, for instance, by evaluating expressions in parallel on the current machine or concurrently on a compute cluster.
The purpose of the future package, which forms the basis of the futureverse, is to provide a very simple and uniform way of evaluating R expressions asynchronously.
By assigning expressions to asynchronous futures, the current/main R process does not block, which means it is available for further processing while the futures are being resolved in separate processes running in the background. In other words, futures provide a simple yet powerful construct for parallel and / or distributed processing in R.
Let’s expand our example to see how we can use futures to perform task parallelisation.
Let’s write two functions that each perform something slightly different:
example_fun1()
goes to sleep for 1 second and then returns a data.frame containing the value ofi
, thepid
(process ID) and theresult
ofi + 10
example_fun2()
does something very similar but goes to sleep for 2 seconds whileresult
is the result ofi
+ 20.
example_fun1 <- function(i) {
Sys.sleep(1) ## Do nothing for 1 second
data.frame(i = i, pid = Sys.getpid(), result = i + 10)
}
example_fun2 <- function(i) {
Sys.sleep(2) ## Do nothing for 2 second
data.frame(i = i, pid = Sys.getpid(), result = i + 20)
}
Let’s imagine these function represent different pre-processing that needs to be done to data before we can analyse it. In the example analytical workflow below, we start by creating some data, a sequence of integers of length n_cores/2
.
The next part of the workflow performs the pre-processing of each element of our data data using lapply
and cbind
to combine the results into a data.frame. The script first performs the pre-processing using example_fun1
to create processed_data_1
and afterwards performs the pre-processing using example_fun2
to create processed_data_2
. Each step happens sequentially.
Finally, the analysis of our processed data is represented by the sum of the values in the results column of processed_data_1
& processed_data_2
.
data <- 1:(n_cores/2)
data
[1] 1 2 3 4
tic()
# Pre-processing of data
processed_data_1 <- do.call(rbind, lapply(data, example_fun1))
processed_data_2 <- do.call(rbind, lapply(data, example_fun2))
# Analysis of data
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
12.069 sec elapsed
processed_data_1
i pid result
1 1 4098 11
2 2 4098 12
3 3 4098 13
4 4 4098 14
processed_data_2
i pid result
1 1 4098 21
2 2 4098 22
3 3 4098 23
4 4 4098 24
We can see that all operations were carried out by the same process sequentially, taking a total of ~ length(data) * 1 + length(data) * 2
= `r length(data) * 1 + length(data) * 2`
seconds.
Using future
& %<-%
to parallelise independent tasks
What we could do to speed up the execution of our code would be to parallelise the pre-processing step of our analysis. To do this we have use the future
package to create processed_data_1
and processed_data_2
as futures that can be evaluated in parallel. To do so we use the %<-%
operator instead of the standard <-
operator.
library(future)
plan(sequential)
tic()
processed_data_1 %<-% do.call(rbind, lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, lapply(data, example_fun2))
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
12.075 sec elapsed
processed_data_1
i pid result
1 1 4098 11
2 2 4098 12
3 3 4098 13
4 4 4098 14
processed_data_2
i pid result
1 1 4098 21
2 2 4098 22
3 3 4098 23
4 4 4098 24
If we run our futures version using a sequential execution plan, we see the same behaviour as we did without using futures.
However, let’s have a look at what happens when specify a multisession
execution plan:
plan(multisession)
tic()
processed_data_1 %<-% do.call(rbind, lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, lapply(data, example_fun2))
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
8.129 sec elapsed
processed_data_1
i pid result
1 1 5546 11
2 2 5546 12
3 3 5546 13
4 4 5546 14
processed_data_2
i pid result
1 1 5554 21
2 2 5554 22
3 3 5554 23
4 4 5554 24
We can see that processed_data_1
and processed_data_2
were created in different processes in parallel and that the whole operation now took ~ length(data) * 2
= 8 seconds, i.e. the time it took for the slowest task to execute.
Combining data and task parallelism
Given that the lapply
call is also amenable to data parallelism, we can go a step further and combine task and data parallelism in our execution plan. This will involve nested paralellisation, where the futures are initially parallelised and within each, execution of lapply
is also parallelised. To do this we need two things:
swap our our
lapply
s withfuture_lapply
s.create a nested execution plan and allocate the correct number of workers to each. To do so we provide a list containing the evaluation strategy we want at each level of nestedness. To be able to set the appropriate number of workers on each one, we also wrap each evaluation strategy definition in function
tweak()
which allows us to override the default values.
Let’s give it a go!
plan(list(tweak(multisession, workers = 2),
tweak(multisession, workers = n_cores/2)))
tic()
processed_data_1 %<-% do.call(rbind, future_lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, future_lapply(data, example_fun2))
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
2.892 sec elapsed
processed_data_1
i pid result
1 1 6041 11
2 2 6043 12
3 3 6040 13
4 4 6042 14
processed_data_2
i pid result
1 1 6160 21
2 2 6161 22
3 3 6159 23
4 4 6158 24
As you can see, each result value in each processed data.frame was computed in parallel in a completely separate process! And now our whole computation executes in ~ 3 secs, the time it takes to run a single iteration of the slowest function plus some overhead to handle the complexity of the execution plan. All in all that’s a nice speed up from our original 12 seconds!
Let’s wrap up and close down any parallel workers
plan(sequential)
Further Reading:
For a deeper dive into parallelising R code, I highly recommend the following: