Partools 1.1.4

Partools 1.1.4 is now on GitHub.

The main change this time is enhancement of the debugging facilities (which work not only for partools but also the cluster-based portion of R’s parallel package in general). As some of you know, I place huge importance on debugging, so much so that I wrote a book on it (The Art of Debugging with GDB, DDD, and Eclipse}, N. Matloff and P. Salzman, NSP, 2008).

But debugging parallel code is hard, especially in the parallel package. The problem is that your R code running on the cluster nodes does so without having a terminal window associated with it. I’ve had various tools for dealing with that in partools from the beginning, but in the latest version their effectiveness is greatly enhanced by adding mechanisms involving R’s dump.frames(). This was Hadley’s idea, for which I am quite grateful. I’ve had a lot of fun using the enhanced debugging tools myself.

This also inspired me to finally add a debugging vignette to the package, which I had long planned to do but hadn’t gotten around to.

I also thank Gábor Csárdi for cleaning up the DESCRIPTION file.

I have more enhancements to partools in the pipeline. One of them involves k-NN nonparametric regression, using Software Alchemy but in a different way than you might think. Actually, I’ve already done this before, in my freqparcoord package with Yingkang Xie, but I’ll do a little tweaking before adding it here. This too is something I’ve been planning to do for a while but hadn’t gotten around to. What inspired me to give it a higher priority was a paper that I recently ran across by some researchers at Stanford and UCB, which establishes nice theoretical properties for a Software Alchemy-type approach for another kind  of nonparametric regression estimation, kernel ridge regression.

partools: a Sensible R Package for Large Data Sets

As I mentioned recently, the new, greatly extended version of my partools package is now on CRAN. (The current version on CRAN is 1.1.3, whereas at the time of my previous announcement it was only 1.1.1. Note that Unix is NOT required.)

It is my contention that for most R users who work with large data,  partools — or methods like it — is a better, simpler, far more convenient approach than Hadoop and Spark.  If you are an R user and, like most Hadoop/Spark users, don’t have a mega cluster (thousands of nodes), partools is a sensible alternative to Hadoop and Spark.

I’ll introduce partools usage in this post. I encourage comments (pro or con, here or in private). In particular, for those of you attending the JSM next week, I’d be happy to discuss the package in person, and hear your comments, feature requests and so on.

Why do I refer to partools as “sensible”? Consider:

  • Hadoop and Spark are quite difficult to install and configure, especially for non-computer systems experts. By contrast, partools just uses ordinary R; there is nothing to set up.
  • Spark, currently much favored  by many over Hadoop, involves new, complex and abstract programming paradigms, even under the R interface, SparkR. By contrast, again, partools just uses ordinary R.
  • Hadoop and Spark, especially the latter, have excellent fault tolerance features. If you have a cluster consisting of thousands of nodes, the possibility of disk failure must be considered. But otherwise, the fault tolerance of Hadoop and Spark are just slowing down your computation, often radically so.  (You could also do your own fault tolerance, ranging from simple backup to sophisticated systems such as Xtreemfs.)

What Hadoop and Spark get right is to base computation on distributed files. Instead of storing data in a monolithic file x, it is stored in chunks, say x.01, x.02,…, which can greatly reduce network overhead in the computation. The partools package also adopts this philosophy.

Overview of  partools:

  • There is no “magic.”  The package merely consists of short, simple uitiliies that make use of R’s parallel package.
  • The key philosophy is Keep It Distributed (KID). Under KID, one does many distributed operations,, with a collective operation being doing occasionally, when needed.

Sample partools (PT) session (see package vignette for details, including code, output):

  • 16-core machine.
  • Flight delay data, 2008. Distributed file created previously from monolithic one  via PT’s filesplit().
  • Called PT’s fileread(), causing each cluster node to read its chunk of the big file.
  •  Called PT’s distribagg() to find max values of DepDelay, ArrDelay, Airtime. 15.952 seconds, vs. 249.634 for R’s serial aggregate().
  • Interested in Sunday evening flights.  Each node performs that filtering op, assigning to data frame sundayeve. Note that that is a distributed data frame, in keeping with KID.
  • Continue with KID, but if later we want to un-distribute that data frame, we could call PT’s distribgetrows().
  • Performed a linear regression analysis, predicting ArrDelay from DepDelay and Distance, using Software Alchemy, via PT’s calm() function. Took 18.396 seconds, vs. 76.225 for ordinary lm(). (See my new book, Parallel Computation for Data Science, for details on Software Alchemy.)
  • Did a distributed na.omit() to each chunk, using parallel‘s clusterEvalQ(). Took 2.352 seconds, compared to 9.907 it would have needed if not distributed.
  • Performed PCA. Took 8.949 seconds for PT’s caprcomp(), vs. 58.444 for the non-distributed case.
  • Calculated interquartile range for each of 12 variables, taking 2.587 seconds, compared to 29.584 for the non-distributed case.
  • Performed a more elaborate  distributed na.omit(), in time 9.293, compared to 55.032 in the serial case.

