InfluxDB TSM file: C++11/STL parsing library

InfluxDB is a scalable database optimized for storage of time series, real-time application metrics, operations monitoring events, etc, written in Go.

Data is stored in .tsm files, which are kept pretty simple conceptually. Each .tsm file contains a header and footer, which stores offset to an index. Index is used to find a data block for a requested time boundary.

Application

InfluxDB

File extension

tsm

KS implementation details

License: MIT

References

This page hosts a formal specification of InfluxDB TSM file using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.

Usage

Runtime library

All parsing code for C++11/STL generated by Kaitai Struct depends on the C++/STL runtime library. You have to install it before you can parse data.

For C++, the easiest way is to clone the runtime library sources and build them along with your project.

Code

Using Kaitai Struct in C++/STL usually consists of 3 steps.

  1. We need to create an STL input stream (std::istream). One can open local file for that, or use existing std::string or char* buffer.
    #include <fstream>
    
    std::ifstream is("path/to/local/file.tsm", std::ifstream::binary);
    
    #include <sstream>
    
    std::istringstream is(str);
    
    #include <sstream>
    
    const char buf[] = { ... };
    std::string str(buf, sizeof buf);
    std::istringstream is(str);
    
  2. We need to wrap our input stream into Kaitai stream:
    #include "kaitai/kaitaistream.h"
    
    kaitai::kstream ks(&is);
    
  3. And finally, we can invoke the parsing:
    tsm_t data(&ks);
    

After that, one can get various attributes from the structure by invoking getter methods like:

data.header() // => get header

C++11/STL source code to parse InfluxDB TSM file

tsm.h

#pragma once

// This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild

#include "kaitai/kaitaistruct.h"
#include <stdint.h>
#include <memory>
#include <vector>

#if KAITAI_STRUCT_VERSION < 9000L
#error "Incompatible Kaitai Struct C++/STL API: version 0.9 or later is required"
#endif

/**
 * InfluxDB is a scalable database optimized for storage of time
 * series, real-time application metrics, operations monitoring events,
 * etc, written in Go.
 * 
 * Data is stored in .tsm files, which are kept pretty simple
 * conceptually. Each .tsm file contains a header and footer, which
 * stores offset to an index. Index is used to find a data block for a
 * requested time boundary.
 */

class tsm_t : public kaitai::kstruct {

public:
    class header_t;
    class index_t;

    tsm_t(kaitai::kstream* p__io, kaitai::kstruct* p__parent = nullptr, tsm_t* p__root = nullptr);

private:
    void _read();
    void _clean_up();

public:
    ~tsm_t();

    class header_t : public kaitai::kstruct {

    public:

        header_t(kaitai::kstream* p__io, tsm_t* p__parent = nullptr, tsm_t* p__root = nullptr);

    private:
        void _read();
        void _clean_up();

    public:
        ~header_t();

    private:
        std::string m_magic;
        uint8_t m_version;
        tsm_t* m__root;
        tsm_t* m__parent;

    public:
        std::string magic() const { return m_magic; }
        uint8_t version() const { return m_version; }
        tsm_t* _root() const { return m__root; }
        tsm_t* _parent() const { return m__parent; }
    };

    class index_t : public kaitai::kstruct {

    public:
        class index_header_t;

        index_t(kaitai::kstream* p__io, tsm_t* p__parent = nullptr, tsm_t* p__root = nullptr);

    private:
        void _read();
        void _clean_up();

    public:
        ~index_t();

        class index_header_t : public kaitai::kstruct {

        public:
            class index_entry_t;

            index_header_t(kaitai::kstream* p__io, tsm_t::index_t* p__parent = nullptr, tsm_t* p__root = nullptr);

        private:
            void _read();
            void _clean_up();

        public:
            ~index_header_t();

            class index_entry_t : public kaitai::kstruct {

            public:
                class block_entry_t;

                index_entry_t(kaitai::kstream* p__io, tsm_t::index_t::index_header_t* p__parent = nullptr, tsm_t* p__root = nullptr);

            private:
                void _read();
                void _clean_up();

            public:
                ~index_entry_t();

                class block_entry_t : public kaitai::kstruct {

                public:

                    block_entry_t(kaitai::kstream* p__io, tsm_t::index_t::index_header_t::index_entry_t* p__parent = nullptr, tsm_t* p__root = nullptr);

                private:
                    void _read();
                    void _clean_up();

                public:
                    ~block_entry_t();

                private:
                    uint32_t m_crc32;
                    std::string m_data;
                    tsm_t* m__root;
                    tsm_t::index_t::index_header_t::index_entry_t* m__parent;

