ICMP network packet: Python (read-write) parsing library

KS implementation details

License: CC0-1.0

References

This page hosts a formal specification of ICMP network packet using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.

Python (read-write) source code to parse ICMP network packet

icmp_packet.py

# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
# type: ignore

import kaitaistruct
from kaitaistruct import ReadWriteKaitaiStruct, KaitaiStream, BytesIO
from enum import IntEnum


if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 11):
    raise Exception("Incompatible Kaitai Struct Python API: 0.11 or later is required, but you have %s" % (kaitaistruct.__version__))

class IcmpPacket(ReadWriteKaitaiStruct):

    class IcmpTypeEnum(IntEnum):
        echo_reply = 0
        destination_unreachable = 3
        source_quench = 4
        redirect = 5
        echo = 8
        time_exceeded = 11
    def __init__(self, _io=None, _parent=None, _root=None):
        super(IcmpPacket, self).__init__(_io)
        self._parent = _parent
        self._root = _root or self

    def _read(self):
        self.icmp_type = KaitaiStream.resolve_enum(IcmpPacket.IcmpTypeEnum, self._io.read_u1())
        if self.icmp_type == IcmpPacket.IcmpTypeEnum.destination_unreachable:
            pass
            self.destination_unreachable = IcmpPacket.DestinationUnreachableMsg(self._io, self, self._root)
            self.destination_unreachable._read()

        if self.icmp_type == IcmpPacket.IcmpTypeEnum.time_exceeded:
            pass
            self.time_exceeded = IcmpPacket.TimeExceededMsg(self._io, self, self._root)
            self.time_exceeded._read()

        if  ((self.icmp_type == IcmpPacket.IcmpTypeEnum.echo) or (self.icmp_type == IcmpPacket.IcmpTypeEnum.echo_reply)) :
            pass
            self.echo = IcmpPacket.EchoMsg(self._io, self, self._root)
            self.echo._read()

        self._dirty = False


    def _fetch_instances(self):
        pass
        if self.icmp_type == IcmpPacket.IcmpTypeEnum.destination_unreachable:
            pass
            self.destination_unreachable._fetch_instances()

        if self.icmp_type == IcmpPacket.IcmpTypeEnum.time_exceeded:
            pass
            self.time_exceeded._fetch_instances()

        if  ((self.icmp_type == IcmpPacket.IcmpTypeEnum.echo) or (self.icmp_type == IcmpPacket.IcmpTypeEnum.echo_reply)) :
            pass
            self.echo._fetch_instances()



    def _write__seq(self, io=None):
        super(IcmpPacket, self)._write__seq(io)
        self._io.write_u1(int(self.icmp_type))
        if self.icmp_type == IcmpPacket.IcmpTypeEnum.destination_unreachable:
            pass
            self.destination_unreachable._write__seq(self._io)

        if self.icmp_type == IcmpPacket.IcmpTypeEnum.time_exceeded:
            pass
            self.time_exceeded._write__seq(self._io)

        if  ((self.icmp_type == IcmpPacket.IcmpTypeEnum.echo) or (self.icmp_type == IcmpPacket.IcmpTypeEnum.echo_reply)) :
            pass
            self.echo._write__seq(self._io)



    def _check(self):
        if self.icmp_type == IcmpPacket.IcmpTypeEnum.destination_unreachable:
            pass
            if self.destination_unreachable._root != self._root:
                raise kaitaistruct.ConsistencyError(u"destination_unreachable", self._root, self.destination_unreachable._root)
            if self.destination_unreachable._parent != self:
                raise kaitaistruct.ConsistencyError(u"destination_unreachable", self, self.destination_unreachable._parent)

        if self.icmp_type == IcmpPacket.IcmpTypeEnum.time_exceeded:
            pass
            if self.time_exceeded._root != self._root:
                raise kaitaistruct.ConsistencyError(u"time_exceeded", self._root, self.time_exceeded._root)
            if self.time_exceeded._parent != self:
                raise kaitaistruct.ConsistencyError(u"time_exceeded", self, self.time_exceeded._parent)

        if  ((self.icmp_type == IcmpPacket.IcmpTypeEnum.echo) or (self.icmp_type == IcmpPacket.IcmpTypeEnum.echo_reply)) :
            pass
            if self.echo._root != self._root:
                raise kaitaistruct.ConsistencyError(u"echo", self._root, self.echo._root)
            if self.echo._parent != self:
                raise kaitaistruct.ConsistencyError(u"echo", self, self.echo._parent)

        self._dirty = False

    class DestinationUnreachableMsg(ReadWriteKaitaiStruct):

        class DestinationUnreachableCode(IntEnum):
            net_unreachable = 0
            host_unreachable = 1
            protocol_unreachable = 2
            port_unreachable = 3
            fragmentation_needed_and_df_set = 4
            source_route_failed = 5
            dst_net_unkown = 6
            sdt_host_unkown = 7
            src_isolated = 8
            net_prohibited_by_admin = 9
            host_prohibited_by_admin = 10
            net_unreachable_for_tos = 11
            host_unreachable_for_tos = 12
            communication_prohibited_by_admin = 13
            host_precedence_violation = 14
            precedence_cuttoff_in_effect = 15
        def __init__(self, _io=None, _parent=None, _root=None):
            super(IcmpPacket.DestinationUnreachableMsg, self).__init__(_io)
            self._parent = _parent
            self._root = _root

        def _read(self):
            self.code = KaitaiStream.resolve_enum(IcmpPacket.DestinationUnreachableMsg.DestinationUnreachableCode, self._io.read_u1())
            self.checksum = self._io.read_u2be()
            self._dirty = False


        def _fetch_instances(self):
            pass


        def _write__seq(self, io=None):
            super(IcmpPacket.DestinationUnreachableMsg, self)._write__seq(io)
            self._io.write_u1(int(self.code))
            self._io.write_u2be(self.checksum)


        def _check(self):
            self._dirty = False


    class EchoMsg(ReadWriteKaitaiStruct):
        def __init__(self, _io=None, _parent=None, _root=None):
            super(IcmpPacket.EchoMsg, self).__init__(_io)
            self._parent = _parent
            self._root = _root

        def _read(self):
            self.code = self._io.read_bytes(1)
            if not self.code == b"\x00":
                raise kaitaistruct.ValidationNotEqualError(b"\x00", self.code, self._io, u"/types/echo_msg/seq/0")
            self.checksum = self._io.read_u2be()
            self.identifier = self._io.read_u2be()
            self.seq_num = self._io.read_u2be()
            self.data = self._io.read_bytes_full()
            self._dirty = False


        def _fetch_instances(self):
            pass


        def _write__seq(self, io=None):
            super(IcmpPacket.EchoMsg, self)._write__seq(io)
            self._io.write_bytes(self.code)
            self._io.write_u2be(self.checksum)
            self._io.write_u2be(self.identifier)
            self._io.write_u2be(self.seq_num)
            self._io.write_bytes(self.data)
            if not self._io.is_eof():
                raise kaitaistruct.ConsistencyError(u"data", 0, self._io.size() - self._io.pos())


        def _check(self):
            if len(self.code) != 1:
                raise kaitaistruct.ConsistencyError(u"code", 1, len(self.code))
            if not self.code == b"\x00":
                raise kaitaistruct.ValidationNotEqualError(b"\x00", self.code, None, u"/types/echo_msg/seq/0")
            self._dirty = False


    class TimeExceededMsg(ReadWriteKaitaiStruct):

        class TimeExceededCode(IntEnum):
            time_to_live_exceeded_in_transit = 0
            fragment_reassembly_time_exceeded = 1
        def __init__(self, _io=None, _parent=None, _root=None):
            super(IcmpPacket.TimeExceededMsg, self).__init__(_io)
            self._parent = _parent
            self._root = _root

        def _read(self):
            self.code = KaitaiStream.resolve_enum(IcmpPacket.TimeExceededMsg.TimeExceededCode, self._io.read_u1())
            self.checksum = self._io.read_u2be()
            self._dirty = False


        def _fetch_instances(self):
            pass


        def _write__seq(self, io=None):
            super(IcmpPacket.TimeExceededMsg, self)._write__seq(io)
            self._io.write_u1(int(self.code))
            self._io.write_u2be(self.checksum)


        def _check(self):
            self._dirty = False