Slides and replay of my “Using R with Hadoop” webinar now available #rstats #hadoop

I owe a big “thank you” to all of you who attended my webinar yesterday “Using R with Hadoop”. Revolution Analytics partnered with us at Think Big Analytics to produce the webinar, and I owe them thanks as well.

For those of you who missed it, the slides and replay are now available from Revolution Analytics.

 

Posted in Tutorials. Tags: , . Leave a Comment »

How to specify a MapR distro when launching Elastic MapReduce clusters with the Ruby CLI

Amazon’s Elastic MapReduce Ruby client allows you to specify which of the supported Hadoop distributions to use, currently either Amazon’s Apache 1.0.3-based distribution or MapR’s M3 and M5 editions.

I found the CLI’s option documented at <http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-mapr.html>:

To launch an Amazon EMR job flow with MapR using the CLI

Set the –with-supported-products parameter to either mapr-m3 or mapr-m5 to run your job flow on the corresponding version of the MapR Hadoop distribution.

The following example launches a job flow running with the M3 Edition of MapR.

elastic-mapreduce –create –alive \
–instance-type m1.xlarge –num-instances 5 \
–with-supported-products mapr-m3

For additional information about launching job flows using the CLI, see the instructions for each job flow type in Create a Job Flow.

 

Slides from today’s Big Data Step-by-Step Tutorials: Infrastructure series and Intro to R+Hadoop with RHadoop’s rmr

Here are my presentations from today’s Boston Predictive Analytics Big Data Workshop.

All code and config files are available at github: https://github.com/jeffreybreen/tutorial-201203-big-data

My portion of the workshop was divided into four parts, three focusing on different infrastructure scenarios and ending with a deep dive into the rmr R package:

Big Data Step-by-Step: Infrastructure 1/3: Local VM

    Starting small. Just because Big Data tools like Hadoop were designed to run at “web-scale,” across many nodes, doesn’t mean you need to build a cluster—or even dedicate a single machine—to get started. In this deck we download and install a virtual machine from Cloudera which comes complete with a functioning, single-node Hadoop installation. As long as you restrict the size of your data set appropriately, this is great way to become accustomed to Hadoop and its tools. We walk through running a Hadoop Streaming job to make sure everything works. We later use this same VM to spawn a Hadoop cluster in the cloud (see part 3).


Big Data Step-by-Step: Infrastructure 2/3: Running R and RStudio on EC2

Not everyone has Big Data. Some of us have an occasional need to analyze a data set larger than comfortably fits in our existing analysis environment either due to disk, CPU, or memory constraints. For these times, launching a single, large machine in the cloud may fit the bill. This part of presentation walks through how to launch just such a machine using Amazon’s EC2 cloud computing platform. Since I tend to run R and RStudio on Linux, that’s the focus of this tutorial, but the general outline may be helpful to others as well.


Big Data Step-by-Step: Infrastructure 3/3: Taking it to the cloud… easily… with Whirr

Scale up using the cloud. The Apache Whirr cloud management tool makes it easy to launch a Hadoop cluster on EC2. We use the Cloudera VM from presentation #1 as a launching point for the cluster and, thanks to a Whirr-generated proxy script, submit jobs and fetch results from our local VM just as before. For extra credit, we see how Whirr can save us money by bidding for excess capacity via EC2’s spot instances.


Big Data Step-by-Step: Using R & Hadoop (with RHadoop’s rmr package)

Crunching Big Data with R. Originally a Java-only ecosystem, Hadoop Streaming allows the creation of mappers, reducers, and combiners in any language which can handle stdin and stdout—but that doesn’t mean you want to have to write code to manage I/O at that level. After a quick (and undoubtedly incomplete) survey of Hadoop-related R packages, we walk through some of the abstractions and features of RHadoop’s rmr package which make it easier for R developers to get started. We walk through a sample mapper and reducer, demonstrating and documenting the native R objects which carry the data from step to step.


Thank you to the session’s sponsors, all the speakers, and to an interesting and engaged audience. Special thanks to John Versotek for arranging such an informative and enjoyable day, and for the opportunity to take part.

Abusing Amazon’s Elastic MapReduce Hadoop service… easily, from R

