InfluxDB TSM file: Python parsing library

InfluxDB is a scalable database optimized for storage of time series, real-time application metrics, operations monitoring events, etc, written in Go.

Data is stored in .tsm files, which are kept pretty simple conceptually. Each .tsm file contains a header and footer, which stores offset to an index. Index is used to find a data block for a requested time boundary.

Application

InfluxDB

File extension

tsm

KS implementation details

License: MIT

This page hosts a formal specification of InfluxDB TSM file using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.

Usage

Parse a local file and get structure in memory:

data = Tsm.from_file("path/to/local/file.tsm")

Or parse structure from a bytes:

from kaitaistruct import KaitaiStream, BytesIO

raw = b"\x00\x01\x02..."
data = Tsm(KaitaiStream(BytesIO(raw)))

After that, one can get various attributes from the structure by invoking getter methods like:

data.header # => get header

Python source code to parse InfluxDB TSM file

tsm.py

# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild

from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO


if parse_version(ks_version) < parse_version('0.7'):
    raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))

class Tsm(KaitaiStruct):
    """InfluxDB is a scalable database optimized for storage of time
    series, real-time application metrics, operations monitoring events,
    etc, written in Go.
    
    Data is stored in .tsm files, which are kept pretty simple
    conceptually. Each .tsm file contains a header and footer, which
    stores offset to an index. Index is used to find a data block for a
    requested time boundary.
    """
    def __init__(self, _io, _parent=None, _root=None):
        self._io = _io
        self._parent = _parent
        self._root = _root if _root else self
        self._read()

    def _read(self):
        self.header = self._root.Header(self._io, self, self._root)

    class Header(KaitaiStruct):
        def __init__(self, _io, _parent=None, _root=None):
            self._io = _io
            self._parent = _parent
            self._root = _root if _root else self
            self._read()

        def _read(self):
            self.magic = self._io.ensure_fixed_contents(b"\x16\xD1\x16\xD1")
            self.version = self._io.read_u1()


    class Index(KaitaiStruct):
        def __init__(self, _io, _parent=None, _root=None):
            self._io = _io
            self._parent = _parent
            self._root = _root if _root else self
            self._read()

        def _read(self):
            self.offset = self._io.read_u8be()

        class IndexHeader(KaitaiStruct):
            def __init__(self, _io, _parent=None, _root=None):
                self._io = _io
                self._parent = _parent
                self._root = _root if _root else self
                self._read()

            def _read(self):
                self.key_len = self._io.read_u2be()
                self.key = (self._io.read_bytes(self.key_len)).decode(u"UTF-8")
                self.type = self._io.read_u1()
                self.entry_count = self._io.read_u2be()
                self.index_entries = [None] * (self.entry_count)
                for i in range(self.entry_count):
                    self.index_entries[i] = self._root.Index.IndexHeader.IndexEntry(self._io, self, self._root)


            class IndexEntry(KaitaiStruct):
                def __init__(self, _io, _parent=None, _root=None):
                    self._io = _io
                    self._parent = _parent
                    self._root = _root if _root else self
                    self._read()

                def _read(self):
                    self.min_time = self._io.read_u8be()
                    self.max_time = self._io.read_u8be()
                    self.block_offset = self._io.read_u8be()
                    self.block_size = self._io.read_u4be()

                class BlockEntry(KaitaiStruct):
                    def __init__(self, _io, _parent=None, _root=None):
                        self._io = _io
                        self._parent = _parent
                        self._root = _root if _root else self
                        self._read()

                    def _read(self):
                        self.crc32 = self._io.read_u4be()
                        self.data = self._io.read_bytes((self._parent.block_size - 4))


                @property
                def block(self):
                    if hasattr(self, '_m_block'):
                        return self._m_block if hasattr(self, '_m_block') else None

                    io = self._root._io
                    _pos = io.pos()
                    io.seek(self.block_offset)
                    self._m_block = self._root.Index.IndexHeader.IndexEntry.BlockEntry(io, self, self._root)
                    io.seek(_pos)
                    return self._m_block if hasattr(self, '_m_block') else None



        @property
        def entries(self):
            if hasattr(self, '_m_entries'):
                return self._m_entries if hasattr(self, '_m_entries') else None

            _pos = self._io.pos()
            self._io.seek(self.offset)
            self._m_entries = []
            i = 0
            while True:
                _ = self._root.Index.IndexHeader(self._io, self, self._root)
                self._m_entries.append(_)
                if self._io.pos() == (self._io.size() - 8):
                    break
                i += 1
            self._io.seek(_pos)
            return self._m_entries if hasattr(self, '_m_entries') else None


    @property
    def index(self):
        if hasattr(self, '_m_index'):
            return self._m_index if hasattr(self, '_m_index') else None

        _pos = self._io.pos()
        self._io.seek((self._io.size() - 8))
        self._m_index = self._root.Index(self._io, self, self._root)
        self._io.seek(_pos)
        return self._m_index if hasattr(self, '_m_index') else None