protocol_body: Python (read-write) parsing library

Protocol body represents particular payload on transport level (OSI layer 4).

Typically this payload in encapsulated into network level (OSI layer 3) packet, which includes "protocol number" field that would be used to decide what's inside the payload and how to parse it. Thanks to IANA's standardization effort, multiple network level use the same IDs for these payloads named "protocol numbers".

This is effectively a "router" type: it expects to get protocol number as a parameter, and then invokes relevant type parser based on that parameter.

KS implementation details

License: CC0-1.0
Minimal Kaitai Struct required: 0.8

This page hosts a formal specification of protocol_body using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.

Python (read-write) source code to parse protocol_body

protocol_body.py

# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
# type: ignore

import kaitaistruct
from kaitaistruct import ReadWriteKaitaiStruct, KaitaiStream, BytesIO
import ipv6_packet
import udp_datagram
import icmp_packet
import tcp_segment
import ipv4_packet
from enum import IntEnum


if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 11):
    raise Exception("Incompatible Kaitai Struct Python API: 0.11 or later is required, but you have %s" % (kaitaistruct.__version__))

class ProtocolBody(ReadWriteKaitaiStruct):
    """Protocol body represents particular payload on transport level (OSI
    layer 4).
    
    Typically this payload in encapsulated into network level (OSI layer
    3) packet, which includes "protocol number" field that would be used
    to decide what's inside the payload and how to parse it. Thanks to
    IANA's standardization effort, multiple network level use the same
    IDs for these payloads named "protocol numbers".
    
    This is effectively a "router" type: it expects to get protocol
    number as a parameter, and then invokes relevant type parser based
    on that parameter.
    
    .. seealso::
       Source - https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
    """

    class ProtocolEnum(IntEnum):
        hopopt = 0
        icmp = 1
        igmp = 2
        ggp = 3
        ipv4 = 4
        st = 5
        tcp = 6
        cbt = 7
        egp = 8
        igp = 9
        bbn_rcc_mon = 10
        nvp_ii = 11
        pup = 12
        argus = 13
        emcon = 14
        xnet = 15
        chaos = 16
        udp = 17
        mux = 18
        dcn_meas = 19
        hmp = 20
        prm = 21
        xns_idp = 22
        trunk_1 = 23
        trunk_2 = 24
        leaf_1 = 25
        leaf_2 = 26
        rdp = 27
        irtp = 28
        iso_tp4 = 29
        netblt = 30
        mfe_nsp = 31
        merit_inp = 32
        dccp = 33
        x_3pc = 34
        idpr = 35
        xtp = 36
        ddp = 37
        idpr_cmtp = 38
        tp_plus_plus = 39
        il = 40
        ipv6 = 41
        sdrp = 42
        ipv6_route = 43
        ipv6_frag = 44
        idrp = 45
        rsvp = 46
        gre = 47
        dsr = 48
        bna = 49
        esp = 50
        ah = 51
        i_nlsp = 52
        swipe = 53
        narp = 54
        mobile = 55
        tlsp = 56
        skip = 57
        ipv6_icmp = 58
        ipv6_nonxt = 59
        ipv6_opts = 60
        any_host_internal_protocol = 61
        cftp = 62
        any_local_network = 63
        sat_expak = 64
        kryptolan = 65
        rvd = 66
        ippc = 67
        any_distributed_file_system = 68
        sat_mon = 69
        visa = 70
        ipcv = 71
        cpnx = 72
        cphb = 73
        wsn = 74
        pvp = 75
        br_sat_mon = 76
        sun_nd = 77
        wb_mon = 78
        wb_expak = 79
        iso_ip = 80
        vmtp = 81
        secure_vmtp = 82
        vines = 83
        ttp_or_iptm = 84
        nsfnet_igp = 85
        dgp = 86
        tcf = 87
        eigrp = 88
        ospfigp = 89
        sprite_rpc = 90
        larp = 91
        mtp = 92
        ax_25 = 93
        ipip = 94
        micp = 95
        scc_sp = 96
        etherip = 97
        encap = 98
        any_private_encryption_scheme = 99
        gmtp = 100
        ifmp = 101
        pnni = 102
        pim = 103
        aris = 104
        scps = 105
        qnx = 106
        a_n = 107
        ipcomp = 108
        snp = 109
        compaq_peer = 110
        ipx_in_ip = 111
        vrrp = 112
        pgm = 113
        any_0_hop = 114
        l2tp = 115
        ddx = 116
        iatp = 117
        stp = 118
        srp = 119
        uti = 120
        smp = 121
        sm = 122
        ptp = 123
        isis_over_ipv4 = 124
        fire = 125
        crtp = 126
        crudp = 127
        sscopmce = 128
        iplt = 129
        sps = 130
        pipe = 131
        sctp = 132
        fc = 133
        rsvp_e2e_ignore = 134
        mobility_header = 135
        udplite = 136
        mpls_in_ip = 137
        manet = 138
        hip = 139
        shim6 = 140
        wesp = 141
        rohc = 142
        reserved_255 = 255
    def __init__(self, protocol_num, _io=None, _parent=None, _root=None):
        super(ProtocolBody, self).__init__(_io)
        self._parent = _parent
        self._root = _root or self
        self.protocol_num = protocol_num

    def _read(self):
        _on = self.protocol
        if _on == ProtocolBody.ProtocolEnum.hopopt:
            pass
            self.body = ProtocolBody.OptionHopByHop(self._io, self, self._root)
            self.body._read()
        elif _on == ProtocolBody.ProtocolEnum.icmp:
            pass
            self.body = icmp_packet.IcmpPacket(self._io)
            self.body._read()
        elif _on == ProtocolBody.ProtocolEnum.ipv4:
            pass
            self.body = ipv4_packet.Ipv4Packet(self._io)
            self.body._read()
        elif _on == ProtocolBody.ProtocolEnum.ipv6:
            pass
            self.body = ipv6_packet.Ipv6Packet(self._io)
            self.body._read()
        elif _on == ProtocolBody.ProtocolEnum.ipv6_nonxt:
            pass
            self.body = ProtocolBody.NoNextHeader(self._io, self, self._root)
            self.body._read()
        elif _on == ProtocolBody.ProtocolEnum.tcp:
            pass
            self.body = tcp_segment.TcpSegment(self._io)
            self.body._read()
        elif _on == ProtocolBody.ProtocolEnum.udp:
            pass
            self.body = udp_datagram.UdpDatagram(self._io)
            self.body._read()
        self._dirty = False


    def _fetch_instances(self):
        pass
        _on = self.protocol
        if _on == ProtocolBody.ProtocolEnum.hopopt:
            pass
            self.body._fetch_instances()
        elif _on == ProtocolBody.ProtocolEnum.icmp:
            pass
            self.body._fetch_instances()
        elif _on == ProtocolBody.ProtocolEnum.ipv4:
            pass
            self.body._fetch_instances()
        elif _on == ProtocolBody.ProtocolEnum.ipv6:
            pass
            self.body._fetch_instances()
        elif _on == ProtocolBody.ProtocolEnum.ipv6_nonxt:
            pass
            self.body._fetch_instances()
        elif _on == ProtocolBody.ProtocolEnum.tcp:
            pass
            self.body._fetch_instances()
        elif _on == ProtocolBody.ProtocolEnum.udp:
            pass
            self.body._fetch_instances()


    def _write__seq(self, io=None):
        super(ProtocolBody, self)._write__seq(io)
        _on = self.protocol
        if _on == ProtocolBody.ProtocolEnum.hopopt:
            pass
            self.body._write__seq(self._io)
        elif _on == ProtocolBody.ProtocolEnum.icmp:
            pass
            self.body._write__seq(self._io)
        elif _on == ProtocolBody.ProtocolEnum.ipv4:
            pass
            self.body._write__seq(self._io)
        elif _on == ProtocolBody.ProtocolEnum.ipv6:
            pass
            self.body._write__seq(self._io)
        elif _on == ProtocolBody.ProtocolEnum.ipv6_nonxt:
            pass
            self.body._write__seq(self._io)
        elif _on == ProtocolBody.ProtocolEnum.tcp:
            pass
            self.body._write__seq(self._io)
        elif _on == ProtocolBody.ProtocolEnum.udp:
            pass
            self.body._write__seq(self._io)


    def _check(self):
        _on = self.protocol
        if _on == ProtocolBody.ProtocolEnum.hopopt:
            pass
            if self.body._root != self._root:
                raise kaitaistruct.ConsistencyError(u"body", self._root, self.body._root)
            if self.body._parent != self:
                raise kaitaistruct.ConsistencyError(u"body", self, self.body._parent)
        elif _on == ProtocolBody.ProtocolEnum.icmp:
            pass
        elif _on == ProtocolBody.ProtocolEnum.ipv4:
            pass
        elif _on == ProtocolBody.ProtocolEnum.ipv6:
            pass
        elif _on == ProtocolBody.ProtocolEnum.ipv6_nonxt:
            pass
            if self.body._root != self._root:
                raise kaitaistruct.ConsistencyError(u"body", self._root, self.body._root)
            if self.body._parent != self:
                raise kaitaistruct.ConsistencyError(u"body", self, self.body._parent)
        elif _on == ProtocolBody.ProtocolEnum.tcp:
            pass
        elif _on == ProtocolBody.ProtocolEnum.udp:
            pass
        self._dirty = False

    class NoNextHeader(ReadWriteKaitaiStruct):
        """Dummy type for IPv6 "no next header" type, which signifies end of headers chain."""
        def __init__(self, _io=None, _parent=None, _root=None):
            super(ProtocolBody.NoNextHeader, self).__init__(_io)
            self._parent = _parent
            self._root = _root

        def _read(self):
            pass
            self._dirty = False


        def _fetch_instances(self):
            pass


        def _write__seq(self, io=None):
            super(ProtocolBody.NoNextHeader, self)._write__seq(io)


        def _check(self):
            self._dirty = False


    class OptionHopByHop(ReadWriteKaitaiStruct):
        def __init__(self, _io=None, _parent=None, _root=None):
            super(ProtocolBody.OptionHopByHop, self).__init__(_io)
            self._parent = _parent
            self._root = _root

        def _read(self):
            self.next_header_type = self._io.read_u1()
            self.hdr_ext_len = self._io.read_u1()
            self.body = self._io.read_bytes((self.hdr_ext_len - 1 if self.hdr_ext_len > 0 else 1))
            self.next_header = ProtocolBody(self.next_header_type, self._io, self, self._root)
            self.next_header._read()
            self._dirty = False


        def _fetch_instances(self):
            pass
            self.next_header._fetch_instances()


        def _write__seq(self, io=None):
            super(ProtocolBody.OptionHopByHop, self)._write__seq(io)
            self._io.write_u1(self.next_header_type)
            self._io.write_u1(self.hdr_ext_len)
            self._io.write_bytes(self.body)
            self.next_header._write__seq(self._io)


        def _check(self):
            if len(self.body) != (self.hdr_ext_len - 1 if self.hdr_ext_len > 0 else 1):
                raise kaitaistruct.ConsistencyError(u"body", (self.hdr_ext_len - 1 if self.hdr_ext_len > 0 else 1), len(self.body))
            if self.next_header._root != self._root:
                raise kaitaistruct.ConsistencyError(u"next_header", self._root, self.next_header._root)
            if self.next_header._parent != self:
                raise kaitaistruct.ConsistencyError(u"next_header", self, self.next_header._parent)
            if self.next_header.protocol_num != self.next_header_type:
                raise kaitaistruct.ConsistencyError(u"next_header", self.next_header_type, self.next_header.protocol_num)
            self._dirty = False


    @property
    def protocol(self):
        if hasattr(self, '_m_protocol'):
            return self._m_protocol

        self._m_protocol = KaitaiStream.resolve_enum(ProtocolBody.ProtocolEnum, self.protocol_num)
        return getattr(self, '_m_protocol', None)

    def _invalidate_protocol(self):
        del self._m_protocol