ShannonDB Technology

SlicingDice runs on top of a proprietary database called ShannonDB. This page introduces the design principles behind ShannonDB that may be important for understanding many of SlicingDice’s characteristics.

What is ShannonDB

ShannonDB is a Java-based distributed non-ACID eventually consistent and Analytics Database.

ShannonDB was strongly inspired by Apache Lucene's (and ElasticSearch's) data insertion techniques and search engine concepts, such as Inverted Index, and also combining well-known concepts from other successful performant databases — like column-store data modeling.

Why we built ShannonDB

We built ShannonDB to solve our own data challenges, which involved dealing with billions of data points coming from different sources that needed to be analysed as fast as possible.

ShannonDB was built to store data as cheaply as possible while being able to query data in sub-second time. To achieve this we focused on data compression, so ShannonDB favors economy both on storage and query time, with strong guarantees usually granted by OLTP databases.

This is possible due to the fact that ShannonDB was built for applications where a fast but possibly out-of-date response is preferable to a completely updated but potentially slower answer. Besides, ShannonDB was designed to be scalable and run on cheap commodity hardware.

These principles are behind many of the design choices made during the development of ShannonDB: from architecture to consistency protocol, we wanted to make ShannonDB light, cheap and fast.

ShannonDB Data Model

Databases are usually classified according to the abstraction they provide to the system using it. Many traditional databases follow a so called relational model which is backed by sound mathematical theory. Although broadly adopted, alternatives to this model have been growing consistently from the 2000’s on as a multitude of databases were created with new models that allow cheaper management of big data.

The so called non-relational databases span a list of categories that include: key-value store, document store, graph database, wide-column store and others. With so many competing models on the database ecosystem, it should be clear that a definitive solution has not yet been found and that there is still space for innovation. This is the case of ShannonDB: although similarities can be traced, none of the known models are exactly a description to what ShannonDB implements.

ShannonDB was not developed with any model in mind and, as a result, it has its own representation of data. However, you may follow the model explained in this section to organize your data when using ShannonDB as your data warehouse solution.

ShannonDB stores data of entities, so every data point must be associated with an entity. An entity is an instance of whatever the application wants to store data about in ShannonDB and it is identified by a numeric, incremental value. You may think of it as the primary key of a record, although this equivalence is not exact.

ShannonDB natively supports two types of data: time-series and non-time-series data.

A time-series data entry is a triplet (Column Name, Column Value, Timestamp) and a non-time-series data entry is a tuple (Column Name, Column Value), where:

  • Column Name is a string that must have been registered beforehand as a column.
  • Column Value is any entry inserted, depending on the data type of the column being added.
  • Timestamp is a formatted date string such as 2017–03–14T15:05:43Z.

Note that these tuples/triples are added independently: if you ever think you are adding many rows of data to ShannonDB at once, this is simply syntax sugar. Internally this data is split into distinct tuples or triplets. Therefore, a complete data record in ShannonDB associates tuples or triplets to one entity.

With all that in mind, a good picture of ShannonDB's data model is a table whose sorting key is the Entity Identifier (Entity ID) and whose content is a list of tuples and triplets. This is called a materialized view.

Additionally to the materialized view ShannonDB keeps data in a format called inverted index view. Hence ShannonDB always provide two views of these tuples and triplets: a materialized view and an inverted index view. By using a materialized view, one can query the value of an entity on a column and the inverted index view allows one to query for all entities with a value on a column.

Example of Materialized Data Structure

Example of a materialized view

Example of Inverted Index Data Structure

Example of inverted index view

How ShannonDB partitions its data

The problem with any big table is that it eventually gets too big for a single machine to manage it. By “too big” we mean that it gets too slow for querying.

A common solution is to partition a big table into smaller ones and give each partition to an independent machine. The usual way of splitting the table is by cutting it horizontally: for instance, call rows 0 to 1,000,000 the first partition of the table and set a machine to handle it. This technique is called sharding and is broadly used by ShannonDB.

The way data is distributed among ShannonDB nodes is by following a mapping rule from the Entity ID (a numerical value) to a set of partition IDs (called a shard) to a node responsible for storing this shard.

A simple sharding scheme consists of keeping in the same shard a quantity of sequential Entity IDs, such as having IDs from 0 to 999,999 in the first shard, IDs from 1,000,000 to 1,999,999 in the second shard and so on. A shard is then identified by its index in this sequence — the first shard has ID 1, the second has ID 2 and so on — and they are distributed evenly among nodes.

For redundancy reasons, there might be more than one node responsible for a shard, in which case all responsible nodes must independently store to its system the same set of stored data. The advantage of this design is that shards might be added as the system grows and might be moved between nodes if a node is to be shut down. As a result, sharding is a technique applied for the sake of horizontal scalability of the system.

