[Git][NTPsec/ntpsec][master] 3 commits: Moved a few DUMMY / TODO labels around for better grepping.

Ian Bruene gitlab at mg.gitlab.com
Wed Jan 10 19:24:14 UTC 2018


Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
d3066ace by Ian Bruene at 2018-01-09T10:47:46-06:00
Moved a few DUMMY / TODO labels around for better grepping.

- - - - -
77c6dfd0 by Ian Bruene at 2018-01-09T10:48:16-06:00
Increased agentx.py coverage to 97%. (does not include py3 shims)

- - - - -
52c4a02c by Ian Bruene at 2018-01-10T13:22:46-06:00
Increased coverage/explicit ignoring of packet.py tests to 100%.

- - - - -


5 changed files:

- ntpclients/ntpsnmpd.py
- pylib/agentx.py
- pylib/packet.py
- tests/pylib/test_agentx.py
- tests/pylib/test_packet.py


Changes:

=====================================
ntpclients/ntpsnmpd.py
=====================================
--- a/ntpclients/ntpsnmpd.py
+++ b/ntpclients/ntpsnmpd.py
@@ -543,6 +543,7 @@ class DataSource:  # This will be broken up in future to be less NTP-specific
 
     def cbr_entNotifMessage(self, oid):  # DUMMY
         # utf8str
+        # Should probably either return null, or remove this handler.
         return ax.Varbind(ax.VALUE_OCTET_STR, oid, "jabber")
 
     #########################
@@ -911,8 +912,8 @@ class DataSource:  # This will be broken up in future to be less NTP-specific
                 control.sendNotify(vl)
                 self.sentNotifications += 1
 
-    def doNotifyConfigChange(self, control):
-        pass  # DUMMY
+    def doNotifyConfigChange(self, control):  # DUMMY
+        pass
 
     def doNotifyLeapSecondAnnounced(self, control):
         oldLeap = self.oldValues["leap"]
@@ -992,8 +993,7 @@ class DataSource:  # This will be broken up in future to be less NTP-specific
             return (adds, rms)
         return
 
-    def misc_getMode(self):
-        # DUMMY: not fully implemented
+    def misc_getMode(self):  # DUMMY: not fully implemented
         try:
             # Don't care about the data, this is a ploy to the the rstatus
             self.session.readvar(0, ["stratum"])


=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -188,8 +188,11 @@ class AgentXPDU:
             return False
         if self.packetID != other.packetID:
             return False
-        if self.context != other.context:
+        if self._hascontext != other._hascontext:
             return False
+        if self._hascontext is True:
+            if self.context != other.context:
+                return False
         return True
 
     def __ne__(self, other):
@@ -313,7 +316,7 @@ class RegisterPDU(AgentXPDU):
     def __eq__(self, other):
         if AgentXPDU.__eq__(self, other) is not True:
             return False
-        if hasattr(self, "timeout"):
+        if hasattr(self, "timeout"):  # Relevant to UnregisterPDU subclass
             if self.timeout != other.timeout:
                 return False
         if self.priority != other.priority:
@@ -1122,9 +1125,7 @@ class SearchRange:
         return not self.__eq__(other)
 
     def __repr__(self):
-        r = "SearchRange("
-        r += repr(self.start) + ", "
-        r += repr(self.end) + ")"
+        r = "SearchRange(%s, %s)" % (repr(self.start), repr(self.end))
         return r
 
     def sanity(self):


=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -220,7 +220,8 @@ import ntp.util
 
 master_encoding = 'latin-1'
 
-if str is bytes:  # Python 2
+if str is bytes:  # pragma: no cover
+    # Python 2
     polystr = str
     polybytes = bytes
     polyord = ord
@@ -233,7 +234,8 @@ if str is bytes:  # Python 2
     def make_wrapper(fp):
         return fp
 
-else:  # Python 3
+else:  # pragma: nocover
+    # Python 3
     import io
 
     def polystr(o):
@@ -359,8 +361,7 @@ class Packet:
         return self.li_vn_mode & 0x7
 
 
