Update on Snowdoop, a MapReduce Alternative

In blog posts a few months ago, I proposed an alternative to MapReduce, e.g. to Hadoop, which I called “Snowdoop.” I pointed out that systems like Hadoop and Spark are very difficult to install and configure, are either too primitive (Hadoop)  or too abstract (Spark) to program, and above all, are SLOW. Spark is of course a great improvement on Hadoop, but still suffers from these problems to various extents.

The idea of Snowdoop is to

  • retain the idea of Hadoop/Spark to work on top of distributed file systems (“move the computation to the data rather than vice versa”)
  • work purely in R, using familiar constructs
  • avoid using Java or any other external language for infrastructure
  • sort data only if the application requires it

I originally proposed Snowdoop just as a concept, saying that I would slowly develop it into an actual package. I later put the beginnings of a Snowdoop package in a broader package, partools, which also contains other parallel computation utilities, such as a debugging aid for the cluster portion of R’s parallel package (which I like to call Snow, as it came from the old snow package).

I remain convinced that Snowdoop is a much more appropriate tool for many R users who are currently using Hadoop or Spark. The latter two, especially Spark, may be a much superior approach for those with very large clusters, thus with a need for built-in fault tolerance (Snowdoop provides none on its own), but even the Hadoop Wiki indicates that many MapReduce users actually work on clusters of very modest size.

So, in the last few weeks, I’ve added quite a bit to Snowdoop, and have run more timing tests. The latter are still very preliminary, but they continue to be very promising. In this blog post, I’ll give an extended example of usage of the latest version, which you can obtain from GitHub. (I do have partools on CRAN as well, but have not updated that yet.)

The data set in this example will be the household power usage data set from UCI. Most people would not consider this “Big Data,” with only about 2 million rows and 9 columns, but it’s certainly no toy data set, and it will make serve well for illustration purposes.

But first, an overview of partools:

  • distributed approach, either persistent (distributed files) or quasi-persistent (distributed objects at the cluster nodes, in memory but re-accessed repeatedly)
  • most Snowdoop-specific function names have the form file*
  • most in-memory functions have names distrib*
  • misc. functions, e.g. debugging aid and “Software Alchemy”

Note that partools, therefore, is more than just Snowdoop.  One need not use distributed files, and simply use the distrib* functions as handy ways to simplify one’s parallel code.

So, here is a session with the household power data.  I’m running on a 16-core machine, using 8 of the cores. For convenience, I changed the file name to hpc.txt. We first create the cluster, and initialize partools, e.g. assign an ID number to each cluster node:

 
> cls <- makeCluster(8)
> setclsinfo(cls) # partools call

Next we split the file into chunks, using the partools function filesplit() (done only once, not shown here). This creates files hpc.txt.1hpc.txt.2 and so on (in this case, all on the same disk). Now have each cluster node read in its chunk:

> system.time(clusterEvalQ(cls,hp <-    read.table(filechunkname("hpc.txt",1),
header=TRUE,sep=";",stringsAsFactors=FALSE)))
 user system elapsed
 9.468 0.270 13.815

(Make note of that time.) The partools function filechunkname() finds the proper file chunk name for the calling cluster node, based on the latter’s ID. We now have a distributed data frame, named hp at each cluster node.

The package includes a function distribagg(), a distributed analog of R’s aggregate() function.  Here is an example of use, first converting the character variables to numeric:

> clusterEvalQ(cls,for (i in 3:9) hp[,i] <- as.numeric(hp[,i]))
> system.time(hpoutdis <-distribagg(cls,"x=hp[,3:6],
   by=list(hp[,7],hp[,8])","max",2))
 user system elapsed
 0.194 0.017 9.918

As you can see, the second and third arguments to distribagg() are those of aggregate(), in string form.

Now let’s compare to the serial version:

 

> system.time(hp <- read.table("hpc.txt",header=TRUE,sep=";",
stringsAsFactors=FALSE))
 user system elapsed
 22.343 0.175 22.529
> for (i in 3:9) hp[,i] <- as.numeric(hp[,i])
> system.time(hpout <- aggregate(x=hp[,3:6],by=list(hp[,7],hp[,8]),FUN=max))
 user system elapsed
 76.348 0.184 76.552


So, the computation using distribagg() was almost 6 times faster than serial, a good speed for 8 cluster nodes. Even the input from disk was more than twice as fast, in spite of the files being on the same disk, going through the same operating system.

 

My New Book and Other Matters

I haven’t posted for a while, so here are some news items:

  • My new book, Parallel Computation for Data Science, will be out in June or July. I believe it will be useful to anyone doing computationally intensive work.
  • After a few months being busy with the book and other things, I have returned to my Snowdoop project and my associated package, partools. I recently gave a talk on Snowdoop to the Berkeley R Group, and you will be hearing more here on this blog.
  • I’ve begun a new book on regression/predictive analytics, finishing two chapters so far.
  • I’ve been doing some research on the missing-value problem, and am developing a package for it too.

Lots of fun stuff to do these days. :-)

GPU Tutorial, with R Interfacing

You’ve heard that graphics processing units — GPUs — can bring big increases in computational speed.  While GPUs cannot speed up work in every application, the fact is that in many cases it can indeed provide very rapid computation.  In this tutorial, we’ll see how this is done, both in passive ways (you write only R), and in more direct ways, where you write C/C++ code and interface it to R.

What Are GPUs?

A GPU is basically just a graphics card.  Any PC or laptop has one (or at least a GPU chipset), but the more expensive ones, favored by the gamers, are quite fast.

Because the graphical operations performed by a GPU involve vectors and such, at some point physicists started using GPUs for their own non-graphic computations.  To do so, they had to “trick” the GPUs by writing what appeared to be graphics code.  Such programming was, to say the least, unwieldy, so NVIDIA took the risky, “build it and they will come” step of designing its GPUs to be generally programmable.  The goal was to have a structure that is fast for game programming but which is programmable in “normal” C/C++ code.

NVIDIA’s bet paid off,, and today the most commonly-used GPUs for general-purpose work are made by that firm.  They are programmed in a language called CUDA, which is an extension of C/C++.  Check this list to see if you have a CUDA-compatible graphics card.  We will focus on NVIDIA here, and the term GPU will mean those devices.

GPUs are shared-memory, threaded devices.  (If you haven’t seen these terms before, please read my recent OpenMP tutorial blog post first, at least the beginning portions.)  Note by the way that the GPU card has its own memory, separate from that accessed by your CPU.

A GPU might be called a “multi-multicore” machine.  It consists of a number of streaming multiprocessors (SMs), each one of which is a multicore engine.  Thus the cores share memory, and many threads run simultaneously.  A typical GPU will contain many more cores than the 2, 4 or even 8 one sees commonly on PCs and laptops.

There are some differences from standard multicore, though, notably that threads run in groups called blocks. Within a block, the threads run in lockstep, all running the same machine language instruction (or none at all) at the same time.  This uniformity makes for very fast hardware if one’s application can exploit it.

Not all applications will fare well with GPUs.  Consider what happens with an if type of statement in C/C++.  The threads that satisfy the “if” condition will all execute the same machine instructions, but the remaining threads will be idle, a situation termed thread divergence. We thus lose parallelism, and speed is reduced.  So, applications that have a regular pattern, such as matrix operations, can really take advantage of GPUs, while other apps that need a lot of if/else decision making may not be speedy.

Another major way in which GPUs differ from ordinary multicore systems is that synchronization between threads is less flexible than in multicore.  There is nothing in GPUs quite the same as what the OpenMP barrier construct gives us.  Threads within a block are synchronized by definition, but not between blocks.  If we need a “wait here until all threads get here” type of operation, one must resort to heavy-handed measures, such as returning control to the CPU of the host machine, very slow.   So, the applications that are most suitable for GPU use are those not needing much in terms of barriers and the like.

In any shared-memory system, a major performance issue is memory access time.  With GPUs, the problem is writ large, since there is memory on two levels, host and device, each of which has long latency times.  On the other hand, this is ameliorated for device memory, since there is an efficient embedded-in-hardware operating system that engages in latency hiding. :  When a thread block encounters an instruction needing access to device memory, the OS suspends that block and tries to find another to run that doesn’t access memory, giving the first block time to do the memory access while useful work is done by the second block.  Accordingly, it is good practice to set up a large number of threads, each with a small amount of work to do (very different from the multicore case).The memory access itself is most efficient if the threads in a block are requesting consecutive memory locations; good CUDA coders strive to make this happen.

Using GPUs While Sticking to R

A number of R packages have been written to allow you to access some GPU operations while staying in the convenience and comfort of your own R living room, such as gputools, gmatrix and Rth.  (The first two are available on CRAN, while the third is currently available from this GitHub site.)  They perform only specialized operations, mainly matrix manipulation and sorting.

Rth, written by Drew Schmidt and me, is an R interface to Thrust, a collection of templates to generate CUDA code, or optionally OpenMP code.  In other words, the same code is usable on two different kinds of parallel platforms, GPU and multicore.

As a quick example, I computed distances between rows of a matrix, first using R’s built-in dist() function, and then using gpuDist() from the gputools package.  For a 1000×1000 matrix, the GPU version took 0.258 seconds, compared to 3.671 for plain R.  For 2500×2500, the results were even more dramatic, 3.220 seconds versus 103.219!

Note, though, that I could not directly try 5000×5000, as I exceeded the memory size of my GPU; to handle the larger matrix, I’d need to do the work in stages, thus losing some of the speed advantage.  GPUs are not panaceas.

Writing CUDA Code

Here we’ll have an example of how to write CUDA yourself.  While giving a fully-detailed account would take more space than is appropriate for a blog, this example should give you a good overview of what is involved.  For further details, see the partial draft of my forthcoming book, Parallel Computation for Data Science.

We will again use the graph application in my OpenMP tutorial blog post, in this case measuring commonalities among inbound links rather than outlinks.  Consider a large collection of Web sites.  A 1 in row i, column j of the matrix means that site i links to site j.  In comparing, for instance, sites 3 and 8, we compare columns 3 and 8, counting the number of rows with 1s in both columns.  We do this for all possible pairs of sites, and compute the average  inlink commonality.

The code, downloadable for your convenience at this site, is as follows:

 
// MutIn.cu:  finds mean number of mutual 
// inlinks, among all pairs of Web sites 
// in our set; in checking (i,j) pairs, 
// thread k will handle all i such that 
// i mod totth = k, where totth is the
//  number of threads

#include<cuda.h>
#include<stdlib.h>
#include<Rinternals.h>

// treat it as C code
extern "C" {
    SEXP gpumutin(SEXP a, SEXP nb);
}

// GPU block size is hard coded as 192
#define BLOCKSIZE 192

// kernel:  processes all pairs assigned to 
// a given thread
__global__ void procpairs(int *m, 
   int *tot, int n)
{
   // total number of threads = 
   // number of blocks * block size
   int totth = gridDim.x * BLOCKSIZE,
       // my thread number
       me = blockIdx.x * blockDim.x + 
          threadIdx.x;
   int i,j,k,sum = 0;
   for (i = me; i < n; i += totth) {  
      for (j = i+1; j < n; j++) {  
         for (k = 0; k < n; k++)
            sum += m[n*k+i] * m[n*k+j];
      }
   }
   // atomically add my sum to the 
   // overall total tot
   atomicAdd(tot,sum);
}

// find the mean number of mutual 
// inlinks, over all pairs of columns 
// of the graph adjacency matrix adj

// SEXP is an internal R entity 
// for storing an object; the 
// INTEGER() function allows 
// C/C++ code to access the data 
// portions of the object

SEXP gpumutin(SEXP adj, SEXP nb) {
    // how many GPU blocks?
    int nblk = INTEGER(nb)[0];
    // find n, number of rows 
    // and cols in adj
    SEXP adjdim = 
       getAttrib(adj,R_DimSymbol);
    int n = INTEGER(adjdim)[0];
    // the CPU is the "host," 
    // the GPU the "device"
    // get C-level view of adj on host
    int *hm = INTEGER(adj);
    // set up a connection (ppinter 
    // to a device address to the 
    // space we will make for the 
    // device matrix
    int *dm; // device matrix
    // need space for totals in host, device
    int htot, // host grand total
        *dtot; // device grand total
    int msize = n * n * sizeof(int);
    // allocate space for device matrix
    cudaMalloc((void **)&dm,msize);
    // copy host matrix to device matrix
    cudaMemcpy(dm,hm,msize,cudaMemcpyHostToDevice);
    htot = 0;
    // set up device total and initialize it
    cudaMalloc((void **)&dtot,sizeof(int));
    cudaMemcpy(dtot,&htot,sizeof(int),
       cudaMemcpyHostToDevice);
    // OK, ready to launch kernel, 
    // so configure grid
    dim3 dimGrid(nblk,1);
    dim3 dimBlock(BLOCKSIZE,1,1);
    // launch the kernel
    procpairs<<<dimGrid,
       dimBlock>>>(dm,dtot,n);
    // wait for kernel to finish
    cudaThreadSynchronize();
    // copy total from device to host
    cudaMemcpy(&htot,dtot,sizeof(int),
       cudaMemcpyDeviceToHost);
    // clean up at device
    cudaFree(dm);
    cudaFree(dtot);
    // done; return the mean as an R object
    return 
       ScalarReal(((double) htot) / (n*(n-1)/2));
}

This looks complicated, but there really are only a few key points:

    • Much of the code, as seen in the comments, consists of allocating memory for our data on the GPU, and moving data back and forth between the host and device memories, such as
    
    // allocate space for device matrix
    cudaMalloc((void **)&dm,msize);
    // copy host matrix to device matrix
    cudaMemcpy(dm,hm,msize,
       cudaMemcpyHostToDevice);
    • Each SM is broken into blocks, of size specified by the programmer.  All the threads within a block execute in lockstep.  The programmer also specifies the grid size, which is the number of blocks.
    • The programmer writes a kernel, designated by __global__.  Each thread runs the kernel.  The thread will determine its ID number from its block number, and thread number within block:

   // total number of threads =
   // number of blocks * block size
   int totth = gridDim.x * BLOCKSIZE,
       // my thread number
       me = blockIdx.x * blockDim.x +
          threadIdx.x;
    • The kernel call, or launch, is in our case here the call to procpairs().  We not only supply arguments to the function, but also this is where we state the block and grid sizes.  Note by the way that we have left the number of blocks up to the user, in the variable nblk:

    // OK, ready to launch kernel,
    // so configure grid
    dim3 dimGrid(nblk,1);
    dim3 dimBlock(BLOCKSIZE,1,1);
    // launch the kernel
    procpairs<<<dimGrid,
       dimBlock>>>(dm,dtot,n);
  • As usual with shared-memory programming, one must avoid race conditions, where several threads may step on each other while writing to a shared variable.    Suppose for instance that current x is 12, and threads 3 and 8 both want to add 1 to it. The result should be that x is 14, but it may be 13 if both do this at about the same time.  (They both read x as 12, both add 1, and both write 13.) GPUs don’t handle this as well as multicore, but here we use CUDA’s atomicAdd() to safely update our overall total.  Warning:  It too is slow.

Compiling and Running

Not surprisingly in view of the facts that we are running on two different hardware platforms at once, in a nonstandard extension of standard C/C++, compiling and running CUDA code requires some attention to detail, e.g. in specifying locations of libraries.

For example, on many machines, including mine, CUDA resides in the directory /usr/local/cuda.  I made sure to set the environment variables for my execution and library-load search paths:

export CUDA_HOME=/usr/local/cuda/
export path=( $CUDA_HOME/bin $path )
export LD_LIBRARY_PATH=$CUDA_HOME/lib64

CUDA has its own compiler, nvcc, which I invoked on my source file MutIn.cu as follows:

nvcc -g -Xcompiler -fPIC -c MutIn.cu \\
   -arch=sm_11 -I/usr/include/R

I instructed nvcc to pass on to gcc a request for position-independent code, specified that I wanted CUDA machine code for hardware version 1.1 or greater, and included the system files from R. Also, I specified the -c option, meaning that I wanted only to generate machine code, MutIn.o, to be linked in later, not a full standalone executable.

To then link in the R libraries and the CUDA runtime library, I ran


R CMD SHLIB MutIn.o -lcudart \\
   -L/usr/local/cuda/lib64

This produced a shared-object file MutIn.so, loadable from R! Here is a sample run:


> dyn.load("MutIn.so")
> n <- 2500
> m <- matrix(as.integer(sample(0:1,n^2,
   replace=T)),ncol=n)
> .Call("gpumutin",m,as.integer(4))

I got run times of about 2.1 seconds for the  CUDA code, versus about 31.6 for straight R code.  For the latter, I reused AdjSnow.R from my OpenMP tutorial with one worker.  This was a bit unfair for various reasons, but on the other hand, the CUDA code here is not optimized either.  But no matter how you slice it, the GPU is definitely a lot faster for this application.

CUDA Libraries

In many cases, such as our example above, one can attain very nice speedup with CUDA on one’s first try, with no attempt at optimization.  But if you want real speed, CUDA is one of the most tweakable platforms you’ll ever encounter, due to the fact that you know an awful lot about what is going on inside.

You know, for instance, the direct implication of having if/else code and of ordered array access.  One can do this to a large extent with multicore too, but in that settings it is more on the level of guesswork.  We can guess cache behavior in multicore (far more complex than in the uniprocessor case), but the GPUs have what amounts to a programmer-managed cache — YOU are in control. And the issue of block size and number of blocks, which can be key to good speed in CUDA, has no counterpart in multicore.

An excellent example is the painstaking optimization of a very simple calculation, a pixel histogram.  Paying careful attention to memory access patterns, the engineer was able to make very impressive speed gains.

But there are two problems with this.  First, there is the machine efficiency vs. human efficiency (your coding time) tradeoff.  Second, the NVIDIA GPU series is constantly evolving; code that works on today’s chips will still work on the future ones, but it won’t be optimal. (In the code example and hardware descriptions above, we are assuming rather earlier chips in the model series.)

One partial solution is to use CUDA libraries whenever possible. These exist for matrix ops (CUBLAS), Fast Fourier Transform (CUFFT), sorting/prefix scan (CUDPP) and so on.  This code is already highly tweaked, and tends to keep up with the evolution of the chipset.

Conclusions

For certain applications, GPUs can deliver excellent speedup.  In some cases, this can be obtained with rather little effort on the programmer’s part, by making use of libraries, including some offering direct R interfaces.  Analysts with major computational needs will find that GPU will make an excellent addition to their toolkits.

OpenMP Tutorial, with R Interface

Almost any PC today is multicore.  Dual-core is standard, quad-core is easily attainable for the home, and larger systems, say 16-core, are easily within reach of even smaller research projects. In addition, large multicore systems can be “rented” on Amazon EC2 and so on.

The most popular way to program on multicore machines is to use OpenMP, a C/C++ (and FORTRAN) callable system that runs on Linux, Mac and Windows.  (For Macs, you need the OpenMP-enabled version of Mac’s clang compiler.)

This blog post will present a short tutorial on OpenMP, including calling OpenMP code from R, using Rcpp.  Use of the latter will be kept to basics, so if you are also new to Rcpp, you’ll learn how to use that too.  This tutorial is adapted from my book, Parallel Computation for Data Science: with Examples in R, C/C++ and CUDA, to be published in June 2015.

I’ll assume that you know R well, and have some familiarity with C/C++.

Threaded programming:

Most programs running on multicore systems are threaded.  This means that several invocations, called threads, of the given program are running simultaneously, typically one thread per core. A key point is that the threads share memory, making it easy for them to cooperate. The work of the program is divided among the threads, and due to the simultaneity, we potentially can achieve a big speedup.

Writing threaded code directly is messy, so higher-level systems have been developed to hide the messy details from the programmer, thus making his/her life far easier.  As noted, OpenMP is the most popular of these systems, though Intel’s Threading Building Blocks system is also widely used.

Our example:

Here we convert a graph adjacency matrix to a 2-column matrix that lists the edges.  (You don’t need know what graphs are, which will be explained shortly.)  Here is an example of running our code, a function transgraph():


> m  
     [,1] [,2] [,3] [,4]
[1,]    0    1    1    0
[2,]    1    0    1    1
[3,]    1    1    1    0
[4,]    0    1    1    0
> .Call("transgraph",m)
      [,1] [,2]
 [1,]    1    2
 [2,]    1    3
 [3,]    2    1
 [4,]    2    3
 [5,]    2    4
 [6,]    3    1
 [7,]    3    2
 [8,]    3    3
 [9,]    4    2
[10,]    4    3

Here our matrix m represented a 4-vertex, i.e. 4-node graph.  If you are not familiar with graphs, think of set of 4 Web sites.  Row 1 of m says that site 1 has links to sites 2 and 3, site 2 has links to sites 1, 3 and 4 and so on.  The output matrix says the same thing; e.g. the (1,2) in the first row says there is a link from site 1 to site 2, and so on.

We might have thousands of Web sites, or even more, in our analysis.  The above matrix conversion thus could be quite computationally time-consuming, hence a desire to do it in parallel on a multicore system, which we will do here.

Plan of attack:

The idea is simple, conceptually.  Say we have four cores.  Then we partition the rows of the matrix m into four groups.  Say m has 1000 rows.  Then we set up four threads, assigning thread 1 to work with rows 1-250, thread 2 dealing with rows 251-500, etc.  Referring to the eventual output matrix (the 2-column matrix seen above) as mout, each thread will compute its portion of that matrix, based on that thread’s assigned rows.

There is one tricky part, which is to stitch the outputs of the four threads together into the single matrix mout. The problem is that we don’t know ahead of time where in mout each thread’s output should be placed. In order to do that, our code will wait until all the threads are done with their assigned work, then ask how many output rows each one found.

To see how this helps, consider again the little four-site example above, and suppose we have just two threads. Thread 1 would handle rows 1-2 of m, finding four rows of mout, beginning with (1,2) above. These will eventually be the first four rows of mout.  The significance of that is that the output of thread 2 must start at row 5 of mout.  In this manner, we’ll know where in mout to place each thread’s output.

The code:


#include <Rcpp.h>
#include <omp.h>

// finds the chunk of rows this thread will 
// process
void findmyrange(int n,int nth,int me,
      int *myrange)
{  int chunksize = n / nth;
   myrange[0] = me * chunksize;
   if (me < nth-1) 
      myrange[1] = (me+1) * chunksize - 1;
   else myrange[1] = n - 1;
}

// SEXP is the internal data type for R 
// objects, as seen from C/C++;
// here our input is an R matrix adjm, and 
// the return value is another R matrix
RcppExport SEXP transgraph(SEXP adjm)
{  
   // i-th element of num1s will be the 
   // number of 1s in row i of adjm
   int *num1s,  
       *cumul1s,  // cumulative sums in num1s
       n;
   // make a C/C++ compatible view of the 
   // R matrix;
   // note: referencing is done with 
   // ( , ) not [ , ], and indices start at 0
   Rcpp::NumericMatrix xadjm(adjm);
   n = xadjm.nrow();
   int n2 = n*n;
   // create output matrix
   Rcpp::NumericMatrix outm(n2,2);

   // start the threads; they will run the 
   // code below simultaneously, though not
   // necessarily executing the same line 
   // at the same time
   #pragma omp parallel
   {  int i,j,m;
      // each thread has an ID (starting at 0),
      // so determine the ID for this thread
      int me = omp_get_thread_num(),
          // find total number of threads
          nth = omp_get_num_threads();
      int myrows[2];
      int tot1s;
      int outrow,num1si;
      // have just one thread execute the
      // following block, while the 
      // others wait
      #pragma omp single
      { 
         num1s = (int *) 
            malloc(n*sizeof(int)); 
         cumul1s = (int *) 
            malloc((n+1)*sizeof(int)); 
      }
      findmyrange(n,nth,me,myrows);
      for (i = myrows[0]; i <= myrows[1]; 
            i++) {
         // number of 1s found in this row
         tot1s = 0;  
         for (j = 0; j < n; j++) 
            if (xadjm(i,j) == 1) {
               xadjm(i,(tot1s++)) = j;
            }
         num1s[i] = tot1s;
      }
      // wait for all threads to be done
      #pragma omp barrier  
      // again, one thread does the
      // following, others wait
      #pragma omp single
      {  
         // cumul1s[i] will be tot 1s before 
         // row i of xadjm
         cumul1s[0] = 0;  
         // now calculate where the output of 
         // each row in xadjm should start 
         // in outm
         for (m = 1; m <= n; m++) {
            cumul1s[m] = 
               cumul1s[m-1] + num1s[m-1];
         }
      }
      // now this thread will put the rows it 
      // found earlier into outm
      for (i = myrows[0]; i <= myrows[1]; 
            i++) {
         // current row within outm
         outrow = cumul1s[i];  
         num1si = num1s[i];
         for (j = 0; j < num1si; j++) {
            outm(outrow+j,0) = i + 1;
            outm(outrow+j,1) = xadjm(i,j) + 1;
         }
      }
   }
   // have some all-0 rows at end of outm; 
   // delete them
   Rcpp::NumericMatrix outmshort = 
      outm(Rcpp::Range(0,cumul1s[n]-1),
      Rcpp::Range(0,1));
   return outmshort;  // R object returned!
}

(For your convenience, I’ve place the code here.)

Compiling and running:

Here’s how to compile on Linux or a Mac. In my case, I had various files in /home/nm, which you’ll need to adjust for your machine. From the shell command line, do

export R_LIBS_USER=/home/nm/R
export PKG_LIBS="-lgomp"
export PKG_CXXFLAGS="-fopenmp -I/home/nm/R/Rcpp/include"
R CMD SHLIB AdjRcpp.cpp

This produces a file AdjRcpp.so, which contains the R-loadable function, transgraph().  We saw earlier how to call the code from R.  However, there is one important step not seen above: Setting the number of threads.

There are several ways to do this, but the most common is to set an environment variable in the shell before you start R.  For example, to specify running 4 threads, type

export OMP_NUM_THREADS=2

Brief performance comparison:

I ran this on a quad core machine, on a 10000-row graph m. The pure-R version of the code (not shown here) required 27.516 seconds to run, while the C/C++ version took only 3.193 seconds!

Note that part of this speedup was due to running four threads in parallel, but we also greatly benefited by running in C/C++ instead of R.

Analysis of the code:

I’ve tried to write the comments to tell most of the details of the story, but a key issue in reading the code is what I call its “anthropomorphic” view:  We write the code pretending we are a thread!

For example, consider the lines

   
   #pragma omp parallel
   {  int i,j,m;
      // each thread has an ID (starting at 0),
      // so determine the ID for this thread
      int me = omp_get_thread_num(),
          // find total number of threads
          nth = omp_get_num_threads();
      int myrows[2];
      ...  // lots of lines here
      for (i = myrows[0]; i <= myrows[1]; 
            i++) {
         outrow = cumul1s[i];  
         num1si = num1s[i];
         for (j = 0; j < num1si; j++) {
            outm(outrow+j,0) = i + 1;
            outm(outrow+j,1) = xadjm(i,j) + 1;
         }
      }
   }
 

As explained in the comments:  The pragma tells the compiler to generate machine code that starts the threads.  Each of the threads — say we have four — will simultaneously execute all the lines following the pragma, through the line with the closing brace.  However, each thread has a different ID number, which we can sense via the call to omp_get_thread_num(), which we record in the variable me.  We write the code pretending we are thread number me, deciding which rows of adjm we must handle, given our ID number.

Debugging:

I’m a fanatic on debugging tools, and whenever I see a new language, programming environment etc., the first question I ask is, How to debug code on this thing?

Any modern C/C++ debugging tool allows for threaded code, so that for example the user can move from one thread to another while single-stepping through the code.  But how does one do this with C/C++ code that is called from R?  The answer is, oddly enough, that we debug R itself!

But though the R Core Team would greatly appreciate your help in debugging R :-) we don’t actually do that.  What we do is start R with the -d option, which places us in the debugger.  We then set a breakpoint in our buggy code, e.g. for gdb

