[Git][NTPsec/ntpsec][master] 2 commits: Expanded packet.py tests to test error handling.

Ian Bruene gitlab at mg.gitlab.com
Wed Aug 23 01:14:39 UTC 2017


Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
ebc10102 by Ian Bruene at 2017-08-22T20:14:08-05:00
Expanded packet.py tests to test error handling.

- - - - -
6e5765c2 by Ian Bruene at 2017-08-22T20:14:08-05:00
Added aliases for frequently used objects

- - - - -


2 changed files:

- pylib/packet.py
- tests/pylib/test_packet.py


Changes:

=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -1464,10 +1464,6 @@ This combats source address spoofing
                             raise ControlException(SERR_STALL)
                         warndbg("--->   Restarting from the beginning, "
                                 "retry #%u\n" % restarted_count, 1)
-                    elif e.errorcode == ntp.control.CERR_UNKNOWNVAR:
-                        e.message = ("CERR_UNKNOWNVAR from ntpd but "
-                                     "no priors given.")
-                        raise e
                     elif e.errorcode == ntp.control.CERR_BADVALUE:
                         if cap_frags:
                             cap_frags = False


=====================================
tests/pylib/test_packet.py
=====================================
--- a/tests/pylib/test_packet.py
+++ b/tests/pylib/test_packet.py
@@ -15,6 +15,8 @@ import getpass
 
 odict = ntp.util.OrderedDict
 
+ntpp = ntp.packet
+ctlerr = ntp.packet.ControlException
 
 class FileJig:
     def __init__(self):
@@ -51,8 +53,12 @@ class SocketJig:
         self.closed = False
         self.connected = None
         self.fail_connect = False
+        self.fail_send = 0
 
     def sendall(self, data):
+        if self.fail_send > 0:
+            self.fail_send -= 1
+            raise socket.error()
         self.data.append(data)
 
     def close(self):
@@ -89,7 +95,7 @@ class SessionJig:
 
 
 class ControlPacketJig:
-    HEADER_LEN = ntp.packet.ControlPacket.HEADER_LEN
+    HEADER_LEN = ntpp.ControlPacket.HEADER_LEN
 
     def __init__(self, session, opcode, associd, data):
         self.session = session
@@ -195,10 +201,21 @@ class SelectModuleJig:
 
     def __init__(self):
         self.select_calls = []
+        self.select_fail = 0
+        self.do_return = []
 
     def select(self, ins, outs, excepts, timeout=0):
         self.select_calls.append((ins, outs, excepts, timeout))
-        return (ins, [], [])
+        if self.select_fail > 0:
+            self.select_fail -= 1
+            raise select.error
+        if len(self.do_return) == 0:  # simplify code that doesn't need it
+            self.do_return.append(True)
+        doreturn = self.do_return.pop(0)
+        if doreturn is True:
+            return (ins, [], [])
+        else:
+            return ([], [], [])
 
 
 class AuthenticatorJig:
@@ -206,6 +223,7 @@ class AuthenticatorJig:
 
     def __init__(self):
         self.control_call_count = 0
+        self.control_fail = 0
         self.fail_getitem = False
         self.compute_mac_calls = []
 
@@ -216,6 +234,9 @@ class AuthenticatorJig:
 
     def control(self):
         self.control_call_count += 1
+        if self.control_fail > 0:
+            self.control_fail -= 1
+            raise ValueError
         return (23, "keytype", "miranda")
 
     @staticmethod
@@ -230,7 +251,7 @@ class AuthenticatorJig:
 
 
 class TestPacket(unittest.TestCase):
-    target = ntp.packet.Packet
+    target = ntpp.Packet
 
     def test_VN_MODE(self):
         f = self.target.VN_MODE
@@ -273,7 +294,7 @@ class TestPacket(unittest.TestCase):
 
 
 class TestSyncPacket(unittest.TestCase):
-    target = ntp.packet.SyncPacket
+    target = ntpp.SyncPacket
 
     def test___init__(self):
         # Test without data (that will be tested via analyze())
@@ -289,7 +310,7 @@ class TestSyncPacket(unittest.TestCase):
         self.assertEqual(cls.origin_timestamp, 0)
         self.assertEqual(cls.receive_timestamp, 0)
         self.assertEqual(cls.transmit_timestamp, 0)
-        self.assertEqual(cls.data, ntp.packet.polybytes(""))
+        self.assertEqual(cls.data, ntpp.polybytes(""))
         self.assertEqual(cls.extension, '')
         self.assertEqual(cls.extfields, [])
         self.assertEqual(cls.mac, '')
@@ -300,6 +321,27 @@ class TestSyncPacket(unittest.TestCase):
         self.assertEqual(cls.rescaled, False)
 
     def test_analyze(self):
