Protocol body represents particular payload on transport level (OSI layer 4).
Typically this payload in encapsulated into network level (OSI layer 3) packet, which includes "protocol number" field that would be used to decide what's inside the payload and how to parse it. Thanks to IANA's standardization effort, multiple network level use the same IDs for these payloads named "protocol numbers".
This is effectively a "router" type: it expects to get protocol number as a parameter, and then invokes relevant type parser based on that parameter.
This page hosts a formal specification of protocol_body using Kaitai Struct. This specification can be automatically translated into a variety of programming languages to get a parsing library.
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
# type: ignore
import kaitaistruct
from kaitaistruct import ReadWriteKaitaiStruct, KaitaiStream, BytesIO
import ipv6_packet
import udp_datagram
import icmp_packet
import tcp_segment
import ipv4_packet
from enum import IntEnum
if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 11):
raise Exception("Incompatible Kaitai Struct Python API: 0.11 or later is required, but you have %s" % (kaitaistruct.__version__))
class ProtocolBody(ReadWriteKaitaiStruct):
"""Protocol body represents particular payload on transport level (OSI
layer 4).
Typically this payload in encapsulated into network level (OSI layer
3) packet, which includes "protocol number" field that would be used
to decide what's inside the payload and how to parse it. Thanks to
IANA's standardization effort, multiple network level use the same
IDs for these payloads named "protocol numbers".
This is effectively a "router" type: it expects to get protocol
number as a parameter, and then invokes relevant type parser based
on that parameter.
.. seealso::
Source - https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
"""
class ProtocolEnum(IntEnum):
hopopt = 0
icmp = 1
igmp = 2
ggp = 3
ipv4 = 4
st = 5
tcp = 6
cbt = 7
egp = 8
igp = 9
bbn_rcc_mon = 10
nvp_ii = 11
pup = 12
argus = 13
emcon = 14
xnet = 15
chaos = 16
udp = 17
mux = 18
dcn_meas = 19
hmp = 20
prm = 21
xns_idp = 22
trunk_1 = 23
trunk_2 = 24
leaf_1 = 25
leaf_2 = 26
rdp = 27
irtp = 28
iso_tp4 = 29
netblt = 30
mfe_nsp = 31
merit_inp = 32
dccp = 33
x_3pc = 34
idpr = 35
xtp = 36
ddp = 37
idpr_cmtp = 38
tp_plus_plus = 39
il = 40
ipv6 = 41
sdrp = 42
ipv6_route = 43
ipv6_frag = 44
idrp = 45
rsvp = 46
gre = 47
dsr = 48
bna = 49
esp = 50
ah = 51
i_nlsp = 52
swipe = 53
narp = 54
mobile = 55
tlsp = 56
skip = 57
ipv6_icmp = 58
ipv6_nonxt = 59
ipv6_opts = 60
any_host_internal_protocol = 61
cftp = 62
any_local_network = 63
sat_expak = 64
kryptolan = 65
rvd = 66
ippc = 67
any_distributed_file_system = 68
sat_mon = 69
visa = 70
ipcv = 71
cpnx = 72
cphb = 73
wsn = 74
pvp = 75
br_sat_mon = 76
sun_nd = 77
wb_mon = 78
wb_expak = 79
iso_ip = 80
vmtp = 81
secure_vmtp = 82
vines = 83
ttp_or_iptm = 84
nsfnet_igp = 85
dgp = 86
tcf = 87
eigrp = 88
ospfigp = 89
sprite_rpc = 90
larp = 91
mtp = 92
ax_25 = 93
ipip = 94
micp = 95
scc_sp = 96
etherip = 97
encap = 98
any_private_encryption_scheme = 99
gmtp = 100
ifmp = 101
pnni = 102
pim = 103
aris = 104
scps = 105
qnx = 106
a_n = 107
ipcomp = 108
snp = 109
compaq_peer = 110
ipx_in_ip = 111
vrrp = 112
pgm = 113
any_0_hop = 114
l2tp = 115
ddx = 116
iatp = 117
stp = 118
srp = 119
uti = 120
smp = 121
sm = 122
ptp = 123
isis_over_ipv4 = 124
fire = 125
crtp = 126
crudp = 127
sscopmce = 128
iplt = 129
sps = 130
pipe = 131
sctp = 132
fc = 133
rsvp_e2e_ignore = 134
mobility_header = 135
udplite = 136
mpls_in_ip = 137
manet = 138
hip = 139
shim6 = 140
wesp = 141
rohc = 142
reserved_255 = 255
def __init__(self, protocol_num, _io=None, _parent=None, _root=None):
super(ProtocolBody, self).__init__(_io)
self._parent = _parent
self._root = _root or self
self.protocol_num = protocol_num
def _read(self):
_on = self.protocol
if _on == ProtocolBody.ProtocolEnum.hopopt:
pass
self.body = ProtocolBody.OptionHopByHop(self._io, self, self._root)
self.body._read()
elif _on == ProtocolBody.ProtocolEnum.icmp:
pass
self.body = icmp_packet.IcmpPacket(self._io)
self.body._read()
elif _on == ProtocolBody.ProtocolEnum.ipv4:
pass
self.body = ipv4_packet.Ipv4Packet(self._io)
self.body._read()
elif _on == ProtocolBody.ProtocolEnum.ipv6:
pass
self.body = ipv6_packet.Ipv6Packet(self._io)
self.body._read()
elif _on == ProtocolBody.ProtocolEnum.ipv6_nonxt:
pass
self.body = ProtocolBody.NoNextHeader(self._io, self, self._root)
self.body._read()
elif _on == ProtocolBody.ProtocolEnum.tcp:
pass
self.body = tcp_segment.TcpSegment(self._io)
self.body._read()
elif _on == ProtocolBody.ProtocolEnum.udp:
pass
self.body = udp_datagram.UdpDatagram(self._io)
self.body._read()
self._dirty = False
def _fetch_instances(self):
pass
_on = self.protocol
if _on == ProtocolBody.ProtocolEnum.hopopt:
pass
self.body._fetch_instances()
elif _on == ProtocolBody.ProtocolEnum.icmp:
pass
self.body._fetch_instances()
elif _on == ProtocolBody.ProtocolEnum.ipv4:
pass
self.body._fetch_instances()
elif _on == ProtocolBody.ProtocolEnum.ipv6:
pass
self.body._fetch_instances()
elif _on == ProtocolBody.ProtocolEnum.ipv6_nonxt:
pass
self.body._fetch_instances()
elif _on == ProtocolBody.ProtocolEnum.tcp:
pass
self.body._fetch_instances()
elif _on == ProtocolBody.ProtocolEnum.udp:
pass
self.body._fetch_instances()
def _write__seq(self, io=None):
super(ProtocolBody, self)._write__seq(io)
_on = self.protocol
if _on == ProtocolBody.ProtocolEnum.hopopt:
pass
self.body._write__seq(self._io)
elif _on == ProtocolBody.ProtocolEnum.icmp:
pass
self.body._write__seq(self._io)
elif _on == ProtocolBody.ProtocolEnum.ipv4:
pass
self.body._write__seq(self._io)
elif _on == ProtocolBody.ProtocolEnum.ipv6:
pass
self.body._write__seq(self._io)
elif _on == ProtocolBody.ProtocolEnum.ipv6_nonxt:
pass
self.body._write__seq(self._io)
elif _on == ProtocolBody.ProtocolEnum.tcp:
pass
self.body._write__seq(self._io)
elif _on == ProtocolBody.ProtocolEnum.udp:
pass
self.body._write__seq(self._io)
def _check(self):
_on = self.protocol
if _on == ProtocolBody.ProtocolEnum.hopopt:
pass
if self.body._root != self._root:
raise kaitaistruct.ConsistencyError(u"body", self._root, self.body._root)
if self.body._parent != self:
raise kaitaistruct.ConsistencyError(u"body", self, self.body._parent)
elif _on == ProtocolBody.ProtocolEnum.icmp:
pass
elif _on == ProtocolBody.ProtocolEnum.ipv4:
pass
elif _on == ProtocolBody.ProtocolEnum.ipv6:
pass
elif _on == ProtocolBody.ProtocolEnum.ipv6_nonxt:
pass
if self.body._root != self._root:
raise kaitaistruct.ConsistencyError(u"body", self._root, self.body._root)
if self.body._parent != self:
raise kaitaistruct.ConsistencyError(u"body", self, self.body._parent)
elif _on == ProtocolBody.ProtocolEnum.tcp:
pass
elif _on == ProtocolBody.ProtocolEnum.udp:
pass
self._dirty = False
class NoNextHeader(ReadWriteKaitaiStruct):
"""Dummy type for IPv6 "no next header" type, which signifies end of headers chain."""
def __init__(self, _io=None, _parent=None, _root=None):
super(ProtocolBody.NoNextHeader, self).__init__(_io)
self._parent = _parent
self._root = _root
def _read(self):
pass
self._dirty = False
def _fetch_instances(self):
pass
def _write__seq(self, io=None):
super(ProtocolBody.NoNextHeader, self)._write__seq(io)
def _check(self):
self._dirty = False
class OptionHopByHop(ReadWriteKaitaiStruct):
def __init__(self, _io=None, _parent=None, _root=None):
super(ProtocolBody.OptionHopByHop, self).__init__(_io)
self._parent = _parent
self._root = _root
def _read(self):
self.next_header_type = self._io.read_u1()
self.hdr_ext_len = self._io.read_u1()
self.body = self._io.read_bytes((self.hdr_ext_len - 1 if self.hdr_ext_len > 0 else 1))
self.next_header = ProtocolBody(self.next_header_type, self._io, self, self._root)
self.next_header._read()
self._dirty = False
def _fetch_instances(self):
pass
self.next_header._fetch_instances()
def _write__seq(self, io=None):
super(ProtocolBody.OptionHopByHop, self)._write__seq(io)
self._io.write_u1(self.next_header_type)
self._io.write_u1(self.hdr_ext_len)
self._io.write_bytes(self.body)
self.next_header._write__seq(self._io)
def _check(self):
if len(self.body) != (self.hdr_ext_len - 1 if self.hdr_ext_len > 0 else 1):
raise kaitaistruct.ConsistencyError(u"body", (self.hdr_ext_len - 1 if self.hdr_ext_len > 0 else 1), len(self.body))
if self.next_header._root != self._root:
raise kaitaistruct.ConsistencyError(u"next_header", self._root, self.next_header._root)
if self.next_header._parent != self:
raise kaitaistruct.ConsistencyError(u"next_header", self, self.next_header._parent)
if self.next_header.protocol_num != self.next_header_type:
raise kaitaistruct.ConsistencyError(u"next_header", self.next_header_type, self.next_header.protocol_num)
self._dirty = False
@property
def protocol(self):
if hasattr(self, '_m_protocol'):
return self._m_protocol
self._m_protocol = KaitaiStream.resolve_enum(ProtocolBody.ProtocolEnum, self.protocol_num)
return getattr(self, '_m_protocol', None)
def _invalidate_protocol(self):
del self._m_protocol