Summing up, all data inserted on ShannonDB must belong to some Entity ID, and as Entity IDs are divided among existing shards. All data belonging to an Entity ID will end up being stored in the same shard, instead of being distributed among all the shards.

ShannonDB & Apache Kafka

Apache Kafka is a publish-subscribe mechanism that detaches data production from data consumption. Messages sent to Kafka are organized into topics, to which multiple clients write and from which multiple server nodes read. Each topic is organized into partitions in which total ordering is guaranteed.
By providing this total ordering, Kafka might be used as a commit log system and this its second functionality to ShannonDB. Furthermore, Kafka is a distributed system running on multiple servers and with redundancy storage for reliability.

Apache Kafka has two main roles in SlicingDices's pipeline:

  1. It servers as a buffer of messages to be processed;
  2. It works as a tool for failure recovery.

Buffering messages is important in case the system is under heavy load and cannot process all messages immediately. In this scenario messages are held on Kafka until the cluster is available to process them.

As for failure recovery, consumed Kafka messages are not immediately removed from its logs: they can be reprocessed in case a node fails or needs to reboot.

One important note regarding the use of Kafka is that when an application sends a message using the ShannonDB client, the client returns as soon as the message is added to the in-memory message buffer of the Kafka’s client, so that it will be asynchronously sent to a Kafka topic packed other messages in batch.
Therefore, a query executed immediately after inserting this data won’t see the effect of the data entry, since the entry probably has not yet been processed by a node. It eventually gets processed for sure, but in the meanwhile that data entry will not be returned on queries.

For every ShannonDB shard there is a Kafka topic from which nodes read messages. This, however, does not get rid of all communication between client and server during data addition. In order to build a complete data entry message, the client must first have contacted servers to fetch available Entity IDs.

Entity and Record IDs

Given that in ShannonDB every data entry is associated with an Entity ID, every data entry message must contain one Entity ID. This Entity ID must come from somewhere, and in order to get a new Entity ID, the client must first connect directly to a cluster node.

The node which the client connects to is responsible for managing a set of shards and keeping information of used Entity IDs that are allocated to these shards. This node answers the request with a pool of IDs that the client might use in the future when the need for new Entity ID arrives.

The decision of which node to connect is made by the client and it does it in a round-robin fashion as a way to achieve load balancing and fair data distribution among nodes. This is, as in many other scenarios, a case in which the client performs important management logic.

Besides the Entity ID to whom the data in the message belongs, there is one more piece of data that must be added to a message regarding ShannonDB’s consistency protocol: a Record Identifier (Record ID) generated by the client. This record is later going to be used when verifying which messages were effectively processed and added on ShannonDB and which were not.

For the sake of understanding the need for the Record ID, recall that a node might fail. When the node reboots, it reads Kafka topics to reload any message it could not completely process before crashing. In this process the node might end up reading again a message that has in fact been processed before the failure.

ShannonDB operations are not idempotent and reapplying a command might produce wrong results on future queries. Record IDs are thus used to verify if a given message was already processed. A ShannonDB node keeps track of processed Record IDs and skips a message in case the Record ID has already been processed. As for the Record ID itself, it is a string containing a unique identifier from the client that generated the record and a sequential number generated by the client itself.

Data Insertion on ShannonDB

Below you'll learn the path that ShannonDB takes to effectively add a data entry.

Say the application wants to add a data describing the following user: {"name": "Rick", "age": 30, "job": "Developer"}

First thing to note is that this data is broken into three columns and each one is an independent column: name, age and job.

If these columns don’t yet exist in ShannonDB, they must be declared. In this case, a column declaration command is sent to any of the nodes on the cluster, who is then responsible for setting up the new column on Zookeeper.

This introduces another element in our pipeline: Zookeeper is a distributed system used for reliably managing shared configurations. In ShannonDB, Zookeeper is used for keeping metadata on column definitions.

Once Zookeeper has updated its data, it sends notifications to every node in the cluster and eventually they all register the new column.

Assuming columns have been declared, the client must now associate the data being added with an Entity ID. If the client does not yet have its own pool of available Entity IDs, it must fetch one from any ShannonDB node.

The complete message built by the client contains the original data, an Entity ID and an attached Record ID.

The message will look like this: {"customer_id": 109, "database_id": 1277, "entity_id": 10, "column": "ColumnName", "value": "ColumnValue"}

Using the example we gave at the beginning of this section, the insertion message will look like this: {"customer_id": 109, "database_id": 1277, "entity_id": 10, "column": "Name", "value": "Rick", "column": "Age", "value": "30", "column": "Job", "value": "Developer"}

