Monday, January 26, 2015

Nanocube feeder in Java, part 2

Continuing on the previous post where we prepared DMP encoder, we now want to see how can we use it to encode and stream the data into Nanocube. And because Nanocube process uses standard input for data provision, we have to first start the process in order to obtain that input stream.

We will use ProcessBuilder class to set up everything needed to start the Nanocube. Configuration-wise, we only need to set up the directory where Nanocube binaries are present (in particular nanocube-leaf command).
 private final static Logger appLogger = Logger.getLogger("Feeder");  
 private final static Logger nanocubeOutLogger = Logger.getLogger("Nanocube OUT");  
 String nanocubeBinPath = "/home/vmarcinko/nanocube/bin";  
 ProcessBuilder pb = new ProcessBuilder(nanocubeBinPath + "/nanocube-leaf", "-q", "29512", "-f", "10000");  
 Map<String, String> env = pb.environment();  
 env.put("NANOCUBE_BIN", nanocubeBinPath);  
 pb.redirectErrorStream(true);"Starting Nanocube process...");  
 Process nanocubeProcess = pb.start();  
 ExecutorService executorService = Executors.newSingleThreadExecutor();  
 startNanocubeOutputLoggingTask(executorService, nanocubeProcess);  
 OutputStream inPipeOutputStream = nanocubeProcess.getOutputStream();  
As seen above, when we start Nanocube process, we fetch output end of the process input "pipe" in form of OutputStream, and we will use that to stream the data into Nanocube.

