dd if=/dev/zero of=image.img bs=512 count=$(( 4 * 1024 * 2 ))
sudo losetup /dev/loop1 image.img
sudo pvcreate /dev/loop1
sudo vgcreate vg_test /dev/loop1
sudo lvcreate --name lv_test1 vg_test
sudo losetup -d /dev/loop1
This page hosts a formal specification of Logical Volume Manager version 2 using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.
All parsing code for Python generated by Kaitai Struct depends on the Python runtime library. You have to install it before you can parse data.
The Python runtime library can be installed from PyPI:
python3 -m pip install kaitaistruct
Parse a local file and get structure in memory:
data = Lvm2.from_file("path/to/local/file.bin")
Or parse structure from a bytes:
from kaitaistruct import KaitaiStream, BytesIO
raw = b"\x00\x01\x02..."
data = Lvm2(KaitaiStream(BytesIO(raw)))
After that, one can get various attributes from the structure by invoking getter methods like:
data.pv # => Physical volume
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum
if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9):
raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__))
class Lvm2(KaitaiStruct):
"""### Building a test file
```
dd if=/dev/zero of=image.img bs=512 count=$(( 4 * 1024 * 2 ))
sudo losetup /dev/loop1 image.img
sudo pvcreate /dev/loop1
sudo vgcreate vg_test /dev/loop1
sudo lvcreate --name lv_test1 vg_test
sudo losetup -d /dev/loop1
```
.. seealso::
Source - https://github.com/libyal/libvslvm/blob/main/documentation/Logical%20Volume%20Manager%20(LVM)%20format.asciidoc
"""
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.pv = Lvm2.PhysicalVolume(self._io, self, self._root)
class PhysicalVolume(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.empty_sector = self._io.read_bytes(self._root.sector_size)
self.label = Lvm2.PhysicalVolume.Label(self._io, self, self._root)
class Label(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.label_header = Lvm2.PhysicalVolume.Label.LabelHeader(self._io, self, self._root)
self.volume_header = Lvm2.PhysicalVolume.Label.VolumeHeader(self._io, self, self._root)
class LabelHeader(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.signature = self._io.read_bytes(8)
if not self.signature == b"\x4C\x41\x42\x45\x4C\x4F\x4E\x45":
raise kaitaistruct.ValidationNotEqualError(b"\x4C\x41\x42\x45\x4C\x4F\x4E\x45", self.signature, self._io, u"/types/physical_volume/types/label/types/label_header/seq/0")
self.sector_number = self._io.read_u8le()
self.checksum = self._io.read_u4le()
self.label_header_ = Lvm2.PhysicalVolume.Label.LabelHeader.LabelHeader(self._io, self, self._root)
class LabelHeader(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.data_offset = self._io.read_u4le()
self.type_indicator = self._io.read_bytes(8)
if not self.type_indicator == b"\x4C\x56\x4D\x32\x20\x30\x30\x31":
raise kaitaistruct.ValidationNotEqualError(b"\x4C\x56\x4D\x32\x20\x30\x30\x31", self.type_indicator, self._io, u"/types/physical_volume/types/label/types/label_header/types/label_header_/seq/1")
class VolumeHeader(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.id = (self._io.read_bytes(32)).decode(u"ascii")
self.size = self._io.read_u8le()
self.data_area_descriptors = []
i = 0
while True:
_ = Lvm2.PhysicalVolume.Label.VolumeHeader.DataAreaDescriptor(self._io, self, self._root)
self.data_area_descriptors.append(_)
if ((_.size != 0) and (_.offset != 0)) :
break
i += 1
self.metadata_area_descriptors = []
i = 0
while True:
_ = Lvm2.PhysicalVolume.Label.VolumeHeader.MetadataAreaDescriptor(self._io, self, self._root)
self.metadata_area_descriptors.append(_)
if ((_.size != 0) and (_.offset != 0)) :
break
i += 1
class DataAreaDescriptor(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.offset = self._io.read_u8le()
self.size = self._io.read_u8le()
@property
def data(self):
if hasattr(self, '_m_data'):
return self._m_data
if self.size != 0:
_pos = self._io.pos()
self._io.seek(self.offset)
self._m_data = (self._io.read_bytes(self.size)).decode(u"ascii")
self._io.seek(_pos)
return getattr(self, '_m_data', None)
class MetadataAreaDescriptor(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.offset = self._io.read_u8le()
self.size = self._io.read_u8le()
@property
def data(self):
if hasattr(self, '_m_data'):
return self._m_data
if self.size != 0:
_pos = self._io.pos()
self._io.seek(self.offset)
self._raw__m_data = self._io.read_bytes(self.size)
_io__raw__m_data = KaitaiStream(BytesIO(self._raw__m_data))
self._m_data = Lvm2.PhysicalVolume.Label.VolumeHeader.MetadataArea(_io__raw__m_data, self, self._root)
self._io.seek(_pos)
return getattr(self, '_m_data', None)
class MetadataArea(KaitaiStruct):
"""According to `[REDHAT]` the metadata area is a circular buffer. New metadata is appended to the old metadata and then the pointer to the start of it is updated. The metadata area, therefore, can contain copies of older versions of the metadata."""
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.header = Lvm2.PhysicalVolume.Label.VolumeHeader.MetadataArea.MetadataAreaHeader(self._io, self, self._root)
class MetadataAreaHeader(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.checksum = Lvm2.PhysicalVolume.Label.VolumeHeader.MetadataArea.MetadataAreaHeader(self._io, self, self._root)
self.signature = self._io.read_bytes(16)
if not self.signature == b"\x20\x4C\x56\x4D\x32\x20\x78\x5B\x35\x41\x25\x72\x30\x4E\x2A\x3E":
raise kaitaistruct.ValidationNotEqualError(b"\x20\x4C\x56\x4D\x32\x20\x78\x5B\x35\x41\x25\x72\x30\x4E\x2A\x3E", self.signature, self._io, u"/types/physical_volume/types/label/types/volume_header/types/metadata_area/types/metadata_area_header/seq/1")
self.version = self._io.read_u4le()
self.metadata_area_offset = self._io.read_u8le()
self.metadata_area_size = self._io.read_u8le()
self.raw_location_descriptors = []
i = 0
while True:
_ = Lvm2.PhysicalVolume.Label.VolumeHeader.MetadataArea.MetadataAreaHeader.RawLocationDescriptor(self._io, self, self._root)
self.raw_location_descriptors.append(_)
if ((_.offset != 0) and (_.size != 0) and (_.checksum != 0)) :
break
i += 1
class RawLocationDescriptor(KaitaiStruct):
"""The data area size can be 0. It is assumed it represents the remaining available data."""
class RawLocationDescriptorFlags(Enum):
raw_location_ignored = 1
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.offset = self._io.read_u8le()
self.size = self._io.read_u8le()
self.checksum = self._io.read_u4le()
self.flags = KaitaiStream.resolve_enum(Lvm2.PhysicalVolume.Label.VolumeHeader.MetadataArea.MetadataAreaHeader.RawLocationDescriptor.RawLocationDescriptorFlags, self._io.read_u4le())
@property
def metadata(self):
if hasattr(self, '_m_metadata'):
return self._m_metadata
_pos = self._io.pos()
self._io.seek(self.metadata_area_offset)
self._m_metadata = self._io.read_bytes(self.metadata_area_size)
self._io.seek(_pos)
return getattr(self, '_m_metadata', None)
@property
def sector_size(self):
if hasattr(self, '_m_sector_size'):
return self._m_sector_size
self._m_sector_size = 512
return getattr(self, '_m_sector_size', None)