-class SyncException(BaseException):
-
+class SyncException(BaseException):  # pragma: no cover
     def __init__(self, message, errorcode=0):
         self.message = message
         self.errorcode = errorcode
@@ -854,7 +855,7 @@ class ControlSession:
                         if self.logfp is not None:
                             self.logfp.write("ntpq: ndp lookup failed, %s\n"
                                              % e2.strerror)
-            except AttributeError:
+            except AttributeError:  # pragma: no cover
                 if self.logfp is not None:
                     self.logfp.write(
                         "ntpq: API error, missing socket attributes\n")
@@ -911,7 +912,7 @@ class ControlSession:
                     key_id = 0
                 if key_id == 0 or key_id > MAX_KEYID:
                     raise ControlException(SERR_BADKEY)
-            except (SyntaxError, ValueError):
+            except (SyntaxError, ValueError):  # pragma: no cover
                 raise ControlException(SERR_BADKEY)
             self.keyid = key_id
 
@@ -938,7 +939,7 @@ class ControlSession:
             if self.logfp is not None:
                 self.logfp.write("Write to %s failed\n" % self.hostname)
             return -1
-        if (self.debug >= 5) and (self.logfp is not None):
+        if (self.debug >= 5) and (self.logfp is not None):  # pragma: no cover
             # special, not replacing with dolog()
             self.logfp.write("Request packet:\n")
             dump_hex_printable(xdata, self.logfp)
@@ -1047,7 +1048,8 @@ class ControlSession:
                     if timeo:
                         raise ControlException(SERR_TIMEOUT)
                 if timeo:
-                    if (self.debug >= 1) and (self.logfp is not None):
+                    if (self.debug >= 1) and \
+                       (self.logfp is not None):  # pragma: no cover
                         # special, not replacing with dolog()
                         self.logfp.write(
                             "ERR_INCOMPLETE: Received fragments:\n")
@@ -1060,7 +1062,7 @@ class ControlSession:
             warndbg("At %s, socket read begins\n" % time.asctime(), 4)
             try:
                 rawdata = polybytes(self.sock.recv(4096))
-            except socket.error:
+            except socket.error:  # pragma: no cover
                 # usually, errno 111: connection refused
                 raise ControlException(SERR_SOCKET)
 
@@ -1073,7 +1075,7 @@ class ControlSession:
 
             # Validate that packet header is sane, and the correct type
             valid = self.__validate_packet(rpkt, rawdata, opcode, associd)
-            if valid is False:
+            if valid is False:  # pragma: no cover
                 continue
 
             # Someday, perhaps, check authentication here
@@ -1085,7 +1087,9 @@ class ControlSession:
                 warn("Received count of 0 in non-final fragment\n")
                 continue
 
-            if seenlastfrag and rpkt.more():
+            if seenlastfrag and rpkt.more():  # pragma: no cover
+                # I'n not sure this can be triggered without hitting another
+                # error first.
                 warn("Received second last fragment\n")
                 continue
 
@@ -1144,13 +1148,14 @@ class ControlSession:
                     warndbg("Fragment collection ends. %d bytes "
                             " in %d fragments\n"
                             % (len(self.response), len(fragments)), 1)
-                    if self.debug >= 5:  # special, not replacing with dolog()
+                    # special loggers, not replacing with dolog()
+                    if self.debug >= 5:  # pragma: no cover
                         warn("Response packet:\n")
                         dump_hex_printable(self.response, self.logfp)
-                    elif self.debug >= 3:
+                    elif self.debug >= 3:  # pragma: no cover
                         # FIXME: Garbage when retrieving assoc list (binary)
                         warn("Response packet:\n%s\n" % repr(self.response))
-                    elif self.debug >= 2:
+                    elif self.debug >= 2:  # pragma: no cover
                         # FIXME: Garbage when retrieving assoc list (binary)
                         eol = self.response.find("\n")
                         firstline = self.response[:eol]
@@ -1591,7 +1596,7 @@ This combats source address spoofing
                         req_buf += incr
                 if direct is not None:
                     span.entries = []