The message is then sent to a Kafka topic and is eventually processed by the ShannonDB node that host the shard responsible for the Entity ID 10. In this example it will be the shard 1, as this shard goes from Entity ID 0 to 1,000,000.

When processing the message, the node checks the Record ID to make sure it is a new record or if the record can be skipped. If the record is to be processed, the node doesn’t immediately writes to its files. Instead it buffers new entries using an in-memory buffer.

This is done so to allow ShannonDB to sort write commands in a way that optimizes the overall process. When this buffer of messages reaches a given size or when messages have been buffered for long enough, the node writes to all necessary files. Think of the node writing the entry data to both materialized and inverted index views, but keep in mind that there are several files to be modified.

Modified files include the following data structures: index and posting list associated with an inverted index, a bitmap of known IDs in the system, a bitmap of Record IDs from the client source and search trees associated with translation of string hashes and frequency of occurrence of values.

Queries on ShannonDB

Below you'll learn the path that ShannonDB takes to query data.

As every node in our cluster contains all columns of a set of entities, all nodes must be queried in almost every query. The orchestration of this process is done by the client. Not too much of a trouble for the client though: as nodes are independent from one another, orchestration is roughly a matter of sending the same query command to every node in parallel and then aggregating the answers. Three other important tasks are taken by the client during query processing:

  • Sometimes, the client expects the answer to contain the column value stored in ShannonDB. However, since ShannonDB uses a dictionary encoding data compression technique, the data must first be translated from the encoded format to the original one. It is thus the job of the client to query nodes for translation.
  • On queries that might produce a large result message, this result is paginated, i.e., broken into smaller pages made available to the application as they are ready. It is the client’s job to keep track of produced pages and continuously query nodes for the next page until the complete result has been aggregated on the client’s side.
  • In case the master and the replica nodes are down and fails to return its corresponding part of the answer, the client might produce an approximation of the answer of that node. This is possible on some types of queries due to the fact that entities are approximately equally distributed among nodes.

As for the server, the query process consists of parsing the query command into a sequence of operations to be taken and move on to taking the appropriate steps, such as:

  1. Reading posting lists and indexes data directly from SSD or in-memory segments.
  2. Uncompressing raw data if needed.
  3. Computing set operations on posting lists.
  4. Building the JSON answer expected by the client.
  5. Updating caches and storing information for the automatic query planner.

Data Compression

A known fact regarding primary memory and storage devices is that access to the former is around 1 million times faster than to the latter when considering hard drives and 10 thousand times faster when considering modern SSDs. Therefore, efforts to reduce the amount of data transferred to and from disks are usually worth the price — and this is precisely the point of compressing data.

At the cost of adding CPU cycles to compress and uncompress data, huge gains on the size of transferred data can be achieved by setting a proper compression scheme. But one scheme won’t fit them all. Since every type of data has its own characteristics, different compression protocols might be required to achieve optimal compression rates.

Currently implemented compression schemes include: delta encoding, run length encoding, Simple 9 encoding and bitmap encoding. We also mix these compression schemes when the compression rate is worth the extra effort. Furthermore, whenever relevant, we reduce the amount of bytes needed to represent a numerical value on a file to a minimal as long as we know an upper bound for that column. For example, by limiting file sizes to 17 Gb we can use 34 bits instead of 64 when representing file addresses.

A point where ShannonDB seems to differentiate from other databases is that we make no distinction between hot and cold data when it comes to compression. Everything gets compressed in the same way.

Data compression is one of the parts where our developers have put a huge amount of time and effort. Because of that, it’s common for us to see compression ratio between 1/10 and even 1/30 of the original data size when inserting it on ShannonDB.

We also avoid repeating strings in our files. The full string of a value is present in only one file in our system, being replaced by a hash value everywhere else and this is why some query results often requires translation. This kind of technique is commonly known as dictionary encoding.

Batch operations

ShannonDB tries to apply, when possible, buffer data, sort them and apply modifications in bulk.

When writing data to storage the goal is to minimize costs with seek time and this is achieved by writing all we need to write at once at a given position instead of moving back and forward on storage device.

Another way where this principle is put to action is when adding data on ShannonDB’s indexes. For example, instead of adding 100 entries one at a time in a balanced binary tree, we’d rather add them all at once in a non-balanced binary tree.
Of course that, when buffering data and postponing write operations, the node might crash before flushing the buffer. This is why keeping messages on Kafka and being able of correctly reprocess them is so important to ShannonDB, thus the imperative need of Record IDs.

Read more

If you got interested in this quick overview about our ShannonDB Technology, you can learn more by accessing our blog regularly to get more details about our technology.