Thursday, June 12, 2014

Running Cascading Hadoop jobs via CLI, Oozie, or an IDE (part 2)

We are continuing now on the previous post our sample project setup that will allow us to run Cascading jobs within few usual environments - CLI, Oozie and IDE.

For our sample job, we'll take all too boring "word count" example. It would be best if we could code it in such a way to satisfy few requirements:
  • it should be triggered in the same way from any of 3 target environments
  • it should have externalized job parameters (such as "word count" input and output HDFS paths)
  • it should have externalized Hadoop configuration to be able to experiment with few of those
  • it should be able to take standard hadoop/yarn CLI command parameters
  • it should be able to take single input path parameter provided in form of multiple comma-separated paths for cases when Oozie coordinator takes multiple dataset instances during workflow submission
  • it should set custom mapreduce job name to improve job visibility on some GUIs (such as Hue)
Taking all these intro consideration, we decided to use recommended way to develop job "driver" applications for hadoop/yarn CLI commands - and that is using Hadoop Tool interface. That way it would be able to parse all standard parameters that these CLI commands provide, such as specifying different configuration file for job submission. 

Because Tool application is plain java application anyway, it can be also called from within an IDE. And finally, Oozie can also include it into its workflow as "java action node".

 public class WordCount extends Configured implements Tool {  
   public static void main(String[] args) throws Exception {  
     int exitCode = ToolRunner.run(new WordCount(), args);  
     System.exit(exitCode);  
   }  
   @Override  
   public int run(String[] args) throws Exception {  
     String inputPath = args[0];  
     String outputPath = args[1];  
     Configuration conf = getConf();  
     doWorkflow(inputPath, outputPath, conf);  
     return 0;  
   }  
 ...  
 }  

Job parameters - input and output path in this case, are provided as standard java program arguments. But as previously mentioned, when using Oozie, frequently an input argument is single String value containing comma-separated HDFS paths, because Oozie coordinator can be instructed to take multiple instances of some dataset and process them in batch, and it basically generates such comma-separated String value as input argument for triggered Oozie workflow. So its useful to construct source Tap from such String value. So here it goes:

   private Tap constructMultiHfsSourceTap(Scheme scheme, String inputPath) {  
     List<Tap> tapList = new ArrayList<Tap>();  
     String[] splits = inputPath.split(",");  
     for (String split : splits) {  
       tapList.add(new Hfs(scheme, split.trim()));  
     }  
     Tap[] taps = tapList.toArray(new Tap[tapList.size()]);  
     return new MultiSourceTap(taps);  
   }  

We couldn't use Cascading's GlobHfs here since individual paths are not part of some hierarchical structure.

Once this job hits the road, it would be great to easily see few main pieces of information about it on some kind of job UI, so we'll set its mapreduce job name:

 ....  
     Flow flow = flowConnector.connect(flowDef);  
     // set mapreduce job name  
     String mapReduceJobName = "Cascading Word Count: '" + inputPath + "' -> '" + outputPath + "'";  
     FlowStepStrategy flowStepStrategy = constructMapReduceJobNameStrategy(mapReduceJobName);  
     flow.setFlowStepStrategy(flowStepStrategy);  
     flow.complete();  
   }  
   private FlowStepStrategy constructMapReduceJobNameStrategy(final String mapReduceJobName) {  
     return new FlowStepStrategy() {  
       @Override  
       public void apply(Flow flow, List predecessorSteps, FlowStep flowStep) {  
         Object config = flowStep.getConfig();  
         if (config instanceof JobConf) {  
           ((JobConf) config).setJobName(mapReduceJobName);  
         }  
       }  
     };  
   }  

Complete sample "word count" project is avaiable on GitHub.

Job submission

IDE

Ok, now when we have all the code in place, we can finally run it locally within our IDE simply by calling the job driver application as any other java application. IDE will take care to put all necessary classes/jars on classpath.

 java -classpath <complete classpath here set by IDE> vmarcinko.cascading.wordcount.WordCount -conf conf/local-site.xml /home/vmarcinko/cascadingtest/sherlock_holmes.txt /home/vmarcinko/cascadingtest/wordcount  

As can be seen, we provided Hadoop configuration file (local-site.xml) by using "-conf" program argument which is standard argument parsed by ToolRunner utility class. In other words, we can use same standard arguments that can be used when submitting job via CLI as shown next.

Sample sherlock_holmes.txt file used in this example is available at <project dir>/data directory. If everything went good (and it should!), then word counts are found in part-xxxx file under output directory.

Command Line Interface (CLI)

First, we must package our job application in suitable form using Gradle Shadow plugin as described in part 1 of this post. The end result of "gradle shadowJar" task would be:
<cascading-wordcount project dir>/build/libs/cascading-wordcount-1.0-all.jar

Next we upload that JAR file into Hadoop cluster, place sample .txt file in HDFS path of our choice, and finally submit the job using shell command:

 yarn jar cascading-wordcount-1.0-all.jar vmarcinko.cascading.wordcount.WordCount /user/cloudera/cascadingtest/sherlock_holmes.txt /user/cloudera/cascadingtest/wordcount  

Shell command "yarn" is available in newer versions of Hadoop. Older version used "hadoop" command.

Oozie

To invoke the same vmarcinko.cascading.wordcount.WordCount application from Oozie, we need to use "java action node" within our Oozie workflow to launch it.