                public:
                    uint32_t crc32() const { return m_crc32; }
                    std::string data() const { return m_data; }
                    tsm_t* _root() const { return m__root; }
                    tsm_t::index_t::index_header_t::index_entry_t* _parent() const { return m__parent; }
                };

            private:
                bool f_block;
                std::unique_ptr<block_entry_t> m_block;

            public:
                block_entry_t* block();

            private:
                uint64_t m_min_time;
                uint64_t m_max_time;
                uint64_t m_block_offset;
                uint32_t m_block_size;
                tsm_t* m__root;
                tsm_t::index_t::index_header_t* m__parent;

            public:
                uint64_t min_time() const { return m_min_time; }
                uint64_t max_time() const { return m_max_time; }
                uint64_t block_offset() const { return m_block_offset; }
                uint32_t block_size() const { return m_block_size; }
                tsm_t* _root() const { return m__root; }
                tsm_t::index_t::index_header_t* _parent() const { return m__parent; }
            };

        private:
            uint16_t m_key_len;
            std::string m_key;
            uint8_t m_type;
            uint16_t m_entry_count;
            std::unique_ptr<std::vector<std::unique_ptr<index_entry_t>>> m_index_entries;
            tsm_t* m__root;
            tsm_t::index_t* m__parent;

        public:
            uint16_t key_len() const { return m_key_len; }
            std::string key() const { return m_key; }
            uint8_t type() const { return m_type; }
            uint16_t entry_count() const { return m_entry_count; }
            std::vector<std::unique_ptr<index_entry_t>>* index_entries() const { return m_index_entries.get(); }
            tsm_t* _root() const { return m__root; }
            tsm_t::index_t* _parent() const { return m__parent; }
        };

    private:
        bool f_entries;
        std::unique_ptr<std::vector<std::unique_ptr<index_header_t>>> m_entries;

    public:
        std::vector<std::unique_ptr<index_header_t>>* entries();

    private:
        uint64_t m_offset;
        tsm_t* m__root;
        tsm_t* m__parent;

    public:
        uint64_t offset() const { return m_offset; }
        tsm_t* _root() const { return m__root; }
        tsm_t* _parent() const { return m__parent; }
    };

private:
    bool f_index;
    std::unique_ptr<index_t> m_index;

public:
    index_t* index();

private:
    std::unique_ptr<header_t> m_header;
    tsm_t* m__root;
    kaitai::kstruct* m__parent;

public:
    header_t* header() const { return m_header.get(); }
    tsm_t* _root() const { return m__root; }
    kaitai::kstruct* _parent() const { return m__parent; }
};

tsm.cpp

// This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild

#include "tsm.h"
#include "kaitai/exceptions.h"

tsm_t::tsm_t(kaitai::kstream* p__io, kaitai::kstruct* p__parent, tsm_t* p__root) : kaitai::kstruct(p__io) {
    m__parent = p__parent;
    m__root = this;
    m_header = nullptr;
    m_index = nullptr;
    f_index = false;
    _read();
}

void tsm_t::_read() {
    m_header = std::unique_ptr<header_t>(new header_t(m__io, this, m__root));
}

tsm_t::~tsm_t() {
    _clean_up();
}

void tsm_t::_clean_up() {
    if (f_index) {
    }
}

tsm_t::header_t::header_t(kaitai::kstream* p__io, tsm_t* p__parent, tsm_t* p__root) : kaitai::kstruct(p__io) {
    m__parent = p__parent;
    m__root = p__root;
    _read();
}

void tsm_t::header_t::_read() {
    m_magic = m__io->read_bytes(4);
    if (!(magic() == std::string("\x16\xD1\x16\xD1", 4))) {
        throw kaitai::validation_not_equal_error<std::string>(std::string("\x16\xD1\x16\xD1", 4), magic(), _io(), std::string("/types/header/seq/0"));
    }
    m_version = m__io->read_u1();
}

tsm_t::header_t::~header_t() {
    _clean_up();
}

void tsm_t::header_t::_clean_up() {
}

tsm_t::index_t::index_t(kaitai::kstream* p__io, tsm_t* p__parent, tsm_t* p__root) : kaitai::kstruct(p__io) {
    m__parent = p__parent;
    m__root = p__root;
    m_entries = nullptr;
    f_entries = false;
    _read();
}

void tsm_t::index_t::_read() {
    m_offset = m__io->read_u8be();
}

tsm_t::index_t::~index_t() {
    _clean_up();
}