I built my first Hadoop cluster this week and ran my first two test MapReduce jobs. It took about 15 minutes, 2 lines of R, and cost 55 cents. And you can too with JD Long’s (very, very experimental) ‘segue’ package.

But first, you may be wondering why I use the word “abusing” in this post’s title. Well, the Apache Hadoop project, and Google’s MapReduce processing system which inspired it, is all about Big Data. Its raison d’être is the distributed processing of large data sets. Huge data sets, actually. Huge like all the web logs from Yahoo! and Facebook huge. Its HDFS file system is designed for streaming reads of large, unchanging data files; its default block size is 64MB, in case that resonates with your inner geek. HDFS expects its files to be so big that it even makes replication decisions based on its knowledge of your network topology.

I use the term “abuse” because, well, we’re just not going to use any of that Big Data stuff. Instead, we’re going to take advantage of Hadoop’s core machinery to parcel out some embarrassingly parallel, computationally-intensive work, collect the results, and send them back to us. And to keep everything in the cloud and capex-free, we’ll do it all on a cluster of Amazon EC2 instances marshalled and managed by Amazon’s Elastic MapReduce service.

Could the same thing be done with MPI, PVM, SNOW, or any number of other parallel processing frameworks? Certainly. But with only a couple of lines of R? Probably not.

Start the cluster

> library(segue)
Loading required package: rJava
Loading required package: caTools
Loading required package: bitops
Segue did not find your AWS credentials. Please run the setCredentials() function.

> setCredentials('YOUR_ACCESS_KEY_ID', 'YOUR_SECRET_ACCESS_KEY')

> myCluster <- createCluster(numInstances=5)
STARTING - 2011-01-04 15:07:53
STARTING - 2011-01-04 15:08:24
STARTING - 2011-01-04 15:08:54
STARTING - 2011-01-04 15:09:25
STARTING - 2011-01-04 15:09:56
STARTING - 2011-01-04 15:10:27
STARTING - 2011-01-04 15:10:58
BOOTSTRAPPING - 2011-01-04 15:11:28
BOOTSTRAPPING - 2011-01-04 15:11:59
BOOTSTRAPPING - 2011-01-04 15:12:30
BOOTSTRAPPING - 2011-01-04 15:13:01
BOOTSTRAPPING - 2011-01-04 15:13:32
BOOTSTRAPPING - 2011-01-04 15:14:03
BOOTSTRAPPING - 2011-01-04 15:14:34
BOOTSTRAPPING - 2011-01-04 15:15:04
WAITING - 2011-01-04 15:15:35
Your Amazon EMR Hadoop Cluster is ready for action.
Remember to terminate your cluster with stopCluster().
Amazon is billing you!

The createCluster() function provisions the specified number of nodes from EC2, establishes a security zone so they can communicate, boots them, and, in its bootstrap phase, upgrades the version of R on each node and loads some helper functions. You can also distribute your own code and (small) data files to each node during the bootstrap phase. In any case, after a few minutes, the cluster is WAITING and the taxi meter is running… so now what?

Try it out

Let’s make sure everything is working as expected by running the example from JD’s December announcement of his project on the R-sig-hpc mailing list:

> # first, let's generate a 10-element list of 999 random numbers + 1 NA:

myList <- NULL
set.seed(1)
for (i in 1:10){
   a <- c(rnorm(999), NA) 
   myList[[i]] <- a
   }

> # since this is a toy test case, we can run it locally to compare:
> outputLocal  <- lapply(myList, mean, na.rm=T)

> # now run it on the cluster
> outputEmr   <- emrlapply(myCluster, myList, mean,  na.rm=T)
RUNNING - 2011-01-04 15:16:57
RUNNING - 2011-01-04 15:17:27
RUNNING - 2011-01-04 15:17:58
WAITING - 2011-01-04 15:18:29

> all.equal(outputEmr, outputLocal)
[1] TRUE

The key is the emrlapply() function. It works just like lapply(), but automagically spreads its work across the specified cluster. It just doesn’t get any cooler—or simpler—than that.

Estimate pi stochastically