Again, see the vignette for details on the above, on how to deal with files that don’t fit into memory etc.

CACM Highlights R

The Association for Computing Machinery is the main professional organization for computer science, largely for academia but still with a broad membership. ACM publishes a number of journals, most of them for research but its flagship publication is a magazine, the Communications of the ACM.

The current issue of the CACM includes an article, “Bringing Big Data to the Big Tent,” that is mainly about R and Spark. After discussing the wide usage that R has developed, it raises a question as to whether R, specifically CRAN, is too disorganized. CMU CS professor Jim Herbsleb is quoted as saying

There’s a lot duplication of effort out there, a lot of missed opportunities, where one scientist has developed a tool for him or herself, and with a few tweaks, or if it conformed to a particular standard used a particular data format, it could be useful to a much wider community,

I understand his point, but I strongly disagree. I really like the free-form way that CRAN (and Bioconductor etc.) works, and appreciate the fact that when I need some utility, not only is CRAN likely to have it, but it’s likely to have several versions, by different authors, giving me a lot of choice. Besides, the various libraries available in the CS world include a lot of duplication too, yet no one seems to mind.

I do believe that there should be more structure to CRAN. The Task Views are nice, but are often nowhere near comprehensive, and some tend to be out of date. I’ve also proposed that there be a Yelp-style review system for CRAN packages.

Speaking of CRAN and Spark, the new version of my partools package (which I informally call Snowdoop) went on CRAN a few days ago. I continue to believe that Hadoop and Spark are not appropriate for the majority of R users who work with large data, and I offer partools as one alternative. The package is much more extensive than the last version. I’ll be blogging about it in the near future (and from time to time afterward, with more news and examples) but in the meantime, I recommend reading the vignette for an introduction to usage of the package. See also my recent talk. Note: Although the DESCRIPTION file says that partools requires a Unix-family OS, it should work fine with Windows.

Heteroscedasticity in Regression — It Matters!

R’s main linear and nonlinear regression functions, lm() and nls(), report standard errors for parameter estimates under the assumption of homoscedasticity, a fancy word for a situation that rarely occurs in practice. The assumption is that the (conditional) variance of the response variable is the same at any set of values of the predictor variables.

Take for instance my favorite introductory example, predicting human weight from height. (It is unlikely we’d need to make such a prediction, but it serves as a quick and easy illustration.) The homoscedasticity assumption says that tall people have no more variation in weight than short people, certainly not true.

So, confidence intervals and p-values calculated by lm() and nls() are generally invalid. In this blog post, I’ll show how to remedy this problem — including some new code that I’ll provide here — and give an example showing just how far wrong one can be without a remedy.

In the linear case, the remedy is simple, but probably not widely known. Years ago, researchers such as Eickert and White derived large-sample theory for the heteroscedastic variance case, and it is coded in CRAN’s car and sandwich packages in the functions hccm() and vcovHC(), respectively. I’ll use the latter here, as its name is similar to that of R’s vcov() function.

The latter inputs the result of a call to lm() or nls(), and outputs the estimated covariance matrix of your estimated parameter vector. Thus the standard errors of the estimated parameters are the square roots of the diagonal elements of the matrix returned by vcov(). Let’s check that, using the example from the online help for lm():

> ctl <- c(4.17,5.58,5.18,6.11,4.50,4.61,5.17,4.53,5.33,5.14)
> trt <- c(4.81,4.17,4.41,3.59,5.87,3.83,6.03,4.89,4.32,4.69)
> group <- gl(2, 10, 20, labels = c("Ctl","Trt"))
> weight <- c(ctl, trt)
> lm.D9 <- lm(weight ~ group)
> summary(lm.D9)
...
Coefficients:
 Estimate Std. Error t value Pr(>|t|)
(Intercept) 5.0320 0.2202 22.850 9.55e-15
groupTrt -0.3710 0.3114 -1.191 0.249
> diag(sqrt(vcov(lm.D9)))
(Intercept) groupTrt 
 0.2202177 0.3114349 

Sure enough, the standard errors match.  (Were you doubting it? :-) )

So, the linear case, i.e. lm(), is taken care of.  But unfortunately, there doesn’t seem to be anything for the nonlinear case, nls(). The following code solves that problem:

library(minpack.lm)
library(sandwich)

nlsvcovhc <- function(nlslmout) {
 b <- coef(nlslmout)
 m <- nlslmout$m
 resid <- m$resid()
 hmat <- m$gradient()
 fakex <- hmat
 fakey <- resid + hmat %*% b
 lmout <- lm(fakey ~ fakex - 1)
 vcovHC(lmout)
}

