Initial commit

parents
Showing with 753 additions and 0 deletions
## Example files for the title:
# Parallel R, by Q. McCallum
[![Parallel R, by Q. McCallum](http://akamaicovers.oreilly.com/images/9781449309916/cat.gif)](https://www.safaribooksonline.com/library/view/title/9781449317850//)
The following applies to example files from material published by O’Reilly Media, Inc. Content from other publishers may include different rules of usage. Please refer to any additional usage rights explained in the actual example files or refer to the publisher’s website.
O'Reilly books are here to help you get your job done. In general, you may use the code in O'Reilly books in your programs and documentation. You do not need to contact us for permission unless you're reproducing a significant portion of the code. For example, writing a program that uses several chunks of code from our books does not require permission. Answering a question by citing our books and quoting example code does not require permission. On the other hand, selling or distributing a CD-ROM of examples from O'Reilly books does require permission. Incorporating a significant amount of example code from our books into your product's documentation does require permission.
We appreciate, but do not require, attribution. An attribution usually includes the title, author, publisher, and ISBN.
If you think your use of code examples falls outside fair use or the permission given here, feel free to contact us at <permissions@oreilly.com>.
Please note that the examples are not production code and have not been carefully testing. They are provided "as-is" and come with no warranty of any kind.
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
This directory contains R scripts that demonstrate the "multicore"
package, and are based on examples in chapter three of the book
"Parallel R".
The example scripts can be executed using a command such as:
% R --slave -f mclapply.R
or using the "source" function within an R session:
> source('mclapply.R')
Example scripts:
mclapply.R Using mclapply to implement parallel kmeans
mccores.R The mc.cores argument
mcsetseed.R The mc.setseed argument
mcpreschedule.R The mc.preschedule argument
mcpvec.R Example use of the pvec function
mcparallelcollect.R Example use of the parallel and collect functions
mcrng.R Using a parallel RNG with multicore
mclowlevel.R Using the low-level API to implement a stripped-
down version of the mclapply function.
library(multicore)
results <- mclapply(1:100, function(i) Sys.getpid(), mc.cores=2)
print(unique(unlist(results)))
options(cores=3)
results <- mclapply(1:100, function(i) Sys.getpid())
print(unique(unlist(results)))
library(multicore)
library(MASS)
results <- mclapply(rep(25, 4),
function(nstart) kmeans(Boston, 4, nstart=nstart))
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]
print(result)
library(multicore)
# Simplified version of mclapply that adds an "mc.init" argument
mclapply.init <- function(X, FUN, ..., mc.cores=4, mc.init=NULL) {
cores <- max(min(mc.cores, length(X)), 1)
ix <- lapply(1:cores, function(i) seq(i, length(X), by=cores))
forkloop <- function(core) {
proc <- fork()
if (inherits(proc, "masterProcess")) {
sendMaster(tryCatch({
suppressWarnings(rm(".Random.seed", pos=.GlobalEnv))
if (is.function(mc.init))
mc.init(core, cores)
lapply(X[ix[[core]]], FUN, ...)
},
error=function(e) {
lapply(ix[[core]], function(i) e)
}))
exit(0)
}
proc$pid
}
pids <- sapply(1:cores, forkloop)
results <- vector("list", length(X))
while (! is.null(ready <- selectChildren(pids, 1))) {
if (is.integer(ready)) {
for (pid in ready) {
data <- readChild(pid)
if (is.raw(data)) {
core <- which(pid == pids)
results[ix[[core]]] <- unserialize(data)
}
}
}
}
names(results) <- names(X)
results
}
# Define an "mc.init" function that defines a worker ID variable
set.worker.id <- function(id, cores) {
assign(".MC.WORKER.ID", id, pos=.GlobalEnv)
}
# Try out mclapply.init, initializing the workers with "set.worker.id"
mclapply.init(11:13, function(i) c(i, .MC.WORKER.ID),
mc.cores=2, mc.init=set.worker.id)
# Define an "mc.init" function that sets the RNG seed of the
# workers in the same way as the real mclapply function.
set.worker.seed <- function(id, cores) {
set.seed(Sys.getpid())
}
# Try out mclapply.init, initializing the workers with "set.worker.seed"
mclapply.init(1:3, function(i) rnorm(1), mc.init=set.worker.seed)
library(multicore)
# Define our set of three contrived (but illustrative) functions
fun1 <- function() {Sys.sleep(10); 1}
fun2 <- function() {Sys.sleep(5); 2}
fun3 <- function() {Sys.sleep(1); 3}
# Start the three functions executing in the background
cat('1) Starting three tasks:\n')
f1 <- parallel(fun1())
f2 <- parallel(fun2())
f3 <- parallel(fun3())
# Here is where you might do something useful before you
# commit to waiting for all of the results
# Wait for the results of all three computations
cat('Calling collect with wait=TRUE:\n')
print(collect(list(f1, f2, f3)))
# Start the same three functions executing again
cat('2) Starting three tasks:\n')
f1 <- parallel(fun1())
f2 <- parallel(fun2())
f3 <- parallel(fun3())
# Check for results, but don't wait, so none are ready
cat('Calling collect with wait=FALSE:\n')
print(collect(list(f1, f2, f3), wait=FALSE))
# Pause a bit, simulating doing something useful
cat('Sleeping for 15 seconds...\n')
Sys.sleep(15)
# Now all the results are finished, so we get them in a list
cat('Calling collect with wait=FALSE:\n')
print(collect(list(f1, f2, f3), wait=FALSE))
# When call collect this time we get a list of three NULL's
# indicating that each of the worker processes has exited
cat('Calling collect with wait=FALSE:\n')
print(collect(list(f1, f2, f3), wait=FALSE))
# Finally we get a single NULL
cat('Calling collect with wait=FALSE:\n')
print(collect(list(f1, f2, f3), wait=FALSE))
# Start the three functions one last time
cat('3) Starting three tasks:\n')
f1 <- parallel(fun1())
f2 <- parallel(fun2())
f3 <- parallel(fun3())
# Because wait is FALSE and we have a timeout, we find out
# about the events (result or process death) one at a time
# until we get the final NULL
cat('Calling collect with wait=FALSE, timeout=1000000:\n')
print(collect(list(f1, f2, f3), wait=FALSE, timeout=1000000))
cat('Calling collect with wait=FALSE, timeout=1000000:\n')
print(collect(list(f1, f2, f3), wait=FALSE, timeout=1000000))
cat('Calling collect with wait=FALSE, timeout=1000000:\n')
print(collect(list(f1, f2, f3), wait=FALSE, timeout=1000000))
cat('Calling collect with wait=FALSE, timeout=1000000:\n')
print(collect(list(f1, f2, f3), wait=FALSE, timeout=1000000))
cat('Calling collect with wait=FALSE, timeout=1000000:\n')
print(collect(list(f1, f2, f3), wait=FALSE, timeout=1000000))
cat('Calling collect with wait=FALSE, timeout=1000000:\n')
print(collect(list(f1, f2, f3), wait=FALSE, timeout=1000000))
cat('Calling collect with wait=FALSE, timeout=1000000:\n')
print(collect(list(f1, f2, f3), wait=FALSE, timeout=1000000))
library(multicore)
# Make the task time vary a lot to increase the need for load balancing
set.seed(93564990)
sleeptime <- abs(rnorm(10, 10, 10))
cat('mclapply with prescheduling:\n')
tm <- system.time(mclapply(sleeptime, Sys.sleep, mc.cores=4))
print(tm)
cat('mclapply without prescheduling:\n')
tm <- system.time(mclapply(sleeptime, Sys.sleep, mc.cores=4, mc.preschedule=FALSE))
print(tm)
library(multicore)
x <- 1:10
results <- pvec(x, '^', 1/3)
print(results)
library(multicore)
library(snow)
library(rsprng)
nw <- 3
seed <- 7777442
kind <- 0
para <- 0
f1 <- parallel({initSprngNode(0, nw, seed, kind, para); rnorm(1)})
f2 <- parallel({initSprngNode(1, nw, seed, kind, para); rnorm(1)})
f3 <- parallel({initSprngNode(2, nw, seed, kind, para); rnorm(1)})
results <- collect(list(f1, f2, f3))
print(unlist(results, use.names=FALSE))
cl <- makeCluster(3, type="SOCK")
seed <- 7777442
clusterSetupSPRNG(cl, seed=seed)
results <- clusterEvalQ(cl, rnorm(1))
print(unlist(results, use.names=FALSE))
stopCluster(cl)
# This example demonstrates the perils of using mclapply
# with mc.set.seed=FALSE.
library(multicore)
cat('mc.set.seed=FALSE when .Random.seed is not set:\n')
results <- mclapply(1:3, function(i) rnorm(3), mc.cores=3, mc.set.seed=FALSE)
print(results)
# This sets .Random.seed to a nonrepeatable value in the global environment
ignore <- rnorm(1)
cat('mc.set.seed=FALSE when .Random.seed is set:\n')
results <- mclapply(1:3, function(i) rnorm(3), mc.cores=3, mc.set.seed=FALSE)
print(results)
# This sets .Random.seed to a repeatable value
set.seed(7777442)
cat('mc.set.seed=TRUE when .Random.seed is set:\n')
results <- mclapply(1:3, function(i) rnorm(3), mc.cores=3, mc.set.seed=TRUE)
print(results)
This directory contains R scripts that demonstrate the "parallel"
package, and are based on examples in chapter three of the book
"Parallel R". Note that you must be using R 2.14.0 or later to execute
these examples. If you're using an older version of R you won't be able
to load the "parallel" package.
The example scripts can be executed using a command such as:
% R --slave -f parstarted.R
or using the "source" function within an R session:
> source('parstarted.R')
Example scripts:
parstarted.R Using mclapply and clusterApply to implement
parallel kmeans
parmakecluster.R Creating PSOCK and FORK cluster objects
parrng.R Using the new L'Ecuyer-CMRG RNG with parallel
library(parallel)
type <- if (exists("mcfork", mode="function")) "FORK" else "PSOCK"
cores <- getOption("mc.cores", detectCores())
cl <- makeCluster(cores, type=type)
results <- parLapply(cl, 1:10, sqrt)
print(results)
stopCluster(cl)
library(parallel)
RNGkind("L'Ecuyer-CMRG")
mclapply(1:2, function(i) rnorm(1))
set.seed(7777442)
mc.reset.stream()
results <- mclapply(1:2, function(i) rnorm(1))
print(unlist(results))
set.seed(7777442)
mc.reset.stream()
results <- mclapply(1:2, function(i) rnorm(1))
print(unlist(results))
cl <- makeCluster(4, type="FORK")
clusterSetRNGStream(cl, 7777442)
unlist(clusterEvalQ(cl, rnorm(1)))
clusterSetRNGStream(cl, 7777442)
results <- clusterEvalQ(cl, rnorm(1))
print(unlist(results))
stopCluster(cl)
library(parallel)
library(MASS) # needed to load the Boston data frame
# Parallel kmeans using the "multicore-derived" API with parallel RNG
RNGkind("L'Ecuyer-CMRG")
mc.cores <- detectCores()
results <- mclapply(rep(25, 4),
function(nstart) kmeans(Boston, 4, nstart=nstart),
mc.cores=mc.cores)
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]
print(result)
# Parallel kmeans using the "snow-derived" API with parallel RNG
cl <- makeCluster(detectCores())
clusterSetRNGStream(cl)
clusterEvalQ(cl, library(MASS))
results <- clusterApply(cl, rep(25, 4),
function(nstart) kmeans(Boston, 4, nstart=nstart))
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]
print(result)
stopCluster(cl)
This directory contains R scripts that demonstrate the "snow" package,
and are based on the examples in chapter two of the book "Parallel R".
The example scripts can be executed using a command such as:
% R --slave -f kmeans.R
or using the "source" function within an R session:
> source('kmeans.R')
Using the "source" function allows the timing plots to be displayed
interactively in clusterapplylb.R, loadbalancing.R and parlapply.R.
Note that config.R will have to be edited, since it contains host and
user names that very likely don't exist on your system.
Example scripts:
kmeans.R Parallel kmeans example using clusterApply
workerinit.R Worker initialization
clusterapplylb.R clusterApplyLB function example
parlapply.R parLapply function example
clustersplit.R Using clusterSplit to implement parVapply
loadbalancing.R Using clusterApplyLB to implement parLapplyLB
env.R Demonstrating function serialization
rng.R Using clusterSetupRNG for parallel RNG
config.R How to configure different workers differently
mpi.R Simple example of creating and using an MPI cluster
batchmpi.sh Simple PBS/TORQUE script for executing mpi.R
#!/bin/sh
#PBS -N SNOWMPI
#PBS -j oe
cd $PBS_O_WORKDIR
orterun -n 1 /usr/bin/R --slave -f mpi.R > mpi-$PBS_JOBID.out 2>&1
library(snow)
cl <- makeSOCKcluster(4)
par(ask=TRUE)
set.seed(7777442)
sleeptime <- abs(rnorm(10, 10, 10))
tm <- snow.time(clusterApplyLB(cl, sleeptime, Sys.sleep))
plot(tm)
cat(sprintf("Elapsed time for clusterApplyLB: %f\n", tm$elapsed))
set.seed(7777442)
sleeptime <- abs(rnorm(10, 10, 10))
tm <- snow.time(clusterApply(cl, sleeptime, Sys.sleep))
plot(tm)
cat(sprintf("Elapsed time for clusterApply: %f\n", tm$elapsed))
stopCluster(cl)
library(snow)
cl <- makeSOCKcluster(4)
results <- clusterSplit(cl, 1:30)
print(results)
parVapply <- function(cl, x, fun, ...) {
do.call("c", clusterApply(cl, clusterSplit(cl, x), fun, ...))
}
results <- parVapply(cl, 1:10, '^', 1/3)
print(results)
stopCluster(cl)
# WARNING: You will have to change the host and user names
# in order to successfully run this example.
library(snow)
tryCatch({
workerList <- list(list(host='n1'),
list(host='n2', user='steve'))
cl <- makeSOCKcluster(workerList)
results <- clusterEvalQ(cl, Sys.info()[['user']])
print(results)
stopCluster(cl)
workerList <- list(list(host='n1', outfile='n1.log', user='weston'),
list(host='n2', outfile='n2-1.log'),
list(host='n2', outfile='n2-2.log'))
cl <- makeSOCKcluster(workerList, user='steve')
results <- clusterEvalQ(cl, Sys.glob('*.log'))
print(results)
stopCluster(cl)
},
error=function(e) {
print(conditionMessage(e))
cat("Did you change the host and user names in this script for your system?",
file=stderr())
})
library(snow)
cl <- makeCluster(4, type="SOCK")
a <- 1:4
x <- rnorm(4)
clusterExport(cl, "x")
mult <- function(s) s * x
parLapply(cl, a, mult)
pmult <- function(cl) {
a <- 1:4
x <- rnorm(4)
mult <- function(s) s * x
parLapply(cl, a, mult)
}
pmult(cl)
pmult <- function(cl, a, x) {
x # force x
mult <- function(s) s * x
parLapply(cl, a, mult)
}
scalars <- 1:4
dat <- rnorm(4)
pmult(cl, scalars, dat)
pmult <- function(cl, a, x) {
mult <- function(s, x) s * x
environment(mult) <- .GlobalEnv
parLapply(cl, a, mult, x)
}
scalars <- 1:4
dat <- rnorm(4)
pmult(cl, scalars, dat)
stopCluster(cl)
# Load MASS package to get "Boston" data frame
library(MASS)
#######################################################
# Example use of the kmeans function
result <- kmeans(Boston, 4, nstart=100)
print(result)
#######################################################
# Sequential splitting using lapply function
results <- lapply(rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart))
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]
print(result)
#######################################################
# Parallel version, converted from lapply version
library(snow)
cl <- makeSOCKcluster(3)
ignore <- clusterEvalQ(cl, {library(MASS); NULL})
results <- clusterApply(cl, rep(25, 4), function(nstart)
kmeans(Boston, 4, nstart=nstart))
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]
print(result)
stopCluster(cl)
library(snow)
cl <- makeCluster(4, type="SOCK")
par(ask=TRUE)
# Wrapper for clusterApplyLB that sends the function and
# the "fixed" arguments to the workers only once, rather
# than in every task. In some cases this can significantly
# reduce communication, thus improving performance.
parLapplyLB <- function(cl, x, fun, ...) {
clusterCall(cl, LB.init, fun, ...)
r <- clusterApplyLB(cl, x, LB.worker)
clusterEvalQ(cl, rm('.LB.fun', '.LB.args', pos=globalenv()))
r
}
# Worker initialization function called by parLapplyLB
LB.init <- function(fun, ...) {
assign('.LB.fun', fun, pos=globalenv())
assign('.LB.args', list(...), pos=globalenv())
NULL
}
# Worker task function called by parLapplyLB
LB.worker <- function(x) {
do.call('.LB.fun', c(list(x), .LB.args))
}
bigsleep <- function(sleeptime, mat) Sys.sleep(sleeptime)
bigmatrix <- matrix(0, 2000, 2000)
sleeptime <- rep(1, 100)
# Time parLapplyLB with our contrived example
tm <- snow.time(parLapplyLB(cl, sleeptime, bigsleep, bigmatrix))
plot(tm)
cat(sprintf("Elapsed time for parLapplyLB: %f\n", tm$elapsed))
# Time clusterApplyLB with the same contrived example
tm <- snow.time(clusterApplyLB(cl, sleeptime, bigsleep, bigmatrix))
plot(tm)
cat(sprintf("Elapsed time for clusterApplyLB: %f\n", tm$elapsed))
# Cleanup
stopCluster(cl)
#!/usr/bin/Rscript
library(snow)
library(Rmpi)
# This script spawns workers when creating the cluster, so make sure
# that mpirun didn't start multiple copies of this script and that
# there are some process slots available for spawned workers.
if (mpi.comm.size(0) > 1 || mpi.universe.size() < 2) {
if (mpi.comm.rank(0) == 0)
cat('must use -n 1 and specify at least two slots\n', file=stderr())
mpi.quit()
}
# Create a cluster and make sure it works
cl <- makeMPIcluster(mpi.universe.size() - 1)
r <- clusterEvalQ(cl, R.version.string)
print(unlist(r))
# Stop the cluster, finalize MPI and quit
stopCluster(cl)
mpi.quit()
library(snow)
cl <- makeCluster(4, type="SOCK")
par(ask=TRUE)
bigsleep <- function(sleeptime, mat) Sys.sleep(sleeptime)
bigmatrix <- matrix(0, 2000, 2000)
sleeptime <- rep(1, 100)
tm <- snow.time(clusterApply(cl, sleeptime, bigsleep, bigmatrix))
plot(tm)
cat(sprintf("Elapsed time for clusterApply: %f\n", tm$elapsed))
tm <- snow.time(parLapply(cl, sleeptime, bigsleep, bigmatrix))
plot(tm)
cat(sprintf("Elapsed time for parLapply: %f\n", tm$elapsed))
stopCluster(cl)
library(snow)
cl <- makeCluster(4, type="SOCK")
results <- clusterSetupRNG(cl, type='RNGstream', seed=c(1,22,333,444,55,6))
print(unlist(results))
stopCluster(cl)
library(snow)
cl <- makeSOCKcluster(3)
# Define a function that loads a specified vector of packages
worker.init <- function(packages) {
for (p in packages) {
library(p, character.only=TRUE)
}
NULL
}
# Load the MASS and boot packages on each of the workers
results <- clusterCall(cl, worker.init, c('MASS', 'boot'))
# Verify that MASS and boot are loaded on all the workers
results <- clusterEvalQ(cl, search())
print(results)
# Assign a unique worker ID to each worker using clusterApply
results <- clusterApply(cl, seq(along=cl), function(id) WORKER.ID <<- id)
# Verify that WORKER.ID is set on each worker
results <- clusterEvalQ(cl, paste("I am worker", WORKER.ID))
print(results)
# Cleanup
stopCluster(cl)
logo.png

