Writing your own time-series database

James Thompson

"Databases are hard"

Writing databases is one of those things that's widely known to be challenging. Even well funded expert teams can produce duds, and subtle correctness errors can take years or even decades to work themselves out.

In reality, though, the major challenge is more in writing general purpose databases with custom storage engines. Most commercial problems aren't like this at all: usually you have a pretty good idea of your data model and the range of workloads people will throw at it, and there's at least one perfectly usable open source storage engine you can download to handle them.

Once you've specified the tightest range of functionality you need it to support, it can be remarkably easy to implement a very specific database and query engine that exactly satisfies your needs, no more no less. More functionality will require adding more code, but likely not too much if the core needs are still the same.

Why would you ever do this? Because it provides a multiple orders of magnitude simplification in system complexity, and through that simplicity comes much greater flexibility and processing performance. For example, when your database is just a small library it's trivial to add to a Python or R package and have data scientists access it locally, rather than calling out to an external server process. They can get a lot more speed this way too.

Time-series data is perfect for this kind of approach, so here we'll go through how to implement a special purpose time-series database in under 150 lines of code. C++ is a decent language choice, but any language is fine if it has access to a viable storage engine (usually through an FFI).

Interestingly, we can generate 100% of this code through some fairly simple model-based code generation. Special purpose database implementation can be really cheap.

Scenario definition

For this example we'll deal with 10,000 imaginary IoT warehouse monitoring devices, broadcasting the activity level of the machine they're strapped to and the name of the user that last logged into them every few seconds. Larger datasets will rise to hundreds of billions of broadcasts. There are 100 users in the warehouse, and each activity level is a real number in the range [0.0, 10.0).

In this case we're primarily interested in sending state machines along the list of broadcasts from each device in turn, so we can answer questions like "which users push the devices too hard for too long?". There's no particular need to traverse the global feed in time order, except via the individual devices, so we won't optimise particularly for that.

Data model

We're able to use a data model with a fixed set of fields here, rather than allowing the kind of unstructured "bag of pairs" model that document databases use. The efficiency and simplicity improvement in being able to do this is so large that it's often worth filtering and projecting unstructured data into a structured form to handle in this way. If this really isn't possible, you can still use the techniques I describe to implement a fast unstructured database; I'll write another article showing how.

It's pretty easy to translate our broadcast spec to the C-family type system. Timestamps are Unix style at per-second resolution, we've been asked to store activity levels at above-float precision, and we'll handle user ID's by storing each string once and translating to an integer string ID.

using Timestamp = int32_t;
using DeviceID  = int32_t;
using ActLevel  = double;
using StringID  = int32_t;

struct Record {
    Timestamp timestamp;
    DeviceID  deviceID;
    ActLevel  actLevel;
    StringID  userStrID;
};

We’ll store each field in its own column in the database in struct-of-arrays style, to maximise streaming performance and allow for possible vectorisation. Each device will have its own set of columns, so we’ll lose a little time as we move from one device’s data to the next, but this vanishes as the database grows.

On the C++ side we'd probably move to strong typedefs as the application matured, as they can really help code stay resilient to modification errors, at the cost of some complexity. It's not work it for the first version, however.

We also won't implement any indexes yet, as the users are most interested in throughput for whole-range traversal, but it's quite straightforward to add indexes later.

Data layout

We’ll use LMDB as our storage engine. This gives us zero-copy memory mapped file performance characteristics, control over the data layout, and cross-process ACID (MVCC) semantics to handle updates.

LMDB divides a single file up into several 'databases', each of which is a table of key/value pairs, where each key and value is a byte array. We use my higher level lmdbutils.hpp C++ library, which offers the STL-friendly memory-aligned typed collections we need. I’ll release this library when the API has settled down.

We'll store each data field as a table, each k/v pair mapping the DeviceID to all an array of all the data it's transmitted in that field (KVArr<K,V> means "table storing a mapping of pairs from K to arrays of V"). Unfortunately for memory alignment reasons we have to use an 8 byte key value when the array type is 8 bytes, so we'll silently widen the deviceID when accessing the activity levels table; there's no danger of overflow. Thankfully KVArr has a static assert to warn us if we forget.

Note that an LMDB environment represents an opened database file, and since it's perfectly possible to store multiple quite separate high-level databases in a single LMDB file, we let callers pass in an Env they're already using rather than a file path.