+        # Test data less than packet
+        data = "blah blah"
+        try:
+            cls = self.target(data)
+            errored = False
+        except ntpp.SyncException as e:
+            errored = e.message
+        self.assertEqual(errored, "impossible packet length")
+        # Test data not word aligned
+        data = "\x5C\x10\x01\xFF" \
+               "\x00\x00\x01\x01\x00\x00\x01\x02\x00\x00\x01\x03" \
+               "\x00\x01\x02\x03\x04\x05\x06\x07" \
+               "\x01\x01\x02\x03\x04\x05\x06\x07" \
+               "\x02\x01\x02\x03\x04\x05\x06\x07" \
+               "\x03\x01\x02\x03\x04\x05\x06\x07\x0B\xAD"
+        try:
+            cls = self.target(data)
+            errored = False
+        except ntpp.SyncException as e:
+            errored = e.message
+        self.assertEqual(errored, "impossible packet length")
         # Test without extension
         data = "\x5C\x10\x01\xFF" \
                "\x00\x00\x01\x01\x00\x00\x01\x02\x00\x00\x01\x03" \
@@ -347,7 +389,7 @@ class TestSyncPacket(unittest.TestCase):
         try:
             cls = self.target(data2)
             errored = False
-        except ntp.packet.SyncException as e:
+        except ntpp.SyncException as e:
             errored = e.message
         self.assertEqual(errored, "Unsupported DES authentication")
         # Test with extension, runt 8
@@ -355,7 +397,7 @@ class TestSyncPacket(unittest.TestCase):
         try:
             cls = self.target(data2)
             errored = False
-        except ntp.packet.SyncException as e:
+        except ntpp.SyncException as e:
             errored = e.message
         self.assertEqual(errored, "Packet is a runt")
         # Test with extension, runt 16
@@ -364,7 +406,7 @@ class TestSyncPacket(unittest.TestCase):
         try:
             cls = self.target(data2)
             errored = False
-        except ntp.packet.SyncException as e:
+        except ntpp.SyncException as e:
             errored = e.message
         self.assertEqual(errored, "Packet is a runt")
         # Test with extension, MD5 or SHA1, 20
@@ -449,6 +491,7 @@ class TestSyncPacket(unittest.TestCase):
         self.assertEqual(cls.t4(), 4)
 
     def test_delta_epsilon_synchd_adjust(self):
+        # Combined because they share setup, and are short tests
         cls = self.target()
         cls.origin_timestamp = 1
         cls.receive_timestamp = 2
@@ -497,18 +540,39 @@ class TestSyncPacket(unittest.TestCase):
 
     def test_is_crypto_nak(self):
         cls = self.target()
+        # Test True
         cls.mac = "blah"
         self.assertEqual(cls.is_crypto_nak(), True)
+        # Test too low
+        cls.mac = "bla"
+        self.assertEqual(cls.is_crypto_nak(), False)
+        # Test too high
+        cls.mac = "blah!"
+        self.assertEqual(cls.is_crypto_nak(), False)
 
     def test_MD5(self):
         cls = self.target()
+        # Test True
         cls.mac = "blah" * 5  # 20 bytes
         self.assertEqual(cls.has_MD5(), True)
+        # Test too low
+        cls.mac = "bla" * 5  # 15 bytes
+        self.assertEqual(cls.has_MD5(), False)
+        # Test too high
+        cls.mac = "blah!" * 5  # 24 bytes
+        self.assertEqual(cls.has_MD5(), False)
 
     def test_SHA1(self):
         cls = self.target()
+        # Test True
         cls.mac = "blah" * 6  # 24 bytes
         self.assertEqual(cls.has_SHA1(), True)
+        # Test too low
+        cls.mac = "bla" * 6  # 18 bytes
+        self.assertEqual(cls.has_SHA1(), False)
+        # Test too high
+        cls.mac = "blah!" * 6  # 30 bytes
+        self.assertEqual(cls.has_SHA1(), False)
 
     def test___repr__(self):
         cls = self.target()
@@ -522,7 +586,7 @@ class TestMisc(unittest.TestCase):
     def test_Peer(self):
         session = SessionJig()
         # Test init
-        cls = ntp.packet.Peer(session, 2, 3)
+        cls = ntpp.Peer(session, 2, 3)
         self.assertEqual(cls.session, session)
         self.assertEqual(cls.associd, 2)
         self.assertEqual(cls.status, 3)
@@ -536,7 +600,7 @@ class TestMisc(unittest.TestCase):
         self.assertEqual(repr(cls), "<Peer: associd=2 status=3>")
 
     def test_dump_hex_printable(self):
-        f = ntp.packet.dump_hex_printable
+        f = ntpp.dump_hex_printable
         fp = FileJig()
         data = "\x00\x01\x02\x03\x04\x05\x06\x07" \
                "\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F"
@@ -562,7 +626,7 @@ class TestMisc(unittest.TestCase):
 
     def test_MRUEntry(self):
         # Test init
-        cls = ntp.packet.MRUEntry()
+        cls = ntpp.MRUEntry()
         self.assertEqual(cls.addr, None)
         self.assertEqual(cls.last, None)
         self.assertEqual(cls.first, None)
@@ -596,7 +660,7 @@ class TestMisc(unittest.TestCase):
 
     def test_MRUList(self):
         # Test init
-        cls = ntp.packet.MRUList()
+        cls = ntpp.MRUList()
         self.assertEqual(cls.entries, [])
         self.assertEqual(cls.now, None)
         # Test is_complete, no
