Monday, June 17, 2013

Submitting Hadoop jobs programmatically from anywhere

If you are newcomer to Hadoop land, have you noticed that most of examples for submitting Hadoop MapReduce jobs out there use shell command - "hadoop jar ..."?

Although this way comes handy, maybe even quite often, I personally would prefer if default, most visible way, would always be about how to do things programmatically because there are various environments where one wants to submit jobs from, and if you know how to code it, that you're always good to go.

Take these situations for example:
  • code-run-debug cycle is always best done in your IDE, but unfortunately various IDEs have their own way of doing things, and they often don't package our code in appropriate JARs
  • you would like to submit jobs from your non-Hadoop client machine to remote Hadoop cluster - in other words, you don't even have "hadoop" shell command available on your client
To be honest, it's not that these programmatic examples are rare if you try to google them, but more like being too simplistic for real-world scenarios - eg. they often just give "word count" example that doesn't use any 3rd party libraries, which if you really think about it, is not that useful at all, since any serious Hadoop developer use some additional library/tool that offers higher-level abstraction for job construction.

The problem

The thing is is that submitting jobs to map/reduce involves providing somehow required classes that constitute job logic to remote Hadoop cluster. These classes have to be also available on client side when submitting the job, and the way we package them for client application can often be totally unsuitable for submission to Hadoop cluster. For example - these classes can be deployed in non-packaged form to /WEB-INF/classes/ directory if client application is standard web application).

There are couple of ways to provide required classes to MR job:
  1. Specify path to local JAR file - JobConf.setJar(String path)
  2. Specify example class and let library figure out what local JAR file is in play - JobConf.setJarByClass(Class exampleClass)
  3. Specify local JAR files in "tmpjars" property of job configuration and, each time for each individual job, Hadoop job client will automatically copy it to remote HDFS and add them to distributed cache (this is programmatic equivalent of what -libjars option in hadoop shell commands does)
  4. Copy explicitly required JAR files to HDFS during client application boot time, and after that add them to distributed cache using DistributedCache.addFileToClassPath(hdfsJarPath, jobConfiguration) prior to any job submission; this is similar to 3rd option, but we do the steps manually
All seems well, but let's dive into these options...

The 1st and 2nd option only submit single JAR file which is not enough if you use some higher-level library for constructing Hadoop MR jobs (such as Cascading), so you usually need many JAR files as well as your custom job classes. That's the reason a lot of people package together all of these into single "fat jar" and provide it to Hadoop. Your custom classes can go directly to this JAR file, whereas 3rd party libs can go to /lib directory inside it.

<some fat jar>
    my.company.myapp.MyClass1
    my.company.myapp.MyClass2
    ....
    lib/
        lib1.jar
        lib2.jar
        ....

The 2nd option can be problematic since Hadoop client will try to resolve local JAR by introspecting client application classpath, and as I said before, client application can require different deployment and packaging that cannot use mentioned fat jar. For example :
  • IDEs often run the applications by putting custom classes in plain non-archived form, so we don't even have these as JARs
  • Web applications builds its classpath by using JARs residing in its /WEB-INF/lib directory, and if we just place "fat jar" in it, web app will not find 3rd party JARs contained in its internal /lib directory
Using "fat jar" has some slight overhead in that we always send whole jar to Hadoop for each job we have, and you usually have a lot of jobs during any slightly more complex workflow.

3rd and 4th option use distributed cache mechanism that takes care for 3rd party libraries to be available to submitted jobs during execution. This is also recommended way by Cloudera. If your provide all (even your custom) classes as these 3rd party JARs, you can even even omit specifying your own JAR using JobConf.setJar(...) and JobConf.setJarByClass(...), although Hadoop client will still log a warning because of it.

I somehow prefer the 4th option, because it gives me the most control, so we'll primarily focus on that option.

The solution

All of this leads us to idea that we should maybe have job logic packaged in 2 separate ways: one for client application and one for Hadoop jobs. For example, in case of client being web application, you still place your classes in WEB-INF/classes or WEB-INF/lib as you usually do, but have some separate directory containing these classes packaged in a way suitable for deploying to Hadoop cluster. Seems redundant, but at least it doesn't bring any collision between these 2 worlds.

Preparing map reduce dependencies

