VMWare Virtual Disk: Python parsing library

File extension

["vmdk"]

KS implementation details

License: CC0-1.0

References

This page hosts a formal specification of VMWare Virtual Disk using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.

Usage

Parse a local file and get structure in memory:

data = VmwareVmdk.from_file("path/to/local/file.["vmdk"]")

Or parse structure from a bytes:

from kaitaistruct import KaitaiStream, BytesIO

raw = b"\x00\x01\x02..."
data = VmwareVmdk(KaitaiStream(BytesIO(raw)))

After that, one can get various attributes from the structure by invoking getter methods like:

data.size_max # => Maximum number of sectors in a given image file (capacity)

Python source code to parse VMWare Virtual Disk

vmware_vmdk.py

# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild

from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum


if parse_version(ks_version) < parse_version('0.7'):
    raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))

class VmwareVmdk(KaitaiStruct):
    """
    .. seealso::
       Source - https://github.com/libyal/libvmdk/blob/master/documentation/VMWare%20Virtual%20Disk%20Format%20(VMDK).asciidoc#41-file-header
    """

    class CompressionMethods(Enum):
        none = 0
        deflate = 1
    def __init__(self, _io, _parent=None, _root=None):
        self._io = _io
        self._parent = _parent
        self._root = _root if _root else self
        self._read()

    def _read(self):
        self.magic = self._io.ensure_fixed_contents(b"\x4B\x44\x4D\x56")
        self.version = self._io.read_s4le()
        self.flags = self._root.HeaderFlags(self._io, self, self._root)
        self.size_max = self._io.read_s8le()
        self.size_grain = self._io.read_s8le()
        self.start_descriptor = self._io.read_s8le()
        self.size_descriptor = self._io.read_s8le()
        self.num_grain_table_entries = self._io.read_s4le()
        self.start_secondary_grain = self._io.read_s8le()
        self.start_primary_grain = self._io.read_s8le()
        self.size_metadata = self._io.read_s8le()
        self.is_dirty = self._io.read_u1()
        self.stuff = self._io.read_bytes(4)
        self.compression_method = self._root.CompressionMethods(self._io.read_u2le())

    class HeaderFlags(KaitaiStruct):
        """
        .. seealso::
           Source - https://github.com/libyal/libvmdk/blob/master/documentation/VMWare%20Virtual%20Disk%20Format%20(VMDK).asciidoc#411-flags
        """
        def __init__(self, _io, _parent=None, _root=None):
            self._io = _io
            self._parent = _parent
            self._root = _root if _root else self
            self._read()

        def _read(self):
            self.reserved1 = self._io.read_bits_int(5)
            self.zeroed_grain_table_entry = self._io.read_bits_int(1) != 0
            self.use_secondary_grain_dir = self._io.read_bits_int(1) != 0
            self.valid_new_line_detection_test = self._io.read_bits_int(1) != 0
            self._io.align_to_byte()
            self.reserved2 = self._io.read_u1()
            self.reserved3 = self._io.read_bits_int(6)
            self.has_metadata = self._io.read_bits_int(1) != 0
            self.has_compressed_grain = self._io.read_bits_int(1) != 0
            self._io.align_to_byte()
            self.reserved4 = self._io.read_u1()


    @property
    def len_sector(self):
        if hasattr(self, '_m_len_sector'):
            return self._m_len_sector if hasattr(self, '_m_len_sector') else None

        self._m_len_sector = 512
        return self._m_len_sector if hasattr(self, '_m_len_sector') else None

    @property
    def descriptor(self):
        if hasattr(self, '_m_descriptor'):
            return self._m_descriptor if hasattr(self, '_m_descriptor') else None

        _pos = self._io.pos()
        self._io.seek((self.start_descriptor * self._root.len_sector))
        self._m_descriptor = self._io.read_bytes((self.size_descriptor * self._root.len_sector))
        self._io.seek(_pos)
        return self._m_descriptor if hasattr(self, '_m_descriptor') else None

    @property
    def grain_primary(self):
        if hasattr(self, '_m_grain_primary'):
            return self._m_grain_primary if hasattr(self, '_m_grain_primary') else None

        _pos = self._io.pos()
        self._io.seek((self.start_primary_grain * self._root.len_sector))
        self._m_grain_primary = self._io.read_bytes((self.size_grain * self._root.len_sector))
        self._io.seek(_pos)
        return self._m_grain_primary if hasattr(self, '_m_grain_primary') else None

    @property
    def grain_secondary(self):
        if hasattr(self, '_m_grain_secondary'):
            return self._m_grain_secondary if hasattr(self, '_m_grain_secondary') else None

        _pos = self._io.pos()
        self._io.seek((self.start_secondary_grain * self._root.len_sector))
        self._m_grain_secondary = self._io.read_bytes((self.size_grain * self._root.len_sector))
        self._io.seek(_pos)
        return self._m_grain_secondary if hasattr(self, '_m_grain_secondary') else None