(gdb) break transgraph

 

We then resume execution, placing us back in R’s interactive mode, then call our buggy function from there as usual, which places in that function — and in the debugger!  We can then single-step etc. from there.

Other OpenMP constructs:

The above example was designed to illustrate several of the most common pragmas in OpenMP. But there are a lot more tricks you can do with it, and in addition there are settings that you can make to improve performance.  Some of these are explained in my book, with more examples and so on, and in addition there are myriad OpenMP tutorials (though not R-oriented) on the Web.

Happy OpenMP-ing!

Debugging Parallel Code with dbs()

I mentioned yesterday that my partools package is now on CRAN.  A number of people have expressed interest in the Snowdoop section, but in this post I want to call attention to the dbs() debugging tool in the package, useful for debugging code written for the portion of R’s parallel library that came from the old snow package.

I like to continue to call that portion “Snow,” using a capital and non-bold font to distinguish from the old CRAN package snow.  Also, I refer to the entity that makes calls to, e.g., clusterApply() as the manager, and refer to the entities that respond as the workers.

You can use dbs() on any Unix-family platform, such as Macs, Linux or Cygwin. (This is the only part of the partools package with this restriction.)

Here is the problem that dbs() solves:  When you run a Snow cluster-creation function, say,

> makeCluster(2)