(The names such as fakex, which I should remove, are explained in the last paragraph below.) You can download this code (complete with comments, which I’ve omitted here for brevity) here. The function is applied to the output of nlsLM(), which is a modified version of nls() in the package minpack.lm, which has better convergence behavior.

Here’s an example, using the enzyme data set vmkmki from the CRAN package nlstools. (I removed the last 12 observations, which for reasons I won’t go into here seem to be anomalous.) The nonlinear model here is the one given in the online help for vmkmki.

> regftn <- function(t,u,b1,b2,b3) b1 * t / (t + b2 * (1 + u/b3))
> bstart <- list(b1=1,b2=1, b3=1)
> z <- nls(v ~ regftn(S,I,b1,b2,b3),data=vmkmki,start=list(b1=1,b2=1, b3=1))
> z
Nonlinear regression model
 model: v ~ regftn(S, I, b1, b2, b3)
 data: vmkmki
 b1 b2 b3 
18.06 15.21 22.28 
...
> vcov(z)
 b1 b2 b3
b1 0.4786776 1.374961 0.8930431
b2 1.3749612 7.568837 11.1332821
b3 0.8930431 11.133282 29.1363366

Compare that to using Eickert-White:

ttt# get new z from nlsLM(), not shown
> nlsvcovhc(z)
 fakex1 fakex2 fakex3
fakex1 0.4708209 1.706591 2.410712
fakex2 1.7065910 10.394496 20.314688
fakex3 2.4107117 20.314688 53.086958

This is rather startling! Except for the estimated variance of the first parameter estimate, the estimated variances and covariances from Eickert-White are much larger than what nls()} found under the assumption of homoscedasticity. In other words, nls() was way off the mark. In fact, this simulation code shows that the standard errors reported by nls() can lead, for instance, to confidence intervals having only 60% coverage probability rather than 90%, due to heteroscedasticity, while the above code fixes the problem.

How does that code work? (Optional reading.) Most nonlinear least-squares procedures use a local linear approximation, such as in the Gauss-Newton algorithm. This results computationally in a fake lm() setting. As such, we are already set up to use the delta method. We can then apply vcovHC().

Macros in R

In programming, sometimes it’s useful to write a macro rather than a function. (Don’t worry if you’ve never heard the term before.) In this post, I’ll give an example of use of macros in R. using the gtools package on CRAN.

I wanted to write some utility code to help me reuse my earlier R commands during an interactive R session. Most (though not all) of what I wanted is already provided in the excellent R user interface systems such as ESS, RStudio and vim-r, but for various reasons I generally use the command line directly, especially for short sessions. (I also have developed my own vim editing mappings for R.) Specifically, I wanted to develop utilities to perform the following tasks:

  • Display all my recent commands, possibly restricting to those matching or excluding certain character strings.
  • Choose one of the recent commands for re-execution.
  • Re-execute a recent command by number or matching string.

One nice thing about R is that one can easily form some command programmatically in a character string, say using paste(), and then execute the string as a command, using eval(). Thus it would be easy to code up the above-listed tasks into functions that I can call when needed, except for one problem: Within the body of a function, one has a different environment than at the caller’s level.

Take for instance (a slightly modified version of) the first example in the online help for defmacro() in gtools, in which to goal is to write a function to recode as NAs all entries in a data frame column with a certain value. The following will NOT work:

setNA <- function(df,var,val) 
   df$var[df$var == val] <- NA

The problem is that within setNA(), df will be a data frame that is only a copy of the one in your call. The recoding to NAs will be made only to the copy. The defmacro() function in gtools would do what you really want:

setNA <- defmacro(df,var,val,
   expr={df$var[df$var == val] <- NA}

With this, df really will be the desired data frame. I urge you to try a little test with both of the above code snippets.

The code I wrote for my command-history utilities is rather short, but too long to place in a blog post; instead, you can access it here. There is a short sample session at the end of the file, which again I urge you to execute by hand to fully understand it. Take note of the tasks I coded as macros instead of functions, and think about why I needed to do so.

How is all this magic accomplished? The defmacro() function still makes use of eval(), substitute() etc., but it already does that work for you, so that you can write your macro while thinking of it as function. Your code is a function — defmacro() builds the function and returns it, and by the way note that that means it is debugable — but again, the point is that you don’t have to deal with all the calls to eval() etc.

If you are a C/C++ programmer, note that this differs from macros in that language, which are straight substitutions made by the compiler preprocessor.

 

Discovered Two Great Web Sites Today

Today is my lucky day.  I learned of two very interesting Web pages, both of them quite informative and the first of them rather provocative (yay!). I have some comments on both, in some cases consisting of mild disagreement, which I may post later, but in any event, I highly recommend both.  Here they are:

Tip: Read Drew’s page as soon as possible, before he removes the F-bombs. :-)