Anyway, we use the same shadow JAR (cascading-wordcount-1.0-all.jar) and place it under <oozie workflow HDFS dir>/lib directory. Under program arguments, it would be best to parametrize this java action node with ${inputPath} and ${outputPath}, so we can provide concrete HDFS paths when submitting the workflow.

When the job is launched via Oozie (either manually submitting workflow, or in scheduled manner via Oozie coordinator), we can see our running job nicely in some UI (such as Hue Job Browser in this example): Name of job corresponds to mapreduce job name that we set prior to execution.


(as usual when java application is called via Oozie's "java action node", for each such launch, Oozie initially starts a Map task that acts as launcher for specified java application, thus we end up with 2 jobs shown above)

I hope this post proves useful to all newbies trying to find some common way to set up Cascading job applications that can be triggered from various environments.

Tuesday, June 10, 2014

Running Cascading Hadoop jobs via CLI, Oozie, or an IDE (part 1)

If you stumbled upon this post, I guess you already know what Cascading is - another library out there that offers higher level language to compose Hadoop jobs. If you're just starting with it, you may very well be searching for best way to setup your project so these jobs can be run in various environments - locally within IDE of your choice, or to try them out on real Hadoop cluster by using CLI commands for job submission. Also, in real case scenarios, Hadoop workflows easily become complex enough to warrant the usage of additional engine for definition and coordination of all these jobs, and Oozie is currently the most popular choice for that in Hadoop ecosystem.

Ok, enough talkin', let's get to business...

To define our project setup and job packaging tasks, we'll use Gradle buid tool which is more modern substitution for Maven. Version of Cascading that we use here is 2.5.

But first, let's examine our 3 target environments....

Environments

IDE

Within our IDE, we want to run Cascading jobs in Hadoop local mode. This requires that all dependency libs (Cascading, Hadoop, and possibly some other ones) are included in classpath when executing Cascading jobs there. So, our build script should take care to generate IDE-specific project files that include all mentioned dependencies for job runs.

Command Line Interface (CLI)

Most of examples found out there show how to trigger hadoop mapreduce jobs via command line interface - via "hadoop jar myjobapp.jar..." command (or "yarn jar myjobapp.jar ..." in newer versions of Hadoop).

Since we're using Cascading here, we need to package somehow Cascading jars together with our custom code for these commands to work. There are couple of ways to do this, but it seems that bundling them all into one jar ("fat jar") is the most popular one.

To construct the "fat jar" we just create regular jar from all our custom classes, and also add 3rd party libs to it. There are 2 way how we can add these libs:
  • by placing them under internal "lib" folder of the fat jar
  • by extracting all classes from them, and add them in unpackaged form to same fat jar
The later approach is a bit more complicated, it destroys the structure of dependency libraries, but as we will see right away, it offers one advantage.

One more thing - we don't need to package Hadoop libraries since they are already present when jobs are run on Hadoop.

Oozie

Unlike hadoop/yarn CLI commands, Oozie doesn't recognize fat jars containing internal lib folder with dependency libs, so our Cascading app's jar packaged in that way would not be able to run when triggered by Oozie.

On the other hand, the second approach of building fat jars is good to go, and fortunately, there is Gradle plugin to help us with that - Gradle Shadow.

Same as CLI approach, we don't want to package Hadoop libraries for already mentioned reason.

Build script

As said, we'll use Gradle as our build tool.

First, let's define our project dependencies in build script (build.gradle).

As already mentioned, we need to somehow mark separately Hadoop libraries because we have to exclude them when packaging our job applications for CLI/Oozie enviroments. We'll do that by defining custom Gradle configuration using:
 configurations {  
   hadoopProvided  
 }  

And finally define dependencies for that configuration (we'll add slf4j/logback dependencies also for proper logging when running within an IDE):
 hadoopProvided(  
       "org.apache.hadoop:hadoop-client:${hadoopVersion}",  
       "commons-httpclient:commons-httpclient:3.1",  
       "org.slf4j:slf4j-api:${slf4jVersion}",  
       "org.slf4j:jcl-over-slf4j:${slf4jVersion}",  
       "org.slf4j:log4j-over-slf4j:${slf4jVersion}",  
       "ch.qos.logback:logback-classic:1.0.+"  
   )  

For everything to work correctly, we have to add that configuration to main sourceSet, and also register that configuration to be included in generated IDEA/Eclipse projects. Take a look at GitHub project to see how it is done inside the build script.

Cascading libraries are added to standard "compile" configuration:
 compile (  
     "cascading:cascading-core:${cascadingVersion}",  
     "cascading:cascading-local:${cascadingVersion}",  
     "cascading:cascading-hadoop2-mr1:${cascadingVersion}"  
   )  

Using special Gradle plugins for Intellij IDEA and Eclipse support, we can generate IDE-specific project files. Project files generation tasks are called by:
 gradle idea  
or
 gradle eclipse  

To package our job application as "fat jar" with all dependency classes extracted from their original jars, we have to include Gradle Shadow plugin (version 0.9.0-M1 currently) into project via:
 buildscript {  
   repositories {  
     jcenter()  
   }  
   dependencies {  
     classpath 'com.github.jengelman.gradle.plugins:shadow:0.9.0-M1'  
   }  
 }  
 apply plugin: 'shadow'  

"Fat jar" is constructed by calling:
 gradle shadowJar  
and end result can be seen at path:
<project-dir>/build/libs/<appname>-<appversion>-all.jar

Complete build script is available here, as part of my cascading-wordcount GitHub project.

In the next part of this post, we'll look at how to execute simple Cascading job within all 3 environments.