class Database {
    lmdb::KVArr<DeviceID, Timestamp> _timestamps;
    lmdb::KVArr<int64_t,  ActLevel>  _actLevels;
    lmdb::KVArr<DeviceID, StringID>  _userStrIDs;
    lmdb::StringStore<StringID>      _stringStore;

public:
    Database (lmdb::Env &env) 
        :_timestamps  {env, "tsdb_FIELD_timestamps"},
         _actLevels   {env, "tsdb_FIELD_actLevels"},
         _userStrIDs  {env, "tsdb_FIELD_usrStrIDs"},
         _stringStore {env, "tsdb_STRINGSTORE"} {}

    StringID intern (lmdb::Txn &txn, const std::string str) {
        return _stringStore.intern (txn, str);
    }
    const char * strWithID (lmdb::Txn &txn, StringID sid) {
        return _stringStore.getStr (txn, sid) .begin();
    }

// ...
};

Client code looks like this:

auto env = lmdb::Env {"my_database_file.db"};
auto db  = Database  {env};

We'll maintain a strict timestamp-increasing ordering for all data stored in the database, as this is essential for reducing algorithmic complexity in client functions.

Core query interface

Since data for each broadcast is spread over three separate LMDB tables, we need some kind of iteration interface that automatically puts these back together for clients, as the following code checking for broadcasts from device 123 with a specific user ID and returning their timestamps in ascending order shows.

auto txn   = lmdb::Txn::rdonly (env);
auto strid = db.intern (txn, "target_user");

for (auto datum : dev.device(txn, 123)) {
    if (datum.userStringID() == strid)
        printf ("%d\n", datum.timestamp());
}

We can implement the interface fairly straightforwardly as follows. When the caller asks for a device we fetch the three database entries holding its broadcast data, and return them inside a DeviceData struct (note that CSpan<T> is a reference to a typed, contiguous array in memory, or in the database). Users can use the provided iterator with range-for loops and standard algorithms to traverse these arrays in the normal way.

class Database {
// ...
public:
    //  ------------------------------------------------------------
    //  Handle to all data stored for one Device
    //    NB DeviceData object must not outlive the Txn it was created from

    class DeviceData {
    public:
        DeviceID               deviceID;
        lmdb::CSpan<Timestamp> timestamps;
        lmdb::CSpan<ActLevel>  actLevels;
        lmdb::CSpan<StringID>  userStrIDs;

        explicit DeviceData (Database &db, lmdb::Txn &txn, DeviceID did)
            :deviceID   {did},
             timestamps {db._timestamps.get (txn, did)},
             actLevels  {db._actLevels .get (txn, did)},
             userStrIDs {db._userStrIDs.get (txn, did)} {}

        size_t size () const { return timestamps.size(); }

        //  --------------------------------------------------
        //  Iterate across the individual records in one DeviceData

        class Iter {
            DeviceData &_src;
            size_t      _pos;
        public:
            explicit Iter (DeviceData &src, size_t pos=0) :_src{src}, _pos{pos} {}
            Iter (const Iter &rhs) :_src{rhs._src}, _pos{rhs._pos} {}

            // The iterator itself contains all the value accessors
            Iter operator* () { return *this; }

            Timestamp timestamp () const { return _src.timestamps.at (_pos); }
            ActLevel  actLevel  () const { return _src.actLevels .at (_pos); }
            StringID  userStrID () const { return _src.userStrIDs.at (_pos); }
            DeviceID  deviceID  () const { return _src.deviceID; }
            
            Iter& operator++ () { ++_pos; return *this; }
            
            bool operator== (const Iter &rhs) const {
                return    _src.timestamps.begin() == rhs._src.timestamps.begin()
                       &;& _pos == rhs._pos;
            }
            bool operator!= (const Iter &rhs) const { return !(*this == rhs); }
        };

        Iter begin () { return Iter {*this, 0}; }
        Iter end   () { return Iter {*this, timestamps.size()}; }
    };



    //  ------------------------------------------------------------
    //  Get the data for one device. NB result can't outlive txn.

    DeviceData device (lmdb::Txn &txn, DeviceID did) {
        return DeviceData {*this, txn, did};
    }

    //  ------------------------------------------------------------
    //  Get a vector of the IDs of all devices stored in the db

