Thingin as a distributed, horizontaly scalable, System
In this document we will be going through different aspects of the code, the most important one is definitely ZooKeeper because it is what allows us to either parallelize some tasks or do the exact opposite, making sure that a task or functionnality is only executed by one and only one instance at a given time.
Before speaking about ZooKeeper itself, I want to speak about the "tasks" themselves and why a distributed locking ZooKeeper system is needed for Thing'in.
We have tasks which can be parallelized and somes can not, i'm speaking for the ontology synchronization. Each time a new ontology is added into OLS, we have a subscription system which makes that the new ontology is notified to Thingin.
Adding an ontology, in the case we have a cluster of Thing'in-Server instances, is not a simple task. We must first validate the Ontology, store it into the graph database and then update the Ontology Cache of every instance of the cluster. Problem is, if we simply give the order to every instance to add the Ontology into the graphDB and then update its Ontology Cache, it will crash for every instance but one.
Simply because the indexes will not allow to store twice the same ontology into the graphDB, we will have concurrency write problems, some transactions will crash and so on... So the question is, how do an instance react in the case it fails to write its ontology to the graphDB ? Is it fine because we know the ontology will be added by someone else ? Do we check in the database each time we fail ? This is not a good solution by any means, it implies a lot of retry strategies and in general a lot of requests for nothing.
Furthermore, I did point out that we are talking to the whole cluster, meaning we are sending a message through our MOM (RabbitMQ) using an exchange in a fan-out method.
When we send a message from the Thing-in API we expect one, and only one, answer. If we send a message to the cluster, how are we supposed to handle the answers ? Wait for the first one and then reply to the client ? Wait for every instances to answer and then reply to the client ? In the second case, it implies that we have to know before sending the message how many Thing'in server instances are running and we must find a method to aggregate the answers, what to do if one fails or timeout whereas the others succeeds ?
To ease this problem we included a master/slave election system into Thing'in and we introduced a new paradigm : MASTER.
At any given time there can be only 1 master, from times to times there won't be any but that's because the previous master and a new one hasn't been reelected. Only the master may handle message from the MASTER paradigm and since a master may fall, the queue for the master paradigms is not in automatic acknowledge but in manual acknowledge, and the acknowledgment must only be done once the message has been fully processed. That way, even if the current master falls during handling the message, the next one will be able to consume the message again (since it has not been acknowledged) and perform the necessary actions.
There are libraries like Apache Curator, a high level API above ZooKeeper, which allows easy master-slave elections setup, which is the original reason we included ZooKeeper into Thing'in.
The second reason is that, ZooKeeper is a tool for manipulating locks distributed across the network. Some tasks, or sub tasks (we will talk about it later) can only be executed by one and only one executor at a given time. Using distributed locks with ZooKeeper guarantees us that, under the right condition, we will never have dual execution of the same task and thus, if a task crashes it will never be because someone else has already executed it but because of a "true" error. Having a "true" error makes it much easier to handle the failures of a task, we do not have to worry about concurrential execution.
So, how does all of that works in ZooKeeper ?
We consider that some actions should only be performed by a dedicated agent into our cluster and for that purpose we included a master/slave election mechanism. There are several way to implement it, Apache Curator propose some election recipes (for the curious ones, go check : https://curator.apache.org/curator-recipes/leader-election.html and https://curator.apache.org/curator-recipes/leader-latch.html) unfortunately it does not correspond to our needs exactly because we need automatic failover and we also need to know who is the master in the cluster at any given time (if possible).
To build our election mechanism with automati some relies directly onto the ephemeral node of ZooKeeper (see https://tech.willhaben.at/implementing-leader-election-and-automatic-fail-over-8347035bfd63) we work differently in our case.
Once created a task may never be deleted from the underlying database. Any tasks has both an active flag (either true or false) and a status (currently WAITING, RUNNING). Once a task has finished in a successful manner, the finished flag is set to true. A task is never actively started after its creation, the TaskApiVerticle does not enable such operation, it is the job of the TaskProcessHandler.
The TaskProcessHandler periodically checks every tasks which are not finished and tries to run them. To process a task the TaskProcessHandler must first acquire the task's lock from ZooKeeper. If it fails to acquire the lock from ZooKeeper the TaskProcessHandler will just move to the next available task, if there is no task left to execute it will go to sleep and wait for the next task checking loop. If it could reserve the task's lock, the TaskProcessHandler will then begin a new task attempt.
Each time a task is started, it will automatically generate an attempt. An attempt has by default a start date, an executor id and a status, if the task managed to complete or not at this particular execuction attempt. In the case the task does not complete, additionnal data are included into the attempt like : date of attempts end (if available), exception stack trace and message (if available), total process time... This data helps us to build report attempt which hopefully will help a human to understand what happened, and what needs to be fixed in order for the task to work (or why it is plain normal the task fails).
At the first execution error the task is supposed to be stopped, the only exception to that is for commit errors. Since tasks are not blocking the system and normal requests can be performed to update, delete or create new avatars while the task is running, there will be data impossible to commit due to concurrent updates of the same resource inside the database. In that case, we allow the system to go for automatic retries and hopes that at some point the resource will be free to use and we will be able to move one into the task.
Also, we do not make any assumption about the ZooKeeper aspect lock, at the moment the lock is lost and the connection with ZooKeeper is lost, all tasks currently running into the instance are stopped by the system and they will all generate an attempt into the database with the "cancelled by system" flag. The aspect lock itself is released only after the attempt has been properly written into the database and every part of the reporting has been done. At the moment the aspect is released, if the task has been finished, its state into the graphDB makes that it won't be possible to execute it again.
At last, tasks may be ran in parallel, there is no specific limitations about this as long as the appropriate locks are respected. If in the future a task requires to lock multiple resources into ZooKeeper, instead of having a single lock to acquire the task we will have to acquire a set of locks to be executed, for that purpose we will maybe have to think about fairness mechanism which helps complex aspect with multiple locks to acquire to be priorized over single ones.
At the moment, every time an instance becomes master (move from state slave to state master), a new request will be send onto MASTER - SYNCHRONIZE_MODEL in order to perform a synchronisation with OLS to make sure that every ontologies available from OLS are also available in Thing'In. This is also done every time OLS accepts a new ontology into its repository and send a notification to Thing'In which also triggers an ontology synchronisation sequence.
We decided to transform ontology synchronisation into tasks because it can be operations which takes quite some time, especially in the case the backend is completely empty and you suddenly have to import 100 or more ontologies, it could lead to timeouts into the VertX framework.
To make the synchronisation, first we poll the ontologies from OLS, then we make a comparison between the ones which are already loaded into the OntologyCache and the ones available from OLS, from this comparison, if there are ontologies to add into Thing'In we create a task which will be in charge of adding specifically those ontologies.
Loading an ontology consists, at the moment, to process its classes graph, to insert it into the graph database with the appropriate bindings, inheritance between classes inside the ontology or with outside ontologies must be represented physically with edges. Loading ontologies into the OntologyCache is a separate job which is done after the task has been completed. Once the task is completed it sends a message on MODEL - UPDATE_CACHE, message on the model paradigm are in broadcast, to tell to every instances of the cluster to reprocess their ontology cache.
To understand more what aspects are, feel free to read this article: Projection/aspects
Adding a new aspects or removing one which is already in action can be very expensive actions, this is especially true when Thing'In will be truly growing because in both case we need to either perform avatar enrichment or avatar cleanup. As an example, if i were to add a new GeoJson Spatial Aspect and Thing'In was having 300 millions of avatars elligible to this new aspect, i would have to update and enrich 300 millions of avatars. In the opposite if the aspect is already active and i want to suppress it, i will need to clean those 300 millions avatars to save disk space. Since this cannot be done in one single operation we create a task which will be in charge of either enriching the data or cleanup it up.
This is not always needed, for some very simple aspect like the timeseries one (be it creation or deletion of the aspect), we do not create tasks. So, instead of some long explanations, here is the sequence diagram which will happen in the case an aspect creation, without a task creation, performs successfully :
If you are not familiar with the VertX framework, i recommand you to go check their documentation (https://vertx.io/docs/vertx-core/java/#_in_the_beginning_there_was_vert_x) in order to understand the basics of their framework. In reaaaally short the VertX frameworks enables us to develop a reactive application making use of bus messages and asynchronous calls, exactly like once would do in nodejs, the difference being that you work in java and you profit of the java ecosystem. Every agents which ends with Verticle correspond to a verticle from Vertx, it may receive and emits message, they must be seen as dedicated worker.
About the agents :
Important thing to understand, sending message to a Verticle may be done directly through its VertX channel or through the RabbitMQ queue which will in turns redirect the message to the appropriate VertX channel. Just to make it clear, I will use those two words to make the distinction :
The RabbitMQ
tag in the diagram sequence means that the interaction is made through sending a message to a RabbitMQ queue, the VertX
tag in the diagram sequence means that the interaction is made through sending a message directly to a VertX channel. A message can be send directly to a VertX channel only from within the Thing'In server instance whereas a message send to a RabbitMQ queue can be from a Thing'In API to a Thing'In server instance and vice-versa.
Also, every RabbitMQ queues except the one for the Model paradigm are in unicast. It means that a message is delivered only once whereas for the Model case the message is replicated and delivered to every current consumer of the Model paradigm, every instances of the cluster will then receive the message.
Here is what happens for the creation of an aspect through the creation of a task :
About the agents :
As shown in the diagram, a start is never started directly. Its the job of the TaskProcessHandler, it will periodically scans all unfinisheds tasks from OrientDB and try to execute them. If a task is not free the instance will consider that the task is being executed by someone else and just move on to the next task until all the tasks have been checked. If the current instance checking the task is the master and it fails to acquire the ask, it will also performs some timeouts checking into ZooKeeper.
When a task is executed by a Thing'In server instance, in the reserved task node in ZooKeeper it sets its own cluster member id. This cluster member id is then checked by the master in ZooKeeper to verify that the corresponding Thing'In server is not dead and that the task is not being deadlocked.
We will go through some others sequence diagram in order to see how the system reacts:
1. Requesting the task state and aspect state through its the aspect creation
2. Creating the same aspect twice in parallel (in the case we have two instances of the AspectVerticle)
About the ontology cache, if you check you can see that we use a ReadWriteLock, you can check its documentation here (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReadWriteLock.html). The interesting point is that the Read part of the lock may be locked by several users whereas the write lock can be only be locked by a single thread at once. As we can see into the diagram sequence, there is no mutual exclusion and both threads can check the ontology cache together, it is ZooKeeper here who will determine which threads will be able to build to process the aspect creation task.
3. Creating an aspect while the ontology cache is being updated because of an Ontology Synchronisation
In this example, there is an ontology synchronisation which runs while the aspect creation is requested. We can see that the ontology cache updates can potentially delete the processing of the aspect creation. It may rarely happen into the normal execution of the system, but it can happen quite oftenly when running tests cases. It needs to be taken into account as it can make some tests fails.
4. Recover after an instance running an aspect creation task crashes, the task hast just been created into Thing'In.
Every instance must acquire their own member node into ZooKeeper in their booting sequence. The member node is composed by the hostname of the physical operating system running the jvm and the pid (process id) of the jvm, this is in theory unique. If its not, there are already some strange problems... If the instance has the opportunity, before quitting, it will unsubscribe itself from ZooKeeper and the process will be faster but there is no guarantee of that in case of crash. This is why the node into ZooKeeper is of ephemeral type, after the jvm dies there will be a timeout on the node and it will automatically be cleaned by ZooKeeper.
Thus, the master checking the executor of a task currently locked will find that the executor does not exist anymore and free the task for future execution.
5. Aspect deletion with task creation