MessagePack (msgpack) serialization format: Python parsing library

MessagePack (msgpack) is a system to serialize arbitrary structured data into a compact binary stream.

KS implementation details

License: CC0-1.0


This page hosts a formal specification of MessagePack (msgpack) serialization format using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.


Runtime library

All parsing code for Python generated by Kaitai Struct depends on the Python runtime library. You have to install it before you can parse data.

The Python runtime library can be installed from PyPI:

python3 -m pip install kaitaistruct


Parse a local file and get structure in memory:

data = Msgpack.from_file("path/to/local/file.bin")

Or parse structure from a bytes:

from kaitaistruct import KaitaiStream, BytesIO

raw = b"\x00\x01\x02..."
data = Msgpack(KaitaiStream(BytesIO(raw)))

After that, one can get various attributes from the structure by invoking getter methods like:

data.b1 # => First byte is msgpack message is either a piece of data by
itself or determines types of further, more complex data

Python source code to parse MessagePack (msgpack) serialization format

# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild

import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO

if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9):
    raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__))

class Msgpack(KaitaiStruct):
    """MessagePack (msgpack) is a system to serialize arbitrary structured
    data into a compact binary stream.
    .. seealso::
       Source -
    def __init__(self, _io, _parent=None, _root=None):
        self._io = _io
        self._parent = _parent
        self._root = _root if _root else self

    def _read(self):
        self.b1 = self._io.read_u1()
        _on = self.b1
        if _on == 211:
            self.int_extra = self._io.read_s8be()
        elif _on == 209:
            self.int_extra = self._io.read_s2be()
        elif _on == 210:
            self.int_extra = self._io.read_s4be()
        elif _on == 208:
            self.int_extra = self._io.read_s1()
        elif _on == 205:
            self.int_extra = self._io.read_u2be()
        elif _on == 207:
            self.int_extra = self._io.read_u8be()
        elif _on == 204:
            self.int_extra = self._io.read_u1()
        elif _on == 206:
            self.int_extra = self._io.read_u4be()
        if self.is_float_32:
            self.float_32_value = self._io.read_f4be()

        if self.is_float_64:
            self.float_64_value = self._io.read_f8be()

        if self.is_str_8:
            self.str_len_8 = self._io.read_u1()

        if self.is_str_16:
            self.str_len_16 = self._io.read_u2be()

        if self.is_str_32:
            self.str_len_32 = self._io.read_u4be()

        if self.is_str:
            self.str_value = (self._io.read_bytes(self.str_len)).decode(u"UTF-8")

        if self.is_array_16:
            self.num_array_elements_16 = self._io.read_u2be()

        if self.is_array_32:
            self.num_array_elements_32 = self._io.read_u4be()

        if self.is_array:
            self.array_elements = []
            for i in range(self.num_array_elements):

        if self.is_map_16:
            self.num_map_elements_16 = self._io.read_u2be()

        if self.is_map_32:
            self.num_map_elements_32 = self._io.read_u4be()

        if self.is_map:
            self.map_elements = []
            for i in range(self.num_map_elements):
                self.map_elements.append(Msgpack.MapTuple(self._io, self, self._root))

    class MapTuple(KaitaiStruct):
        def __init__(self, _io, _parent=None, _root=None):
            self._io = _io
            self._parent = _parent
            self._root = _root if _root else self

        def _read(self):
            self.key = Msgpack(self._io)
            self.value = Msgpack(self._io)

    def is_array_32(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_array_32'):
            return self._m_is_array_32

        self._m_is_array_32 = self.b1 == 221
        return getattr(self, '_m_is_array_32', None)

    def int_value(self):
        if hasattr(self, '_m_int_value'):
            return self._m_int_value

        if self.is_int:
            self._m_int_value = (self.pos_int7_value if self.is_pos_int7 else (self.neg_int5_value if self.is_neg_int5 else 4919))

        return getattr(self, '_m_int_value', None)

    def str_len(self):
        if hasattr(self, '_m_str_len'):
            return self._m_str_len

        if self.is_str:
            self._m_str_len = ((self.b1 & 31) if self.is_fix_str else (self.str_len_8 if self.is_str_8 else (self.str_len_16 if self.is_str_16 else self.str_len_32)))

        return getattr(self, '_m_str_len', None)

    def is_fix_array(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_fix_array'):
            return self._m_is_fix_array

        self._m_is_fix_array = (self.b1 & 240) == 144
        return getattr(self, '_m_is_fix_array', None)

    def is_map(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_map'):
            return self._m_is_map

        self._m_is_map =  ((self.is_fix_map) or (self.is_map_16) or (self.is_map_32)) 
        return getattr(self, '_m_is_map', None)

    def is_array(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_array'):
            return self._m_is_array

        self._m_is_array =  ((self.is_fix_array) or (self.is_array_16) or (self.is_array_32)) 
        return getattr(self, '_m_is_array', None)

    def is_float(self):
        if hasattr(self, '_m_is_float'):
            return self._m_is_float

        self._m_is_float =  ((self.is_float_32) or (self.is_float_64)) 
        return getattr(self, '_m_is_float', None)

    def is_str_8(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_str_8'):
            return self._m_is_str_8

        self._m_is_str_8 = self.b1 == 217
        return getattr(self, '_m_is_str_8', None)

    def is_fix_map(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_fix_map'):
            return self._m_is_fix_map

        self._m_is_fix_map = (self.b1 & 240) == 128
        return getattr(self, '_m_is_fix_map', None)

    def is_int(self):
        if hasattr(self, '_m_is_int'):
            return self._m_is_int

        self._m_is_int =  ((self.is_pos_int7) or (self.is_neg_int5)) 
        return getattr(self, '_m_is_int', None)

    def is_bool(self):
        if hasattr(self, '_m_is_bool'):
            return self._m_is_bool

        self._m_is_bool =  ((self.b1 == 194) or (self.b1 == 195)) 
        return getattr(self, '_m_is_bool', None)

    def is_str_16(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_str_16'):
            return self._m_is_str_16

        self._m_is_str_16 = self.b1 == 218
        return getattr(self, '_m_is_str_16', None)

    def is_float_64(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_float_64'):
            return self._m_is_float_64

        self._m_is_float_64 = self.b1 == 203
        return getattr(self, '_m_is_float_64', None)

    def is_map_16(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_map_16'):
            return self._m_is_map_16

        self._m_is_map_16 = self.b1 == 222
        return getattr(self, '_m_is_map_16', None)

    def is_neg_int5(self):
        if hasattr(self, '_m_is_neg_int5'):
            return self._m_is_neg_int5

        self._m_is_neg_int5 = (self.b1 & 224) == 224
        return getattr(self, '_m_is_neg_int5', None)

    def pos_int7_value(self):
        if hasattr(self, '_m_pos_int7_value'):
            return self._m_pos_int7_value

        if self.is_pos_int7:
            self._m_pos_int7_value = self.b1

        return getattr(self, '_m_pos_int7_value', None)

    def is_nil(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_nil'):
            return self._m_is_nil

        self._m_is_nil = self.b1 == 192
        return getattr(self, '_m_is_nil', None)

    def float_value(self):
        if hasattr(self, '_m_float_value'):
            return self._m_float_value

        if self.is_float:
            self._m_float_value = (self.float_32_value if self.is_float_32 else self.float_64_value)

        return getattr(self, '_m_float_value', None)

    def num_array_elements(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_num_array_elements'):
            return self._m_num_array_elements

        if self.is_array:
            self._m_num_array_elements = ((self.b1 & 15) if self.is_fix_array else (self.num_array_elements_16 if self.is_array_16 else self.num_array_elements_32))

        return getattr(self, '_m_num_array_elements', None)

    def neg_int5_value(self):
        if hasattr(self, '_m_neg_int5_value'):
            return self._m_neg_int5_value

        if self.is_neg_int5:
            self._m_neg_int5_value = -((self.b1 & 31))

        return getattr(self, '_m_neg_int5_value', None)

    def bool_value(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_bool_value'):
            return self._m_bool_value

        if self.is_bool:
            self._m_bool_value = self.b1 == 195

        return getattr(self, '_m_bool_value', None)

    def is_pos_int7(self):
        if hasattr(self, '_m_is_pos_int7'):
            return self._m_is_pos_int7

        self._m_is_pos_int7 = (self.b1 & 128) == 0
        return getattr(self, '_m_is_pos_int7', None)

    def is_array_16(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_array_16'):
            return self._m_is_array_16

        self._m_is_array_16 = self.b1 == 220
        return getattr(self, '_m_is_array_16', None)

    def is_str(self):
        if hasattr(self, '_m_is_str'):
            return self._m_is_str

        self._m_is_str =  ((self.is_fix_str) or (self.is_str_8) or (self.is_str_16) or (self.is_str_32)) 
        return getattr(self, '_m_is_str', None)

    def is_fix_str(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_fix_str'):
            return self._m_is_fix_str

        self._m_is_fix_str = (self.b1 & 224) == 160
        return getattr(self, '_m_is_fix_str', None)

    def is_str_32(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_str_32'):
            return self._m_is_str_32

        self._m_is_str_32 = self.b1 == 219
        return getattr(self, '_m_is_str_32', None)

    def num_map_elements(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_num_map_elements'):
            return self._m_num_map_elements

        if self.is_map:
            self._m_num_map_elements = ((self.b1 & 15) if self.is_fix_map else (self.num_map_elements_16 if self.is_map_16 else self.num_map_elements_32))

        return getattr(self, '_m_num_map_elements', None)

    def is_float_32(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_float_32'):
            return self._m_is_float_32

        self._m_is_float_32 = self.b1 == 202
        return getattr(self, '_m_is_float_32', None)

    def is_map_32(self):
        .. seealso::
           Source -
        if hasattr(self, '_m_is_map_32'):
            return self._m_is_map_32

        self._m_is_map_32 = self.b1 == 223
        return getattr(self, '_m_is_map_32', None)