Overview

Parallel Computing

Modern computing architectures, from high-performance computing clusters to desktops, and laptops, typically are built around multi-core processors. This allows modern computing platforms to execute many tasks simultaneously through threading and other means. We can use parallel computing paradigms to take advantage of this architecture for computationally intensive methods, such as the re-sampling methods we have studied previously.

The following is quoted from the Wikipedia page linked above.

Parallel computing is a type of computation where many calculations or the execution of processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved at the same time. There are several different forms of parallel computing: bit-level, instruction-level, data, and task parallelism.

We will be concerned with task parallelism specifically what is called coarse-grained parallelism. This form of parallelism is appropriate for a series of related tasks which can be executed independently and then summarized.

There are a number of opportunities to make use of coarse-grained parallelism in statistics and data-science, including:

  • Monte Carlo simulations,
  • bootstrap computations,
  • permutation tests,
  • cross-validation.

Scope

In these notes we will discuss two approaches to parallel computing in R.

  • Multi-core computing on a single machine using mclapply() from the parallel package.

  • Parallel computing on one or more machines using the %dopar% operator from the doParallel package.

In addition we will discuss:

  • Options for splitting computations across multiple processes and how to choose among these by considering both the structure of the tasks to be performed and the parallel architecture available.

  • How to obtain valid and reproducible pseudo-random numbers when using parallel computations.

Reading

Multi-core computing using the parallel package

Most modern computers, including laptops and desktops, come with multiple processors or cores. The distinction between processors and cores is not important for our purposes but see here for some discussion. The basic idea of multi-core computing is to allow a single program (e.g. R) to run multiple threads simultaneously in order to reduce the ‘wall time’ required to complete the computing task.

In R, we can make use of multiple cores using the parallel package distributed in the base distribution since version 2.14.0. By default, the parallel package achieves parallelism by forking which is available on Unix-like operating systems, i.e. Linux and Mac, but not Windows. For Windows operating systems which do not support process forking, it is necessary to use a socket cluster.

In the image below, we see a parent process with three forked children.

When an R process is forked, child R processes have read-only access to objects in the global environment. For comparison, in a socket cluster data needed by a given “child” process must first be copied to memory owned by that process.

Using mclapply()

In R, *apply functions iterate over a data-structure applying a function to each subset, i.e. each row, column, item in a vector, component of a list, etc.

The lapply() function is used to iterate over a list and mclapply() splits these iterations into multiple processes. Parallelizing sequential code to use mclapply() generally involves:

  • Loading the parallel package with library(parallel),
  • Writing a function to handle each case,
  • Calling mclapply() with appropriate options.

Here are a couple of constructions that follow these steps.

library(parallel)
myFunc = function(case) {
 # ... do something with case ...
 case_result = case
 return(case_result)
}

# when inputs to myFunc are defined relatively by case 
results = mclapply(1:nCases, myFunc)

# when myList has data/parameters that are specific to each case
results = mclapply(myList, myFunc)

And here is a minimal example.

# Fit 4 linear models in parallel: --------------------------------------------
formulas = list(
   mpg ~ wt + cyl + carb,
   mpg ~ wt + cyl + carb + wt:carb,
   mpg ~ wt*cyl + carb,
   mpg ~ wt*cyl*carb
)

# Serial version: -------------------------------------------------------------
system.time({
  fits_serial = lapply(formulas, lm, data = mtcars)  
})
##    user  system elapsed 
##   0.025   0.002   0.045
# Parallel version: -----------------------------------------------------------
system.time({
  fits_parallel = parallel::mclapply(formulas, lm, data = mtcars)  
})
##    user  system elapsed 
##   0.051   0.057   0.097

Do you need to parallelize?

When thinking of parallelizing some portion of a program it is important to remember that parallelism is not magic. There is some computational overhead involved in splitting the task, initializing child processes, communicating data, and collating results. For this reason, there is usually little to be gained in parallelizing already fast computations (less than a second) as the overhead may outweigh the time saved on sequential computations.

At the same time, the maximum impact of parallelism is generally the sequential time divided by the number of cores available and, in most cases, the actual impact will be less. As such, very long running programs may require reconsidering the algorithm, implementation, or purpose in order to be computationally feasible even with parallelism.

Parallel computing with doParallel and foreach

The doParallel package is one of several ‘parallel back-ends’ for the foreach package. The phrase ‘parallel back-end’ is a catch-all for establishing communication between multiple cores,perhaps on different physical “nodes” connected by network connections. You don’t need to know a great deal about the details of parallel back-ends to get started with parallel computing.

Below is a minimal example of how to set up a parallel back-end using doParallel.

## (install and) load library ##
#install.packages(doParallel)
library(doParallel)

# how many cores to use in the cluster? #
ncores = 2   

# set up a cluster called 'cl'
cl = makeCluster(ncores)