Since nowadays the standard way of using 3rd party libs in java applications is via Maven repositories, one would usually like to isolate job dependencies and prepare them for deployment to Hadoop. I use Gradle build tool, so I'll show how you can achieve that.

First, we define separate dependency configuration for Map/Reduce dependencies. Default "compile" configuration should extend it.

 configurations {  
   mapreduce {  
     description = 'Map reduce jobs dependencies'  
   }  
   compile {  
     extendsFrom mapreduce  
   }  
 }  

Now you can define dependencies inside this new configuration. For eg. if you're using Cascading library, you can specify it like:

   mapreduce (  
       "cascading:cascading-core:${cascadingVersion}",  
       "cascading:cascading-local:${cascadingVersion}",  
   )  
   mapreduce ("cascading:cascading-hadoop:${cascadingVersion}") {  
     exclude group: "org.apache.hadoop", module: "hadoop-core"  
   }  
   compile (  
       "org.slf4j:slf4j-api:${slf4jVersion}",  
       "org.slf4j:jcl-over-slf4j:${slf4jVersion}",  
       "org.slf4j:log4j-over-slf4j:${slf4jVersion}",  
       "ch.qos.logback:logback-classic:1.0.+",  
       "org.apache.hadoop:hadoop-core:${hadoopVersion}",  
       "commons-io:commons-io:2.1"  
   )  
   
As you can see, I excluded "hadoop-core" from "mapreduce" configuration since it is provided in Hadoop cluster, but I had to include it again in "compile" since I need it in client application classpath. This is somewhat the equivalent of "Provided" scope that some IDEs offer as a way to flag the libraries.

Now, if you don't like "fat jar" and prefer separate JARs, we would need to copy these "mapreduce" dependecies' JARs to designated directory we do:

 task prepareMapReduceLibsInDir(type: Sync, dependsOn: jar) {  
   from jar.outputs.files  
   from configurations.mapreduce.files  
   into 'mapreducelib'  
 }  

Now we have these dependencies available in separate directory ("mapreducelib"). Our custom classes (jar.outputs.files) are also packaged here as JAR file named by archivesBaseName build script property.

All modern IDEs have a way to include the build task to its development cycle, so you should call this Gradle task whenever you change your custom map/reduce classes or the collection of 3rd party libraries.

Job submission helpers

General description of job submission as described previously in 4th option is:
  • first, copy all required local libraries to some specific HDFS directory
  • prior to any job submission, add these HDFS libraries to distributed cache, which ultimately results with Configuration instance being prepared for jobs
  • use the Configuration in your job client
All utility methods for job submission are contained in JobHelper class, available on Gist.

Let's go through each of mentioned steps.

Copy all local libraries to HDFS

Local directory containing JARs is the one that we constructed via Gradle prepareMapReduceLibsInDir task

String localJarsDir = "./mapreducelib";
String hdfsJarsDir = "/temp/hadoop/myjobs/mylibs";
JobHelper.copyLocalJarsToHdfs(localJarsDir, hdfsJarsDir, new Configuration());

As said, this should be done only once at client application's boot time.

Add HDFS libraries to distributed cache

Now we need to add copied JAR files to distributed cache, so these would be available to Hadoop data nodes which are running job tasks.

Configuration configuration = new Configuration();
JobHelper.hackHadoopStagingOnWin();
JobHelper.addHdfsJarsToDistributedCache(hdfsJarsDir, configuration);
I called additional method that is required for job submission to work correctly on Windows. This piece of code mostly came from Spring Data for Hadoop library. If your client application is running on *nix, you can freely omit that. 

Use prepared Configuration

Now finally we can use prepared Configuration instance from prior step in our JobConf/JobClient:
JobConf jobConf = new JobConf(configuration);
....
JobClient.runJob(jobConf);

If you use some higher level library that doesn't use Configuration instance directly, but expose some other way to pass Hadoop Configuration properties to it, just extract these properties from Configuration and pass it that way. For example, Cascading library allows properties' Map to be passed to it, so you can use following method for constructing the Map:
Map configurationProperties = JobHelper.convertConfigurationToMap(configuration);

And that's pretty much it. Although there can be many more little things that will give you headaches when submitting the jobs, I hope this post expanded your arsenal of possible ways to tackle them.