I first stumbled across JD’s R+MapReduce work in this video of his presentation to the Chicago area Hadoop User Group. As a demonstration, he estimates the value of pi stochastically, by throwing dots at random at a unit circle inscribed within a unit square. On average, the proportion of dots falling inside the circle should be related to its area compared to that of the square. And if you remember anything from what passed as math education in your younger years, you may recall that pi is somehow involved. Fortunately for us, JD has posted his code on github so we can put down our #2 pencils and cut-and-paste instead:

> estimatePi <- function(seed){
   set.seed(seed)
   numDraws <- 1e6

   r <- .5 #radius... in case the unit circle is too boring
   x <- runif(numDraws, min=-r, max=r)
   y <- runif(numDraws, min=-r, max=r)
   inCircle <- ifelse( (x^2 + y^2)^.5 < r , 1, 0)

   return(sum(inCircle) / length(inCircle) * 4)
 }

> seedList <- as.list(1:1e3)

> myEstimates <- emrlapply( myCluster, seedList, estimatePi )
RUNNING - 2011-01-04 15:22:28
RUNNING - 2011-01-04 15:22:59
RUNNING - 2011-01-04 15:23:30
RUNNING - 2011-01-04 15:24:01
RUNNING - 2011-01-04 15:24:32
RUNNING - 2011-01-04 15:25:02
RUNNING - 2011-01-04 15:25:34
RUNNING - 2011-01-04 15:26:04
RUNNING - 2011-01-04 15:26:39
RUNNING - 2011-01-04 15:27:10
RUNNING - 2011-01-04 15:27:41
RUNNING - 2011-01-04 15:28:11
RUNNING - 2011-01-04 15:28:42
RUNNING - 2011-01-04 15:29:13
RUNNING - 2011-01-04 15:29:44
RUNNING - 2011-01-04 15:30:14
RUNNING - 2011-01-04 15:30:45
RUNNING - 2011-01-04 15:31:16
RUNNING - 2011-01-04 15:31:47
WAITING - 2011-01-04 15:32:18

> stopCluster(myCluster)
> head(myEstimates)
[[1]]
[1] 3.142512

[[2]]
[1] 3.140052

[[3]]
[1] 3.138796

[[4]]
[1] 3.145028

[[5]]
[1] 3.14204

[[6]]
[1] 3.142136

> # Reduce() is R's Reduce() -- look it up! -- and not related to the cluster:
> myPi <- Reduce(sum, myEstimates) / length(myEstimates)

> format(myPi, digits=10)
[1] "3.141586544"

> format(pi, digits=10)
[1] "3.141592654"

So, a thousand simulations of a million throws each takes about 10 minutes on a 5-node cluster and gets us five decimal places. Not bad.

How does this example relate to MapReduce?

First of all, I am not MapReduce expert, but here’s what I understand based on JD’s talk and my skimming of Hadoop: The Definitive Guide (highly recommended and each purchase goes towards my beer^H^H^H^Helastic computing budget):

  1. Instead of a terabyte or so of log files, we feed Hadoop a list of the numbers 1-1000. It dutifully doles each one to a “mapper” process running our estimatePi() function.
  2. Each invocation of our function uses this input as the seed for its random number generator. (It sure would be embarrassing to have all 1,000 simulations generate exactly the same results!)
  3. The output of the mappers is collected by Hadoop and normally sent on for reducing, but segue’s reduce step just concatenates all of the results so they can be sent back to our local instance as an R list.

All communication between Hadoop and the R code on the cluster is peformed using Hadoop Streaming which allows map and reduce functions to be written in nearly any language which knows the difference between stdin and stdout.

Conclusion and alternatives

If you do your modeling in R and are looking for an easy way to spread around some CPU-intensive work, segue may be right up your alley. But if you’re looking to use Hadoop the right way—The Big Data Way—segue’s not for you. Instead, check out Saptarshi Guha’s RHIPE, the R and Hadoop Integrated Processing Environment.

If you’re just looking to run R on an EC2 node, you can start with this old post by Robert Grossman.

If you’re in Facebook’s data infrastructure engineering team, or are otherwise hooked on Hive, I bet you could use the RJDBC package and the HiveDriver JDBC driver, but I understand that most people just pass CSV files back and forth. The more things change….

But if you think all of this is unnatural and makes you want to take a shower, perhaps I can direct you to CRAN’s High-Performance and Parallel Computing with R task view for more traditional parallel processing options.