this launches two new invocations of R.  The problem is that they are not associated with terminals, and thus you can’t directly use R’s debug(), browser() and so on, let alone more sophisticated debugging tools.  However, there is an indirect way:  You can specify manual=TRUE in your call to makeCluster(), and then start your worker R processes yourself, by hand, from within terminal windows.  You can insert a call to browser() inside the function you wish to debug, source() it, then run the function in each of these worker windows, single-stepping once the browser() call is hit.

This is fine in principle, but it’s a pain to actually do.  So, I wrote dbs() and included it in my partools package.  It automates the above process, very convenient.  I’ll present a quick, simple example here.

Say I have a file x.R consisting of

f <- function(x) {
x <- x + 1
x^2
}

Then I would call

> dbs(2,src="x.R",ftn="f",xterm="xterm")

Then dbs() would do all of the following for me, automatically, WITHOUT MY DOING ANY TYPING AT ALL:

  • Create 3 new terminal windows on my screen, two for my Snow workers and one for my Snow manager.
  • In the manager window, call makeCluster(2,manual=TRUE,port=ranport) (where ranport is a randomly chosen port number).
  • In the worker windows, invoke R with a connection to the manager, and have them listen for commands the manager will send them to run.
  • Have each of the worker processes execute source(“x.R”) and then debug(f).
  • Have the manager execute .libPaths() to acquire the same library search path that I’d been using at the time I ran dbs().  Then have each worker do so too.
  • Have the manager and workers load partools, in case it may be needed.