void tsm_t::index_t::_clean_up() {
    if (f_entries) {
    }
}

tsm_t::index_t::index_header_t::index_header_t(kaitai::kstream* p__io, tsm_t::index_t* p__parent, tsm_t* p__root) : kaitai::kstruct(p__io) {
    m__parent = p__parent;
    m__root = p__root;
    m_index_entries = nullptr;
    _read();
}

void tsm_t::index_t::index_header_t::_read() {
    m_key_len = m__io->read_u2be();
    m_key = kaitai::kstream::bytes_to_str(m__io->read_bytes(key_len()), std::string("UTF-8"));
    m_type = m__io->read_u1();
    m_entry_count = m__io->read_u2be();
    m_index_entries = std::unique_ptr<std::vector<std::unique_ptr<index_entry_t>>>(new std::vector<std::unique_ptr<index_entry_t>>());
    const int l_index_entries = entry_count();
    for (int i = 0; i < l_index_entries; i++) {
        m_index_entries->push_back(std::move(std::unique_ptr<index_entry_t>(new index_entry_t(m__io, this, m__root))));
    }
}

tsm_t::index_t::index_header_t::~index_header_t() {
    _clean_up();
}

void tsm_t::index_t::index_header_t::_clean_up() {
}

tsm_t::index_t::index_header_t::index_entry_t::index_entry_t(kaitai::kstream* p__io, tsm_t::index_t::index_header_t* p__parent, tsm_t* p__root) : kaitai::kstruct(p__io) {
    m__parent = p__parent;
    m__root = p__root;
    m_block = nullptr;
    f_block = false;
    _read();
}

void tsm_t::index_t::index_header_t::index_entry_t::_read() {
    m_min_time = m__io->read_u8be();
    m_max_time = m__io->read_u8be();
    m_block_offset = m__io->read_u8be();
    m_block_size = m__io->read_u4be();
}

tsm_t::index_t::index_header_t::index_entry_t::~index_entry_t() {
    _clean_up();
}

void tsm_t::index_t::index_header_t::index_entry_t::_clean_up() {
    if (f_block) {
    }
}

tsm_t::index_t::index_header_t::index_entry_t::block_entry_t::block_entry_t(kaitai::kstream* p__io, tsm_t::index_t::index_header_t::index_entry_t* p__parent, tsm_t* p__root) : kaitai::kstruct(p__io) {
    m__parent = p__parent;
    m__root = p__root;
    _read();
}

void tsm_t::index_t::index_header_t::index_entry_t::block_entry_t::_read() {
    m_crc32 = m__io->read_u4be();
    m_data = m__io->read_bytes((_parent()->block_size() - 4));
}

tsm_t::index_t::index_header_t::index_entry_t::block_entry_t::~block_entry_t() {
    _clean_up();
}

void tsm_t::index_t::index_header_t::index_entry_t::block_entry_t::_clean_up() {
}

tsm_t::index_t::index_header_t::index_entry_t::block_entry_t* tsm_t::index_t::index_header_t::index_entry_t::block() {
    if (f_block)
        return m_block.get();
    kaitai::kstream *io = _root()->_io();
    std::streampos _pos = io->pos();
    io->seek(block_offset());
    m_block = std::unique_ptr<block_entry_t>(new block_entry_t(io, this, m__root));
    io->seek(_pos);
    f_block = true;
    return m_block.get();
}

std::vector<std::unique_ptr<tsm_t::index_t::index_header_t>>* tsm_t::index_t::entries() {
    if (f_entries)
        return m_entries.get();
    std::streampos _pos = m__io->pos();
    m__io->seek(offset());
    m_entries = std::unique_ptr<std::vector<std::unique_ptr<index_header_t>>>(new std::vector<std::unique_ptr<index_header_t>>());
    {
        int i = 0;
        index_header_t* _;
        do {
            _ = new index_header_t(m__io, this, m__root);
            m_entries->push_back(std::move(std::unique_ptr<index_header_t>(_)));
            i++;
        } while (!(_io()->pos() == (_io()->size() - 8)));
    }
    m__io->seek(_pos);
    f_entries = true;
    return m_entries.get();
}

tsm_t::index_t* tsm_t::index() {
    if (f_index)
        return m_index.get();
    std::streampos _pos = m__io->pos();
    m__io->seek((_io()->size() - 8));
    m_index = std::unique_ptr<index_t>(new index_t(m__io, this, m__root));
    m__io->seek(_pos);
    f_index = true;
    return m_index.get();
}