Monday, January 26, 2015

Nanocube feeder in Java, part 2

Continuing on the previous post where we prepared DMP encoder, we now want to see how can we use it to encode and stream the data into Nanocube. And because Nanocube process uses standard input for data provision, we have to first start the process in order to obtain that input stream.

We will use ProcessBuilder class to set up everything needed to start the Nanocube. Configuration-wise, we only need to set up the directory where Nanocube binaries are present (in particular nanocube-leaf command).
 private final static Logger appLogger = Logger.getLogger("Feeder");  
 private final static Logger nanocubeOutLogger = Logger.getLogger("Nanocube OUT");  
 ...  
 ...  
 String nanocubeBinPath = "/home/vmarcinko/nanocube/bin";  
 ProcessBuilder pb = new ProcessBuilder(nanocubeBinPath + "/nanocube-leaf", "-q", "29512", "-f", "10000");  
   
 Map<String, String> env = pb.environment();  
 env.put("NANOCUBE_BIN", nanocubeBinPath);  
   
 pb.redirectOutput(ProcessBuilder.Redirect.PIPE);  
 pb.redirectInput(ProcessBuilder.Redirect.PIPE);  
 pb.redirectErrorStream(true);  
   
 appLogger.info("Starting Nanocube process...");  
 Process nanocubeProcess = pb.start();  
   
 ExecutorService executorService = Executors.newSingleThreadExecutor();  
 startNanocubeOutputLoggingTask(executorService, nanocubeProcess);  
   
 OutputStream inPipeOutputStream = nanocubeProcess.getOutputStream();  
 ...  
 ...  
As seen above, when we start Nanocube process, we fetch output end of the process input "pipe" in form of OutputStream, and we will use that to stream the data into Nanocube.

One interesting piece above is the way we handle Nanocube process output - regardless if it was error or standard output, we start new task in another thread (thus java.util.concurrent.Executor) to read that output and print it via some logging framework used in our java application (here plain java.util.logging loggers). Here is relevant startNanocubeOutputLoggingTask method:
 private static void startNanocubeOutputLoggingTask(Executor executor, final Process nanocubeProcess) {  
   InputStream outPipeInputStream = nanocubeProcess.getInputStream();  
   Runnable runnable = new Runnable() {  
     @Override  
     public void run() {  
       try {  
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(outPipeInputStream))) {  
           String line;  
           while ((line = reader.readLine()) != null) {  
             nanocubeOutLogger.info(line);  
           }  
         }  
       } catch (IOException e) {  
         appLogger.log(Level.SEVERE, "Error reading from Nanocube output: " + e.getMessage(), e);  
       }  
     }  
   };  
   // start logging task  
   executor.execute(runnable);  
 }  
   
By having ExecutorService instance around, we can use it to stop that logging task at the application shutdown.

And finally, with the code above, and instance of NanocubeDmpEncoder ready (see previous post), we perform the feeding:
 appLogger.info("Streaming DMP content into Nanocube process...");  
   
 NanocubeDmpEncoder dmpEncoder = ...;  

 byte[] headerBytes = dmpEncoder.encodeHeaders();
 inPipeOutputStream.write(headerBytes);

 byte[] recordBytes = dmpEncoder.encodeRecord(....);
 inPipeOutputStream.write(recordBytes);

 inPipeOutputStream.flush();  
 inPipeOutputStream.close();  
   
 appLogger.info("Streaming of data finished");  
   
 nanocubeProcess.waitFor();  
 executorService.shutdown();  
 appLogger.info("Nanocube process stopped");  
   
At the end, we wait for Nanocube process to stop before doing the cleanup of our output logging task.


No comments:

Post a Comment