    std::vector<DeviceID> devices (lmdb::Txn &txn) {
        std::vector<DeviceID> res {};
        for (const auto &dat : _timestamps.iter(txn))
            res.push_back (dat.key);
        return res;
    }

// ...
};

More complex queries

By far the most productive way to write code to query this database is with a ranges library like [range-v3], but that's beyond the scope of this article. Instead we'll use some of the new vocabulary types in C++17 to implement quite a clean purely functional state machine, to find runs of individual users running machines at very high activity levels.

The interesting code is just a bundle of functions and value types sitting inside a namespace. State machine state is modeled as a std::variant, and clients call transition() to get the new state with each new record, as well as any HA period that's just closed, using an empty std::optional to indicate that none exists. Similarly, you pass in an empty std::optional record at the end to signify the device has not more records left, closing and returning any open HA period.

This is completely unoptimised at present, so it'll be interesting to see how it performs later.

struct HAPeriod {
    tsdb::DeviceID  deviceID;
    tsdb::StringID  userStrID;
    tsdb::Timestamp periodStart;
    tsdb::Timestamp periodEnd;
};

namespace HAFinder {
    // Utils for std::visit
    template<class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
    template<class... Ts> overloaded(Ts...) -> overloaded<Ts...>;

    // State machine state
    struct NotInHAPeriod {};
    struct    InHAPeriod { HAPeriod curData; };  // updated on each transition
    using State = std::variant <NotInHAPeriod, InHAPeriod>;

    // Result of making a state transmission: new state and any emitted HAPeriod
    struct TransResult {
        State newState;
        std::optional<HAPeriod> period;
    };

    // Making a state transition.
    // Call with 'none' record when you've reached the end of the recs.
    TransResult transition (State state, std::optional<Rec> rec) {
        return std::visit (overloaded {
            [&](NotInHAPeriod s) {
                (void) s;
                if (rec && isHighAct(*rec)) {
                    // A new period is starting
                    return TransResult {
                               InHAPeriod {HAPeriod {rec->deviceID(),
                                                     rec->userStrID(),
                                                     rec->timestamp(),
                                                     rec->timestamp()}},
                               {}
                           };
                } else {
                    // No, the non-period continues
                    return TransResult {NotInHAPeriod{}, {}};
                }
            },

            [&](InHAPeriod s) {
                // Sanity check
                if (rec) {
                    assert (s.curData.deviceID == rec->deviceID ());
                    assert (s.curData.periodEnd <= rec->timestamp ());
                }

                if (   rec
                    && isHighAct (*rec)
                    && s.curData.userStrID == rec->userStrID())
                {
                    // The current HA period is continuing
                    return TransResult {
                               InHAPeriod {HAPeriod {rec->deviceID(),
                                                     rec->userStrID (),
                                                     s.curData.periodStart,
                                                     rec->timestamp ()}},
                               {}
                           };
                } else {
                    // The period's finished, so close and emit to the caller,
                    // then maybe start a new period
                    auto endTS = rec ? rec->timestamp() : s.curData.periodEnd;
                    return TransResult {
                               transition (NotInHAPeriod{}, rec).newState,
                               HAPeriod {s.curData.deviceID, s.curData.userStrID,
                                         s.curData.periodStart, endTS}
                           };
                }
            }

        }, state);
    }
}  // namespace HAFinder

The code calling this is just a simple pair of loops:

for (auto devID : db.devices(txn)) {
    auto finder = HAFinder::State {};

    // Process each rec in the device
    auto recs = db.device (txn, devID);
    for (const auto &r : recs) {
        auto [newFinder, possPeriod] = HAFinder::transition (finder, r);
        emitIfPresentAndLong (possPeriod);
        finder = newFinder;
    }

    // Close any remaining period
    auto [dummy, possPeriod] = HAFinder::transition (finder, {});
    emitIfPresentAndLong (possPeriod);
}

Performance

For initial testing I populated a database with 250 million random records, with device IDs uniformly distributed in the range [0,10_000), user IDs uniformly distributed across 100 strings, and activity level uniformly distributed in the range [0.0, 10.0). The resulting file takes up 3.8GB on disk.

Calculating the mean activity level for each device using the following naive code takes about 8 seconds on one core of the same laptop, or 31 million records per second.

