[Git][NTPsec/ntpsec][master] Changed error handling in decode_packet to return remaining data

Ian Bruene gitlab at mg.gitlab.com
Sun Oct 29 19:30:21 UTC 2017


Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
c326c208 by Ian Bruene at 2017-10-29T14:29:09-05:00
Changed error handling in decode_packet to return remaining data

- - - - -


2 changed files:

- pylib/agentx.py
- tests/pylib/test_agentx.py


Changes:

=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -91,6 +91,18 @@ prefixCount = len(internetPrefix)
 #
 # ==========================================================================
 
+# ==========================================================================
+#
+# Error classes
+#
+# ==========================================================================
+
+class ParseError(Exception):
+    def __init__(self, message, packetData="", remainingData=""):
+        Exception.__init__(self)
+        self.message = message
+        self.packetData = packetData
+        self.remainingData = remainingData
 
 # ==========================================================================
 #
@@ -1227,8 +1239,6 @@ def encode_pduheader(pduType, instanceRegistration, newIndex,
 def decode_pduheader(data):  # Endianness is controlled from the PDU header
     lineone, data = slicedata(data, 4)
     version, pduType, flags = struct.unpack(">BBBx", polybytes(lineone))
-    if pduType not in definedPDUTypes:
-        raise ValueError("PDU type %s not in defined types" % pduType)
     # Slice up the flags
     flagDict = decode_flagbyte(flags)
     # Chop the remaining parts of the header from the rest of the datastream
@@ -1262,21 +1272,30 @@ def decode_context(data, header):
 
 
 def decode_packet(data):
+    if len(data) < 20:
+        raise ParseError("Data too short for header")
     header, newData = slicedata(data, 20)
     header = decode_pduheader(header)
     if header["length"] > len(newData):
-        raise IndexError("Packet data too short")
+        raise ParseError("Packet data too short")
+    packetSlice, newData = slicedata(newData, header["length"])
     if header["version"] != 1:
-        raise ValueError("Unknown packet version", header["version"])
+        err = ParseError("Unknown packet version %i" % header["version"])
+        err.packetData = packetSlice
+        err.remainingData = newData
+        raise err
     pktType = header["type"]
     if pktType not in definedPDUTypes.keys():
-        raise ValueError("PDU type %s not in defined types" % pktType)
+        raise ParseError("PDU type %s not in defined types" % pktType)
     decoder = definedPDUTypes[pktType]
-    if decoder is None:
-        parsedPkt = None
-    else:
-        packetSlice, newData = slicedata(newData, header["length"])
+    try:
         parsedPkt = decoder(packetSlice, header)
+    except Exception as e:
+        err = ParseError("Body parsing error")
+        err.originalError = e
+        err.packetData = packetSlice
+        err.remainingData = newData
+        raise err
     return parsedPkt, newData
 
 


=====================================
tests/pylib/test_agentx.py
=====================================
--- a/tests/pylib/test_agentx.py
+++ b/tests/pylib/test_agentx.py
@@ -2459,16 +2459,46 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
                            b"\x00\x00\x00\x04\x00\x05\x00\x06"),
                          (x.ResponsePDU(True, 1, 2, 3, 4, 5, 6),
                           b""))
-        # Test insufficient data
+        # Test errors
+        # Test insufficient data for header
+        try:
+            self.assertEqual(f(b""), None)
+            errored = False
+        except x.ParseError as e:
+            errored = e
+        self.assertEqual(errored.message, "Data too short for header")
+        # Test insufficient data for packet
         try:
             f(b"\x01\x11\x10\x00"
               b"\x00\x00\x00\x01\x00\x00\x00\x02"
               b"\x00\x00\x00\x03\x00\x00\x00\x10"
               b"\x03\x00\x00\x00\x00\x00\x00\x04")
-            fail = False
-        except IndexError:
-            fail = True
-        self.assertEqual(fail, True)
+            errored = False
+        except x.ParseError as e:
+            errored = e
+        self.assertEqual(errored.message, "Packet data too short")
+        # Test wrong version
+        try:
+            f(b"\x02\x11\x10\x00"
+              b"\x00\x00\x00\x00\x00\x00\x00\x00"
+              b"\x00\x00\x00\x00\x00\x00\x00\x08"
+              b"blahblahjabber")
+            errored = False
+        except x.ParseError as e:
+            errored = e
+        self.assertEqual(errored.message, "Unknown packet version 2")
+        self.assertEqual(errored.packetData, "blahblah")
+        self.assertEqual(errored.remainingData, "jabber")
+        # Test unrecognized packet type
+        try:
+            f(b"\x01\xFF\x10\x00"
+              b"\x00\x00\x00\x00\x00\x00\x00\x00"
+              b"\x00\x00\x00\x00\x00\x00\x00\x08"
+              b"blahblah")
+            errored = False
+        except x.ParseError as e:
+            errored = e
+        self.assertEqual(errored.message, "PDU type 255 not in defined types")
 
     def test_mibTree2List(self):
         x = ntp.agentx



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/c326c20807fc19dfebbe5144dfa0d178dba96faa

---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/c326c20807fc19dfebbe5144dfa0d178dba96faa
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20171029/e2af8758/attachment.html>


More information about the vc mailing list