[Git][NTPsec/ntpsec][master] Changed packet representation to classes.
Ian Bruene
gitlab at mg.gitlab.com
Fri Jul 28 21:08:53 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
4369f7e4 by Ian Bruene at 2017-07-28T15:59:50-05:00
Changed packet representation to classes.
- - - - -
2 changed files:
- pylib/agentx.py
- tests/pylib/test_agentx.py
Changes:
=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -6,351 +6,650 @@ from __future__ import print_function
import struct
-# Endianess is selectable in the AgentX packet headers. In this program
-# *ALL* values will be transmitted in big / network endianess.
-
internetPrefix = (1, 3, 6, 1) # Used by the prefix option of OID headers
prefixCount = len(internetPrefix)
-# =======================================================================
+# ==========================================================================
#
-# Function types fall into the following categories:
+# Callables fall into the following categories:
# Data type encoder/decoders
-# Packet body encoder/decoders
+# Packet classes/encoders
+# Packet body decoders
# Glue/Utility/Misc functions
#
-# To encode a packet, call the relevant encode_*pdu function.
+# To encode a packet, create an instance of that packet type's class, then
+# call the encode() method.
#
# To decode a packet, call the decode_packet function, which will select
# the correct decoder for that packet type.
#
-# =======================================================================
+# ==========================================================================
-# =======================================================================
+# ==========================================================================
#
-# Packet body encoders / decoders
+# Packet encoders / decoders
#
-# Encoders take information for both the header and body and
-# return a complete packet.
+# Encoders are class methods which output a string version of the
+# packet, ready for transmission.
#
# Decoders take just the body sliced away from everything else,
-# and a context flag. They return tuples containing the data.
-# They do not return extra data as they are never supposed to
-# receive it.
-# Decoders are not meant to be called directly by external code.
+# and a dict containing the already pared information from the header.
+# They return the relevant class instance for the packet in question,
+# they do not return extra data as they are never supposed to receive it.
#
-# =======================================================================
-
+# Decoders are not meant to be called directly by external code,
+# only by decode_packet().
+#
+# ==========================================================================
+
+
+class AgentXPDU:
+ def __init__(self, pduType, bigEndian, sID, tactID, pktID, context=None):
+ self.pduType = pduType
+ self.bigEndian = bigEndian
+ self.sessionID = sID
+ self.transactionID = tactID
+ self.packetID = pktID
+ self.context = context
+
+ def __eq__(self, other):
+ if not isinstance(other, self.__class__):
+ return False
+ if self.pduType != other.pduType:
+ return False
+ if self.bigEndian != other.bigEndian:
+ return False
+ if self.sessionID != other.sessionID:
+ return False
+ if self.transactionID != other.transactionID:
+ return False
+ if self.packetID != other.packetID:
+ return False
+ if self.context != other.context:
+ return False
+ return True
-def encode_openpdu(sID, tactID, pktID, timeout, oid, description):
- payload = struct.pack("Bxxx", timeout)
- payload += encode_oid(oid, False)
- payload += encode_octetstr(description)
- header = encode_pduheader(PDU_OPEN, False, False, False, False,
- sID, tactID, pktID, len(payload))
- return header + payload
+ def __ne__(self, other):
+ return not self.__eq__(other)
-def decode_openpdu(data, flags):
+def decode_OpenPDU(data, header):
+ flags = header["flags"]
temp, data = slicedata(data, 4)
timeout = struct.unpack("Bxxx", temp)[0]
- oid, data = decode_oid(data, flags)
- octets = decode_octetstr(data, flags)[0]
- result = {"timeout": timeout, "oid": oid, "description": octets}
+ oid, data = decode_oid(data, header)
+ description = decode_octetstr(data, header)[0]
+ result = OpenPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ timeout, oid, description)
return result
-def encode_closepdu(sID, tactID, pktID, reason):
- if reason not in definedReasons:
- raise ValueError("Close reason %s not in defined types" % reason)
- payload = struct.pack("Bxxx", reason)
- header = encode_pduheader(PDU_CLOSE, False, False, False, False,
- sID, tactID, pktID, len(payload))
- return header + payload
+class OpenPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID,
+ timeout, oid, description):
+ AgentXPDU.__init__(self, PDU_OPEN, bigEndian, sID, tactID, pktID)
+ self.timeout = timeout
+ self.oid = oid
+ self.description = description
+
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.timeout != other.timeout:
+ return False
+ if self.oid != other.oid:
+ return False
+ if self.description != other.description:
+ return False
+ return True
+ def encode(self):
+ payload = struct.pack("Bxxx", self.timeout)
+ payload += encode_oid(self.bigEndian, self.oid, False)
+ payload += encode_octetstr(self.bigEndian, self.description)
+ header = encode_pduheader(self.pduType,
+ False, False, False, False, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
+
+
+def decode_ClosePDU(data, header):
+ flags = header["flags"]
+ reason = ord(data[0]) # Bxxx
+ result = ClosePDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ reason)
+ return result
-def decode_closepdu(data, flags):
- reason = {"reason": ord(data[0])} # Bxxx
- return reason
+class ClosePDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID, reason):
+ AgentXPDU.__init__(self, PDU_CLOSE, bigEndian, sID, tactID, pktID)
+ if reason not in definedReasons:
+ raise ValueError("Close reason %s not in defined types" % reason)
+ self.reason = reason
-def encode_registerpdu_core(isregister, sID, tactID, pktID,
- timeout, priority, subtree,
- rangeSubid=0, upperBound=None, context=None):
- if isregister:
- pkttype = PDU_REGISTER
- regbit = True
- else:
- pkttype = PDU_UNREGISTER
- timeout = 0
- regbit = False
- contextP, payload = encode_context(context)
- payload += struct.pack(">BBBx", timeout, priority, rangeSubid)
- payload += encode_oid(subtree, False)
- if rangeSubid != 0:
- if upperBound is None:
- raise ValueError("upperBound must be set if rangeSubid is set")
- payload += struct.pack(">I", upperBound)
- header = encode_pduheader(pkttype, regbit, False, False, contextP,
- sID, tactID, pktID, len(payload))
- packet = header + payload
- return packet
-
-
-def decode_registerpdu_core(data, flags):
- endianToken = getendian(flags)
- context, data = decode_context(data, flags)
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.reason != other.reason:
+ return False
+ return True
+
+ def encode(self):
+ payload = struct.pack("Bxxx", self.reason)
+ header = encode_pduheader(self.pduType,
+ False, False, False, False, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
+
+
+def decode_xRegisterPDU(data, header):
+ flags = header["flags"]
+ endianToken = getendian(flags["bigEndian"])
+ context, data = decode_context(data, header)
temp, data = slicedata(data, 4)
timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx", temp)
- oid, data = decode_oid(data, flags)
+ oid, data = decode_oid(data, header)
if rangeSubid != 0:
temp, data = slicedata(data, 4)
upperBound = struct.unpack(endianToken + "I", temp)[0]
else:
upperBound = None
- result = {"oid": oid, "timeout": timeout, "priority": priority,
- "context": context, "range_subid": rangeSubid,
- "upper_bound": upperBound}
+ if header["type"] == PDU_REGISTER:
+ result = RegisterPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ timeout, priority, oid,
+ rangeSubid, upperBound, context)
+ else:
+ result = UnregisterPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ priority, oid, rangeSubid, upperBound, context)
return result
-def encode_registerpdu(sID, tactID, pktID, timeout, priority, subtree,
- rangeSubid=0, upperBound=None, context=None):
- return encode_registerpdu_core(True, sID, tactID, pktID, timeout,
- priority, subtree, rangeSubid,
- upperBound, context)
-
-
-def encode_unregisterpdu(sID, tactID, pktID, priority, subtree,
- rangeSubid=0, upperBound=None, context=None):
- return encode_registerpdu_core(False, sID, tactID, pktID, 0,
- priority, subtree, rangeSubid,
- upperBound, context)
-
-
-decode_registerpdu = decode_registerpdu_core
-
-
-def decode_unregisterpdu(data, flags):
- result = decode_registerpdu_core(data, flags)
- del result["timeout"]
- return result
-
+class RegisterPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID,
+ timeout, priority, subtree,
+ rangeSubid=0, upperBound=None, context=None):
+ AgentXPDU.__init__(self, PDU_REGISTER,
+ bigEndian, sID, tactID, pktID, context)
+ self.timeout = timeout
+ self.priority = priority
+ self.subtree = subtree
+ self.rangeSubid = rangeSubid
+ self.upperBound = upperBound
+ self.instReg = True # so we don't have to write two encode()s
+
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.timeout != other.timeout:
+ return False
+ if self.priority != other.priority:
+ return False
+ if self.subtree != other.subtree:
+ return False
+ if self.rangeSubid != other.rangeSubid:
+ return False
+ if self.upperBound != other.upperBound:
+ return False
+ return True
-def encode_getpdu_core(isnext, sID, tactID, pktID, oidranges, context=None):
- if isnext is True:
- pkttype = PDU_GET_NEXT
- nullTerm = False
- else:
- pkttype = PDU_GET
- nullTerm = True
- contextP, payload = encode_context(context)
- payload += encode_searchrange_list(oidranges, nullTerm)
- header = encode_pduheader(pkttype, False, False, False, contextP,
- sID, tactID, pktID, len(payload))
- return header + payload
-
-
-def decode_getpdu_core(isnext, data, flags):
- context, data = decode_context(data, flags)
- if isnext is True:
- oidranges = decode_searchrange_list(data, flags)
+ def encode(self):
+ endianToken = getendian(self.bigEndian)
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ if self.pduType == PDU_REGISTER:
+ payload += struct.pack(endianToken + "BBBx", self.timeout,
+ self.priority, self.rangeSubid)
+ else:
+ payload += struct.pack(endianToken + "xBBx",
+ self.priority, self.rangeSubid)
+ payload += encode_oid(self.bigEndian, self.subtree, False)
+ if self.rangeSubid != 0:
+ if self.upperBound is None:
+ raise ValueError("upperBound must be set if rangeSubid is set")
+ payload += struct.pack(endianToken + "I", self.upperBound)
+ header = encode_pduheader(self.pduType, self.instReg, False, False,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ packet = header + payload
+ return packet
+
+
+class UnregisterPDU(RegisterPDU): # These could inherit in either direction
+ def __init__(self, bigEndian, sID, tactID, pktID, priority, subtree,
+ rangeSubid=0, upperBound=None, context=None):
+ RegisterPDU.__init__(self, bigEndian, sID, tactID, pktID,
+ None, priority, subtree,
+ rangeSubid, upperBound, context)
+ self.pduType = PDU_UNREGISTER
+ self.instReg = False
+
+
+def decode_xGetPDU(data, header):
+ flags = header["flags"]
+ context, data = decode_context(data, header)
+ if header["type"] == PDU_GET_NEXT:
+ oidranges = decode_searchrange_list(data, header)
+ result = GetNextPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ oidranges, context)
else:
- oidranges, data = decode_searchrange_list_nullterm(data, flags)
- result = {"context": context, "oidranges": oidranges}
+ oidranges, data = decode_searchrange_list_nullterm(data, header)
+ result = GetPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ oidranges, context)
return result
-def encode_getpdu(sID, tactID, pktID, oidranges, context=None):
- return encode_getpdu_core(False, sID, tactID, pktID, oidranges, context)
-
+class GetPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID, oidranges, context=None):
+ AgentXPDU.__init__(self, PDU_GET,
+ bigEndian, sID, tactID, pktID, context)
+ self.oidranges = oidranges
+ self.nullTerm = True
-def encode_getnextpdu(sID, tactID, pktID, oidranges, context=None):
- return encode_getpdu_core(True, sID, tactID, pktID, oidranges, context)
-
-
-def decode_getpdu(data, flags):
- return decode_getpdu_core(False, data, flags)
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.oidranges != other.oidranges:
+ return False
+ return True
+ def encode(self):
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ payload += encode_searchrange_list(self.bigEndian,
+ self.oidranges, self.nullTerm)
+ header = encode_pduheader(self.pduType, False, False, False,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
+
+
+class GetNextPDU(GetPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID, oidranges, context=None):
+ GetPDU.__init__(self, bigEndian, sID, tactID, pktID,
+ oidranges, context)
+ self.pduType = PDU_GET_NEXT
+ self.nullTerm = False
+
+
+def decode_GetBulkPDU(data, header):
+ flags = header["flags"]
+ endianToken = getendian(flags["bigEndian"])
+ context, data = decode_context(data, header)
+ temp, data = slicedata(data, 4)
+ nonReps, maxReps = struct.unpack(endianToken + "HH", temp)
+ oidranges = decode_searchrange_list(data, header)
+ result = GetBulkPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ nonReps, maxReps, oidranges, context)
+ return result
-def decode_getnextpdu(data, flags):
- return decode_getpdu_core(True, data, flags)
+class GetBulkPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID,
+ nonReps, maxReps, oidranges, context=None):
+ AgentXPDU.__init__(self, PDU_GET_BULK,
+ bigEndian, sID, tactID, pktID, context)
+ self.nonReps = nonReps
+ self.maxReps = maxReps
+ self.oidranges = oidranges
+
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.nonReps != other.nonReps:
+ return False
+ if self.maxReps != other.maxReps:
+ return False
+ if self.oidranges != other.oidranges:
+ return False
+ return True
-def encode_getbulkpdu(sID, tactID, pktID, nonReps, maxReps,
- oidranges, context=None):
- contextP, payload = encode_context(context)
- payload += struct.pack(">HH", nonReps, maxReps)
- payload += encode_searchrange_list(oidranges)
- header = encode_pduheader(PDU_GET_BULK, False, False, False, contextP,
- sID, tactID, pktID, len(payload))
- return header + payload
+ def encode(self):
+ endianToken = getendian(self.bigEndian)
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ payload += struct.pack(endianToken + "HH", self.nonReps, self.maxReps)
+ payload += encode_searchrange_list(self.bigEndian,
+ self.oidranges, False)
+ header = encode_pduheader(self.pduType, False, False, False,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
+
+
+def decode_TestSetPDU(data, header):
+ flags = header["flags"]
+ context, data = decode_context(data, header)
+ varbinds = decode_varbindlist(data, header)
+ result = TestSetPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ varbinds, context)
+ return result
-def decode_getbulkpdu(data, flags):
- endianToken = getendian(flags)
- context, data = decode_context(data, flags)
- temp, data = slicedata(data, 4)
- nonReps, maxReps = struct.unpack(endianToken + "HH", temp)
- oidranges = decode_searchrange_list(data, flags)
- result = {"context": context, "non_reps": nonReps, "max_reps": maxReps,
- "oidranges": oidranges}
- return result
+class TestSetPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID, varbinds, context=None):
+ AgentXPDU.__init__(self, PDU_TEST_SET,
+ bigEndian, sID, tactID, pktID, context)
+ self.varbinds = varbinds
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.varbinds != other.varbinds:
+ return False
+ return True
-def encode_testsetpdu(sID, tactID, pktID, varbinds, context=None):
- contextP, payload = encode_context(context)
- payload += encode_varbindlist(varbinds)
- header = encode_pduheader(PDU_TEST_SET, False, False, False, contextP,
- sID, tactID, pktID, len(payload))
- return header + payload
+ def encode(self):
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ payload += encode_varbindlist(self.bigEndian, self.varbinds)
+ header = encode_pduheader(self.pduType, False, False, False,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
-def decode_testsetpdu(data, flags):
- context, data = decode_context(data, flags)
- varbinds = decode_varbindlist(data, flags)
- result = {"context": context, "varbinds": varbinds}
+def decode_CommitSetPDU(data, header):
+ flags = header["flags"]
+ result = CommitSetPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"])
return result
-# CommitSet, UndoSet, and CleanupSet are bare headers. Don't need decoders
+class CommitSetPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID):
+ AgentXPDU.__init__(self, PDU_COMMIT_SET,
+ bigEndian, sID, tactID, pktID)
-def encode_commitsetpdu(sID, tactID, pktID):
- return encode_pduheader(PDU_COMMIT_SET, False, False, False, False,
- sID, tactID, pktID, 0)
+ def encode(self):
+ header = encode_pduheader(self.pduType,
+ False, False, False, False, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, 0)
+ return header
-def encode_undosetpdu(sID, tactID, pktID):
- return encode_pduheader(PDU_UNDO_SET, False, False, False, False,
- sID, tactID, pktID, 0)
+def decode_UndoSetPDU(data, header):
+ flags = header["flags"]
+ result = UndoSetPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"])
+ return result
-def encode_cleanupsetpdu(sID, tactID, pktID):
- return encode_pduheader(PDU_CLEANUP_SET, False, False, False, False,
- sID, tactID, pktID, 0)
+class UndoSetPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID):
+ AgentXPDU.__init__(self, PDU_UNDO_SET,
+ bigEndian, sID, tactID, pktID)
+ def encode(self):
+ header = encode_pduheader(self.pduType,
+ False, False, False, False, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, 0)
+ return header
-def encode_pingpdu(sID, tactID, pktID, context=None):
- contextP, payload = encode_context(context)
- header = encode_pduheader(PDU_PING, False, False, False, contextP,
- sID, tactID, pktID, len(payload))
- return header + payload
+def decode_CleanupSetPDU(data, header):
+ flags = header["flags"]
+ result = CleanupSetPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"])
+ return result
-def decode_pingpdu(data, flags):
- context, data = decode_context(data, flags)
- return {"context": context}
+class CleanupSetPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID):
+ AgentXPDU.__init__(self, PDU_CLEANUP_SET,
+ bigEndian, sID, tactID, pktID)
-def encode_notifypdu(sID, tactID, pktID, varbinds, context=None):
- contextP, payload = encode_context(context)
- payload += encode_varbindlist(varbinds)
- header = encode_pduheader(PDU_NOTIFY, False, False, False, contextP,
- sID, tactID, pktID, len(payload))
- return header + payload
+ def encode(self):
+ header = encode_pduheader(self.pduType,
+ False, False, False, False, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, 0)
+ return header
-def decode_notifypdu(data, flags):
- context, data = decode_context(data, flags)
- varbinds = decode_varbindlist(data, flags)
- result = {"context": context, "varbinds": varbinds}
+def decode_PingPDU(data, header):
+ flags = header["flags"]
+ context, data = decode_context(data, header)
+ result = PingPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ context)
return result
-def encode_indexallocpdu_core(isdealloc,
- sID, tactID, pktID, newIndex, anyIndex,
- varbinds, context=None):
- pkttype = PDU_INDEX_DEALLOC if isdealloc is True else PDU_INDEX_ALLOC
- contextP, payload = encode_context(context)
- payload += encode_varbindlist(varbinds)
- header = encode_pduheader(pkttype, False, newIndex, anyIndex,
- contextP, sID, tactID, pktID, len(payload))
- return header + payload
+class PingPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID, context=None):
+ AgentXPDU.__init__(self, PDU_PING,
+ bigEndian, sID, tactID, pktID, context)
+ def encode(self):
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ header = encode_pduheader(self.pduType, False, False, False,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
-def encode_indexallocpdu(sID, tactID, pktID, newIndex, anyIndex,
- varbinds, context=None):
- return encode_indexallocpdu_core(False,
- sID, tactID, pktID, newIndex, anyIndex,
- varbinds, context)
+
+def decode_NotifyPDU(data, header):
+ flags = header["flags"]
+ context, data = decode_context(data, header)
+ varbinds = decode_varbindlist(data, header)
+ result = NotifyPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ varbinds, context)
+ return result
-def encode_indexdeallocpdu(sID, tactID, pktID, newIndex, anyIndex,
- varbinds, context=None):
- return encode_indexallocpdu_core(True,
- sID, tactID, pktID, newIndex, anyIndex,
- varbinds, context)
+class NotifyPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID, varbinds, context=None):
+ AgentXPDU.__init__(self, PDU_NOTIFY,
+ bigEndian, sID, tactID, pktID, context)
+ self.varbinds = varbinds
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.varbinds != other.varbinds:
+ return False
+ return True
-def decode_indexallocpdu(data, flags):
- context, data = decode_context(data, flags)
- varbinds = decode_varbindlist(data, flags)
- result = {"context": context, "varbinds": varbinds}
+ def encode(self):
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ payload += encode_varbindlist(self.bigEndian, self.varbinds)
+ header = encode_pduheader(self.pduType, False, False, False,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
+
+
+def decode_xIndexAllocPDU(data, header):
+ flags = header["flags"]
+ context, data = decode_context(data, header)
+ varbinds = decode_varbindlist(data, header)
+ isalloc = (header["type"] == PDU_INDEX_ALLOC)
+ pdu = IndexAllocPDU if isalloc else IndexDeallocPDU
+ result = pdu(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ flags["newIndex"], flags["anyIndex"],
+ varbinds, context)
return result
-decode_indexdeallocpdu = decode_indexallocpdu
+class IndexAllocPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID,
+ newIndex, anyIndex, varbinds, context=None):
+ AgentXPDU.__init__(self, PDU_INDEX_ALLOC,
+ bigEndian, sID, tactID, pktID, context)
+ self.newIndex = newIndex
+ self.anyIndex = anyIndex
+ self.varbinds = varbinds
+
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.newIndex != other.newIndex:
+ return False
+ if self.anyIndex != other.anyIndex:
+ return False
+ if self.varbinds != other.varbinds:
+ return False
+ return True
+ def encode(self):
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ payload += encode_varbindlist(self.bigEndian, self.varbinds)
+ header = encode_pduheader(self.pduType,
+ False, self.newIndex, self.anyIndex,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
+
+
+class IndexDeallocPDU(IndexAllocPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID,
+ newIndex, anyIndex, varbinds, context=None):
+ IndexAllocPDU.__init__(self, bigEndian, sID, tactID, pktID,
+ newIndex, anyIndex, varbinds, context)
+ self.pduType = PDU_INDEX_DEALLOC
+
+
+def decode_AddAgentCapsPDU(data, header):
+ flags = header["flags"]
+ context, data = decode_context(data, header)
+ oid, data = decode_oid(data, header)
+ descr = decode_octetstr(data, header)[0]
+ result = AddAgentCapsPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ oid, descr, context)
+ return result
-def encode_addagentcapspdu(sID, tactID, pktID, oid, descr, context=None):
- contextP, payload = encode_context(context)
- payload += encode_oid(oid, False)
- payload += encode_octetstr(descr)
- header = encode_pduheader(PDU_ADD_AGENT_CAPS,
- False, False, False, contextP,
- sID, tactID, pktID, len(payload))
- return header + payload
+class AddAgentCapsPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID,
+ oid, description, context=None):
+ AgentXPDU.__init__(self, PDU_ADD_AGENT_CAPS,
+ bigEndian, sID, tactID, pktID, context)
+ self.oid = oid
+ self.description = description
+
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.oid != other.oid:
+ return False
+ if self.description != other.description:
+ return False
+ return True
-def decode_addagentcapspdu(data, flags):
- context, data = decode_context(data, flags)
- oid, data = decode_oid(data, flags)
- descr = decode_octetstr(data, flags)[0]
- result = {"context": context, "oid": oid, "description": descr}
+ def encode(self):
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ payload += encode_oid(self.bigEndian, self.oid, False)
+ payload += encode_octetstr(self.bigEndian, self.description)
+ header = encode_pduheader(self.pduType, False, False, False,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
+
+
+def decode_RMAgentCapsPDU(data, header):
+ flags = header["flags"]
+ context, data = decode_context(data, header)
+ oid, data = decode_oid(data, header)
+ result = RMAgentCapsPDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ oid, context)
return result
-def encode_rmagentcapspdu(sID, tactID, pktID, oid, context=None):
- contextP, payload = encode_context(context)
- payload += encode_oid(oid, False)
- header = encode_pduheader(PDU_RM_AGENT_CAPS,
- False, False, False, contextP,
- sID, tactID, pktID, len(payload))
- return header + payload
-
-
-def decode_rmagentcapspdu(data, flags):
- context, data = decode_context(data, flags)
- oid, data = decode_oid(data, flags)
- result = {"context": context, "oid": oid}
- return result
+class RMAgentCapsPDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID, oid, context=None):
+ AgentXPDU.__init__(self, PDU_RM_AGENT_CAPS,
+ bigEndian, sID, tactID, pktID, context)
+ self.oid = oid
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.oid != other.oid:
+ return False
+ return True
-def encode_responsepdu(sID, tactID, pktID,
- sysUpTime, resError, resIndex,
- varbinds=None):
- payload = struct.pack(">IHH", sysUpTime, resError, resIndex)
- if varbinds is not None:
- payload += encode_varbindlist(varbinds)
- header = encode_pduheader(PDU_RESPONSE, False, False, False, False,
- sID, tactID, pktID, len(payload))
- return header + payload
+ def encode(self):
+ contextP, payload = encode_context(self.bigEndian, self.context)
+ payload += encode_oid(self.bigEndian, self.oid, False)
+ header = encode_pduheader(self.pduType, False, False, False,
+ contextP, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
-def decode_responsepdu(data, flags):
- endianToken = getendian(flags)
+def decode_ResponsePDU(data, header):
+ flags = header["flags"]
+ endianToken = getendian(flags["bigEndian"])
temp, data = slicedata(data, 8)
- sysUpTime, resError, resIndex = struct.unpack(endianToken + "IHH", temp)
+ sysUptime, resError, resIndex = struct.unpack(endianToken + "IHH", temp)
if len(data) > 0:
- varbinds = decode_varbindlist(data, flags)
+ varbinds = decode_varbindlist(data, header)
else:
varbinds = None
- result = {"sys_uptime": sysUpTime, "res_err": resError,
- "res_index": resIndex, "varbinds": varbinds}
+ result = ResponsePDU(flags["bigEndian"], header["session_id"],
+ header["transaction_id"], header["packet_id"],
+ sysUptime, resError, resIndex, varbinds)
return result
+class ResponsePDU(AgentXPDU):
+ def __init__(self, bigEndian, sID, tactID, pktID,
+ sysUptime, resError, resIndex, varbinds=None):
+ AgentXPDU.__init__(self, PDU_RESPONSE, bigEndian, sID, tactID, pktID)
+ self.sysUptime = sysUptime
+ self.resError = resError
+ self.resIndex = resIndex
+ self.varbinds = varbinds
+
+ def __eq__(self, other):
+ if AgentXPDU.__eq__(self, other) is not True:
+ return False
+ if self.sysUptime != other.sysUptime:
+ return False
+ if self.resError != other.resError:
+ return False
+ if self.resIndex != other.resIndex:
+ return False
+ if self.varbinds != other.varbinds:
+ return False
+ return True
+
+ def encode(self):
+ endianToken = getendian(self.bigEndian)
+ payload = struct.pack(endianToken + "IHH", self.sysUptime,
+ self.resError, self.resIndex)
+ if self.varbinds is not None:
+ payload += encode_varbindlist(self.bigEndian, self.varbinds)
+ header = encode_pduheader(self.pduType,
+ False, False, False, False, self.bigEndian,
+ self.sessionID, self.transactionID,
+ self.packetID, len(payload))
+ return header + payload
+
+
# ========================================================================
#
# Data type encoders / decoders
@@ -361,7 +660,7 @@ def decode_responsepdu(data, flags):
#
# ========================================================================
-def encode_oid(subids, include):
+def encode_oid(bigEndian, subids, include):
subids = tuple(subids)
numsubs = len(subids)
if not (0 <= numsubs <= 128): # Packet can have a maximum of 128 sections
@@ -376,13 +675,15 @@ def encode_oid(subids, include):
prefix = 0
n_subid = len(subids)
include = int(bool(include)) # force integer bool
- body = struct.pack(">BBBx", n_subid, prefix, include)
+ endianToken = getendian(bigEndian)
+ body = struct.pack(endianToken + "BBBx", n_subid, prefix, include)
for subid in subids:
- body += struct.pack(">I", subid)
+ body += struct.pack(endianToken + "I", subid)
return body
-def decode_oid(data, flags):
+def decode_oid(data, header):
+ flags = header["flags"]
# Need to split off the header to get the subid count
header, data = slicedata(data, 4)
n_subid, prefix, include = struct.unpack("BBBx", header)
@@ -397,16 +698,17 @@ def decode_oid(data, flags):
# Now have the count, need to split the subids from the rest of the packet
byteCount = n_subid * 4
data, rest = slicedata(data, byteCount)
- endianToken = getendian(flags)
+ endianToken = getendian(flags["bigEndian"])
formatString = endianToken + ("I" * n_subid)
subids += struct.unpack(formatString, data)
result = {"subids": subids, "include": include}
return (result, rest)
-def encode_octetstr(octets):
+def encode_octetstr(bigEndian, octets):
numoctets = len(octets)
- header = struct.pack(">I", numoctets)
+ endianToken = getendian(bigEndian)
+ header = struct.pack(endianToken + "I", numoctets)
pad = (numoctets % 4)
if pad > 0: # Pad out the data to word boundary
pad = 4 - pad
@@ -420,9 +722,10 @@ def encode_octetstr(octets):
return data
-def decode_octetstr(data, flags):
+def decode_octetstr(data, header):
+ flags = header["flags"]
header, data = slicedata(data, 4)
- endianToken = getendian(flags)
+ endianToken = getendian(flags["bigEndian"])
numoctets = struct.unpack(endianToken + "I", header)[0]
if len(data) < numoctets:
raise ValueError("Octet string shorter than length")
@@ -432,112 +735,118 @@ def decode_octetstr(data, flags):
return data[:numoctets], data[numoctets + pad:]
-def encode_varbind(valType, oid, *payload):
+def encode_varbind(bigEndian, valType, oid, *payload):
if valType not in definedValueTypes.keys():
raise ValueError("Value type %s not in defined types" % valType)
- header = struct.pack(">Hxx", valType)
- name = encode_oid(oid, False)
+ endianToken = getendian(bigEndian)
+ header = struct.pack(endianToken + "Hxx", valType)
+ name = encode_oid(bigEndian, oid, False)
handlers = definedValueTypes[valType]
if handlers is None:
data = header + name
else:
- data = header + name + handlers[0](*payload)
+ data = header + name + handlers[0](bigEndian, *payload)
return data
-def decode_varbind(data, flags):
- header, data = slicedata(data, 4)
- endianToken = getendian(flags)
- valType = struct.unpack(endianToken + "Hxx", header)[0]
- name, data = decode_oid(data, flags)
+def decode_varbind(data, header):
+ flags = header["flags"]
+ bindheader, data = slicedata(data, 4)
+ endianToken = getendian(flags["bigEndian"])
+ valType = struct.unpack(endianToken + "Hxx", bindheader)[0]
+ name, data = decode_oid(data, header)
if valType not in definedValueTypes.keys():
raise ValueError("Value type %s not in defined types" % valType)
handlers = definedValueTypes[valType]
- payload, data = handlers[1](data, flags)
+ payload, data = handlers[1](data, header)
result = {"type": valType, "name": name, "data": payload}
return result, data
-def encode_integer32(num):
- return struct.pack(">I", num)
+def encode_integer32(bigEndian, num):
+ endianToken = getendian(bigEndian)
+ return struct.pack(endianToken + "I", num)
-def decode_integer32(data, flags):
- endianToken = getendian(flags)
+def decode_integer32(data, header):
+ flags = header["flags"]
+ endianToken = getendian(flags["bigEndian"])
num, data = slicedata(data, 4)
num = struct.unpack(endianToken + "I", num)[0]
return (num, data)
-def encode_nullvalue():
+def encode_nullvalue(bigEndian):
return ""
-def decode_nullvalue(data, flags):
+def decode_nullvalue(data, header):
return (None, data)
-def encode_integer64(num):
- return struct.pack(">Q", num)
+def encode_integer64(bigEndian, num):
+ endianToken = getendian(bigEndian)
+ return struct.pack(endianToken + "Q", num)
-def decode_integer64(data, flags):
- endianToken = getendian(flags)
+def decode_integer64(data, header):
+ flags = header["flags"]
+ endianToken = getendian(flags["bigEndian"])
num, data = slicedata(data, 8)
num = struct.unpack(endianToken + "Q", num)[0]
return (num, data)
-def encode_ipaddr(octets):
+def encode_ipaddr(bigEndian, octets):
if len(octets) != 4:
raise ValueError("IP Address incorrect length")
- return encode_octetstr(octets)
+ return encode_octetstr(bigEndian, octets)
-def decode_ipaddr(data, flags):
- addr, data = decode_octetstr(data, flags)
+def decode_ipaddr(data, header):
+ addr, data = decode_octetstr(data, header)
addr = struct.unpack("BBBB", addr)
return addr, data
-def encode_searchrange(startOID, endOID, inclusiveP):
- startOIDstr = encode_oid(startOID, inclusiveP)
- endOIDstr = encode_oid(endOID, False)
+def encode_searchrange(bigEndian, startOID, endOID, inclusiveP):
+ startOIDstr = encode_oid(bigEndian, startOID, inclusiveP)
+ endOIDstr = encode_oid(bigEndian, endOID, False)
return startOIDstr + endOIDstr
-def decode_searchrange(data, flags):
- startOID, data = decode_oid(data, flags)
- endOID, data = decode_oid(data, flags)
+def decode_searchrange(data, header):
+ startOID, data = decode_oid(data, header)
+ endOID, data = decode_oid(data, header)
result = {"start": startOID, "end": endOID}
return result, data
-def encode_searchrange_list(oidranges, nullTerminate=False):
+def encode_searchrange_list(bigEndian, oidranges, nullTerminate=False):
encoded = []
for oran in oidranges:
- encoded.append(encode_searchrange(*oran))
+ encoded.append(encode_searchrange(bigEndian, *oran))
if nullTerminate:
- encoded.append(encode_oid(tuple(), False))
+ encoded.append(encode_oid(bigEndian, tuple(), False))
encoded = "".join(encoded)
return encoded
-def decode_searchrange_list(data, flags): # Cannot handle extra data
+def decode_searchrange_list(data, header): # Cannot handle extra data
oidranges = []
while len(data) > 0:
- oids, data = decode_searchrange(data, flags)
+ oids, data = decode_searchrange(data, header)
oidranges.append(oids)
return tuple(oidranges)
-def decode_searchrange_list_nullterm(data, flags):
+def decode_searchrange_list_nullterm(data, header):
oidranges = []
while len(data) > 0:
- one, data = decode_oid(data, flags)
+ one, data = decode_oid(data, header)
if isnullOID(one):
break
- two, data = decode_oid(data, flags)
+ two, data = decode_oid(data, header)
oidranges.append({"start": one, "end": two})
return tuple(oidranges), data
@@ -546,8 +855,9 @@ def decode_searchrange_list_nullterm(data, flags):
# Utilities, glue, and misc
# =========================================
-def getendian(flags):
- return ">" if flags["bigEndian"] is True else "<"
+
+def getendian(bigEndian):
+ return ">" if bigEndian is True else "<"
def slicedata(data, slicepoint):
@@ -560,18 +870,18 @@ def isnullOID(oid):
return False
-def encode_varbindlist(varbinds):
+def encode_varbindlist(bigEndian, varbinds):
payload = ""
for varbind in varbinds:
- payload += encode_varbind(*varbind)
+ payload += encode_varbind(bigEndian, *varbind)
return payload
-def decode_varbindlist(data, flags):
+def decode_varbindlist(data, header):
if len(data) > 0:
varbinds = []
while len(data) > 0:
- vb, data = decode_varbind(data, flags)
+ vb, data = decode_varbind(data, header)
varbinds.append(vb)
varbinds = tuple(varbinds)
else:
@@ -580,8 +890,8 @@ def decode_varbindlist(data, flags):
def encode_pduheader(pduType, instanceRegistration, newIndex,
- anyIndex, nonDefaultContext, sessionID,
- transactionID, packetID, payloadLength):
+ anyIndex, nonDefaultContext, bigEndian,
+ sessionID, transactionID, packetID, payloadLength):
# version type flags reserved
# sessionID
# transactionID
@@ -594,12 +904,12 @@ def encode_pduheader(pduType, instanceRegistration, newIndex,
flags |= newIndex << 1
flags |= anyIndex << 2
flags |= nonDefaultContext << 3
- flags |= True << 4 # byte order, we always send network / big endian
- data = struct.pack(">BBBxIIII", 1, pduType, flags,
- sessionID,
- transactionID,
- packetID,
- payloadLength)
+ flags |= bigEndian << 4
+ # Yes, this is a kluge, it is less problematic then the next best kluge
+ endianToken = getendian(bigEndian)
+ data = struct.pack(endianToken + "BBBxIIII", 1, pduType, flags,
+ sessionID, transactionID,
+ packetID, payloadLength)
return data
@@ -617,7 +927,7 @@ def decode_pduheader(data): # Endianess is controlled from the PDU header
flagDict["bigEndian"] = bool(flags & 0x10)
# Chop the remaining parts of the header from the rest of the datastream
# then parse them
- fmt = getendian(flagDict) + "IIII"
+ fmt = getendian(flagDict["bigEndian"]) + "IIII"
linen, data = slicedata(data, 16) # 4 x 4-byte variables
sID, tactionID, pktID, dataLen = struct.unpack(fmt, linen)
result = {"version": version, "type": pduType, "flags": flagDict,
@@ -626,19 +936,20 @@ def decode_pduheader(data): # Endianess is controlled from the PDU header
return result
-def encode_context(context):
+def encode_context(bigEndian, context):
if context is not None:
contextP = True
- payload = encode_octetstr(context)
+ payload = encode_octetstr(bigEndian, context)
else:
contextP = False
payload = ""
return (contextP, payload)
-def decode_context(data, flags):
+def decode_context(data, header):
+ flags = header["flags"]
if flags["contextP"] is True:
- context, data = decode_octetstr(data, flags)
+ context, data = decode_octetstr(data, header)
else:
context = None
return (context, data)
@@ -659,9 +970,8 @@ def decode_packet(data):
parsedPkt = None
else:
packetSlice, newData = slicedata(newData, header["length"])
- parsedPkt = decoder(packetSlice, header["flags"])
- result = {"header": header, "body": parsedPkt}
- return result, newData
+ parsedPkt = decoder(packetSlice, header)
+ return parsedPkt, newData
# Value types
@@ -713,24 +1023,24 @@ PDU_ADD_AGENT_CAPS = 16
PDU_RM_AGENT_CAPS = 17
PDU_RESPONSE = 18
definedPDUTypes = { # Used by the decode_packet function
- PDU_OPEN: decode_openpdu,
- PDU_CLOSE: decode_closepdu,
- PDU_REGISTER: decode_registerpdu,
- PDU_UNREGISTER: decode_unregisterpdu,
- PDU_GET: decode_getpdu,
- PDU_GET_NEXT: decode_getnextpdu,
- PDU_GET_BULK: decode_getbulkpdu,
- PDU_TEST_SET: decode_testsetpdu,
- PDU_COMMIT_SET: None,
- PDU_UNDO_SET: None,
- PDU_CLEANUP_SET: None,
- PDU_NOTIFY: decode_notifypdu,
- PDU_PING: decode_pingpdu,
- PDU_INDEX_ALLOC: decode_indexallocpdu,
- PDU_INDEX_DEALLOC: decode_indexdeallocpdu,
- PDU_ADD_AGENT_CAPS: decode_addagentcapspdu,
- PDU_RM_AGENT_CAPS: decode_rmagentcapspdu,
- PDU_RESPONSE: decode_responsepdu}
+ PDU_OPEN: decode_OpenPDU,
+ PDU_CLOSE: decode_ClosePDU,
+ PDU_REGISTER: decode_xRegisterPDU,
+ PDU_UNREGISTER: decode_xRegisterPDU,
+ PDU_GET: decode_xGetPDU,
+ PDU_GET_NEXT: decode_xGetPDU,
+ PDU_GET_BULK: decode_GetBulkPDU,
+ PDU_TEST_SET: decode_TestSetPDU,
+ PDU_COMMIT_SET: decode_CommitSetPDU,
+ PDU_UNDO_SET: decode_UndoSetPDU,
+ PDU_CLEANUP_SET: decode_CleanupSetPDU,
+ PDU_NOTIFY: decode_NotifyPDU,
+ PDU_PING: decode_PingPDU,
+ PDU_INDEX_ALLOC: decode_xIndexAllocPDU,
+ PDU_INDEX_DEALLOC: decode_xIndexAllocPDU,
+ PDU_ADD_AGENT_CAPS: decode_AddAgentCapsPDU,
+ PDU_RM_AGENT_CAPS: decode_RMAgentCapsPDU,
+ PDU_RESPONSE: decode_ResponsePDU}
# Closing reasons
RSN_OTHER = 1
=====================================
tests/pylib/test_agentx.py
=====================================
The diff for this file was not included because it is too large.
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/4369f7e495249b5b36a0ded5f75521f07133e350
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/4369f7e495249b5b36a0ded5f75521f07133e350
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170728/53825d78/attachment.html>
More information about the vc
mailing list