We will use ProcessBuilder class to set up everything needed to start the Nanocube. Configuration-wise, we only need to set up the directory where Nanocube binaries are present (in particular nanocube-leaf command).
private final static Logger appLogger = Logger.getLogger("Feeder");
private final static Logger nanocubeOutLogger = Logger.getLogger("Nanocube OUT");
...
...
String nanocubeBinPath = "/home/vmarcinko/nanocube/bin";
ProcessBuilder pb = new ProcessBuilder(nanocubeBinPath + "/nanocube-leaf", "-q", "29512", "-f", "10000");
Map<String, String> env = pb.environment();
env.put("NANOCUBE_BIN", nanocubeBinPath);
pb.redirectOutput(ProcessBuilder.Redirect.PIPE);
pb.redirectInput(ProcessBuilder.Redirect.PIPE);
pb.redirectErrorStream(true);
appLogger.info("Starting Nanocube process...");
Process nanocubeProcess = pb.start();
ExecutorService executorService = Executors.newSingleThreadExecutor();
startNanocubeOutputLoggingTask(executorService, nanocubeProcess);
OutputStream inPipeOutputStream = nanocubeProcess.getOutputStream();
...
...
As seen above, when we start Nanocube process, we fetch output end of the process input "pipe" in form of OutputStream, and we will use that to stream the data into Nanocube.One interesting piece above is the way we handle Nanocube process output - regardless if it was error or standard output, we start new task in another thread (thus java.util.concurrent.Executor) to read that output and print it via some logging framework used in our java application (here plain java.util.logging loggers). Here is relevant startNanocubeOutputLoggingTask method:
private static void startNanocubeOutputLoggingTask(Executor executor, final Process nanocubeProcess) {
InputStream outPipeInputStream = nanocubeProcess.getInputStream();
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(outPipeInputStream))) {
String line;
while ((line = reader.readLine()) != null) {
nanocubeOutLogger.info(line);
}
}
} catch (IOException e) {
appLogger.log(Level.SEVERE, "Error reading from Nanocube output: " + e.getMessage(), e);
}
}
};
// start logging task
executor.execute(runnable);
}
By having ExecutorService instance around, we can use it to stop that logging task at the application shutdown.And finally, with the code above, and instance of NanocubeDmpEncoder ready (see previous post), we perform the feeding:
appLogger.info("Streaming DMP content into Nanocube process...");
NanocubeDmpEncoder dmpEncoder = ...;
byte[] headerBytes = dmpEncoder.encodeHeaders();
inPipeOutputStream.write(headerBytes);
byte[] recordBytes = dmpEncoder.encodeRecord(....);
inPipeOutputStream.write(recordBytes);
inPipeOutputStream.flush();
inPipeOutputStream.close();
appLogger.info("Streaming of data finished");
nanocubeProcess.waitFor();
executorService.shutdown();
appLogger.info("Nanocube process stopped");
At the end, we wait for Nanocube process to stop before doing the cleanup of our output logging task.
No comments:
Post a Comment