One interesting piece above is the way we handle Nanocube process output - regardless if it was error or standard output, we start new task in another thread (thus java.util.concurrent.Executor) to read that output and print it via some logging framework used in our java application (here plain java.util.logging loggers). Here is relevant startNanocubeOutputLoggingTask method:
 private static void startNanocubeOutputLoggingTask(Executor executor, final Process nanocubeProcess) {  
   InputStream outPipeInputStream = nanocubeProcess.getInputStream();  
   Runnable runnable = new Runnable() {  
     public void run() {  
       try {  
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(outPipeInputStream))) {  
           String line;  
           while ((line = reader.readLine()) != null) {  
       } catch (IOException e) {  
         appLogger.log(Level.SEVERE, "Error reading from Nanocube output: " + e.getMessage(), e);  
   // start logging task  
By having ExecutorService instance around, we can use it to stop that logging task at the application shutdown.

And finally, with the code above, and instance of NanocubeDmpEncoder ready (see previous post), we perform the feeding:"Streaming DMP content into Nanocube process...");  
 NanocubeDmpEncoder dmpEncoder = ...;  

 byte[] headerBytes = dmpEncoder.encodeHeaders();

 byte[] recordBytes = dmpEncoder.encodeRecord(....);

 inPipeOutputStream.close();"Streaming of data finished");  
 executorService.shutdown();"Nanocube process stopped");  
At the end, we wait for Nanocube process to stop before doing the cleanup of our output logging task.

Wednesday, January 21, 2015

Nanocube feeder in Java, part 1

For some time I've been sporadically following Nanocube project, which is a novel way to index spatio-temporal data, all in memory, enabling very fast analytical querying and visualization of fairly large datasets. Just to get rough impression of Nanocube memory consumption - for up to 10 billions of records, it can require around few tens of GB of RAM, and the querying will work sufficiently fast to allow real-time interactive exploration of such huge pool of data.

Note - this post is based on nanocube version 3.1.

Currently provided way to feed data into nanocube is via python command line tool -
In a nuthshell, this tool is consuming CSV datasets and converts it into so called "dmp" format which is than piped into nanocube server. This can be cumbersome way for some use cases, and being a java developer I wanted to have some java-friendly way to feed the data, so I set out to develop my own feeder in Java.

DMP Encoder

Biggest portion of my feeder would be DMP encoder which would convert input data, given as plain java objects, into DMP format required by nanocube server. Since documentation about DMP format is somewhat scarce, the easiest way seemed to be analyzing python code within tool, and translate it to Java.

The result encoder is not the most generic one, but focused on the most common use cases - when we have following dimensions - spatial, temporal, count, and arbitrary number of categorical dimensions. Of course, you need accordingly to have compiled binaries of nanocube for given combination of dimensions and their details (such as byte lengths of certain dimensions, zoom level etc..), and few of them are found in nanocube-dir/bin directory, example being nc_q25_c1_c1_u2_u4.

Whole source code for DMP encoder is available here.

The way to use it is as follows.

You first instantiate the encoder by specifying schema for the dataset. Constructor arguments are:
  • name
  • number of location zoom levels (eg. 25 is typically sufficient)
  • time offset
  • length of time bin in seconds
  • length of time value in bytes (2, 4 or 8)
  • map that describes categorical dimensions and their options
 NanocubeDmpEncoder dmpEncoder = new NanocubeDmpEncoder("telco-data", 25, timeOffset, 3600, 2, categoryEnumerations);  
Map of category data is actually a 2-level map. Top level map contains entries where keys are category names and values are maps again, containing enum options specification for given category. Each enum options map ties domain-specific option value to (label, byte) pair which specifies how that option will be presented (label) and also how it will be encoded in nanocube (byte value).

For example, if we want to have categories "NetType" and "Status" which will represent network type and status for some mobile call, then in Java we will usually use enum class, such as:
 public enum NetType {  
 public enum Status {  
So you can define our category definition map something like:
 Map<String, Map<Object, CategoryEnumInfo>> categoryEnumerations = new HashMap<>();  
 categoryEnumerations.put("NetType", createEnumInfosFromEnumClass(NetType.class));  
 categoryEnumerations.put("Status", createEnumInfosFromEnumClass(Status.class));  
 private <E extends Enum> Map<Object, CategoryEnumInfo> createEnumInfosFromEnumClass(Class<E> enumClass) {  
   Map<Object, CategoryEnumInfo> enumInfos = new HashMap<>();  
   for (E constant : enumClass.getEnumConstants()) {  
     enumInfos.put(constant, new CategoryEnumInfo(, (byte) constant.ordinal()));  
   return enumInfos;  
So now when we have NanocubeDmpEncoder instance constructed, now we can encode the headers and records into required DMP format.
 // headers  
 byte[] headerBytes = dmpEncoder.encodeHeaders();  
 // record  
 Map<String, Object> categoryValues = new HashMap<>();  
 categoryValues.put("NetType", NetType.GSM);  
 categoryValues.put("Status", Status.SUCCEEDED);  
 byte[] recordBytes = dmpEncoder.encodeRecord(44.8789661034259, 17.72673345412568, new Date(), 1, categoryValues);  

In second part of this post we will see how to boot Nanocube server from Java process, and how to stream encoded data into it.

Friday, October 24, 2014

Packages and separation of concerns

Recently I had short discussion with my coworker about how to package java code, and it brought back some of my earlier thoughts about it, mostly because over time some of my opinions about the subject changed significantly.

Within a single code base, packages are highest-level way to modularize our code, so they play significant role when one wants to grasp what the code is all about. Thing is that, each class (or interface, or actually any other code artifact represented as a file), can often be looked upon from multiple different point of views, but since we have to place that class in one and only one package, we have to make decision what point of view (out of many) is most significant to us.

For example, in java community, DAOs (data access object) are common "type" of classes which encapsulate data access logic. Thus, we commonly have something like UserDao that contains logic to do CRUD operations on user entities. Actually, following good rule to separate interface from implementation, we most often have UserDao interface and something like HibernateUserDaoImpl implementation (in case Hibernate is used as our ORM framework) or maybe JdbcUserDaoImpl (in case plain JDBC is used).

However, this UserDao class can be viewed from 2 points of view:
  • it is DAO (technical point of view)
  • it is user-related (business domain point of view)
Maybe our app even have multiple deployments based on our customers, so we maybe even have something like AcmeJdbcUserDaoImpl, so beside 2 aspects mentioned above, we can even say for that class to have additional one - it is ACME-deployment-related functionality.

To keep things simple, let's just look at two-aspect version of this DAO. So basically we need to decide how will we package our code, whether by technical aspect (layering), such as:
, or by business aspect:

Actually, the latter approach isn't packaged by business domains in the fullest sense, but I'll explain that in a moment.

Earlier in my career I used former approach, but over time I mostly adopted later one. Maybe it's just a matter of preference, but my reasoning is as follows.

As we already mentioned, in both examples the code is structured based on some concern, technical or domain related, so it seems that in both cases some sense of order is kept there. But following the reasoning that Robert C. Martin gave when explaining single responsibility principle (close cousin to principles of coupling, cohesion and separation of concern), we should group pieces of code that are likely to change due to same reason.

Now, in our example of UserDao, if you frequently have to change DAO implementations in whole app due to desire to use different persistence framework for example, then you would certainly find useful if all DAOs are under one package. But I find these cases quite rare. On the other hand, if some business feature is changed, such as some which is related to user "module", and if for that reason you have to change User entity, as well as UserDao which persists it, and also UserManager that exposes service operation related to user functionality, then you should certainly look to group those code artifacts together. And in my experience, these kind of changes are constantly present.

I can see some other people already blogged about this kind of "vertical" modularization, so here is one example. It also goes kind of hand in hand with SOA/microservices architectures, where you identify pieces of your application from functional perspective, which when grown, can be separated into its own services, thus deployed separately.

Now, I said before that I cheated a bit regarding the second approach presented above. One can notice that I still have web layer separated, although I have user-related web functionality there which is out of "users" package. I guess I still consider web layer such a different concern from business layer that I like to keep it separated. Lot of times we just have changes to web layer that doesn't touch business side, thus this separatiuon. But it definitely got me thinking that it would maybe make nice experiment to just try once putting even UserWebController within "users" package because it often updated when some change is required to user-related functionality.

Another thing - this whole story reminded me of a moment in a past when I was frequently using Tapestry as a web framework of my choice. Common thing in web development back then, as it is also now, was to have HTML template files separated from web controller classes, usually under web app root directory. But I heard occasionally that some Tapestry users like to keep HTML templates under the classpath, in other words, placed within packages, together with other web related code. Thus, one would have something like:
I resisted this idea for some time because it felt so wrong to keep HTML files together with the code, but when I eventually gave it a try, I was enlightened. It was so much easier not having to jump through different project directories and search files whenever I had to change some web feature, and I almost always have to simultaneously change HTML template together with the related controller code.

Of course, if I had been working on some other type of project where I had some dedicated web designer to work solely on HTML templates, then it would be different situation so this way of packaging would probably be totally unsuitable, but since I was both the HTML designer as well as the coder, it really made sense.

You can also notice in picture above that localization messages (.properties files) are also placed within the package and separated for each HTML page (, Although I never practiced this much, and was also using single localization file for all web messages as usual, it was also practiced by some people due to benefits of having related messages as close as possible to related web artifacts (page controller class, page HTML template) for easier maintenance.  Of course, the benefit of usual approach with single localization file is that it is certainly easier to translate all web messages to some new language if you have all of them placed in single file (, But even nowadays, whenever I work on some larger project, I stumble upon situations where these single localization message files contain much of old garbage messages. It is a garbage left due to exact reason that it is quite easy to forget to clean up these localization messages once some web stuff has been changed/removed. With the approach given above, you usually never forget to do that because these localization files are so close to file that you just changed/removed.

Saturday, August 16, 2014

Embedded Kafka and Zookeeper for unit testing

Recently I wanted to setup embedded Kafka cluster for my unit tests, and suprisingly it wasn't that trivial because most of examples I found around were made for some older versions of Kafka/Zookeeper or they didn't work for some other reasons, so it took me some time to find some proper version.

The project I took it from is Camus which is Kafka->Hadoop ETL project, and I just made some slight changes related to newer Zookeeper, as well as some changes configuration-wise.

My embedded Kafka & Zookeeper servers are available at this gist. All the code is tested against Kafka and Zookeeper 3.4.6.

Here is simple example on how to setup Zookeeper at fixed port (2181), and 2 Kafka servers at random available ports:

     EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(2181);  
     List<Integer> kafkaPorts = new ArrayList<Integer>();  
     // -1 for any available port  
     EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), kafkaPorts);  
     System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());  
     System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());  

Thursday, June 12, 2014

Running Cascading Hadoop jobs via CLI, Oozie, or an IDE (part 2)

We are continuing now on the previous post our sample project setup that will allow us to run Cascading jobs within few usual environments - CLI, Oozie and IDE.

For our sample job, we'll take all too boring "word count" example. It would be best if we could code it in such a way to satisfy few requirements:
  • it should be triggered in the same way from any of 3 target environments
  • it should have externalized job parameters (such as "word count" input and output HDFS paths)
  • it should have externalized Hadoop configuration to be able to experiment with few of those
  • it should be able to take standard hadoop/yarn CLI command parameters
  • it should be able to take single input path parameter provided in form of multiple comma-separated paths for cases when Oozie coordinator takes multiple dataset instances during workflow submission
  • it should set custom mapreduce job name to improve job visibility on some GUIs (such as Hue)
Taking all these intro consideration, we decided to use recommended way to develop job "driver" applications for hadoop/yarn CLI commands - and that is using Hadoop Tool interface. That way it would be able to parse all standard parameters that these CLI commands provide, such as specifying different configuration file for job submission. 

Because Tool application is plain java application anyway, it can be also called from within an IDE. And finally, Oozie can also include it into its workflow as "java action node".

 public class WordCount extends Configured implements Tool {  
   public static void main(String[] args) throws Exception {  
     int exitCode = WordCount(), args);  
   public int run(String[] args) throws Exception {  
     String inputPath = args[0];  
     String outputPath = args[1];  
     Configuration conf = getConf();  
     doWorkflow(inputPath, outputPath, conf);  
     return 0;  

Job parameters - input and output path in this case, are provided as standard java program arguments. But as previously mentioned, when using Oozie, frequently an input argument is single String value containing comma-separated HDFS paths, because Oozie coordinator can be instructed to take multiple instances of some dataset and process them in batch, and it basically generates such comma-separated String value as input argument for triggered Oozie workflow. So its useful to construct source Tap from such String value. So here it goes:

   private Tap constructMultiHfsSourceTap(Scheme scheme, String inputPath) {  
     List<Tap> tapList = new ArrayList<Tap>();  
     String[] splits = inputPath.split(",");  
     for (String split : splits) {  
       tapList.add(new Hfs(scheme, split.trim()));  
     Tap[] taps = tapList.toArray(new Tap[tapList.size()]);  
     return new MultiSourceTap(taps);  

We couldn't use Cascading's GlobHfs here since individual paths are not part of some hierarchical structure.

Once this job hits the road, it would be great to easily see few main pieces of information about it on some kind of job UI, so we'll set its mapreduce job name:

     Flow flow = flowConnector.connect(flowDef);  
     // set mapreduce job name  
     String mapReduceJobName = "Cascading Word Count: '" + inputPath + "' -> '" + outputPath + "'";  
     FlowStepStrategy flowStepStrategy = constructMapReduceJobNameStrategy(mapReduceJobName);  
   private FlowStepStrategy constructMapReduceJobNameStrategy(final String mapReduceJobName) {  
     return new FlowStepStrategy() {  
       public void apply(Flow flow, List predecessorSteps, FlowStep flowStep) {  
         Object config = flowStep.getConfig();  
         if (config instanceof JobConf) {  
           ((JobConf) config).setJobName(mapReduceJobName);  

Complete sample "word count" project is avaiable on GitHub.

Job submission


Ok, now when we have all the code in place, we can finally run it locally within our IDE simply by calling the job driver application as any other java application. IDE will take care to put all necessary classes/jars on classpath.

 java -classpath <complete classpath here set by IDE> vmarcinko.cascading.wordcount.WordCount -conf conf/local-site.xml /home/vmarcinko/cascadingtest/sherlock_holmes.txt /home/vmarcinko/cascadingtest/wordcount  

As can be seen, we provided Hadoop configuration file (local-site.xml) by using "-conf" program argument which is standard argument parsed by ToolRunner utility class. In other words, we can use same standard arguments that can be used when submitting job via CLI as shown next.

Sample sherlock_holmes.txt file used in this example is available at <project dir>/data directory. If everything went good (and it should!), then word counts are found in part-xxxx file under output directory.

Command Line Interface (CLI)

First, we must package our job application in suitable form using Gradle Shadow plugin as described in part 1 of this post. The end result of "gradle shadowJar" task would be:
<cascading-wordcount project dir>/build/libs/cascading-wordcount-1.0-all.jar

Next we upload that JAR file into Hadoop cluster, place sample .txt file in HDFS path of our choice, and finally submit the job using shell command:

 yarn jar cascading-wordcount-1.0-all.jar vmarcinko.cascading.wordcount.WordCount /user/cloudera/cascadingtest/sherlock_holmes.txt /user/cloudera/cascadingtest/wordcount  

Shell command "yarn" is available in newer versions of Hadoop. Older version used "hadoop" command.


To invoke the same vmarcinko.cascading.wordcount.WordCount application from Oozie, we need to use "java action node" within our Oozie workflow to launch it.

Anyway, we use the same shadow JAR (cascading-wordcount-1.0-all.jar) and place it under <oozie workflow HDFS dir>/lib directory. Under program arguments, it would be best to parametrize this java action node with ${inputPath} and ${outputPath}, so we can provide concrete HDFS paths when submitting the workflow.

When the job is launched via Oozie (either manually submitting workflow, or in scheduled manner via Oozie coordinator), we can see our running job nicely in some UI (such as Hue Job Browser in this example): Name of job corresponds to mapreduce job name that we set prior to execution.

(as usual when java application is called via Oozie's "java action node", for each such launch, Oozie initially starts a Map task that acts as launcher for specified java application, thus we end up with 2 jobs shown above)

I hope this post proves useful to all newbies trying to find some common way to set up Cascading job applications that can be triggered from various environments.

Tuesday, June 10, 2014

Running Cascading Hadoop jobs via CLI, Oozie, or an IDE (part 1)

If you stumbled upon this post, I guess you already know what Cascading is - another library out there that offers higher level language to compose Hadoop jobs. If you're just starting with it, you may very well be searching for best way to setup your project so these jobs can be run in various environments - locally within IDE of your choice, or to try them out on real Hadoop cluster by using CLI commands for job submission. Also, in real case scenarios, Hadoop workflows easily become complex enough to warrant the usage of additional engine for definition and coordination of all these jobs, and Oozie is currently the most popular choice for that in Hadoop ecosystem.

Ok, enough talkin', let's get to business...

To define our project setup and job packaging tasks, we'll use Gradle buid tool which is more modern substitution for Maven. Version of Cascading that we use here is 2.5.

But first, let's examine our 3 target environments....



Within our IDE, we want to run Cascading jobs in Hadoop local mode. This requires that all dependency libs (Cascading, Hadoop, and possibly some other ones) are included in classpath when executing Cascading jobs there. So, our build script should take care to generate IDE-specific project files that include all mentioned dependencies for job runs.

Command Line Interface (CLI)

Most of examples found out there show how to trigger hadoop mapreduce jobs via command line interface - via "hadoop jar myjobapp.jar..." command (or "yarn jar myjobapp.jar ..." in newer versions of Hadoop).

Since we're using Cascading here, we need to package somehow Cascading jars together with our custom code for these commands to work. There are couple of ways to do this, but it seems that bundling them all into one jar ("fat jar") is the most popular one.

To construct the "fat jar" we just create regular jar from all our custom classes, and also add 3rd party libs to it. There are 2 way how we can add these libs:
  • by placing them under internal "lib" folder of the fat jar
  • by extracting all classes from them, and add them in unpackaged form to same fat jar
The later approach is a bit more complicated, it destroys the structure of dependency libraries, but as we will see right away, it offers one advantage.

One more thing - we don't need to package Hadoop libraries since they are already present when jobs are run on Hadoop.


Unlike hadoop/yarn CLI commands, Oozie doesn't recognize fat jars containing internal lib folder with dependency libs, so our Cascading app's jar packaged in that way would not be able to run when triggered by Oozie.

On the other hand, the second approach of building fat jars is good to go, and fortunately, there is Gradle plugin to help us with that - Gradle Shadow.

Same as CLI approach, we don't want to package Hadoop libraries for already mentioned reason.

Build script

As said, we'll use Gradle as our build tool.

First, let's define our project dependencies in build script (build.gradle).

As already mentioned, we need to somehow mark separately Hadoop libraries because we have to exclude them when packaging our job applications for CLI/Oozie enviroments. We'll do that by defining custom Gradle configuration using:
 configurations {  

And finally define dependencies for that configuration (we'll add slf4j/logback dependencies also for proper logging when running within an IDE):

For everything to work correctly, we have to add that configuration to main sourceSet, and also register that configuration to be included in generated IDEA/Eclipse projects. Take a look at GitHub project to see how it is done inside the build script.

Cascading libraries are added to standard "compile" configuration:
 compile (  

Using special Gradle plugins for Intellij IDEA and Eclipse support, we can generate IDE-specific project files. Project files generation tasks are called by:
 gradle idea  
 gradle eclipse  

To package our job application as "fat jar" with all dependency classes extracted from their original jars, we have to include Gradle Shadow plugin (version 0.9.0-M1 currently) into project via:
 buildscript {  
   repositories {  
   dependencies {  
     classpath 'com.github.jengelman.gradle.plugins:shadow:0.9.0-M1'  
 apply plugin: 'shadow'  

"Fat jar" is constructed by calling:
 gradle shadowJar  
and end result can be seen at path:

Complete build script is available here, as part of my cascading-wordcount GitHub project.

In the next part of this post, we'll look at how to execute simple Cascading job within all 3 environments.

Thursday, April 10, 2014

My upcoming JavaCro 2014 talk: Log as basis for distributed systems

After last year talking about Neo4j and graph databases, this year at JavaCro 2014, I will be having a talk about log-based distributed systems, with quick overview of few of such systems and architectures - Kafka, Datomic, CQRS/event-sourcing....

The talk will be held on May 13th, 2014. So if you're nearby, and would like to use this opportunity to see the talk or chat about whatever IT-related, I would be glad to do so.

Here's the talk abstract:
Log, or historical storage of system events, has always occupied central place in architectures of all traditional databases and analytical systems, but nowadays it serves more and more as backbone of modern distributed systems. Some of architectures and tools which use this type of data storage will be presented - Kafka message broker, Datomic database, CQRS/Event-sourcing architecture ...