-        except KeyboardInterrupt:
+        except KeyboardInterrupt:  # pragma: no cover
             pass        # We can test for interruption with is_complete()
 
         # C ntpq's code for stitching together spans was absurdly


=====================================
tests/pylib/test_agentx.py
=====================================
--- a/tests/pylib/test_agentx.py
+++ b/tests/pylib/test_agentx.py
@@ -71,14 +71,15 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
     # PDU tests
     #
     def test_AgentXPDU(self):
+        cls = ntp.agentx.AgentXPDU
         # Test these so we don't need a bunch of redundant tests
         # Test basic, without context
-        test = ntp.agentx.AgentXPDU(0, True, 1, 2, 3, context=extraData)
+        test = cls(0, True, 1, 2, 3, context=extraData)
         self.assertEqual(repr(test),
                          "AgentXPDU(bigEndian=True, packetID=3, "
                          "pduType=0, sessionID=1, transactionID=2)")
         # Test basic, with context
-        test = ntp.agentx.AgentXPDU(0, True, 1, 2, 3, hascontext=True)
+        test = cls(0, True, 1, 2, 3, hascontext=True)
         self.assertEqual(repr(test),
                          "AgentXPDU(bigEndian=True, context=None, packetID=3, "
                          "pduType=0, sessionID=1, transactionID=2)")
@@ -91,6 +92,39 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
                          "context='jabber jabber jabber', "
                          "foo=42, packetID=3, "
                          "pduType=0, sessionID=1, transactionID=2)")
+        # Test __eq__
+        a = cls(0, True, 1, 2, 3)
+        b = cls(0, True, 1, 2, 3)
+        # Test all equal
+        self.assertEqual(a == b, True)
+        # Test same class, false
+        b = "blah blah"
+        self.assertEqual(a == b, False)
+        # Test different pdu
+        b = cls(1, True, 1, 2, 3)
+        self.assertEqual(a == b, False)
+        # Test different endianess
+        b = cls(0, False, 1, 2, 3)
+        self.assertEqual(a == b, False)
+        # Test different session ID
+        b = cls(0, True, 42, 2, 3)
+        self.assertEqual(a == b, False)
+        # Test different transaction ID
+        b = cls(0, True, 1, 23, 3)
+        self.assertEqual(a == b, False)
+        # Test different packet ID
+        b = cls(0, True, 1, 2, 13)
+        self.assertEqual(a == b, False)
+        # Test different _hascontext
+        b = cls(0, True, 1, 2, 3, hascontext=True)
+        self.assertEqual(a == b, False)
+        # Test with context, equal
+        a = cls(0, True, 1, 2, 3, True, "foo")
+        b = cls(0, True, 1, 2, 3, True, "foo")
+        self.assertEqual(a == b, True)
+        # Test with context, not equal
+        b = cls(0, True, 1, 2, 3, True, "bar")
+        self.assertEqual(a == b, False)
 
     def test_OpenPDU(self):
         dec = ntp.agentx.decode_OpenPDU
@@ -310,6 +344,13 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
                           "rangeSubid": 0,
                           "upperBound": None,
                           "context": None})
+        # Test __eq__ gap, equal
+        a = cls(True, 1, 2, 3, 4, 5, ())
+        b = cls(True, 1, 2, 3, 4, 5, ())
+        self.assertEqual(a == b, True)
+        # Test __eq__ gap, unequal
+        b = cls(False, 1, 2, 3, 4, 5, ())
+        self.assertEqual(a == b, False)
 
     def test_UnregisterPDU(self):
         dec = ntp.agentx.decode_xRegisterPDU
@@ -1718,6 +1759,15 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
         # Test >=
         self.assertEqual(target((1, 2, 3)) >= target((1, 2)), True)
         self.assertEqual(target((1, 2, 3)) >= target((1, 2, 3)), True)
+        # Test insane subids type
+        try:
+            errored = target("foo")
+        except TypeError:
+            errored = True
+        self.assertEqual(errored, True)
+        # Test isNull
+        self.assertEqual(target((1, 2)).isNull(), False)
+        self.assertEqual(target(()).isNull(), True)
 
     def test_searchrange(self):
         target = ntp.agentx.SearchRange