# register the cluster
registerDoParallel(cl)

## do some parallel computaitons with foreach
results = foreach( case = 1:10 ) %dopar% {
 ## what do do with case ? #
 result_case = case
 result_case
}

## Always shut the cluster down when done
stopCluster(cl)

In the example above, we also see the basic structure for performing parallel computations in parallel. Compare the following two code snippets.

Snippet 1:

results = c()
for (case = 1:10) {
 result_case = case
 results[case] = result_case
}

Snippet 2:

results = foreach(case = 1:10) %dopar% {
 result_case = case
 result_case
}

The foreach() function evaluates an expression for each value of the iterator ‘case’. The only difference is that in snippet 1, the expression changes results[case] in the global environment while in snippet 2 the expression is evaluated in a separate environment and returned as a value.

In the examples above, the %dopar% operator is used to execute code for each case in parallel. Using %do% instead would lead to sequential computation by the primary process.

Example

In this example we will make parallel the cross validation portion of the earlier ridge regression example. In the process, we will also discuss the concept of pre-scheduling or dividing parallel computations into chunks for each child process. An alternative to pre-scheduling is to assign child process one task at a time, creating new processes for each task as cores become available.

Here are some examples to illustrate this concept.

mclapply()

The mclapply() approach uses pre-scheduling by default. You can control this option using the mc.preschedule parameter.

library(tidyverse); library(parallel)

# with pre-scheduling, 
mclapply(1:10, function(i) Sys.getpid(), mc.cores = 2) %>%
  unlist() %>%
  matrix(nrow = 2)
##       [,1]  [,2]  [,3]  [,4]  [,5]
## [1,] 14784 14784 14784 14784 14784
## [2,] 14785 14785 14785 14785 14785
# w/o pre-scheduling, a new process is forked for each task
mclapply(1:10,
         function(i) Sys.getpid(), 
         mc.cores = 2, 
         mc.preschedule = FALSE
) %>%
  unlist() %>%
  matrix(nrow = 2)
##       [,1]  [,2]  [,3]  [,4]  [,5]
## [1,] 14786 14788 14790 14792 14794
## [2,] 14787 14789 14791 14793 14795

foreach()

The foreach() approach sets up dedicated child processes but assigns tasks to these processes one at a time. The following example illustrates this.

library(doParallel)
## Loading required package: foreach
## 
## Attaching package: 'foreach'
## The following objects are masked from 'package:purrr':
## 
##     accumulate, when
## Loading required package: iterators
cl = makeCluster(2, setup_timeout = 0.5)
registerDoParallel(cl)

foreach(i = 1:10) %dopar% {
  Sys.sleep(runif(1, max = 0.1))
  Sys.getpid()
} %>%
  unlist() %>%
  matrix(nrow = 2)
##       [,1]  [,2]  [,3]  [,4]  [,5]
## [1,] 14798 14797 14798 14797 14798
## [2,] 14797 14797 14797 14798 14797
stopCluster(cl)

When to pre-schedule?

When the compute times for tasks are roughly constant, pre-scheduling is more efficient because it reduces the overhead of creating a forked process or communicating with dedicated “child” processes. However, when times are unpredictable or come from a distribution with a long right tail it can be more efficient not to use pre-scheduling in order to avoid situations where one or more child processes dominate the wall time while others sit idle.

In other cases, it can be helpful to use a hybrid approach in which the set of tasks is divided into more chunks than the number of available cores, with each child process assigned one chunk of tasks at a time. This can be done to mitigate the worst case or “bad luck” scenarios while still using pre-scheduling to limit overhead.

Random numbers and parallel computing

Many statistical and machine learning applications rely on pseudo-random numbers for things like sampling from distributions and stochastic optimization. When child processes are spawned to compute in parallel, care needs to be taken to ensure random number streams behave as expected. This is particularly true for bootstrap inference, Monte Carlo simulations, and other applications where it is important that iterations are independent. Care is also needed to ensure results can be reliably reproduced using set.seed().

Review the section “Random Numbers” from the R help for mcparallel() and the R help page for RNGstreams for more information. There we learn that in the multi-core approach typified by calls to mclapply(), reproducible random numbers in parallel computations can be achieved if instructing R to use “L’Ecuyer-CMRG” for random number generation by calling RNGkind("L'Ecuyer-CMRG") and using pre-scheduling.

Similarly, to create reproducible results when using random number generation within foreach() expressions use the %dorng% operator from package doRNG.

For more details see the doRNG vignette.

Summary

The parallel, doParallel, and foreach packages greatly facilitate parallel computing in R. When used appropriately, performing computations in parallel can reduce the wait time needed to obtain results and make the overall research process more efficient. When deciding whether and how to parallelize it is important to keep in mind the amount and architecture of available computing resources.

Additional resources