Saturday, August 16, 2014

Embedded Kafka and Zookeeper for unit testing

Recently I wanted to setup embedded Kafka cluster for my unit tests, and suprisingly it wasn't that trivial because most of examples I found around were made for some older versions of Kafka/Zookeeper or they didn't work for some other reasons, so it took me some time to find some proper version.

The project I took it from is Camus which is Kafka->Hadoop ETL project, and I just made some slight changes related to newer Zookeeper, as well as some changes configuration-wise.

My embedded Kafka & Zookeeper servers are available at this gist. All the code is tested against Kafka and Zookeeper 3.4.6.

Here is simple example on how to setup Zookeeper at fixed port (2181), and 2 Kafka servers at random available ports:

     EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(2181);  
     List<Integer> kafkaPorts = new ArrayList<Integer>();  
     // -1 for any available port  
     EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), kafkaPorts);  
     System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());  
     System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());  

Thursday, June 12, 2014

Running Cascading Hadoop jobs via CLI, Oozie, or an IDE (part 2)

We are continuing now on the previous post our sample project setup that will allow us to run Cascading jobs within few usual environments - CLI, Oozie and IDE.

For our sample job, we'll take all too boring "word count" example. It would be best if we could code it in such a way to satisfy few requirements:
  • it should be triggered in the same way from any of 3 target environments
  • it should have externalized job parameters (such as "word count" input and output HDFS paths)
  • it should have externalized Hadoop configuration to be able to experiment with few of those
  • it should be able to take standard hadoop/yarn CLI command parameters
  • it should be able to take single input path parameter provided in form of multiple comma-separated paths for cases when Oozie coordinator takes multiple dataset instances during workflow submission
  • it should set custom mapreduce job name to improve job visibility on some GUIs (such as Hue)
Taking all these intro consideration, we decided to use recommended way to develop job "driver" applications for hadoop/yarn CLI commands - and that is using Hadoop Tool interface. That way it would be able to parse all standard parameters that these CLI commands provide, such as specifying different configuration file for job submission. 