@@ -612,8 +676,8 @@ class TestMisc(unittest.TestCase):
 
 
 class TestControlPacket(unittest.TestCase):
-    target = ntp.packet.ControlPacket
-    session = ntp.packet.ControlSession
+    target = ntpp.ControlPacket
+    session = ntpp.ControlSession
 
     def test___init__(self):
         ses = self.session()
@@ -721,15 +785,15 @@ class TestControlPacket(unittest.TestCase):
 
 
 class TestControlSession(unittest.TestCase):
-    target = ntp.packet.ControlSession
+    target = ntpp.ControlSession
 
     def test___init__(self):
         # Test
         cls = self.target()
         self.assertEqual(cls.debug, 0)
         self.assertEqual(cls.ai_family, socket.AF_UNSPEC)
-        self.assertEqual(cls.primary_timeout, ntp.packet.DEFTIMEOUT)
-        self.assertEqual(cls.secondary_timeout, ntp.packet.DEFSTIMEOUT)
+        self.assertEqual(cls.primary_timeout, ntpp.DEFTIMEOUT)
+        self.assertEqual(cls.secondary_timeout, ntpp.DEFSTIMEOUT)
         self.assertEqual(cls.pktversion, ntp.magic.NTP_OLDVERSION + 1)
         self.assertEqual(cls.always_auth, False)
         self.assertEqual(cls.keytype, "MD5")
@@ -770,7 +834,7 @@ class TestControlSession(unittest.TestCase):
         logjig = FileJig()
         try:
             fakesockmod = SocketModuleJig()
-            ntp.packet.socket = fakesockmod
+            ntpp.socket = fakesockmod
             # Init
             cls = self.target()
             cls.debug = 3
@@ -817,8 +881,28 @@ class TestControlSession(unittest.TestCase):
                                socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0),
                               ("blah.com", "ntp", cls.ai_family,
                                socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0)])
+            # Test all types failed
+            logjig.__init__()  # reset
+            fakesockmod.__init__()
+            fakesockmod.gai_error_count = 3
+            result = cls._ControlSession__lookuphost("blah.com", "family")
+            self.assertEqual(result, None)
+            self.assertEqual(logjig.data,
+                             ["ntpq: numeric-mode lookup of blah.com "
+                              "failed, None\n",
+                              "ntpq: standard-mode lookup of blah.com "
+                              "failed, None\n",
+                              "ntpq: ndp lookup failed, None\n"])
+            self.assertEqual(fakesockmod.gai_calls,
+                             [("blah.com", "ntp", cls.ai_family,
+                               socket.SOCK_DGRAM, socket.IPPROTO_UDP,
+                               socket.AI_NUMERICHOST),
+                              ("blah.com", "ntp", cls.ai_family,
+                               socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0),
+                              ("blah.com", "ntp", cls.ai_family,
+                               socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0)])
         finally:
-            ntp.packet.socket = socket
+            ntpp.socket = socket
 
     def test_openhost(self):
         lookups = []
@@ -838,7 +922,7 @@ class TestControlSession(unittest.TestCase):
         logjig = FileJig()
         try:
             fakesockmod = SocketModuleJig()
-            ntp.packet.socket = fakesockmod
+            ntpp.socket = fakesockmod
             # Init
             cls = self.target()
             cls.debug = 3
@@ -888,7 +972,7 @@ class TestControlSession(unittest.TestCase):
             try:
                 result = cls.openhost("foo.org")
                 errored = False
-            except ntp.packet.ControlException as e:
+            except ctlerr as e:
                 errored = e
             self.assertEqual(errored.message,
                              "Error opening foo.org: error! [23]")
@@ -899,12 +983,12 @@ class TestControlSession(unittest.TestCase):
             try:
                 result = cls.openhost("foo.org")
                 errored = False
-            except ntp.packet.ControlException as e:
+            except ctlerr as e:
                 errored = e
             self.assertEqual(errored.message,
                              "Error connecting to foo.org: socket! [16]")
         finally:
-            ntp.packet.socket = socket
+            ntpp.socket = socket
 
     def test_password(self):
         iojig = FileJig()
@@ -912,20 +996,31 @@ class TestControlSession(unittest.TestCase):
         # Init
         cls = self.target()
         try:
-            tempauth = ntp.packet.Authenticator()
-            ntp.packet.Authenticator = AuthenticatorJig
-            ntp.packet.getpass = fakegetpmod
+            tempauth = ntpp.Authenticator()
+            ntpp.Authenticator = AuthenticatorJig
+            ntpp.getpass = fakegetpmod
             tempstdin = sys.stdin
             sys.stdin = iojig
             tempstdout = sys.stdout
             sys.stdout = iojig
-            # Test, with nothing
+            # Test with nothing
             iojig.readline_return = ["1\n"] * 10
             cls.password()
             self.assertEqual(isinstance(cls.auth, AuthenticatorJig), True)
             self.assertEqual(cls.keyid, 1)
             self.assertEqual(cls.passwd, "pass")