Update on Snowdoop, a MapReduce Alternative

In blog posts a few months ago, I proposed an alternative to MapReduce, e.g. to Hadoop, which I called “Snowdoop.” I pointed out that systems like Hadoop and Spark are very difficult to install and configure, are either too primitive (Hadoop)  or too abstract (Spark) to program, and above all, are SLOW. Spark is of course a great improvement on Hadoop, but still suffers from these problems to various extents.

The idea of Snowdoop is to

  • retain the idea of Hadoop/Spark to work on top of distributed file systems (“move the computation to the data rather than vice versa”)
  • work purely in R, using familiar constructs
  • avoid using Java or any other external language for infrastructure
  • sort data only if the application requires it

I originally proposed Snowdoop just as a concept, saying that I would slowly develop it into an actual package. I later put the beginnings of a Snowdoop package in a broader package, partools, which also contains other parallel computation utilities, such as a debugging aid for the cluster portion of R’s parallel package (which I like to call Snow, as it came from the old snow package).

I remain convinced that Snowdoop is a much more appropriate tool for many R users who are currently using Hadoop or Spark. The latter two, especially Spark, may be a much superior approach for those with very large clusters, thus with a need for built-in fault tolerance (Snowdoop provides none on its own), but even the Hadoop Wiki indicates that many MapReduce users actually work on clusters of very modest size.

So, in the last few weeks, I’ve added quite a bit to Snowdoop, and have run more timing tests. The latter are still very preliminary, but they continue to be very promising. In this blog post, I’ll give an extended example of usage of the latest version, which you can obtain from GitHub. (I do have partools on CRAN as well, but have not updated that yet.)

The data set in this example will be the household power usage data set from UCI. Most people would not consider this “Big Data,” with only about 2 million rows and 9 columns, but it’s certainly no toy data set, and it will make serve well for illustration purposes.

But first, an overview of partools:

  • distributed approach, either persistent (distributed files) or quasi-persistent (distributed objects at the cluster nodes, in memory but re-accessed repeatedly)
  • most Snowdoop-specific function names have the form file*
  • most in-memory functions have names distrib*
  • misc. functions, e.g. debugging aid and “Software Alchemy”

Note that partools, therefore, is more than just Snowdoop.  One need not use distributed files, and simply use the distrib* functions as handy ways to simplify one’s parallel code.

So, here is a session with the household power data.  I’m running on a 16-core machine, using 8 of the cores. For convenience, I changed the file name to hpc.txt. We first create the cluster, and initialize partools, e.g. assign an ID number to each cluster node:

 
> cls <- makeCluster(8)
> setclsinfo(cls) # partools call

Next we split the file into chunks, using the partools function filesplit() (done only once, not shown here). This creates files hpc.txt.1hpc.txt.2 and so on (in this case, all on the same disk). Now have each cluster node read in its chunk:

> system.time(clusterEvalQ(cls,hp <-    read.table(filechunkname("hpc.txt",1),
header=TRUE,sep=";",stringsAsFactors=FALSE)))
 user system elapsed
 9.468 0.270 13.815

(Make note of that time.) The partools function filechunkname() finds the proper file chunk name for the calling cluster node, based on the latter’s ID. We now have a distributed data frame, named hp at each cluster node.

The package includes a function distribagg(), a distributed analog of R’s aggregate() function.  Here is an example of use, first converting the character variables to numeric:

> clusterEvalQ(cls,for (i in 3:9) hp[,i] <- as.numeric(hp[,i]))
> system.time(hpoutdis <-distribagg(cls,"x=hp[,3:6],
   by=list(hp[,7],hp[,8])","max",2))
 user system elapsed
 0.194 0.017 9.918

As you can see, the second and third arguments to distribagg() are those of aggregate(), in string form.

Now let’s compare to the serial version:

 

> system.time(hp <- read.table("hpc.txt",header=TRUE,sep=";",
stringsAsFactors=FALSE))
 user system elapsed
 22.343 0.175 22.529
> for (i in 3:9) hp[,i] <- as.numeric(hp[,i])
> system.time(hpout <- aggregate(x=hp[,3:6],by=list(hp[,7],hp[,8]),FUN=max))
 user system elapsed
 76.348 0.184 76.552


So, the computation using distribagg() was almost 6 times faster than serial, a good speed for 8 cluster nodes. Even the input from disk was more than twice as fast, in spite of the files being on the same disk, going through the same operating system.

 

Follow

Get every new post delivered to your Inbox.

Join 126 other followers