I can now go to the manager window, and run my Snow app as usual, say by typing (now I am typing again)

> clusterEvalQ(cls,f(5))

This gets the workers going, running f(), but since they had previously executed debug(), they now enter the browser.  My screen now looks like this:

dbs

My original window, the one from which I had invoked dbs(), is seen in the lower-right.  That invocation had created the three new windows, two for the workers at the top left and right, and one for the manager, at the middle bottom.  I had then given an ordinary Snow cluster call in the latter, so f() started running in the top two windows, and they entered the browser.  Those worker windows are now waiting for my ordinary R debug commands, such as n for single-stepping.

My argument xterm = “xterm” in my dbs() call needs comment.  It can specify any kind of terminal window that supports the -e option (which states what program to run in a newly-created window).  So, for instance, gnome-terminal in Ubuntu Linux would be fine.

If your system doesn’t have an xterm-family terminal window (I downloaded xterm to my Mac), you can still run dbs(), except that the function will require a little bit of typing by you in the procedure I listed above.  It’s still a huge work saver even in that case.

(Maybe some of you Mac aficionados out there will see how to eliminate that little bit of typing for xterm-less Macs.)

So, that’s it.  Happy debugging!

Snowdoop/partools Package Now on CRAN

I’ve now placed the partools package, including Snowdoop, on CRAN.  No major new functions since my last posting, but the existing functions have been made more versatile and convenient, and the documentation is now more detailed, with more examples and so on.  I do have more functions planned.

It is all platform independent, except for the dbs() debugging tool, which requires some Unix-family (Mac, Linux, Cygwin) utilities, e.g. screen.

Follow

Get every new post delivered to your Inbox.

Join 105 other followers