-            # Test, with auth and localhost
+            # Test with auth and localhost, fail
+            cls.keyid = None
+            cls.passwd = None
+            cls.hostname = "localhost"
+            cls.auth.control_fail = 1
+            try:
+                cls.password()
+                errored = False
+            except ctlerr as e:
+                errored = e.message
+            self.assertEqual(errored, ntpp.SERR_NOTRUST)
+            # Test with auth and localhost
             cls.keyid = None
             cls.passwd = None
             cls.hostname = "localhost"
@@ -933,7 +1028,7 @@ class TestControlSession(unittest.TestCase):
             self.assertEqual(cls.keyid, 23)
             self.assertEqual(cls.keytype, "keytype")
             self.assertEqual(cls.passwd, "miranda")
-            # Test, with all but password
+            # Test with all but password
             cls.passwd = None
             cls.auth.fail_getitem = True
             cls.password()
@@ -941,8 +1036,8 @@ class TestControlSession(unittest.TestCase):
                              [("keytype Password: ", None)])
             self.assertEqual(cls.passwd, "xyzzy")
         finally:
-            ntp.packet.Authenticator = tempauth
-            ntp.packet.getpass = getpass
+            ntpp.Authenticator = tempauth
+            ntpp.getpass = getpass
             sys.stdin = tempstdin
             sys.stdout = tempstdout
 
@@ -958,18 +1053,23 @@ class TestControlSession(unittest.TestCase):
         # Test
         res = cls.sendpkt("blahfoo")
         self.assertEqual(res, 0)
-        self.assertEqual(len(logjig.data), 1)
-        self.assertEqual(logjig.data[0], "Sending 8 octets.  seq=0\n")
-        self.assertEqual(len(sockjig.data), 1)
-        self.assertEqual(sockjig.data[0], "blahfoo\x00")
+        self.assertEqual(logjig.data, ["Sending 8 octets.  seq=0\n"])
+        self.assertEqual(sockjig.data, ["blahfoo\x00"])
+        # Test error
+        logjig.__init__()
+        sockjig.fail_send = 1
+        res = cls.sendpkt("blah")
+        self.assertEqual(res, -1)
+        self.assertEqual(logjig.data, ["Sending 4 octets.  seq=0\n",
+                                       "Write to None failed\n"])
 
     def test_sendrequest(self):
         logjig = FileJig()
         try:
-            tempcpkt = ntp.packet.ControlPacket
-            ntp.packet.ControlPacket = ControlPacketJig
-            tempauth = ntp.packet.Authenticator
-            ntp.packet.Authenticator = AuthenticatorJig
+            tempcpkt = ntpp.ControlPacket
+            ntpp.ControlPacket = ControlPacketJig
+            tempauth = ntpp.Authenticator
+            ntpp.Authenticator = AuthenticatorJig
             cls = self.target()
             cls.logfp = logjig
             cls.debug = 3
@@ -999,12 +1099,12 @@ class TestControlSession(unittest.TestCase):
             try:
                 cls.sendrequest(1, 2, "foo", True)
                 errored = False
-            except ntp.packet.ControlException:
+            except ctlerr:
                 errored = True
             self.assertEqual(errored, True)
         finally:
-            ntp.packet.ControlPacket = tempcpkt
-            ntp.packet.Authenticator = tempauth
+            ntpp.ControlPacket = tempcpkt
+            ntpp.Authenticator = tempauth
 
     def test_getresponse(self):
         logjig = FileJig()
@@ -1016,7 +1116,7 @@ class TestControlSession(unittest.TestCase):
         cls.logfp = logjig
         cls.sock = sockjig
         try:
-            ntp.packet.select = fakeselectmod
+            ntpp.select = fakeselectmod
             # Test empty
             sockjig.return_data = [
                 "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"]
@@ -1033,8 +1133,66 @@ class TestControlSession(unittest.TestCase):
             cls.sequence = 1
             cls.getresponse(1, 3, True)
             self.assertEqual(cls.response, "foo=4223,blah=248,x=23,quux=1")
+            # Test MAXFRAGS bail
+            maxtemp = ntpp.MAXFRAGS
+            ntpp.MAXFRAGS = 1
+            sockjig.return_data = [
+                "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
+                "foo=4223,\x00\x00\x00",
+                "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+                "blah=248,x=23,\x00\x00",
+                "\x0E\x81\x00\x01\x00\x02\x00\x03\x00\x17\x00\x06"
+                "quux=1\x00\x00"]
+            cls.sequence = 1
+            try:
+                cls.getresponse(1, 3, True)
+                errored = False
+            except ctlerr as e:
+                errored = e.message
+            self.assertEqual(errored, ntpp.SERR_TOOMUCH)
+            ntpp.MAXFRAGS = maxtemp
+            # Test select fail
+            fakeselectmod.select_fail = 1
+            try:
+                cls.getresponse(1, 2, True)
+                errored = False
+            except ctlerr as e:
+                errored = e.message
+            self.assertEqual(errored, ntpp.SERR_SELECT)
+            # Test no data and timeout
+            fakeselectmod.do_return = [False]
+            try:
+                cls.getresponse(1, 2, True)
+                errored = False
+            except ctlerr as e:
+                errored = e.message
+            self.assertEqual(errored, ntpp.SERR_TIMEOUT)
+            # Test partial data and no timeout
+            sockjig.return_data = [
+                "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
+                "foo=4223,\x00\x00\x00",
+                "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+                "blah=248,x=23,\x00\x00",
+                "\x0E\x81\x00\x01\x00\x02\x00\x03\x00\x17\x00\x06"
+                "quux=1\x00\x00"]
+            fakeselectmod.do_return = [True, False]
+            try:
+                cls.getresponse(1, 2, False)
+                errored = False
+            except ctlerr as e:
+                errored = e.message
+            self.assertEqual(errored, ntpp.SERR_INCOMPLETE)
+            # Test header parse fail
+            sockjig.return_data = [
+                "\x0E\x81\x00\x00\x00\x03"]
+            try:
+                cls.getresponse(1, 2, True)
+                errored = False
+            except ctlerr as e:
+                errored = e.message
+            self.assertEqual(errored, ntpp.SERR_UNSPEC)
         finally:
-            ntp.packet.select = select
+            ntpp.select = select
 
     def test___validate_packet(self):
         logjig = FileJig()
@@ -1044,7 +1202,7 @@ class TestControlSession(unittest.TestCase):
         cls.logfp = logjig
         # Test good packet, empty data
         raw = "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
                          True)
@@ -1053,7 +1211,7 @@ class TestControlSession(unittest.TestCase):
         logjig.data = []
         raw = "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09" \
               "foo=4223,\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         cls.sequence = 1
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 3),
@@ -1064,7 +1222,7 @@ class TestControlSession(unittest.TestCase):
         cls.sequence = 0
         logjig.data = []
         raw = "\x46\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
                          False)
@@ -1072,7 +1230,7 @@ class TestControlSession(unittest.TestCase):
         # Test bad packet, bad mode
         logjig.data = []
         raw = "\x0D\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
                          False)
@@ -1080,7 +1238,7 @@ class TestControlSession(unittest.TestCase):
         # Test bad packet, bad response bit
         logjig.data = []
         raw = "\x0E\x01\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
                          False)
@@ -1088,7 +1246,7 @@ class TestControlSession(unittest.TestCase):
         # Test bad packet, bad sequence
         logjig.data = []
         raw = "\x0E\x81\x00\x01\x00\x03\x00\x02\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
                          False)
@@ -1097,7 +1255,7 @@ class TestControlSession(unittest.TestCase):
         # Test bad packet, bad opcode
         logjig.data = []
         raw = "\x0E\x80\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
                          False)
@@ -1108,24 +1266,24 @@ class TestControlSession(unittest.TestCase):
         raw = "\x0E\xC1\x00\x00" + \
               chr(ntp.control.CERR_BADVALUE) + \
               "\x03\x00\x02\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         try:
             cls._ControlSession__validate_packet(pkt, raw, 1, 2)
             self.assertEqual(False, True)  # it should have errored here
-        except ntp.packet.ControlException as e:
+        except ctlerr as e:
             self.assertEqual(e.errorcode, ntp.control.CERR_BADVALUE)
         self.assertEqual(logjig.data, [])
         # Test error packet, with more bit
         logjig.data = []
         errcs = chr(ntp.control.CERR_BADVALUE)
         raw = "\x0E\xE1\x00\x00" + errcs + "\x03\x00\x02\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         try:
             cls._ControlSession__validate_packet(pkt, raw, 1, 2)
             self.assertEqual(False, True)  # it should have errored here
-        except ntp.packet.ControlException as e:
+        except ctlerr as e:
             self.assertEqual(e.errorcode, ntp.control.CERR_BADVALUE)
         errstr = "Error " + str(ntp.control.CERR_BADVALUE) + \
                  " received on non-final fragment\n"
@@ -1133,7 +1291,7 @@ class TestControlSession(unittest.TestCase):
         # Test ok-ish packet, bad associd
         logjig.data = []
         raw = "\x0E\x81\x00\x00\x00\x03\x00\xFF\x00\x00\x00\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
                          True)
@@ -1142,7 +1300,7 @@ class TestControlSession(unittest.TestCase):
         # Test bad data padding
         logjig.data = []
         raw = "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x01@"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
                          False)
@@ -1151,13 +1309,13 @@ class TestControlSession(unittest.TestCase):
         # Test too little data
         logjig.data = []
         raw = "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x10foo\x00"
-        pkt = ntp.packet.ControlPacket(cls)
+        pkt = ntpp.ControlPacket(cls)
         pkt.analyze(raw)
         try:
             cls._ControlSession__validate_packet(pkt, raw, 1, 2)
             self.assertEqual(True, False)  # should have errored here
-        except ntp.packet.ControlException as e:
-            self.assertEqual(e.message, ntp.packet.SERR_INCOMPLETE)
+        except ctlerr as e:
+            self.assertEqual(e.message, ntpp.SERR_INCOMPLETE)
         self.assertEqual(logjig.data,
                          ["Response fragment claims 16 octets payload, "
                           "above 4 received\n"])
