ICMP network packet: Python parsing library

KS implementation details

License: CC0-1.0

References

This page hosts a formal specification of ICMP network packet using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.

Usage

Parse a local file and get structure in memory:

data = IcmpPacket.from_file("path/to/local/file.ICMP network packet")

Or parse structure from a bytes:

from kaitaistruct import KaitaiStream, BytesIO

raw = b"\x00\x01\x02..."
data = IcmpPacket(KaitaiStream(BytesIO(raw)))

After that, one can get various attributes from the structure by invoking getter methods like:

data.icmp_type # => get icmp type

Python source code to parse ICMP network packet

icmp_packet.py

# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild

from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum


if parse_version(ks_version) < parse_version('0.7'):
    raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))

class IcmpPacket(KaitaiStruct):

    class IcmpTypeEnum(Enum):
        echo_reply = 0
        destination_unreachable = 3
        source_quench = 4
        redirect = 5
        echo = 8
        time_exceeded = 11
    def __init__(self, _io, _parent=None, _root=None):
        self._io = _io
        self._parent = _parent
        self._root = _root if _root else self
        self._read()

    def _read(self):
        self.icmp_type = self._root.IcmpTypeEnum(self._io.read_u1())
        if self.icmp_type == self._root.IcmpTypeEnum.destination_unreachable:
            self.destination_unreachable = self._root.DestinationUnreachableMsg(self._io, self, self._root)

        if self.icmp_type == self._root.IcmpTypeEnum.time_exceeded:
            self.time_exceeded = self._root.TimeExceededMsg(self._io, self, self._root)

        if  ((self.icmp_type == self._root.IcmpTypeEnum.echo) or (self.icmp_type == self._root.IcmpTypeEnum.echo_reply)) :
            self.echo = self._root.EchoMsg(self._io, self, self._root)


    class DestinationUnreachableMsg(KaitaiStruct):

        class DestinationUnreachableCode(Enum):
            net_unreachable = 0
            host_unreachable = 1
            protocol_unreachable = 2
            port_unreachable = 3
            fragmentation_needed_and_df_set = 4
            source_route_failed = 5
            dst_net_unkown = 6
            sdt_host_unkown = 7
            src_isolated = 8
            net_prohibited_by_admin = 9
            host_prohibited_by_admin = 10
            net_unreachable_for_tos = 11
            host_unreachable_for_tos = 12
            communication_prohibited_by_admin = 13
            host_precedence_violation = 14
            precedence_cuttoff_in_effect = 15
        def __init__(self, _io, _parent=None, _root=None):
            self._io = _io
            self._parent = _parent
            self._root = _root if _root else self
            self._read()

        def _read(self):
            self.code = self._root.DestinationUnreachableMsg.DestinationUnreachableCode(self._io.read_u1())
            self.checksum = self._io.read_u2be()


    class TimeExceededMsg(KaitaiStruct):

        class TimeExceededCode(Enum):
            time_to_live_exceeded_in_transit = 0
            fragment_reassembly_time_exceeded = 1
        def __init__(self, _io, _parent=None, _root=None):
            self._io = _io
            self._parent = _parent
            self._root = _root if _root else self
            self._read()

        def _read(self):
            self.code = self._root.TimeExceededMsg.TimeExceededCode(self._io.read_u1())
            self.checksum = self._io.read_u2be()


    class EchoMsg(KaitaiStruct):
        def __init__(self, _io, _parent=None, _root=None):
            self._io = _io
            self._parent = _parent
            self._root = _root if _root else self
            self._read()

        def _read(self):
            self.code = self._io.ensure_fixed_contents(b"\x00")
            self.checksum = self._io.read_u2be()
            self.identifier = self._io.read_u2be()
            self.seq_num = self._io.read_u2be()
            self.data = self._io.read_bytes_full()