for (auto devID : db.devices(txn)) {
    int count = 0;
    double sum = 0;
    for (const auto &dat : db.device (txn, devID)) {
        ++count;
        sum += dat.actLevel();
    }
    printf ("%d,%d,%f\n", devID, count, sum / (double)count);
}

The earlier code finding single user high activity spans was a little slower, taking about 30 seconds or 8 million records per second. Doubtless we could speed this up if we spent a bit of time on it, but that's pretty fast. I'll discuss writing indexes in another article; the most useful ones often take their inspiration from bloom fitlers

Ingesting new data

Adding new data to the database is a matter of bringing the current data for each device together with the new, and then enforcing the ordering invariants. We use class RecordBundle to split out input data by device.

It takes about 1 minute 45 seconds to write the 250 million pre-generated random records to the database, or about 2.4 million records per second.

//  ----------------------------------------------------------------------
//  Bundle of ingestable data

class RecordBundle {
    // Records always ordered timestamp ascending
    std::unordered_map<DeviceID, std::vector<Record>> _devData = {};

public:
    // Clients can iterate over internal data but not modify
    auto begin () const { return _devData.cbegin(); }
    auto end   () const { return _devData.cend(); }

    //  ------------------------------------------------------------
    //  Builder to create RecordBundle's from unordered Record sets
    
    class Builder {
        // Records are unordered while builder handles them
        std::unordered_map<DeviceID, std::vector<Record>> _devData;
    public:
        auto begin () const { return _devData.begin(); }
        auto end   () const { return _devData.end(); }
        void push (const Record &rec) {
            _devData [rec.deviceID] .push_back (rec);
        }
        RecordBundle build () const { return RecordBundle {*this}; }
    };

private:
    //  ------------------------------------------------------------
    //  Only the Buidler can construct a RecordBundle
    
    explicit RecordBundle (const Builder &bu) {
        for (auto &pair : bu) {
            _devData [pair.first] = pair.second;
            auto &dat = _devData [pair.first];
            std::sort (dat.begin(), dat.end(),
                       [](const auto &r1, const auto &r2) {
                           return r1.timestamp < r2.timestamp;});
        }
    }
};

class Database {
// ... 
    void push (lmdb::Txn &txn, const std::vector&lt;Record> &newRecs) {
        auto bu = RecordBundle::Builder {};
        for (auto r : newRecs) bu.push (r);
        auto newRecsBundle = bu.build ();

        // Iterate over the devices sequentially, writing on each
        for (const auto& [devID, newDevRecs] : newRecsBundle) {
            // To store both new and old data
            std::vector<Record> tempRecs = newDevRecs;
            
            // Gather any recs for the device already stored in the db
            if (deviceExists (txn, devID)) {
                for (const auto &r : device (txn, devID)) {
                    tempRecs.push_back (Record {r.timestamp(), r.deviceID(),
                                                r.actLevel(),  r.userStrID()});
                }
            }
            fprintf (stderr, "-- Size after adding old recs: %zu\n",
                     tempRecs.size());
            

            // Sort ts ascending (the invariant)
            std::sort (tempRecs.begin(), tempRecs.end(),
                       [](const auto &r1, const auto &r2) {
                           return r1.timestamp &lt; r2.timestamp;});

            // And insert the results
            
            auto r = [&](){ return urange::make (tempRecs); };
            
            auto timestamps = r().map ([](auto &r){return r.timestamp;})
                                 .vector ();
            _timestamps.put (txn, devID, lmdb::CSpan<Timestamp>::of (timestamps));

            auto actLevels = r().map ([](auto &r){return r.actLevel;})
                                .vector ();
            _actLevels.put (txn, devID, lmdb::CSpan<ActLevel>::of (actLevels));

            auto userStrIDs = r().map ([](auto &r){return r.userStrID;})
                                 .vector ();
            _userStrIDs.put (txn, devID, lmdb::CSpan<StringID>::of (userStrIDs));
        }
    }

// ...
};

I've used my personal 'micro ranges' library in push() above to demonstrate how useful ranges can be, even as a super-lightweight dependency like urange, but it's obviously not necessary here. Have a look at ranges if you haven't tried them yet, though, as they can make a huge impact on a code base, and let you access performance that was too hard to write code to achieve before.

Get in touch at contact@inkblotsoftware.com to see how we can help with your data challenges