@@ -1174,14 +1332,22 @@ class TestControlSession(unittest.TestCase):
             gets.append((opcode, associd, retry))
             if doerror[0]:
                 doerror[0] = False
-                raise ntp.packet.ControlException(ntp.packet.SERR_TIMEOUT)
+                raise ctlerr(ntpp.SERR_TIMEOUT)
             return "flax!"
         # Init
         cls = self.target()
         cls.sendrequest = sendrequest_jig
         cls.getresponse = getresponse_jig
-        cls.sock = True  # to fool havehost()
+
+        # Test no socket
+        try:
+            cls.doquery(1, 2, "blah")
+            errored = False
+        except ctlerr as e:
+            errored = e.message
+        self.assertEqual(errored, ntpp.SERR_NOHOST)
         # Test no retry
+        cls.sock = True  # to fool havehost()
         result = cls.doquery(1, 2, "blah")
         self.assertEqual(result, "flax!")
         self.assertEqual(len(sends), 1)
@@ -1220,7 +1386,7 @@ class TestControlSession(unittest.TestCase):
         cls.response = "\xDE\xAD\xF0\x0D"
         idlist = cls.readstat()
         self.assertEqual(len(idlist), 1)
-        self.assertEqual(isinstance(idlist[0], ntp.packet.Peer), True)
+        self.assertEqual(isinstance(idlist[0], ntpp.Peer), True)
         self.assertEqual(idlist[0].associd, 0xDEAD)
         self.assertEqual(idlist[0].status, 0xF00D)
         self.assertEqual(queries, [(ntp.control.CTL_OP_READSTAT,
@@ -1230,7 +1396,7 @@ class TestControlSession(unittest.TestCase):
         try:
             cls.readstat()
             errored = False
-        except ntp.packet.ControlException:
+        except ctlerr:
             errored = True
         self.assertEqual(errored, True)
 
@@ -1350,6 +1516,15 @@ class TestControlSession(unittest.TestCase):
         self.assertEqual(result, False)
         self.assertEqual(queries,
                          [(ntp.control.CTL_OP_CONFIGURE, 0, "Boo!", True)])
+        # Test no response
+        queries = []
+        cls.response = ""
+        try:
+            cls.config("blah")
+            errored = False
+        except ctlerr as e:
+            errored = e.message
+        self.assertEqual(errored, ntpp.SERR_PERMISSION)
 
     def test_fetch_nonce(self):
         queries = []
@@ -1373,7 +1548,7 @@ class TestControlSession(unittest.TestCase):
         try:
             result = cls.fetch_nonce()
             errored = False
-        except ntp.packet.ControlException:
+        except ctlerr:
             errored = True
         self.assertEqual(errored, True)
         self.assertEqual(filefp.data, ["## Nonce expected: blah blah"])
@@ -1399,9 +1574,15 @@ class TestControlSession(unittest.TestCase):
                "mv.1=2,rs.1=3",
                "now=0x00000000.00000000"]
         query_results = qrm[:]  # qrm == query results master
+        query_fail = [0]
+        query_fail_code = []
 
         def doquery_jig(opcode, associd=0, qdata="", auth=False):
             queries.append((opcode, associd, qdata, auth))
+            if query_fail[0] > 0:
+                query_fail[0] -= 1
+                code = query_fail_code.pop(0)
+                raise ctlerr("foo", errorcode=code)
             if len(query_results) > 0:
                 setresponse(query_results.pop(0))
         logjig = FileJig()
@@ -1425,7 +1606,7 @@ class TestControlSession(unittest.TestCase):
                            "nonce=foo, frags=32, addr.0=10.20.30.40:23, "
                            "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
                            "addr.2=1.2.3.4:23, last.2=40", False)])
-        self.assertEqual(isinstance(result, ntp.packet.MRUList), True)
+        self.assertEqual(isinstance(result, ntpp.MRUList), True)
         self.assertEqual(len(result.entries), 2)
         mru = result.entries[0]
         self.assertEqual(mru.addr, "1.2.3.4:23")
@@ -1459,7 +1640,7 @@ class TestControlSession(unittest.TestCase):
                            "nonce=foo, frags=27, addr.0=10.20.30.40:23, "
                            "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
                            "addr.2=1.2.3.4:23, last.2=40", False)])
-        self.assertEqual(isinstance(result, ntp.packet.MRUList), True)
+        self.assertEqual(isinstance(result, ntpp.MRUList), True)
         self.assertEqual(len(result.entries), 2)
         mru = result.entries[0]
         self.assertEqual(mru.addr, "10.20.30.40:23")
@@ -1475,6 +1656,170 @@ class TestControlSession(unittest.TestCase):
         self.assertEqual(mru.ct, 1)
         self.assertEqual(mru.mv, 2)
         self.assertEqual(mru.rs, 3)
