[Git][NTPsec/ntpsec][master] Changed packet representation to classes.

Ian Bruene gitlab at mg.gitlab.com
Fri Jul 28 21:08:53 UTC 2017


Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
4369f7e4 by Ian Bruene at 2017-07-28T15:59:50-05:00
Changed packet representation to classes.

- - - - -


2 changed files:

- pylib/agentx.py
- tests/pylib/test_agentx.py


Changes:

=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -6,351 +6,650 @@ from __future__ import print_function
 
 import struct
 
-# Endianess is selectable in the AgentX packet headers. In this program
-# *ALL* values will be transmitted in big / network endianess.
-
 internetPrefix = (1, 3, 6, 1)  # Used by the prefix option of OID headers
 prefixCount = len(internetPrefix)
 
-# =======================================================================
+# ==========================================================================
 #
-# Function types fall into the following categories:
+# Callables fall into the following categories:
 #   Data type encoder/decoders
-#   Packet body encoder/decoders
+#   Packet classes/encoders
+#   Packet body decoders
 #   Glue/Utility/Misc functions
 #
-# To encode a packet, call the relevant encode_*pdu function.
+# To encode a packet, create an instance of that packet type's class, then
+# call the encode() method.
 #
 # To decode a packet, call the decode_packet function, which will select
 # the correct decoder for that packet type.
 #
-# =======================================================================
+# ==========================================================================
 
 
-# =======================================================================
+# ==========================================================================
 #
-# Packet body encoders / decoders
+# Packet encoders / decoders
 #
-#   Encoders take information for both the header and body and
-#   return a complete packet.
+#   Encoders are class methods which output a string version of the
+#   packet, ready for transmission.
 #
 #   Decoders take just the body sliced away from everything else,
-#   and a context flag. They return tuples containing the data.
-#   They do not return extra data as they are never supposed to
-#   receive it.
-#   Decoders are not meant to be called directly by external code.
+#   and a dict containing the already pared information from the header.
+#   They return the relevant class instance for the packet in question,
+#   they do not return extra data as they are never supposed to receive it.
 #
-# =======================================================================
-
+#   Decoders are not meant to be called directly by external code,
+#   only by decode_packet().
+#
+# ==========================================================================
+
+
+class AgentXPDU:
+    def __init__(self, pduType, bigEndian, sID, tactID, pktID, context=None):
+        self.pduType = pduType
+        self.bigEndian = bigEndian
+        self.sessionID = sID
+        self.transactionID = tactID
+        self.packetID = pktID
+        self.context = context
+
+    def __eq__(self, other):
+        if not isinstance(other, self.__class__):
+            return False
+        if self.pduType != other.pduType:
+            return False
+        if self.bigEndian != other.bigEndian:
+            return False
+        if self.sessionID != other.sessionID:
+            return False
+        if self.transactionID != other.transactionID:
+            return False
+        if self.packetID != other.packetID:
+            return False
+        if self.context != other.context:
+            return False
+        return True
 