@@ -1806,6 +1856,31 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
                              b"\x07\x00\x00\x00\x08\x00\x00\x00",
                              lilEndianFlags),
                          (target((1, 2, 3, 4), (5, 6, 7, 8), True), b""))
+        # Test __eq__
+        # Test equal
+        a = target((1, 2, 3), (1, 2, 3))
+        b = target((1, 2, 3), (1, 2, 3))
+        self.assertEqual(a == b, True)
+        # Test start unequal
+        b = target((1, 2, 3), (1, 2, 3), True)
+        self.assertEqual(a == b, False)
+        # Test end unequal
+        b = target((1, 2, 3), (1, 2, 3, 4))
+        self.assertEqual(a == b, False)
+        # Test __ne__
+        # Test equal
+        a = target((1, 2, 3), (1, 2, 3))
+        b = target((1, 2, 3), (1, 2, 3))
+        self.assertEqual(a != b, False)
+        # Test start unequal
+        b = target((1, 2, 3), (1, 2, 3), True)
+        self.assertEqual(a != b, True)
+        # Test end unequal
+        b = target((1, 2, 3), (1, 2, 3, 4))
+        self.assertEqual(a != b, True)
+        # Test __repr__
+        self.assertEqual(repr(target((1, 2), (1, 3))),
+                         "SearchRange(OID((1, 2), False), OID((1, 3), False))")
 
     def test_encode_searchrange_list(self):
         enc = ntp.agentx.encode_searchrange_list
@@ -1855,9 +1930,10 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
                          (srch((1, 2), (1, 2), True),
                           srch((2, 3), (3, 4), False)))
 
-    def test_encode_octetstr(self):
+    def test_xcode_octetstr(self):
         enc = ntp.agentx.encode_octetstr
         dec = ntp.agentx.decode_octetstr
+        san = ntp.agentx.sanity_octetstr
 
         # Encode empty
         self.assertEqual(enc(True, ()), b"\x00\x00\x00\x00")
@@ -1885,6 +1961,43 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
                              b"blarg\x00\x00\x00" + extraData,
                              standardFlags),
                          ("blarg", extraData))
+        # Test sanity
+        # Test str
+        try:
+            errored = san("foo")  # str is always sane
+        except Exception as e:
+            errored = e
+        self.assertEqual(errored, None)
+        # Test sane list
+        try:
+            errored = san([1, 2, 3])
+        except Exception as e:
+            errored = e
+        self.assertEqual(errored, None)
+        # Test sane tuple
+        try:
+            errored = san((1, 2, 3))
+        except Exception as e:
+            errored = e
+        self.assertEqual(errored, None)
+        # Test insane list
+        try:
+            errored = san([23, 300, 42])
+        except ValueError:
+            errored = True
+        self.assertEqual(errored, True)
+        # Test insane tuple
+        try:
+            errored = san((23, 300, 42))
+        except ValueError:
+            errored = True
+        self.assertEqual(errored, True)
+        # Test insane type
+        try:
+            errored = san(42.23)
+        except TypeError:
+            errored = True
+        self.assertEqual(errored, True)
 
     def test_Varbind(self):
         target = ntp.agentx.Varbind


=====================================
tests/pylib/test_packet.py
=====================================
--- a/tests/pylib/test_packet.py
+++ b/tests/pylib/test_packet.py
@@ -488,11 +488,19 @@ class TestSyncPacket(unittest.TestCase):
         cls.origin_timestamp = self.target.posix_to_ntp(1)
         cls.receive_timestamp = self.target.posix_to_ntp(2)
         cls.transmit_timestamp = self.target.posix_to_ntp(3)
+        # Test without extensions
         self.assertEqual(cls.__repr__(),
                          "<NTP:unsync:4:3:0.000000:0.000000:0.0.0.0:"
                          "1970-01-01T00:00:00Z:1970-01-01T00:00:01Z:"
                          "1970-01-01T00:00:02Z:1970-01-01T00:00:03Z>")