2.67 KB

This directory contains R scripts that demonstrate the "multicore"
package, and are based on examples in chapter three of the book
"Parallel R".
The example scripts can be executed using a command such as:
% R --slave -f mclapply.R
or using the "source" function within an R session:
> source('mclapply.R')
Example scripts:
mclapply.R Using mclapply to implement parallel kmeans
mccores.R The mc.cores argument
mcsetseed.R The mc.setseed argument
mcpreschedule.R The mc.preschedule argument
mcpvec.R Example use of the pvec function
mcparallelcollect.R Example use of the parallel and collect functions
mcrng.R Using a parallel RNG with multicore
mclowlevel.R Using the low-level API to implement a stripped-
down version of the mclapply function.
library(multicore)
results <- mclapply(1:100, function(i) Sys.getpid(), mc.cores=2)
print(unique(unlist(results)))
options(cores=3)
results <- mclapply(1:100, function(i) Sys.getpid())
print(unique(unlist(results)))
library(multicore)
library(MASS)
results <- mclapply(rep(25, 4),
function(nstart) kmeans(Boston, 4, nstart=nstart))
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]
print(result)
library(multicore)
# Simplified version of mclapply that adds an "mc.init" argument
mclapply.init <- function(X, FUN, ..., mc.cores=4, mc.init=NULL) {
cores <- max(min(mc.cores, length(X)), 1)
ix <- lapply(1:cores, function(i) seq(i, length(X), by=cores))
forkloop <- function(core) {
proc <- fork()
if (inherits(proc, "masterProcess")) {
sendMaster(tryCatch({
suppressWarnings(rm(".Random.seed", pos=.GlobalEnv))
if (is.function(mc.init))
mc.init(core, cores)
lapply(X[ix[[core]]], FUN, ...)
},
error=function(e) {
lapply(ix[[core]], function(i) e)
}))
exit(0)
}
proc$pid
}
pids <- sapply(1:cores, forkloop)
results <- vector("list", length(X))
while (! is.null(ready <- selectChildren(pids, 1))) {
if (is.integer(ready)) {
for (pid in ready) {
data <- readChild(pid)
if (is.raw(data)) {
core <- which(pid == pids)
results[ix[[core]]] <- unserialize(data)
}
}
}
}
names(results) <- names(X)
results
}
# Define an "mc.init" function that defines a worker ID variable
set.worker.id <- function(id, cores) {
assign(".MC.WORKER.ID", id, pos=.GlobalEnv)
}
# Try out mclapply.init, initializing the workers with "set.worker.id"
mclapply.init(11:13, function(i) c(i, .MC.WORKER.ID),
mc.cores=2, mc.init=set.worker.id)
# Define an "mc.init" function that sets the RNG seed of the
# workers in the same way as the real mclapply function.
set.worker.seed <- function(id, cores) {
set.seed(Sys.getpid())
}
# Try out mclapply.init, initializing the workers with "set.worker.seed"
mclapply.init(1:3, function(i) rnorm(1), mc.init=set.worker.seed)
library(multicore)
# Define our set of three contrived (but illustrative) functions
fun1 <- function() {Sys.sleep(10); 1}
fun2 <- function() {Sys.sleep(5); 2}
fun3 <- function() {Sys.sleep(1); 3}
# Start the three functions executing in the background
cat('1) Starting three tasks:\n')
f1 <- parallel(fun1())