-def encode_openpdu(sID, tactID, pktID, timeout, oid, description):
-    payload = struct.pack("Bxxx", timeout)
-    payload += encode_oid(oid, False)
-    payload += encode_octetstr(description)
-    header = encode_pduheader(PDU_OPEN, False, False, False, False,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
+    def __ne__(self, other):
+        return not self.__eq__(other)
 
 
-def decode_openpdu(data, flags):
+def decode_OpenPDU(data, header):
+    flags = header["flags"]
     temp, data = slicedata(data, 4)
     timeout = struct.unpack("Bxxx", temp)[0]
-    oid, data = decode_oid(data, flags)
-    octets = decode_octetstr(data, flags)[0]
-    result = {"timeout": timeout, "oid": oid, "description": octets}
+    oid, data = decode_oid(data, header)
+    description = decode_octetstr(data, header)[0]
+    result = OpenPDU(flags["bigEndian"], header["session_id"],
+                     header["transaction_id"], header["packet_id"],
+                     timeout, oid, description)
     return result
 
 
-def encode_closepdu(sID, tactID, pktID, reason):
-    if reason not in definedReasons:
-        raise ValueError("Close reason %s not in defined types" % reason)
-    payload = struct.pack("Bxxx", reason)
-    header = encode_pduheader(PDU_CLOSE, False, False, False, False,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
+class OpenPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID,
+                 timeout, oid, description):
+        AgentXPDU.__init__(self, PDU_OPEN, bigEndian, sID, tactID, pktID)
+        self.timeout = timeout
+        self.oid = oid
+        self.description = description
+
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.timeout != other.timeout:
+            return False
+        if self.oid != other.oid:
+            return False
+        if self.description != other.description:
+            return False
+        return True
 
+    def encode(self):
+        payload = struct.pack("Bxxx", self.timeout)
+        payload += encode_oid(self.bigEndian, self.oid, False)
+        payload += encode_octetstr(self.bigEndian, self.description)
+        header = encode_pduheader(self.pduType,
+                                  False, False, False, False, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
+
+
+def decode_ClosePDU(data, header):
+    flags = header["flags"]
+    reason = ord(data[0])  # Bxxx
+    result = ClosePDU(flags["bigEndian"], header["session_id"],
+                      header["transaction_id"], header["packet_id"],
+                      reason)
+    return result
 
-def decode_closepdu(data, flags):
-    reason = {"reason": ord(data[0])}  # Bxxx
-    return reason
 
+class ClosePDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID, reason):
+        AgentXPDU.__init__(self, PDU_CLOSE, bigEndian, sID, tactID, pktID)
+        if reason not in definedReasons:
+            raise ValueError("Close reason %s not in defined types" % reason)
+        self.reason = reason
 
-def encode_registerpdu_core(isregister, sID, tactID, pktID,
-                            timeout, priority, subtree,
-                            rangeSubid=0, upperBound=None, context=None):
-    if isregister:
-        pkttype = PDU_REGISTER
-        regbit = True
-    else:
-        pkttype = PDU_UNREGISTER
-        timeout = 0
-        regbit = False
-    contextP, payload = encode_context(context)
-    payload += struct.pack(">BBBx", timeout, priority, rangeSubid)
-    payload += encode_oid(subtree, False)
-    if rangeSubid != 0:
-        if upperBound is None:
-            raise ValueError("upperBound must be set if rangeSubid is set")
-        payload += struct.pack(">I", upperBound)
-    header = encode_pduheader(pkttype, regbit, False, False, contextP,
-                              sID, tactID, pktID, len(payload))
-    packet = header + payload
-    return packet
-
-
-def decode_registerpdu_core(data, flags):
-    endianToken = getendian(flags)
-    context, data = decode_context(data, flags)
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.reason != other.reason:
+            return False
+        return True
+
+    def encode(self):
+        payload = struct.pack("Bxxx", self.reason)
+        header = encode_pduheader(self.pduType,
+                                  False, False, False, False, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
+
+
+def decode_xRegisterPDU(data, header):
+    flags = header["flags"]
+    endianToken = getendian(flags["bigEndian"])
+    context, data = decode_context(data, header)
     temp, data = slicedata(data, 4)
     timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx", temp)
-    oid, data = decode_oid(data, flags)
+    oid, data = decode_oid(data, header)
     if rangeSubid != 0:
         temp, data = slicedata(data, 4)
         upperBound = struct.unpack(endianToken + "I", temp)[0]
     else:
         upperBound = None
-    result = {"oid": oid, "timeout": timeout, "priority": priority,
-              "context": context, "range_subid": rangeSubid,
-              "upper_bound": upperBound}
+    if header["type"] == PDU_REGISTER:
+        result = RegisterPDU(flags["bigEndian"], header["session_id"],
+                             header["transaction_id"], header["packet_id"],
+                             timeout, priority, oid,
+                             rangeSubid, upperBound, context)
+    else:
+        result = UnregisterPDU(flags["bigEndian"], header["session_id"],
+                               header["transaction_id"], header["packet_id"],
+                               priority, oid, rangeSubid, upperBound, context)
     return result
 
 
-def encode_registerpdu(sID, tactID, pktID, timeout, priority, subtree,
-                       rangeSubid=0, upperBound=None, context=None):
-    return encode_registerpdu_core(True, sID, tactID, pktID, timeout,
-                                   priority, subtree, rangeSubid,
-                                   upperBound, context)
-
-
-def encode_unregisterpdu(sID, tactID, pktID, priority, subtree,
-                         rangeSubid=0, upperBound=None, context=None):
-    return encode_registerpdu_core(False, sID, tactID, pktID, 0,
-                                   priority, subtree, rangeSubid,
-                                   upperBound, context)
-
-
-decode_registerpdu = decode_registerpdu_core
-
-
-def decode_unregisterpdu(data, flags):
-    result = decode_registerpdu_core(data, flags)
-    del result["timeout"]
-    return result
-
+class RegisterPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID,
+                 timeout, priority, subtree,
+                 rangeSubid=0, upperBound=None, context=None):
+        AgentXPDU.__init__(self, PDU_REGISTER,
+                           bigEndian, sID, tactID, pktID, context)
+        self.timeout = timeout
+        self.priority = priority
+        self.subtree = subtree
+        self.rangeSubid = rangeSubid
+        self.upperBound = upperBound
+        self.instReg = True  # so we don't have to write two encode()s
+
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.timeout != other.timeout:
+            return False
+        if self.priority != other.priority:
+            return False
+        if self.subtree != other.subtree:
+            return False
+        if self.rangeSubid != other.rangeSubid:
+            return False
+        if self.upperBound != other.upperBound:
+            return False
+        return True
 
-def encode_getpdu_core(isnext, sID, tactID, pktID, oidranges, context=None):
-    if isnext is True:
-        pkttype = PDU_GET_NEXT
-        nullTerm = False
-    else:
-        pkttype = PDU_GET
-        nullTerm = True
-    contextP, payload = encode_context(context)
-    payload += encode_searchrange_list(oidranges, nullTerm)
-    header = encode_pduheader(pkttype, False, False, False, contextP,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
-
-
-def decode_getpdu_core(isnext, data, flags):
-    context, data = decode_context(data, flags)
-    if isnext is True:
-        oidranges = decode_searchrange_list(data, flags)
+    def encode(self):
+        endianToken = getendian(self.bigEndian)
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        if self.pduType == PDU_REGISTER:
+            payload += struct.pack(endianToken + "BBBx", self.timeout,
+                                   self.priority, self.rangeSubid)
+        else:
+            payload += struct.pack(endianToken + "xBBx",
+                                   self.priority, self.rangeSubid)
+        payload += encode_oid(self.bigEndian, self.subtree, False)
+        if self.rangeSubid != 0:
+            if self.upperBound is None:
+                raise ValueError("upperBound must be set if rangeSubid is set")
+            payload += struct.pack(endianToken + "I", self.upperBound)
+        header = encode_pduheader(self.pduType, self.instReg, False, False,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        packet = header + payload
+        return packet
+
+
+class UnregisterPDU(RegisterPDU):  # These could inherit in either direction
+    def __init__(self, bigEndian, sID, tactID, pktID, priority, subtree,
+                 rangeSubid=0, upperBound=None, context=None):
+        RegisterPDU.__init__(self, bigEndian, sID, tactID, pktID,
+                             None, priority, subtree,
+                             rangeSubid, upperBound, context)
+        self.pduType = PDU_UNREGISTER
+        self.instReg = False
+
+
+def decode_xGetPDU(data, header):
+    flags = header["flags"]
+    context, data = decode_context(data, header)
+    if header["type"] == PDU_GET_NEXT:
+        oidranges = decode_searchrange_list(data, header)
+        result = GetNextPDU(flags["bigEndian"], header["session_id"],
+                            header["transaction_id"], header["packet_id"],
+                            oidranges, context)
     else:
-        oidranges, data = decode_searchrange_list_nullterm(data, flags)
-    result = {"context": context, "oidranges": oidranges}
+        oidranges, data = decode_searchrange_list_nullterm(data, header)
+        result = GetPDU(flags["bigEndian"], header["session_id"],
+                        header["transaction_id"], header["packet_id"],
+                        oidranges, context)
     return result
 
 
-def encode_getpdu(sID, tactID, pktID, oidranges, context=None):
-    return encode_getpdu_core(False, sID, tactID, pktID, oidranges, context)
-
+class GetPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID, oidranges, context=None):
+        AgentXPDU.__init__(self, PDU_GET,
+                           bigEndian, sID, tactID, pktID, context)
+        self.oidranges = oidranges
+        self.nullTerm = True
 
-def encode_getnextpdu(sID, tactID, pktID, oidranges, context=None):
-    return encode_getpdu_core(True, sID, tactID, pktID, oidranges, context)
-
-
-def decode_getpdu(data, flags):
-    return decode_getpdu_core(False, data, flags)
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.oidranges != other.oidranges:
+            return False
+        return True
 
+    def encode(self):
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        payload += encode_searchrange_list(self.bigEndian,
+                                           self.oidranges, self.nullTerm)
+        header = encode_pduheader(self.pduType, False, False, False,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
+
+
+class GetNextPDU(GetPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID, oidranges, context=None):
+        GetPDU.__init__(self, bigEndian, sID, tactID, pktID,
+                        oidranges, context)
+        self.pduType = PDU_GET_NEXT
+        self.nullTerm = False
+
+
+def decode_GetBulkPDU(data, header):
+    flags = header["flags"]
+    endianToken = getendian(flags["bigEndian"])
+    context, data = decode_context(data, header)
+    temp, data = slicedata(data, 4)
+    nonReps, maxReps = struct.unpack(endianToken + "HH", temp)
+    oidranges = decode_searchrange_list(data, header)
+    result = GetBulkPDU(flags["bigEndian"], header["session_id"],
+                        header["transaction_id"], header["packet_id"],
+                        nonReps, maxReps, oidranges, context)
+    return result
 
-def decode_getnextpdu(data, flags):
-    return decode_getpdu_core(True, data, flags)
 
+class GetBulkPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID,
+                 nonReps, maxReps, oidranges, context=None):
+        AgentXPDU.__init__(self, PDU_GET_BULK,
+                           bigEndian, sID, tactID, pktID, context)
+        self.nonReps = nonReps
+        self.maxReps = maxReps
+        self.oidranges = oidranges
+
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.nonReps != other.nonReps:
+            return False
+        if self.maxReps != other.maxReps:
+            return False
+        if self.oidranges != other.oidranges:
+            return False
+        return True
 
-def encode_getbulkpdu(sID, tactID, pktID, nonReps, maxReps,
-                      oidranges, context=None):
-    contextP, payload = encode_context(context)
-    payload += struct.pack(">HH", nonReps, maxReps)
-    payload += encode_searchrange_list(oidranges)
-    header = encode_pduheader(PDU_GET_BULK, False, False, False, contextP,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
+    def encode(self):
+        endianToken = getendian(self.bigEndian)
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        payload += struct.pack(endianToken + "HH", self.nonReps, self.maxReps)
+        payload += encode_searchrange_list(self.bigEndian,
+                                           self.oidranges, False)
+        header = encode_pduheader(self.pduType, False, False, False,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
+
+
+def decode_TestSetPDU(data, header):
+    flags = header["flags"]
+    context, data = decode_context(data, header)
+    varbinds = decode_varbindlist(data, header)
+    result = TestSetPDU(flags["bigEndian"], header["session_id"],
+                        header["transaction_id"], header["packet_id"],
+                        varbinds, context)
+    return result
 
 
-def decode_getbulkpdu(data, flags):
-    endianToken = getendian(flags)
-    context, data = decode_context(data, flags)
-    temp, data = slicedata(data, 4)
-    nonReps, maxReps = struct.unpack(endianToken + "HH", temp)
-    oidranges = decode_searchrange_list(data, flags)
-    result = {"context": context, "non_reps": nonReps, "max_reps": maxReps,
-              "oidranges": oidranges}
-    return result
+class TestSetPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID, varbinds, context=None):
+        AgentXPDU.__init__(self, PDU_TEST_SET,
+                           bigEndian, sID, tactID, pktID, context)
+        self.varbinds = varbinds
 
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.varbinds != other.varbinds:
+            return False
+        return True
 
-def encode_testsetpdu(sID, tactID, pktID, varbinds, context=None):
-    contextP, payload = encode_context(context)
-    payload += encode_varbindlist(varbinds)
-    header = encode_pduheader(PDU_TEST_SET, False, False, False, contextP,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
+    def encode(self):
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        payload += encode_varbindlist(self.bigEndian, self.varbinds)
+        header = encode_pduheader(self.pduType, False, False, False,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
 
 
-def decode_testsetpdu(data, flags):
-    context, data = decode_context(data, flags)
-    varbinds = decode_varbindlist(data, flags)
-    result = {"context": context, "varbinds": varbinds}
+def decode_CommitSetPDU(data, header):
+    flags = header["flags"]
+    result = CommitSetPDU(flags["bigEndian"], header["session_id"],
+                          header["transaction_id"], header["packet_id"])
     return result
 
 
-# CommitSet, UndoSet, and CleanupSet are bare headers. Don't need decoders
+class CommitSetPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID):
+        AgentXPDU.__init__(self, PDU_COMMIT_SET,
+                           bigEndian, sID, tactID, pktID)
 
-def encode_commitsetpdu(sID, tactID, pktID):
-    return encode_pduheader(PDU_COMMIT_SET, False, False, False, False,
-                            sID, tactID, pktID, 0)
+    def encode(self):
+        header = encode_pduheader(self.pduType,
+                                  False, False, False, False, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, 0)
+        return header
 
 
-def encode_undosetpdu(sID, tactID, pktID):
-    return encode_pduheader(PDU_UNDO_SET, False, False, False, False,
-                            sID, tactID, pktID, 0)
+def decode_UndoSetPDU(data, header):
+    flags = header["flags"]
+    result = UndoSetPDU(flags["bigEndian"], header["session_id"],
+                        header["transaction_id"], header["packet_id"])
+    return result
 
 
-def encode_cleanupsetpdu(sID, tactID, pktID):
-    return encode_pduheader(PDU_CLEANUP_SET, False, False, False, False,
-                            sID, tactID, pktID, 0)
+class UndoSetPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID):
+        AgentXPDU.__init__(self, PDU_UNDO_SET,
+                           bigEndian, sID, tactID, pktID)
 
+    def encode(self):
+        header = encode_pduheader(self.pduType,
+                                  False, False, False, False, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, 0)
+        return header
 
-def encode_pingpdu(sID, tactID, pktID, context=None):
-    contextP, payload = encode_context(context)
-    header = encode_pduheader(PDU_PING, False, False, False, contextP,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
 
+def decode_CleanupSetPDU(data, header):
+    flags = header["flags"]
+    result = CleanupSetPDU(flags["bigEndian"], header["session_id"],
+                           header["transaction_id"], header["packet_id"])
+    return result
 
-def decode_pingpdu(data, flags):
-    context, data = decode_context(data, flags)
-    return {"context": context}
 
+class CleanupSetPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID):
+        AgentXPDU.__init__(self, PDU_CLEANUP_SET,
+                           bigEndian, sID, tactID, pktID)
 
-def encode_notifypdu(sID, tactID, pktID, varbinds, context=None):
-    contextP, payload = encode_context(context)
-    payload += encode_varbindlist(varbinds)
-    header = encode_pduheader(PDU_NOTIFY, False, False, False, contextP,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
+    def encode(self):
+        header = encode_pduheader(self.pduType,
+                                  False, False, False, False, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, 0)
+        return header
 
 
-def decode_notifypdu(data, flags):
-    context, data = decode_context(data, flags)
-    varbinds = decode_varbindlist(data, flags)
-    result = {"context": context, "varbinds": varbinds}
+def decode_PingPDU(data, header):
+    flags = header["flags"]
+    context, data = decode_context(data, header)
+    result = PingPDU(flags["bigEndian"], header["session_id"],
+                     header["transaction_id"], header["packet_id"],
+                     context)
     return result
 
 
-def encode_indexallocpdu_core(isdealloc,
-                              sID, tactID, pktID, newIndex, anyIndex,
-                              varbinds, context=None):
-    pkttype = PDU_INDEX_DEALLOC if isdealloc is True else PDU_INDEX_ALLOC
-    contextP, payload = encode_context(context)
-    payload += encode_varbindlist(varbinds)
-    header = encode_pduheader(pkttype, False, newIndex, anyIndex,
-                              contextP, sID, tactID, pktID, len(payload))
-    return header + payload
+class PingPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID, context=None):
+        AgentXPDU.__init__(self, PDU_PING,
+                           bigEndian, sID, tactID, pktID, context)
 
+    def encode(self):
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        header = encode_pduheader(self.pduType, False, False, False,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
 
-def encode_indexallocpdu(sID, tactID, pktID, newIndex, anyIndex,
-                         varbinds, context=None):
-    return encode_indexallocpdu_core(False,
-                                     sID, tactID, pktID, newIndex, anyIndex,
-                                     varbinds, context)
+
+def decode_NotifyPDU(data, header):
+    flags = header["flags"]
+    context, data = decode_context(data, header)
+    varbinds = decode_varbindlist(data, header)
+    result = NotifyPDU(flags["bigEndian"], header["session_id"],
+                       header["transaction_id"], header["packet_id"],
+                       varbinds, context)
+    return result
 
 
-def encode_indexdeallocpdu(sID, tactID, pktID, newIndex, anyIndex,
-                           varbinds, context=None):
-    return encode_indexallocpdu_core(True,
-                                     sID, tactID, pktID, newIndex, anyIndex,
-                                     varbinds, context)
+class NotifyPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID, varbinds, context=None):
+        AgentXPDU.__init__(self, PDU_NOTIFY,
+                           bigEndian, sID, tactID, pktID, context)
+        self.varbinds = varbinds
 
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.varbinds != other.varbinds:
+            return False
+        return True
 
-def decode_indexallocpdu(data, flags):
-    context, data = decode_context(data, flags)
-    varbinds = decode_varbindlist(data, flags)
-    result = {"context": context, "varbinds": varbinds}
+    def encode(self):
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        payload += encode_varbindlist(self.bigEndian, self.varbinds)
+        header = encode_pduheader(self.pduType, False, False, False,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
+
+
+def decode_xIndexAllocPDU(data, header):
+    flags = header["flags"]
+    context, data = decode_context(data, header)
+    varbinds = decode_varbindlist(data, header)
+    isalloc = (header["type"] == PDU_INDEX_ALLOC)
+    pdu = IndexAllocPDU if isalloc else IndexDeallocPDU
+    result = pdu(flags["bigEndian"], header["session_id"],
+                 header["transaction_id"], header["packet_id"],
+                 flags["newIndex"], flags["anyIndex"],
+                 varbinds, context)
     return result
 
 
-decode_indexdeallocpdu = decode_indexallocpdu
+class IndexAllocPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID,
+                 newIndex, anyIndex, varbinds, context=None):
+        AgentXPDU.__init__(self, PDU_INDEX_ALLOC,
+                           bigEndian, sID, tactID, pktID, context)
+        self.newIndex = newIndex
+        self.anyIndex = anyIndex
+        self.varbinds = varbinds
+
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.newIndex != other.newIndex:
+            return False
+        if self.anyIndex != other.anyIndex:
+            return False
+        if self.varbinds != other.varbinds:
+            return False
+        return True
 
+    def encode(self):
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        payload += encode_varbindlist(self.bigEndian, self.varbinds)
+        header = encode_pduheader(self.pduType,
+                                  False, self.newIndex, self.anyIndex,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
+
+
+class IndexDeallocPDU(IndexAllocPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID,
+                 newIndex, anyIndex, varbinds, context=None):
+        IndexAllocPDU.__init__(self, bigEndian, sID, tactID, pktID,
+                               newIndex, anyIndex, varbinds, context)
+        self.pduType = PDU_INDEX_DEALLOC
+
+
+def decode_AddAgentCapsPDU(data, header):
+    flags = header["flags"]
+    context, data = decode_context(data, header)
+    oid, data = decode_oid(data, header)
+    descr = decode_octetstr(data, header)[0]
+    result = AddAgentCapsPDU(flags["bigEndian"], header["session_id"],
+                             header["transaction_id"], header["packet_id"],
+                             oid, descr, context)
+    return result
 
-def encode_addagentcapspdu(sID, tactID, pktID, oid, descr, context=None):
-    contextP, payload = encode_context(context)
-    payload += encode_oid(oid, False)
-    payload += encode_octetstr(descr)
-    header = encode_pduheader(PDU_ADD_AGENT_CAPS,
-                              False, False, False, contextP,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
 
+class AddAgentCapsPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID,
+                 oid, description, context=None):
+        AgentXPDU.__init__(self, PDU_ADD_AGENT_CAPS,
+                           bigEndian, sID, tactID, pktID, context)
+        self.oid = oid
+        self.description = description
+
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.oid != other.oid:
+            return False
+        if self.description != other.description:
+            return False
+        return True
 
-def decode_addagentcapspdu(data, flags):
-    context, data = decode_context(data, flags)
-    oid, data = decode_oid(data, flags)
-    descr = decode_octetstr(data, flags)[0]
-    result = {"context": context, "oid": oid, "description": descr}
+    def encode(self):
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        payload += encode_oid(self.bigEndian, self.oid, False)
+        payload += encode_octetstr(self.bigEndian, self.description)
+        header = encode_pduheader(self.pduType, False, False, False,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
+
+
+def decode_RMAgentCapsPDU(data, header):
+    flags = header["flags"]
+    context, data = decode_context(data, header)
+    oid, data = decode_oid(data, header)
+    result = RMAgentCapsPDU(flags["bigEndian"], header["session_id"],
+                            header["transaction_id"], header["packet_id"],
+                            oid, context)
     return result
 
 
-def encode_rmagentcapspdu(sID, tactID, pktID, oid, context=None):
-    contextP, payload = encode_context(context)
-    payload += encode_oid(oid, False)
-    header = encode_pduheader(PDU_RM_AGENT_CAPS,
-                              False, False, False, contextP,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
-
-
-def decode_rmagentcapspdu(data, flags):
-    context, data = decode_context(data, flags)
-    oid, data = decode_oid(data, flags)
-    result = {"context": context, "oid": oid}
-    return result
+class RMAgentCapsPDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID, oid, context=None):
+        AgentXPDU.__init__(self, PDU_RM_AGENT_CAPS,
+                           bigEndian, sID, tactID, pktID, context)
+        self.oid = oid
 
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.oid != other.oid:
+            return False
+        return True
 
-def encode_responsepdu(sID, tactID, pktID,
-                       sysUpTime, resError, resIndex,
-                       varbinds=None):
-    payload = struct.pack(">IHH", sysUpTime, resError, resIndex)
-    if varbinds is not None:
-            payload += encode_varbindlist(varbinds)
-    header = encode_pduheader(PDU_RESPONSE, False, False, False, False,
-                              sID, tactID, pktID, len(payload))
-    return header + payload
+    def encode(self):
+        contextP, payload = encode_context(self.bigEndian, self.context)
+        payload += encode_oid(self.bigEndian, self.oid, False)
+        header = encode_pduheader(self.pduType, False, False, False,
+                                  contextP, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
 
 
-def decode_responsepdu(data, flags):
-    endianToken = getendian(flags)
+def decode_ResponsePDU(data, header):
+    flags = header["flags"]
+    endianToken = getendian(flags["bigEndian"])
     temp, data = slicedata(data, 8)
-    sysUpTime, resError, resIndex = struct.unpack(endianToken + "IHH", temp)
+    sysUptime, resError, resIndex = struct.unpack(endianToken + "IHH", temp)
     if len(data) > 0:
-        varbinds = decode_varbindlist(data, flags)
+        varbinds = decode_varbindlist(data, header)
     else:
         varbinds = None
-    result = {"sys_uptime": sysUpTime, "res_err": resError,
-              "res_index": resIndex, "varbinds": varbinds}
+    result = ResponsePDU(flags["bigEndian"], header["session_id"],
+                         header["transaction_id"], header["packet_id"],
+                         sysUptime, resError, resIndex, varbinds)
     return result
 
 
+class ResponsePDU(AgentXPDU):
+    def __init__(self, bigEndian, sID, tactID, pktID,
+                 sysUptime, resError, resIndex, varbinds=None):
+        AgentXPDU.__init__(self, PDU_RESPONSE, bigEndian, sID, tactID, pktID)
+        self.sysUptime = sysUptime
+        self.resError = resError
+        self.resIndex = resIndex
+        self.varbinds = varbinds
+
+    def __eq__(self, other):
+        if AgentXPDU.__eq__(self, other) is not True:
+            return False
+        if self.sysUptime != other.sysUptime:
+            return False
+        if self.resError != other.resError:
+            return False
+        if self.resIndex != other.resIndex:
+            return False
+        if self.varbinds != other.varbinds:
+            return False
+        return True
+
+    def encode(self):
+        endianToken = getendian(self.bigEndian)
+        payload = struct.pack(endianToken + "IHH", self.sysUptime,
+                              self.resError, self.resIndex)
+        if self.varbinds is not None:
+            payload += encode_varbindlist(self.bigEndian, self.varbinds)
+        header = encode_pduheader(self.pduType,
+                                  False, False, False, False, self.bigEndian,
+                                  self.sessionID, self.transactionID,
+                                  self.packetID, len(payload))
+        return header + payload
+
+
 # ========================================================================
 #
 # Data type encoders / decoders
@@ -361,7 +660,7 @@ def decode_responsepdu(data, flags):
 #
 # ========================================================================
 
-def encode_oid(subids, include):
+def encode_oid(bigEndian, subids, include):
     subids = tuple(subids)
     numsubs = len(subids)
     if not (0 <= numsubs <= 128):  # Packet can have a maximum of 128 sections
@@ -376,13 +675,15 @@ def encode_oid(subids, include):
         prefix = 0
     n_subid = len(subids)
     include = int(bool(include))  # force integer bool
-    body = struct.pack(">BBBx", n_subid, prefix, include)
+    endianToken = getendian(bigEndian)
+    body = struct.pack(endianToken + "BBBx", n_subid, prefix, include)
     for subid in subids:
-        body += struct.pack(">I", subid)
+        body += struct.pack(endianToken + "I", subid)
     return body
 
 
-def decode_oid(data, flags):
+def decode_oid(data, header):
+    flags = header["flags"]
     # Need to split off the header to get the subid count
     header, data = slicedata(data, 4)
     n_subid, prefix, include = struct.unpack("BBBx", header)
@@ -397,16 +698,17 @@ def decode_oid(data, flags):
     # Now have the count, need to split the subids from the rest of the packet
     byteCount = n_subid * 4
     data, rest = slicedata(data, byteCount)
-    endianToken = getendian(flags)
+    endianToken = getendian(flags["bigEndian"])
     formatString = endianToken + ("I" * n_subid)
     subids += struct.unpack(formatString, data)
     result = {"subids": subids, "include": include}
     return (result, rest)
 
 
-def encode_octetstr(octets):
+def encode_octetstr(bigEndian, octets):
     numoctets = len(octets)
-    header = struct.pack(">I", numoctets)
+    endianToken = getendian(bigEndian)
+    header = struct.pack(endianToken + "I", numoctets)
     pad = (numoctets % 4)
     if pad > 0:  # Pad out the data to word boundary
         pad = 4 - pad
@@ -420,9 +722,10 @@ def encode_octetstr(octets):
     return data
 
 
-def decode_octetstr(data, flags):
+def decode_octetstr(data, header):
+    flags = header["flags"]
     header, data = slicedata(data, 4)
-    endianToken = getendian(flags)
+    endianToken = getendian(flags["bigEndian"])
     numoctets = struct.unpack(endianToken + "I", header)[0]
     if len(data) < numoctets:
         raise ValueError("Octet string shorter than length")
@@ -432,112 +735,118 @@ def decode_octetstr(data, flags):
     return data[:numoctets], data[numoctets + pad:]
 
 
-def encode_varbind(valType, oid, *payload):
+def encode_varbind(bigEndian, valType, oid, *payload):
     if valType not in definedValueTypes.keys():
         raise ValueError("Value type %s not in defined types" % valType)
-    header = struct.pack(">Hxx", valType)
-    name = encode_oid(oid, False)
+    endianToken = getendian(bigEndian)
+    header = struct.pack(endianToken + "Hxx", valType)
+    name = encode_oid(bigEndian, oid, False)
     handlers = definedValueTypes[valType]
     if handlers is None:
         data = header + name
     else:
-        data = header + name + handlers[0](*payload)
+        data = header + name + handlers[0](bigEndian, *payload)
     return data
 
 
-def decode_varbind(data, flags):
-    header, data = slicedata(data, 4)
-    endianToken = getendian(flags)
-    valType = struct.unpack(endianToken + "Hxx", header)[0]
-    name, data = decode_oid(data, flags)
+def decode_varbind(data, header):
+    flags = header["flags"]
+    bindheader, data = slicedata(data, 4)
+    endianToken = getendian(flags["bigEndian"])
+    valType = struct.unpack(endianToken + "Hxx", bindheader)[0]
+    name, data = decode_oid(data, header)
     if valType not in definedValueTypes.keys():
         raise ValueError("Value type %s not in defined types" % valType)
     handlers = definedValueTypes[valType]
-    payload, data = handlers[1](data, flags)
+    payload, data = handlers[1](data, header)
     result = {"type": valType, "name": name, "data": payload}
     return result, data
 
 
-def encode_integer32(num):
-    return struct.pack(">I", num)
+def encode_integer32(bigEndian, num):
+    endianToken = getendian(bigEndian)
+    return struct.pack(endianToken + "I", num)
 
 
-def decode_integer32(data, flags):
-    endianToken = getendian(flags)
+def decode_integer32(data, header):
+    flags = header["flags"]
+    endianToken = getendian(flags["bigEndian"])
     num, data = slicedata(data, 4)
     num = struct.unpack(endianToken + "I", num)[0]
     return (num, data)
 
 
-def encode_nullvalue():
+def encode_nullvalue(bigEndian):
     return ""
 
 
-def decode_nullvalue(data, flags):
+def decode_nullvalue(data, header):
     return (None, data)
 
 
-def encode_integer64(num):
-    return struct.pack(">Q", num)
+def encode_integer64(bigEndian, num):
+    endianToken = getendian(bigEndian)
+    return struct.pack(endianToken + "Q", num)
 
 
-def decode_integer64(data, flags):
-    endianToken = getendian(flags)
+def decode_integer64(data, header):
+    flags = header["flags"]
+    endianToken = getendian(flags["bigEndian"])
     num, data = slicedata(data, 8)
     num = struct.unpack(endianToken + "Q", num)[0]
     return (num, data)
 
 
-def encode_ipaddr(octets):
+def encode_ipaddr(bigEndian, octets):
     if len(octets) != 4:
         raise ValueError("IP Address incorrect length")
-    return encode_octetstr(octets)
+    return encode_octetstr(bigEndian, octets)
 
 
-def decode_ipaddr(data, flags):
-    addr, data = decode_octetstr(data, flags)
+def decode_ipaddr(data, header):
+    addr, data = decode_octetstr(data, header)
     addr = struct.unpack("BBBB", addr)
     return addr, data
 
 
-def encode_searchrange(startOID, endOID, inclusiveP):
-    startOIDstr = encode_oid(startOID, inclusiveP)
-    endOIDstr = encode_oid(endOID, False)
+def encode_searchrange(bigEndian, startOID, endOID, inclusiveP):
+    startOIDstr = encode_oid(bigEndian, startOID, inclusiveP)
+    endOIDstr = encode_oid(bigEndian, endOID, False)
     return startOIDstr + endOIDstr
 
 
-def decode_searchrange(data, flags):
-    startOID, data = decode_oid(data, flags)
-    endOID, data = decode_oid(data, flags)
+def decode_searchrange(data, header):
+    startOID, data = decode_oid(data, header)
+    endOID, data = decode_oid(data, header)
     result = {"start": startOID, "end": endOID}
     return result, data
 
 
-def encode_searchrange_list(oidranges, nullTerminate=False):
+def encode_searchrange_list(bigEndian, oidranges, nullTerminate=False):
     encoded = []
     for oran in oidranges:
-        encoded.append(encode_searchrange(*oran))
+        encoded.append(encode_searchrange(bigEndian, *oran))
     if nullTerminate:
-        encoded.append(encode_oid(tuple(), False))
+        encoded.append(encode_oid(bigEndian, tuple(), False))
     encoded = "".join(encoded)
     return encoded
 
 
-def decode_searchrange_list(data, flags):  # Cannot handle extra data
+def decode_searchrange_list(data, header):  # Cannot handle extra data
     oidranges = []
     while len(data) > 0:
-        oids, data = decode_searchrange(data, flags)
+        oids, data = decode_searchrange(data, header)
         oidranges.append(oids)
     return tuple(oidranges)
 
 
-def decode_searchrange_list_nullterm(data, flags):
+def decode_searchrange_list_nullterm(data, header):
     oidranges = []
     while len(data) > 0:
-        one, data = decode_oid(data, flags)
+        one, data = decode_oid(data, header)
         if isnullOID(one):
             break
-        two, data = decode_oid(data, flags)
+        two, data = decode_oid(data, header)
         oidranges.append({"start": one, "end": two})
     return tuple(oidranges), data
 
@@ -546,8 +855,9 @@ def decode_searchrange_list_nullterm(data, flags):
 # Utilities, glue, and misc
 # =========================================
 
-def getendian(flags):
-    return ">" if flags["bigEndian"] is True else "<"
+
+def getendian(bigEndian):
+    return ">" if bigEndian is True else "<"
 
 
 def slicedata(data, slicepoint):
@@ -560,18 +870,18 @@ def isnullOID(oid):
     return False
 
 
-def encode_varbindlist(varbinds):
+def encode_varbindlist(bigEndian, varbinds):
     payload = ""
     for varbind in varbinds:
-        payload += encode_varbind(*varbind)
+        payload += encode_varbind(bigEndian, *varbind)
     return payload
 
 
-def decode_varbindlist(data, flags):
+def decode_varbindlist(data, header):
     if len(data) > 0:
         varbinds = []
         while len(data) > 0:
-            vb, data = decode_varbind(data, flags)
+            vb, data = decode_varbind(data, header)
             varbinds.append(vb)
         varbinds = tuple(varbinds)
     else:
@@ -580,8 +890,8 @@ def decode_varbindlist(data, flags):
 
 
 def encode_pduheader(pduType, instanceRegistration, newIndex,
-                     anyIndex, nonDefaultContext, sessionID,
-                     transactionID, packetID, payloadLength):
+                     anyIndex, nonDefaultContext, bigEndian,
+                     sessionID, transactionID, packetID, payloadLength):
     # version type flags reserved
     # sessionID
     # transactionID
@@ -594,12 +904,12 @@ def encode_pduheader(pduType, instanceRegistration, newIndex,
     flags |= newIndex << 1
     flags |= anyIndex << 2
     flags |= nonDefaultContext << 3
-    flags |= True << 4  # byte order, we always send network / big endian
-    data = struct.pack(">BBBxIIII", 1, pduType, flags,
-                       sessionID,
-                       transactionID,
-                       packetID,
-                       payloadLength)
+    flags |= bigEndian << 4
+    # Yes, this is a kluge, it is less problematic then the next best kluge
+    endianToken = getendian(bigEndian)
+    data = struct.pack(endianToken + "BBBxIIII", 1, pduType, flags,
+                       sessionID, transactionID,
+                       packetID, payloadLength)
     return data
 
 
@@ -617,7 +927,7 @@ def decode_pduheader(data):  # Endianess is controlled from the PDU header
     flagDict["bigEndian"] = bool(flags & 0x10)
     # Chop the remaining parts of the header from the rest of the datastream
     # then parse them
-    fmt = getendian(flagDict) + "IIII"
+    fmt = getendian(flagDict["bigEndian"]) + "IIII"
     linen, data = slicedata(data, 16)  # 4 x 4-byte variables
     sID, tactionID, pktID, dataLen = struct.unpack(fmt, linen)
     result = {"version": version, "type": pduType, "flags": flagDict,
@@ -626,19 +936,20 @@ def decode_pduheader(data):  # Endianess is controlled from the PDU header
     return result
 
 
-def encode_context(context):
+def encode_context(bigEndian, context):
     if context is not None:
         contextP = True
-        payload = encode_octetstr(context)
+        payload = encode_octetstr(bigEndian, context)
     else:
         contextP = False
         payload = ""
     return (contextP, payload)
 
 
-def decode_context(data, flags):
+def decode_context(data, header):
+    flags = header["flags"]
     if flags["contextP"] is True:
-        context, data = decode_octetstr(data, flags)
+        context, data = decode_octetstr(data, header)
     else:
         context = None
     return (context, data)
@@ -659,9 +970,8 @@ def decode_packet(data):
         parsedPkt = None
     else:
         packetSlice, newData = slicedata(newData, header["length"])
-        parsedPkt = decoder(packetSlice, header["flags"])
-    result = {"header": header, "body": parsedPkt}
-    return result, newData
+        parsedPkt = decoder(packetSlice, header)
+    return parsedPkt, newData
 
 
 # Value types
@@ -713,24 +1023,24 @@ PDU_ADD_AGENT_CAPS = 16
 PDU_RM_AGENT_CAPS = 17
 PDU_RESPONSE = 18
 definedPDUTypes = {  # Used by the decode_packet function
-    PDU_OPEN: decode_openpdu,
-    PDU_CLOSE: decode_closepdu,
-    PDU_REGISTER: decode_registerpdu,
-    PDU_UNREGISTER: decode_unregisterpdu,
-    PDU_GET: decode_getpdu,
-    PDU_GET_NEXT: decode_getnextpdu,
-    PDU_GET_BULK: decode_getbulkpdu,
-    PDU_TEST_SET: decode_testsetpdu,
-    PDU_COMMIT_SET: None,
-    PDU_UNDO_SET: None,
-    PDU_CLEANUP_SET: None,
-    PDU_NOTIFY: decode_notifypdu,
-    PDU_PING: decode_pingpdu,
-    PDU_INDEX_ALLOC: decode_indexallocpdu,
-    PDU_INDEX_DEALLOC: decode_indexdeallocpdu,
-    PDU_ADD_AGENT_CAPS: decode_addagentcapspdu,
-    PDU_RM_AGENT_CAPS: decode_rmagentcapspdu,
-    PDU_RESPONSE: decode_responsepdu}
+    PDU_OPEN: decode_OpenPDU,
+    PDU_CLOSE: decode_ClosePDU,
+    PDU_REGISTER: decode_xRegisterPDU,
+    PDU_UNREGISTER: decode_xRegisterPDU,
+    PDU_GET: decode_xGetPDU,
+    PDU_GET_NEXT: decode_xGetPDU,
+    PDU_GET_BULK: decode_GetBulkPDU,
+    PDU_TEST_SET: decode_TestSetPDU,
+    PDU_COMMIT_SET: decode_CommitSetPDU,
+    PDU_UNDO_SET: decode_UndoSetPDU,
+    PDU_CLEANUP_SET: decode_CleanupSetPDU,
+    PDU_NOTIFY: decode_NotifyPDU,
+    PDU_PING: decode_PingPDU,
+    PDU_INDEX_ALLOC: decode_xIndexAllocPDU,
+    PDU_INDEX_DEALLOC: decode_xIndexAllocPDU,
+    PDU_ADD_AGENT_CAPS: decode_AddAgentCapsPDU,
+    PDU_RM_AGENT_CAPS: decode_RMAgentCapsPDU,
+    PDU_RESPONSE: decode_ResponsePDU}
 
 # Closing reasons
 RSN_OTHER = 1


=====================================
tests/pylib/test_agentx.py
=====================================
The diff for this file was not included because it is too large.


View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/4369f7e495249b5b36a0ded5f75521f07133e350

---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/4369f7e495249b5b36a0ded5f75521f07133e350
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170728/53825d78/attachment.html>


More information about the vc mailing list