-
+        # Test with extensions
+        cls.extfields = "foobar"
+        cls.mac = "machinations"
+        self.assertEqual(cls.__repr__(),
+                         "<NTP:unsync:4:3:0.000000:0.000000:0.0.0.0:"
+                         "1970-01-01T00:00:00Z:1970-01-01T00:00:01Z:"
+                         "1970-01-01T00:00:02Z:1970-01-01T00:00:03Z:"
+                         "'foobar':machinations>")
 
 class TestMisc(unittest.TestCase):
     def test_Peer(self):
@@ -770,7 +778,7 @@ class TestControlSession(unittest.TestCase):
             cls.logfp = logjig
             # Test first type
             fakesockmod.gai_returns = [42]
-            result = cls._ControlSession__lookuphost("blah.com", "family")
+            result = cls._ControlSession__lookuphost("[blah.com]", "family")
             self.assertEqual(result, 42)
             self.assertEqual(fakesockmod.gai_calls,
                              [("blah.com", "ntp", cls.ai_family,
@@ -1071,6 +1079,20 @@ class TestControlSession(unittest.TestCase):
             cls.getresponse(1, 3, True)
             self.assertEqual(cls.response,
                              polybytes("foo=4223,blah=248,x=23,quux=1"))
+            # Test with data, duplicate packet
+            sockjig.return_data = [
+                "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
+                "foo=4223,\x00\x00\x00",
+                "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+                "blah=248,x=23,\x00\x00",
+                "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+                "blah=248,x=23,\x00\x00",
+                "\x0E\x81\x00\x01\x00\x02\x00\x03\x00\x17\x00\x06"
+                "quux=1\x00\x00"]
+            cls.sequence = 1
+            cls.getresponse(1, 3, True)
+            self.assertEqual(cls.response,
+                             polybytes("foo=4223,blah=248,x=23,quux=1"))
             # Test MAXFRAGS bail
             maxtemp = ntpp.MAXFRAGS
             ntpp.MAXFRAGS = 1
@@ -1571,18 +1593,24 @@ class TestControlSession(unittest.TestCase):
         nonce_fetch_count = [0]
         query_results = qrm[:]
         queries = []
-        result = cls.mrulist(variables={"sort": "addr", "frags": 24})
+        result = cls.mrulist(variables={"sort": "addr",
+                                        "frags": 24,
+                                        "resall":5})
         self.assertEqual(nonce_fetch_count, [4])
+        print(repr(queries))
         self.assertEqual(queries,
-                         [(10, 0, "nonce=foo, frags=24", False),
+                         [(10, 0, "nonce=foo, frags=24, resall=0x5", False),
                           (10, 0,
-                           "nonce=foo, frags=25, addr.0=1.2.3.4:23, last.0=40",
+                           "nonce=foo, frags=25, resall=0x5, "
+                           "addr.0=1.2.3.4:23, last.0=40",
                            False),
                           (10, 0,
-                           "nonce=foo, frags=26, addr.0=1.2.3.4:23, "
-                           "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+                           "nonce=foo, frags=26, resall=0x5, "
+                           "addr.0=1.2.3.4:23, last.0=41, addr.1=1.2.3.4:23, "
+                           "last.1=40", False),
                           (10, 0,
-                           "nonce=foo, frags=27, addr.0=10.20.30.40:23, "
+                           "nonce=foo, frags=27, resall=0x5, "
+                           "addr.0=10.20.30.40:23, "
                            "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
                            "addr.2=1.2.3.4:23, last.2=40", False)])
         self.assertEqual(isinstance(result, ntpp.MRUList), True)



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/d761d7748bec8bb2af14d7a48f790360fc30b64f...52c4a02c90bdf085f0c6f84f12060b46c58f9749

---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/d761d7748bec8bb2af14d7a48f790360fc30b64f...52c4a02c90bdf085f0c6f84f12060b46c58f9749
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20180110/225647de/attachment.html>


More information about the vc mailing list