+        # Test sorter error
+        nonce_fetch_count = [0]
+        try:
+            cls.mrulist(variables={"sort": "foo"})
+            errored = False
+        except ctlerr as e:
+            errored = e.message
+        self.assertEqual(errored, ntpp.SERR_BADSORT % "foo")
+        # Test varbind error
+        nonce_fetch_count = [0]
+        try:
+            cls.mrulist(variables={"foo": 1})
+            errored = False
+        except ctlerr as e:
+            errored = e.message
+        self.assertEqual(errored, ntpp.SERR_BADPARAM % "foo")
+        # Test add to request errors
+        # Test None error
+        nonce_fetch_count = [0]
+        queries = []
+        query_fail = [1]
+        query_fail_code = [None]
+        try:
+            cls.mrulist()
+            errored = False
+        except ctlerr as e:
+            errored = e.errorcode
+        self.assertEqual(errored, None)
+        # Test random error
+        nonce_fetch_count = [0]
+        queries = []
+        query_fail = [1]
+        query_fail_code = ["therdaglib"]
+        try:
+            cls.mrulist()
+            errored = False
+        except ctlerr as e:
+            errored = e.errorcode
+        self.assertEqual(errored, "therdaglib")
+        # Test unknown var error
+        nonce_fetch_count = [0]
+        query_results = qrm[:]
+        queries = []
+        query_fail = [1]
+        query_fail_code = [ntp.control.CERR_UNKNOWNVAR] * 2
+        cls.response = ""
+        result = cls.mrulist()
+        self.assertEqual(nonce_fetch_count, [5])
+        self.assertEqual(queries,
+                         [(10, 0, "nonce=foo, frags=32", False),
+                          (10, 0, "nonce=foo, frags=32", False),
+                          (10, 0,
+                           "nonce=foo, frags=32, addr.0=1.2.3.4:23, last.0=40",
+                           False),
+                          (10, 0,
+                           "nonce=foo, frags=32, addr.0=1.2.3.4:23, "
+                           "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+                          (10, 0,
+                           "nonce=foo, frags=32, addr.0=10.20.30.40:23, "
+                           "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+                           "addr.2=1.2.3.4:23, last.2=40", False)])
+        self.assertEqual(isinstance(result, ntpp.MRUList), True)
+        self.assertEqual(len(result.entries), 2)
+        # Test bad value error
+        nonce_fetch_count = [0]
+        query_results = qrm[:]
+        queries = []
+        query_fail = [2]
+        query_fail_code = [ntp.control.CERR_BADVALUE] * 2
+        cls.response = ""
+        logjig.data = []
+        cls.debug = 1
+        result = cls.mrulist()
+        self.assertEqual(nonce_fetch_count, [6])
+        self.assertEqual(queries,
+                         [(10, 0, "nonce=foo, frags=32", False),
+                          (10, 0, "nonce=foo, limit=96", False),
+                          (10, 0, "nonce=foo, limit=96", False),
+                          (10, 0,
+                           "nonce=foo, limit=96, addr.0=1.2.3.4:23, last.0=40",
+                           False),
+                          (10, 0,
+                           "nonce=foo, limit=96, addr.0=1.2.3.4:23, "
+                           "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+                          (10, 0,
+                           "nonce=foo, limit=96, addr.0=10.20.30.40:23, "
+                           "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+                           "addr.2=1.2.3.4:23, last.2=40", False)])
+        self.assertEqual(isinstance(result, ntpp.MRUList), True)
+        self.assertEqual(len(result.entries), 2)
+        self.assertEqual(logjig.data, ["Reverted to row limit from "
+                                       "fragments limit.\n",
+                                       "Row limit reduced to 96 following "
+                                       "CERR_BADVALUE.\n"])
+        # Test incomplete error
+        nonce_fetch_count = [0]
+        query_results = qrm[:]
+        queries = []
+        query_fail = [3]
+        query_fail_code = [ntpp.SERR_INCOMPLETE,
+                           ntp.control.CERR_BADVALUE,  # Trigger cap_frags
+                           ntpp.SERR_INCOMPLETE]
+        cls.response = ""
+        logjig.data = []
+        cls.debug = 1
+        result = cls.mrulist()
+        self.assertEqual(nonce_fetch_count, [7])
+        self.assertEqual(queries,
+                         [(10, 0, "nonce=foo, frags=32", False),
+                          (10, 0, "nonce=foo, frags=16", False),
+                          (10, 0, "nonce=foo, limit=96", False),
+                          (10, 0, "nonce=foo, limit=48", False),
+                          (10, 0,
+                           "nonce=foo, limit=49, addr.0=1.2.3.4:23, last.0=40",
+                           False),
+                          (10, 0,
+                           "nonce=foo, limit=51, addr.0=1.2.3.4:23, "
+                           "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+                          (10, 0,
+                           "nonce=foo, limit=52, addr.0=10.20.30.40:23, "
+                           "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+                           "addr.2=1.2.3.4:23, last.2=40", False)])
+        self.assertEqual(len(result.entries), 2)
+        self.assertEqual(logjig.data,
+                         ["Frag limit reduced to 16 following incomplete "
+                          "response.\n",
+                          "Reverted to row limit from fragments limit.\n",
+                          "Row limit reduced to 48 following  incomplete "
+                          "response.\n"])
+        # Test timeout error
+        nonce_fetch_count = [0]
+        query_results = qrm[:]
+        queries = []
+        query_fail = [3]
+        query_fail_code = [ntpp.SERR_TIMEOUT,
+                           ntp.control.CERR_BADVALUE,  # Trigger cap_frags
+                           ntpp.SERR_TIMEOUT]
+        cls.response = ""
+        logjig.data = []
+        cls.debug = 1
+        result = cls.mrulist()
+        self.assertEqual(nonce_fetch_count, [7])
+        self.assertEqual(queries,
+                         [(10, 0, "nonce=foo, frags=32", False),
+                          (10, 0, "nonce=foo, frags=16", False),
+                          (10, 0, "nonce=foo, limit=96", False),
+                          (10, 0, "nonce=foo, limit=48", False),
+                          (10, 0,
+                           "nonce=foo, limit=49, addr.0=1.2.3.4:23, last.0=40",
+                           False),
+                          (10, 0,
+                           "nonce=foo, limit=51, addr.0=1.2.3.4:23, "
+                           "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+                          (10, 0,
+                           "nonce=foo, limit=52, addr.0=10.20.30.40:23, "
+                           "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+                           "addr.2=1.2.3.4:23, last.2=40", False)])
+        self.assertEqual(len(result.entries), 2)
+        self.assertEqual(logjig.data,
+                         ["Frag limit reduced to 16 following incomplete "
+                          "response.\n",
+                          "Reverted to row limit from fragments limit.\n",
+                          "Row limit reduced to 48 following  incomplete "
+                          "response.\n"])
 
     def test___ordlist(self):
         queries = []
@@ -1523,7 +1868,7 @@ class TestControlSession(unittest.TestCase):
 
 
 class TestAuthenticator(unittest.TestCase):
-    target = ntp.packet.Authenticator
+    target = ntpp.Authenticator
     open_calls = []
     open_files = []
     open_data = []
@@ -1537,7 +1882,7 @@ class TestAuthenticator(unittest.TestCase):
 
     def test___init__(self):
         try:
-            ntp.packet.open = self.openjig
+            ntpp.open = self.openjig
             # Test without file
             cls = self.target()
             self.assertEqual(cls.passwords, {})
@@ -1554,7 +1899,7 @@ class TestAuthenticator(unittest.TestCase):
             self.assertEqual(self.open_calls, ["file"])
             self.assertEqual(len(self.open_files), 1)
         finally:
-            ntp.packet.open = open
+            ntpp.open = open
             self.open_calls = []
             self.open_files = []
             self.open_data = []
@@ -1581,7 +1926,7 @@ class TestAuthenticator(unittest.TestCase):
         self.assertEqual(cls.control(1), (1, None, None))
         try:
             # Read keyid from /etc/ntp.conf
-            ntp.packet.open = self.openjig
+            ntpp.open = self.openjig
             self.open_data = ["blah blah", "control 2"]
             self.assertEqual(cls.control(), (2, "b", "y"))
             # Fail to read keyid from /etc/ntp.conf
@@ -1593,21 +1938,21 @@ class TestAuthenticator(unittest.TestCase):
                 errored = True
             self.assertEqual(errored, True)
         finally:
-            ntp.packet.open = open
+            ntpp.open = open
 
     def test_compute_mac(self):
         f = self.target.compute_mac
         try:
-            temphash = ntp.packet.hashlib
+            temphash = ntpp.hashlib
             fakehashlibmod = HashlibModuleJig()
-            ntp.packet.hashlib = fakehashlibmod
+            ntpp.hashlib = fakehashlibmod
             # Test no digest
             self.assertEqual(f(None, None, None, None), None)
             # Test with digest
             self.assertEqual(f("foo", 0x42, "bar", "quux"),
                              "\x00\x00\x00\x42blahblahblahblah")
         finally:
-            ntp.packet.hashlib = temphash
+            ntpp.hashlib = temphash
 
     def test_have_mac(self):
         f = self.target.have_mac
@@ -1624,15 +1969,15 @@ class TestAuthenticator(unittest.TestCase):
         good_pkt = "foobar\x00\x00\x00\x23blahblahblahblah"
         bad_pkt = "foobar\xDE\xAD\xDE\xAFblahblahblah"
         try:
-            temphash = ntp.packet.hashlib
+            temphash = ntpp.hashlib
             fakehashlibmod = HashlibModuleJig()
-            ntp.packet.hashlib = fakehashlibmod
+            ntpp.hashlib = fakehashlibmod
             # Test good
             self.assertEqual(cls.verify_mac(good_pkt), True)
             # Test bad
             self.assertEqual(cls.verify_mac(bad_pkt), False)
         finally:
-            ntp.packet.hashlib = temphash
+            ntpp.hashlib = temphash
 
 
 if __name__ == "__main__":



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/177743d243584de8fb8d229f4de1f8ced7639cb5...6e5765c2edd810cb4bb22ac21d586226a72c42f7

---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/177743d243584de8fb8d229f4de1f8ced7639cb5...6e5765c2edd810cb4bb22ac21d586226a72c42f7
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170823/21cd52dc/attachment.html>


More information about the vc mailing list