Because Tool application is plain java application anyway, it can be also called from within an IDE. And finally, Oozie can also include it into its workflow as "java action node".

 public class WordCount extends Configured implements Tool {  
   public static void main(String[] args) throws Exception {  
     int exitCode = WordCount(), args);  
   public int run(String[] args) throws Exception {  
     String inputPath = args[0];  
     String outputPath = args[1];  
     Configuration conf = getConf();  
     doWorkflow(inputPath, outputPath, conf);  
     return 0;  

Job parameters - input and output path in this case, are provided as standard java program arguments. But as previously mentioned, when using Oozie, frequently an input argument is single String value containing comma-separated HDFS paths, because Oozie coordinator can be instructed to take multiple instances of some dataset and process them in batch, and it basically generates such comma-separated String value as input argument for triggered Oozie workflow. So its useful to construct source Tap from such String value. So here it goes:

   private Tap constructMultiHfsSourceTap(Scheme scheme, String inputPath) {  
     List<Tap> tapList = new ArrayList<Tap>();  
     String[] splits = inputPath.split(",");  
     for (String split : splits) {  
       tapList.add(new Hfs(scheme, split.trim()));  
     Tap[] taps = tapList.toArray(new Tap[tapList.size()]);  
     return new MultiSourceTap(taps);  

We couldn't use Cascading's GlobHfs here since individual paths are not part of some hierarchical structure.

Once this job hits the road, it would be great to easily see few main pieces of information about it on some kind of job UI, so we'll set its mapreduce job name:

     Flow flow = flowConnector.connect(flowDef);  
     // set mapreduce job name  
     String mapReduceJobName = "Cascading Word Count: '" + inputPath + "' -> '" + outputPath + "'";  
     FlowStepStrategy flowStepStrategy = constructMapReduceJobNameStrategy(mapReduceJobName);  
   private FlowStepStrategy constructMapReduceJobNameStrategy(final String mapReduceJobName) {  
     return new FlowStepStrategy() {  
       public void apply(Flow flow, List predecessorSteps, FlowStep flowStep) {  
         Object config = flowStep.getConfig();  
         if (config instanceof JobConf) {  
           ((JobConf) config).setJobName(mapReduceJobName);  

Complete sample "word count" project is avaiable on GitHub.

Job submission


Ok, now when we have all the code in place, we can finally run it locally within our IDE simply by calling the job driver application as any other java application. IDE will take care to put all necessary classes/jars on classpath.

 java -classpath <complete classpath here set by IDE> vmarcinko.cascading.wordcount.WordCount -conf conf/local-site.xml /home/vmarcinko/cascadingtest/sherlock_holmes.txt /home/vmarcinko/cascadingtest/wordcount  

As can be seen, we provided Hadoop configuration file (local-site.xml) by using "-conf" program argument which is standard argument parsed by ToolRunner utility class. In other words, we can use same standard arguments that can be used when submitting job via CLI as shown next.

Sample sherlock_holmes.txt file used in this example is available at <project dir>/data directory. If everything went good (and it should!), then word counts are found in part-xxxx file under output directory.

Command Line Interface (CLI)

First, we must package our job application in suitable form using Gradle Shadow plugin as described in part 1 of this post. The end result of "gradle shadowJar" task would be:
<cascading-wordcount project dir>/build/libs/cascading-wordcount-1.0-all.jar

Next we upload that JAR file into Hadoop cluster, place sample .txt file in HDFS path of our choice, and finally submit the job using shell command:

 yarn jar cascading-wordcount-1.0-all.jar vmarcinko.cascading.wordcount.WordCount /user/cloudera/cascadingtest/sherlock_holmes.txt /user/cloudera/cascadingtest/wordcount  

Shell command "yarn" is available in newer versions of Hadoop. Older version used "hadoop" command.


To invoke the same vmarcinko.cascading.wordcount.WordCount application from Oozie, we need to use "java action node" within our Oozie workflow to launch it.

Anyway, we use the same shadow JAR (cascading-wordcount-1.0-all.jar) and place it under <oozie workflow HDFS dir>/lib directory. Under program arguments, it would be best to parametrize this java action node with ${inputPath} and ${outputPath}, so we can provide concrete HDFS paths when submitting the workflow.

When the job is launched via Oozie (either manually submitting workflow, or in scheduled manner via Oozie coordinator), we can see our running job nicely in some UI (such as Hue Job Browser in this example): Name of job corresponds to mapreduce job name that we set prior to execution.

(as usual when java application is called via Oozie's "java action node", for each such launch, Oozie initially starts a Map task that acts as launcher for specified java application, thus we end up with 2 jobs shown above)

I hope this post proves useful to all newbies trying to find some common way to set up Cascading job applications that can be triggered from various environments.

Tuesday, June 10, 2014

Running Cascading Hadoop jobs via CLI, Oozie, or an IDE (part 1)

If you stumbled upon this post, I guess you already know what Cascading is - another library out there that offers higher level language to compose Hadoop jobs. If you're just starting with it, you may very well be searching for best way to setup your project so these jobs can be run in various environments - locally within IDE of your choice, or to try them out on real Hadoop cluster by using CLI commands for job submission. Also, in real case scenarios, Hadoop workflows easily become complex enough to warrant the usage of additional engine for definition and coordination of all these jobs, and Oozie is currently the most popular choice for that in Hadoop ecosystem.

Ok, enough talkin', let's get to business...

To define our project setup and job packaging tasks, we'll use Gradle buid tool which is more modern substitution for Maven. Version of Cascading that we use here is 2.5.

But first, let's examine our 3 target environments....



Within our IDE, we want to run Cascading jobs in Hadoop local mode. This requires that all dependency libs (Cascading, Hadoop, and possibly some other ones) are included in classpath when executing Cascading jobs there. So, our build script should take care to generate IDE-specific project files that include all mentioned dependencies for job runs.

Command Line Interface (CLI)

Most of examples found out there show how to trigger hadoop mapreduce jobs via command line interface - via "hadoop jar myjobapp.jar..." command (or "yarn jar myjobapp.jar ..." in newer versions of Hadoop).

Since we're using Cascading here, we need to package somehow Cascading jars together with our custom code for these commands to work. There are couple of ways to do this, but it seems that bundling them all into one jar ("fat jar") is the most popular one.

To construct the "fat jar" we just create regular jar from all our custom classes, and also add 3rd party libs to it. There are 2 way how we can add these libs:
  • by placing them under internal "lib" folder of the fat jar
  • by extracting all classes from them, and add them in unpackaged form to same fat jar
The later approach is a bit more complicated, it destroys the structure of dependency libraries, but as we will see right away, it offers one advantage.

One more thing - we don't need to package Hadoop libraries since they are already present when jobs are run on Hadoop.


Unlike hadoop/yarn CLI commands, Oozie doesn't recognize fat jars containing internal lib folder with dependency libs, so our Cascading app's jar packaged in that way would not be able to run when triggered by Oozie.

On the other hand, the second approach of building fat jars is good to go, and fortunately, there is Gradle plugin to help us with that - Gradle Shadow.

Same as CLI approach, we don't want to package Hadoop libraries for already mentioned reason.

Build script

As said, we'll use Gradle as our build tool.

First, let's define our project dependencies in build script (build.gradle).

As already mentioned, we need to somehow mark separately Hadoop libraries because we have to exclude them when packaging our job applications for CLI/Oozie enviroments. We'll do that by defining custom Gradle configuration using:
 configurations {  

And finally define dependencies for that configuration (we'll add slf4j/logback dependencies also for proper logging when running within an IDE):

For everything to work correctly, we have to add that configuration to main sourceSet, and also register that configuration to be included in generated IDEA/Eclipse projects. Take a look at GitHub project to see how it is done inside the build script.

Cascading libraries are added to standard "compile" configuration:
 compile (  

Using special Gradle plugins for Intellij IDEA and Eclipse support, we can generate IDE-specific project files. Project files generation tasks are called by:
 gradle idea  
 gradle eclipse  

To package our job application as "fat jar" with all dependency classes extracted from their original jars, we have to include Gradle Shadow plugin (version 0.9.0-M1 currently) into project via:
 buildscript {  
   repositories {  
   dependencies {  
     classpath 'com.github.jengelman.gradle.plugins:shadow:0.9.0-M1'  
 apply plugin: 'shadow'  

"Fat jar" is constructed by calling:
 gradle shadowJar  
and end result can be seen at path:

Complete build script is available here, as part of my cascading-wordcount GitHub project.

In the next part of this post, we'll look at how to execute simple Cascading job within all 3 environments.

Thursday, April 10, 2014

My upcoming JavaCro 2014 talk: Log as basis for distributed systems

After last year talking about Neo4j and graph databases, this year at JavaCro 2014, I will be having a talk about log-based distributed systems, with quick overview of few of such systems and architectures - Kafka, Datomic, CQRS/event-sourcing....

The talk will be held on May 13th, 2014. So if you're nearby, and would like to use this opportunity to see the talk or chat about whatever IT-related, I would be glad to do so.

Here's the talk abstract:
Log, or historical storage of system events, has always occupied central place in architectures of all traditional databases and analytical systems, but nowadays it serves more and more as backbone of modern distributed systems. Some of architectures and tools which use this type of data storage will be presented - Kafka message broker, Datomic database, CQRS/Event-sourcing architecture ...

Tuesday, October 22, 2013

Introducing Teuta, laughingly simple dependency injection container in Clojure

It was year 2002, when I tried my first dependency injection container in Java (these were mostly called Inversion-of-Control containers then). It was one of Apache Avalon subprojects, namely Fortress (beside ECM, Merlin and some others). Before it, I designed my applications in any custom way I saw fit, and sometimes there wasn't much design at all, so that moment really felt enlightening. I know it sounds silly now because these containers are so common now in all mainstream languages, but back then, it really took quality of my apps to whole new level, and I felt I could comprehend my code much more easily.

Now it's 2013, and destiny took me to Clojure language. I'm still fresh to it, but what I noticed is there isn't much info around about structuring the applications, as if namespaces and vars contained in them are sufficient for anything. If there wasn't few presentations from Stuart Sierra or Prismatic team, I would probably go on thinking it must be an issue with my OO legacy. Fortunately, after these talks, I could see there is a real need for some kind of componentization, and although there are some libraries out there such as Prismatic Graph or Jig, they are somewhat different from what Java programmers are used to, so I decided to write my own, especially because it's so dead-simple idea. The final result is small GitHub project called Teuta.

Library Dependencies

Add the necessary dependency to your Leiningen project.clj and require the library in your ns:
[vmarcinko/teuta "0.1.0"] ; project.clj

(ns my-app (:require [vmarcinko.teuta :as teuta])) ; ns

Container Specification

Anyway, to create a component container, we have to start by defining a specification, and it is simply a map of entries [component-id component-specification]. Component ID is usually a keyword, though String or some other value can be used. Component specification is vector of [component-factory-fn & args], so a component can be constructed later, during container construction time, by evaluating factory function with given arguments. So you see, this is just an ordinary function, and a component can be constructed in any arbitrary way, though maybe most usual way would be to use records and their map factory functions which are very descriptive. If a component depends upon some other component, then it should be configured to use it. Referring to other components is done via
(teuta/comp-ref some-comp-id)
If components form circular dependencies, exception will be reported during container construction time. Similarly, if we want to parametrize some piece of component configuration, then we simply do that via:
(teuta/param-ref some-param-id-path)
So, specification would look something like:
{:my-comp-1 [mycompany.myapp/map->MyComp1Record 
             {:my-prop-1  "Some string"
              :my-prop-2  334
              :my-prop-3  (teuta/param-ref :comp-1-settings :some-remote-URL)
              :comp2-prop (teuta/comp-ref :my-comp-2)}]
 :my-comp-2 [mycompany.myapp/map->MyComp2Record 
             {:my-prop-1 6161
              :my-prop-2 (atom nil)
              :my-prop-3 (teuta/param-ref :comp-2-settings :admin-email)}]}
Since whole specification is simply a regular map, it is useful to have some common map containing always present components, and have separate profile-specific maps with components for production, test, development... That way you simply merge those maps together to construct desired final specification.

Container Construction

Once we have our specification, we can simply create a container by calling
(def my-container (teuta/create-container my-specification my-parameters))
The container is just a sorted map of [component-id component] entries. When the container map is printed, in order to make it a bit more clear, referred components will be printed as << component some-comp-id >>.

Since whole application state is also contained in this container map, this means it plays nicely with Stuart Sierra "reloaded" workflow.

Component Lifecycle

If a component's functions depend upon some side-effecting logic being executed prior to using them, then a component can implement vmarcinko.teuta/Lifecycle protocol. The protocol combines start and stop functions which will get called during starting and stopping of a container.
(defprotocol Lifecycle
  (start [this] "Starts the component. Returns nil.")
  (stop [this] "Stops the component. Returns nil."))
Container is started by:
(teuta/start-container my-container)
Components are started in dependency order. If any component raises exception during startup, the container will automatically perform stopping of all already started components, and rethrow the exception afterwards. Likewise, stopping of container is done via:
(teuta/stop-container my-container)
If any component raises exception during this process, the exception will be logged and the process will continue with other components.


Here we define 2 components - divider and alarmer.

Divider takes 2 numbers and returns result of their division. Let's define working interface of the component as protocol, so we can allow many implementations.
(ns vmarcinko.teutaexample.divider)

(defprotocol Divider
  (divide [this n1 n2] 
  "Divides 2 numbers and returns vector [:ok result]. 
  In case of error, [:error \"Some error description\"] will be returned"))
Unlike this example, component interfaces will mostly contain multiple related functions. Request-handler components, such as web handlers, usually don't have a working interface since we don't "pull" them for some functionality, they just need to be started and stopped by container, thus implement Lifecycle protocol. Default implementation of our divider component will naturally return the result of dividing the numbers, but in case of division by zero, it will also send notification about the thing to alarmer component (by calling vmarcinko.teutaexample.alarmer/raise-alarm). Placing component implementation in separate namespace is just a nice way of separating component interface and implementation.
(ns vmarcinko.teutaexample.divider-impl
  (:require [vmarcinko.teutaexample.alarmer :as alarmer]
            [vmarcinko.teutaexample.divider :as divider]
            [vmarcinko.teuta :as teuta]))

(defrecord DefaultDividerImpl [alarmer division-by-zero-alarm-text]
  (divide [_ n1 n2]
    (if (= n2 0)
        (alarmer/raise-alarm alarmer division-by-zero-alarm-text)
        [:error "Division by zero error"])
      [:ok (/ n1 n2)])))
Alarmer is defined as follows:
(ns vmarcinko.teutaexample.alarmer)

(defprotocol Alarmer
  (raise-alarm [this description] "Raise alarm about some issue. Returns nil."))
Default implementation of alarmer "sends" alarm notifications to preconfigured email addresses. For this example, sending an email is just printing the message to stdout. It also prints alarm count, which is mutable state of this component, and is held in an atom passed to it during construction. Atom state is initialized and cleaned up during lifecycle phases - start and stop.
(ns vmarcinko.teutaexample.alarmer-impl
  (:require [vmarcinko.teutaexample.alarmer :as alarmer]
            [vmarcinko.teuta :as teuta]))

(defrecord DefaultAlarmerImpl [notification-emails alarm-count]
  (raise-alarm [_ description]
    (let [new-alarm-count (swap! alarm-count inc)]
      (println (str "Alarm Nr." new-alarm-count " raised: '" description "'; notifying emails: " notification-emails))))
  (start [_]
    (reset! alarm-count 0))
  (stop [_]
    (reset! alarm-count nil)))
So let's finally create container specification and wire these 2 components. We will also extract alarmer email addresses as application parameters.
(def my-parameters {:alarmer-settings {:emails ["" ""]}})

(def my-specification
  {:my-divider [vmarcinko.teutaexample.divider-impl/map->DefaultDividerImpl
                {:alarmer                       (teuta/comp-ref :my-alarmer)
                 :division-by-zero-alarm-text   "Arghhh, somebody tried to divide with zero!"}]

   :my-alarmer [vmarcinko.teutaexample.alarmer-impl/map->DefaultAlarmerImpl
                {:notification-emails   (teuta/param-ref :alarmer-settings :emails)
                 :alarm-count           (atom nil)}]})
Now we can construct the container, start it and try out dividing 2 numbers via divider component.
(def my-container (teuta/create-container my-specification my-parameters))

(teuta/start-container my-container)

(vmarcinko.teutaexample.divider/divide (:my-divider my-container) 3 44)
=> [:ok 3/44]

(vmarcinko.teutaexample.divider/divide (:my-divider my-container) 3 0)
=> Alarm Nr.1 raised: 'Arghhh, somebody tried to divide with zero!': notifying emails: ["" ""]
=> [:error "Division by zero error"]
In order to call vmarcinko.teutaexample.divider/divide function "from outside", we needed to pick divider component from the container first. But if request-handling piece of application is also a component in container, as could be the case with some web handler serving HTTP requests to our vmarcinko.teutaexample.divider/divide function, then container specification will handle wiring specified divider component. Let's create such a web handler component using popular Jetty web server:
(ns vmarcinko.teutaexample.web-handler
  (:require [ring.adapter.jetty :as jetty]
            [vmarcinko.teuta :as teuta]
            [ring.middleware.params :as ring-params]
            [vmarcinko.teutaexample.divider :as divider]))

(defn- create-handler [divider]
  (fn [request]
    (let [num1 (Integer/parseInt ((:params request) "arg1"))
          num2 (Integer/parseInt ((:params request) "arg2"))
          result (nth (divider/divide divider num1 num2) 1)]
      {:status 200
       :headers {"Content-Type" "text/html"}
       :body (str "<h1>Result of dividing " num1 " with " num2 " is: " result " </h1>")})))

(defn- ignore-favicon [handler]
  (fn [request]
    (when-not (= (:uri request) "/favicon.ico")
      (handler request))))

(defrecord DefaultWebHandler [port divider server]
  (start [this]
    (reset! server
      (let [handler (->> (create-handler divider)
        (jetty/run-jetty handler {:port port :join? false}))))
  (stop [this]
    (.stop @server)
    (reset! server nil)))
Jetty server is held in an atom, and is started on configured port during lifecycle start phase. As can be seen, divider component is the only dependency of this component, and request URL parameters "arg1" and "arg2" are passed as arguments to vmarcinko.teutaexample.divider/divide function. We added also favicon request ignoring handler to simplify testing it via browser. This component requires popular Ring library, so one needs to add that to project.clj as:
:dependencies [[ring/ring-core "1.2.0"]
               [ring/ring-jetty-adapter "1.2.0"]
Let's expand our specification to wire this new component.
(def my-parameters { ...previous parameters ...
                    :web-handler-settings {:port 3500}})

(def my-specification
  { ....previous components ....
   :my-web-handler [vmarcinko.teutaexample.web-handler/map->DefaultWebHandler
                    {:port (teuta/param-ref :web-handler-settings :port)
                     :divider (teuta/comp-ref :my-divider)
                     :server (atom nil)}]})
Now, after the container has been started, we can try out HTTP request: 
Division result should be returned as HTML response. Division with zero should print alarming message to REPL output.

Wednesday, October 2, 2013

Neo4j model for (SQL) dummies

In general, one characteristic of the mind is that it has hard time grasping new concepts if these are presented without comparison to some familiar ones. And I experienced that when trying to explain Neo4j data model to people who are stumbling on it for first time. Mostly they are confused by lack of schema, because when visualized, those scattered graph nodes, connected into some kind of spider web, bring confusion into minds so long accustomed to nicely ordered rectangular SQL world.

So what seemed to work better in this case is just to describe it using all too familiar RDBMS/SQL model and its elements: tables, columns, records, foreign keys ...In other words, let's try to describe Neo4j-graph model as it would be if built on top of SQL model.

Actually, this is quite easy to do. We just need 2 tables, and let's call them "NODES" and "RELATIONSHIPS". Both reflect 2 main elements in Neo4j model - graph nodes and relationships between them.

"NODES" table

This one would be where entities are stored, and it contains 2 columns - "ID" and "PROPERTIES".

334 {"name": "John Doe", "age": 31, "salary": 80000}
335 {"name": "ACME Inc.", "address": "Broadway 345, New York City, NY"}
336 {"manufacturer": "Toyota", "model": "Corolla", "year": 2005}
337{"name": "Annie Doe", "age" 30, "salary": 82000}

PROPERTIES column stores map-like data structure containing arbitrary properties with their values. Just for purpose of presentation, I picked JSON serialization here. So you see, due to this schema-less design, there are no constraints upon what properties are contained in the PROPERTIES column - which is actually the only practical/possible way since all entity types (department, company, employee, vehicle...) are stored in this single table.


This table would contain "ID", "NAME", "SOURCE_NODE_ID", "TARGET_NODE_ID" and "PROPERTIES" columns, and purpose is to store associations between nodes. We can say that records stored here represent schema-less version of SQL foreign-keys.

191 MARRIED_TO 334 337 {"wedding_date": "20070213"}
192 OWNS 337 336
193 WORKS_FOR 337 335 {"job-position": "IT manager"}

Relationship's NAME marks its "type", and we can add new association "types" into the system dynamically, just by storing new relationship records with previously non-existing names, whereas in SQL database, we need to pre-define available foreign keys upfront.

Since relationships usually have a direction (though they can be bi-directional also in Neo4j), thus we have "SOURCE_NODE_ID" and "TARGET_NODE_ID" foreign keys, pointing to respective NODES. Direction is mainly valuable for its semantic purpose.

Similar to NODES table, here we also have PROPERTIES column to store additional information about association - in SQL world we would need to introduce "link" table to store this kind of data.


Having no schema brings well known trade-off to the table. On one hand, the structure of such system is less obvious, and special care has to be taken not to corrupt the data, but on the other hand, given flexibility can be exploited for domains that are rich and rapidly changing. And of course, since there are no constraints imposed by database here, it means that application now is solely responsible for correctness of stored data.

Monday, September 16, 2013

Referencing non-indexed Neo4j entities in service layer

"Idiomatic" way to index entities in Neo4j is to do that only on few types of them, usually the ones that are most often used, or for some reason are the most practical to be accessed directly. Of course, top level entities (such as Company or User in some business domains) just have to be indexed since they cannot be fetched via some other entity.

So, let's say we have 2 types of entities - Company and Department, and they are in one-to-many relationship. Company would have to be indexed, but Department would not because it can be traversed to starting from the parent Company. This fetching via traversal is actually one of best selling points of Neo4j because the speed of that operation generally doesn't depend upon size of whole dataset, unlike SQL databases that have to perform JOIN-ing of different tables which involves tackling with their indexes and performance of that ultimately depends upon table size.

Anyway, all seems good, but it can have some impact on your service layer.

Until now, when you had your Department entities indexed, you had some service layer operation  with only one argument needed to reference the entity:

 public interface DepartmentManager {  
  void activateDepartment(UUID departmentUuid);  

And now we must introduce another argument to identify parent Company to be able to traverse the graph to Department in question.

 public interface DepartmentManager {  
  void activateDepartment(UUID companyUuid, UUID departmentUuid);  

Of course, one can argue that we could decide to index Department entities also to simplify accessing them, but then this same reasoning can lead us to index almost all types of entities that we want to operate on at service layer, and we surely want to avoid that for reasons described in the beginning of this post.