[Git][NTPsec/ntpsec][master] 2 commits: Expanded packet.py tests to test error handling.
Ian Bruene
gitlab at mg.gitlab.com
Wed Aug 23 01:14:39 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
ebc10102 by Ian Bruene at 2017-08-22T20:14:08-05:00
Expanded packet.py tests to test error handling.
- - - - -
6e5765c2 by Ian Bruene at 2017-08-22T20:14:08-05:00
Added aliases for frequently used objects
- - - - -
2 changed files:
- pylib/packet.py
- tests/pylib/test_packet.py
Changes:
=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -1464,10 +1464,6 @@ This combats source address spoofing
raise ControlException(SERR_STALL)
warndbg("---> Restarting from the beginning, "
"retry #%u\n" % restarted_count, 1)
- elif e.errorcode == ntp.control.CERR_UNKNOWNVAR:
- e.message = ("CERR_UNKNOWNVAR from ntpd but "
- "no priors given.")
- raise e
elif e.errorcode == ntp.control.CERR_BADVALUE:
if cap_frags:
cap_frags = False
=====================================
tests/pylib/test_packet.py
=====================================
--- a/tests/pylib/test_packet.py
+++ b/tests/pylib/test_packet.py
@@ -15,6 +15,8 @@ import getpass
odict = ntp.util.OrderedDict
+ntpp = ntp.packet
+ctlerr = ntp.packet.ControlException
class FileJig:
def __init__(self):
@@ -51,8 +53,12 @@ class SocketJig:
self.closed = False
self.connected = None
self.fail_connect = False
+ self.fail_send = 0
def sendall(self, data):
+ if self.fail_send > 0:
+ self.fail_send -= 1
+ raise socket.error()
self.data.append(data)
def close(self):
@@ -89,7 +95,7 @@ class SessionJig:
class ControlPacketJig:
- HEADER_LEN = ntp.packet.ControlPacket.HEADER_LEN
+ HEADER_LEN = ntpp.ControlPacket.HEADER_LEN
def __init__(self, session, opcode, associd, data):
self.session = session
@@ -195,10 +201,21 @@ class SelectModuleJig:
def __init__(self):
self.select_calls = []
+ self.select_fail = 0
+ self.do_return = []
def select(self, ins, outs, excepts, timeout=0):
self.select_calls.append((ins, outs, excepts, timeout))
- return (ins, [], [])
+ if self.select_fail > 0:
+ self.select_fail -= 1
+ raise select.error
+ if len(self.do_return) == 0: # simplify code that doesn't need it
+ self.do_return.append(True)
+ doreturn = self.do_return.pop(0)
+ if doreturn is True:
+ return (ins, [], [])
+ else:
+ return ([], [], [])
class AuthenticatorJig:
@@ -206,6 +223,7 @@ class AuthenticatorJig:
def __init__(self):
self.control_call_count = 0
+ self.control_fail = 0
self.fail_getitem = False
self.compute_mac_calls = []
@@ -216,6 +234,9 @@ class AuthenticatorJig:
def control(self):
self.control_call_count += 1
+ if self.control_fail > 0:
+ self.control_fail -= 1
+ raise ValueError
return (23, "keytype", "miranda")
@staticmethod
@@ -230,7 +251,7 @@ class AuthenticatorJig:
class TestPacket(unittest.TestCase):
- target = ntp.packet.Packet
+ target = ntpp.Packet
def test_VN_MODE(self):
f = self.target.VN_MODE
@@ -273,7 +294,7 @@ class TestPacket(unittest.TestCase):
class TestSyncPacket(unittest.TestCase):
- target = ntp.packet.SyncPacket
+ target = ntpp.SyncPacket
def test___init__(self):
# Test without data (that will be tested via analyze())
@@ -289,7 +310,7 @@ class TestSyncPacket(unittest.TestCase):
self.assertEqual(cls.origin_timestamp, 0)
self.assertEqual(cls.receive_timestamp, 0)
self.assertEqual(cls.transmit_timestamp, 0)
- self.assertEqual(cls.data, ntp.packet.polybytes(""))
+ self.assertEqual(cls.data, ntpp.polybytes(""))
self.assertEqual(cls.extension, '')
self.assertEqual(cls.extfields, [])
self.assertEqual(cls.mac, '')
@@ -300,6 +321,27 @@ class TestSyncPacket(unittest.TestCase):
self.assertEqual(cls.rescaled, False)
def test_analyze(self):
+ # Test data less than packet
+ data = "blah blah"
+ try:
+ cls = self.target(data)
+ errored = False
+ except ntpp.SyncException as e:
+ errored = e.message
+ self.assertEqual(errored, "impossible packet length")
+ # Test data not word aligned
+ data = "\x5C\x10\x01\xFF" \
+ "\x00\x00\x01\x01\x00\x00\x01\x02\x00\x00\x01\x03" \
+ "\x00\x01\x02\x03\x04\x05\x06\x07" \
+ "\x01\x01\x02\x03\x04\x05\x06\x07" \
+ "\x02\x01\x02\x03\x04\x05\x06\x07" \
+ "\x03\x01\x02\x03\x04\x05\x06\x07\x0B\xAD"
+ try:
+ cls = self.target(data)
+ errored = False
+ except ntpp.SyncException as e:
+ errored = e.message
+ self.assertEqual(errored, "impossible packet length")
# Test without extension
data = "\x5C\x10\x01\xFF" \
"\x00\x00\x01\x01\x00\x00\x01\x02\x00\x00\x01\x03" \
@@ -347,7 +389,7 @@ class TestSyncPacket(unittest.TestCase):
try:
cls = self.target(data2)
errored = False
- except ntp.packet.SyncException as e:
+ except ntpp.SyncException as e:
errored = e.message
self.assertEqual(errored, "Unsupported DES authentication")
# Test with extension, runt 8
@@ -355,7 +397,7 @@ class TestSyncPacket(unittest.TestCase):
try:
cls = self.target(data2)
errored = False
- except ntp.packet.SyncException as e:
+ except ntpp.SyncException as e:
errored = e.message
self.assertEqual(errored, "Packet is a runt")
# Test with extension, runt 16
@@ -364,7 +406,7 @@ class TestSyncPacket(unittest.TestCase):
try:
cls = self.target(data2)
errored = False
- except ntp.packet.SyncException as e:
+ except ntpp.SyncException as e:
errored = e.message
self.assertEqual(errored, "Packet is a runt")
# Test with extension, MD5 or SHA1, 20
@@ -449,6 +491,7 @@ class TestSyncPacket(unittest.TestCase):
self.assertEqual(cls.t4(), 4)
def test_delta_epsilon_synchd_adjust(self):
+ # Combined because they share setup, and are short tests
cls = self.target()
cls.origin_timestamp = 1
cls.receive_timestamp = 2
@@ -497,18 +540,39 @@ class TestSyncPacket(unittest.TestCase):
def test_is_crypto_nak(self):
cls = self.target()
+ # Test True
cls.mac = "blah"
self.assertEqual(cls.is_crypto_nak(), True)
+ # Test too low
+ cls.mac = "bla"
+ self.assertEqual(cls.is_crypto_nak(), False)
+ # Test too high
+ cls.mac = "blah!"
+ self.assertEqual(cls.is_crypto_nak(), False)
def test_MD5(self):
cls = self.target()
+ # Test True
cls.mac = "blah" * 5 # 20 bytes
self.assertEqual(cls.has_MD5(), True)
+ # Test too low
+ cls.mac = "bla" * 5 # 15 bytes
+ self.assertEqual(cls.has_MD5(), False)
+ # Test too high
+ cls.mac = "blah!" * 5 # 24 bytes
+ self.assertEqual(cls.has_MD5(), False)
def test_SHA1(self):
cls = self.target()
+ # Test True
cls.mac = "blah" * 6 # 24 bytes
self.assertEqual(cls.has_SHA1(), True)
+ # Test too low
+ cls.mac = "bla" * 6 # 18 bytes
+ self.assertEqual(cls.has_SHA1(), False)
+ # Test too high
+ cls.mac = "blah!" * 6 # 30 bytes
+ self.assertEqual(cls.has_SHA1(), False)
def test___repr__(self):
cls = self.target()
@@ -522,7 +586,7 @@ class TestMisc(unittest.TestCase):
def test_Peer(self):
session = SessionJig()
# Test init
- cls = ntp.packet.Peer(session, 2, 3)
+ cls = ntpp.Peer(session, 2, 3)
self.assertEqual(cls.session, session)
self.assertEqual(cls.associd, 2)
self.assertEqual(cls.status, 3)
@@ -536,7 +600,7 @@ class TestMisc(unittest.TestCase):
self.assertEqual(repr(cls), "<Peer: associd=2 status=3>")
def test_dump_hex_printable(self):
- f = ntp.packet.dump_hex_printable
+ f = ntpp.dump_hex_printable
fp = FileJig()
data = "\x00\x01\x02\x03\x04\x05\x06\x07" \
"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F"
@@ -562,7 +626,7 @@ class TestMisc(unittest.TestCase):
def test_MRUEntry(self):
# Test init
- cls = ntp.packet.MRUEntry()
+ cls = ntpp.MRUEntry()
self.assertEqual(cls.addr, None)
self.assertEqual(cls.last, None)
self.assertEqual(cls.first, None)
@@ -596,7 +660,7 @@ class TestMisc(unittest.TestCase):
def test_MRUList(self):
# Test init
- cls = ntp.packet.MRUList()
+ cls = ntpp.MRUList()
self.assertEqual(cls.entries, [])
self.assertEqual(cls.now, None)
# Test is_complete, no
@@ -612,8 +676,8 @@ class TestMisc(unittest.TestCase):
class TestControlPacket(unittest.TestCase):
- target = ntp.packet.ControlPacket
- session = ntp.packet.ControlSession
+ target = ntpp.ControlPacket
+ session = ntpp.ControlSession
def test___init__(self):
ses = self.session()
@@ -721,15 +785,15 @@ class TestControlPacket(unittest.TestCase):
class TestControlSession(unittest.TestCase):
- target = ntp.packet.ControlSession
+ target = ntpp.ControlSession
def test___init__(self):
# Test
cls = self.target()
self.assertEqual(cls.debug, 0)
self.assertEqual(cls.ai_family, socket.AF_UNSPEC)
- self.assertEqual(cls.primary_timeout, ntp.packet.DEFTIMEOUT)
- self.assertEqual(cls.secondary_timeout, ntp.packet.DEFSTIMEOUT)
+ self.assertEqual(cls.primary_timeout, ntpp.DEFTIMEOUT)
+ self.assertEqual(cls.secondary_timeout, ntpp.DEFSTIMEOUT)
self.assertEqual(cls.pktversion, ntp.magic.NTP_OLDVERSION + 1)
self.assertEqual(cls.always_auth, False)
self.assertEqual(cls.keytype, "MD5")
@@ -770,7 +834,7 @@ class TestControlSession(unittest.TestCase):
logjig = FileJig()
try:
fakesockmod = SocketModuleJig()
- ntp.packet.socket = fakesockmod
+ ntpp.socket = fakesockmod
# Init
cls = self.target()
cls.debug = 3
@@ -817,8 +881,28 @@ class TestControlSession(unittest.TestCase):
socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0),
("blah.com", "ntp", cls.ai_family,
socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0)])
+ # Test all types failed
+ logjig.__init__() # reset
+ fakesockmod.__init__()
+ fakesockmod.gai_error_count = 3
+ result = cls._ControlSession__lookuphost("blah.com", "family")
+ self.assertEqual(result, None)
+ self.assertEqual(logjig.data,
+ ["ntpq: numeric-mode lookup of blah.com "
+ "failed, None\n",
+ "ntpq: standard-mode lookup of blah.com "
+ "failed, None\n",
+ "ntpq: ndp lookup failed, None\n"])
+ self.assertEqual(fakesockmod.gai_calls,
+ [("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP,
+ socket.AI_NUMERICHOST),
+ ("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0),
+ ("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0)])
finally:
- ntp.packet.socket = socket
+ ntpp.socket = socket
def test_openhost(self):
lookups = []
@@ -838,7 +922,7 @@ class TestControlSession(unittest.TestCase):
logjig = FileJig()
try:
fakesockmod = SocketModuleJig()
- ntp.packet.socket = fakesockmod
+ ntpp.socket = fakesockmod
# Init
cls = self.target()
cls.debug = 3
@@ -888,7 +972,7 @@ class TestControlSession(unittest.TestCase):
try:
result = cls.openhost("foo.org")
errored = False
- except ntp.packet.ControlException as e:
+ except ctlerr as e:
errored = e
self.assertEqual(errored.message,
"Error opening foo.org: error! [23]")
@@ -899,12 +983,12 @@ class TestControlSession(unittest.TestCase):
try:
result = cls.openhost("foo.org")
errored = False
- except ntp.packet.ControlException as e:
+ except ctlerr as e:
errored = e
self.assertEqual(errored.message,
"Error connecting to foo.org: socket! [16]")
finally:
- ntp.packet.socket = socket
+ ntpp.socket = socket
def test_password(self):
iojig = FileJig()
@@ -912,20 +996,31 @@ class TestControlSession(unittest.TestCase):
# Init
cls = self.target()
try:
- tempauth = ntp.packet.Authenticator()
- ntp.packet.Authenticator = AuthenticatorJig
- ntp.packet.getpass = fakegetpmod
+ tempauth = ntpp.Authenticator()
+ ntpp.Authenticator = AuthenticatorJig
+ ntpp.getpass = fakegetpmod
tempstdin = sys.stdin
sys.stdin = iojig
tempstdout = sys.stdout
sys.stdout = iojig
- # Test, with nothing
+ # Test with nothing
iojig.readline_return = ["1\n"] * 10
cls.password()
self.assertEqual(isinstance(cls.auth, AuthenticatorJig), True)
self.assertEqual(cls.keyid, 1)
self.assertEqual(cls.passwd, "pass")
- # Test, with auth and localhost
+ # Test with auth and localhost, fail
+ cls.keyid = None
+ cls.passwd = None
+ cls.hostname = "localhost"
+ cls.auth.control_fail = 1
+ try:
+ cls.password()
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_NOTRUST)
+ # Test with auth and localhost
cls.keyid = None
cls.passwd = None
cls.hostname = "localhost"
@@ -933,7 +1028,7 @@ class TestControlSession(unittest.TestCase):
self.assertEqual(cls.keyid, 23)
self.assertEqual(cls.keytype, "keytype")
self.assertEqual(cls.passwd, "miranda")
- # Test, with all but password
+ # Test with all but password
cls.passwd = None
cls.auth.fail_getitem = True
cls.password()
@@ -941,8 +1036,8 @@ class TestControlSession(unittest.TestCase):
[("keytype Password: ", None)])
self.assertEqual(cls.passwd, "xyzzy")
finally:
- ntp.packet.Authenticator = tempauth
- ntp.packet.getpass = getpass
+ ntpp.Authenticator = tempauth
+ ntpp.getpass = getpass
sys.stdin = tempstdin
sys.stdout = tempstdout
@@ -958,18 +1053,23 @@ class TestControlSession(unittest.TestCase):
# Test
res = cls.sendpkt("blahfoo")
self.assertEqual(res, 0)
- self.assertEqual(len(logjig.data), 1)
- self.assertEqual(logjig.data[0], "Sending 8 octets. seq=0\n")
- self.assertEqual(len(sockjig.data), 1)
- self.assertEqual(sockjig.data[0], "blahfoo\x00")
+ self.assertEqual(logjig.data, ["Sending 8 octets. seq=0\n"])
+ self.assertEqual(sockjig.data, ["blahfoo\x00"])
+ # Test error
+ logjig.__init__()
+ sockjig.fail_send = 1
+ res = cls.sendpkt("blah")
+ self.assertEqual(res, -1)
+ self.assertEqual(logjig.data, ["Sending 4 octets. seq=0\n",
+ "Write to None failed\n"])
def test_sendrequest(self):
logjig = FileJig()
try:
- tempcpkt = ntp.packet.ControlPacket
- ntp.packet.ControlPacket = ControlPacketJig
- tempauth = ntp.packet.Authenticator
- ntp.packet.Authenticator = AuthenticatorJig
+ tempcpkt = ntpp.ControlPacket
+ ntpp.ControlPacket = ControlPacketJig
+ tempauth = ntpp.Authenticator
+ ntpp.Authenticator = AuthenticatorJig
cls = self.target()
cls.logfp = logjig
cls.debug = 3
@@ -999,12 +1099,12 @@ class TestControlSession(unittest.TestCase):
try:
cls.sendrequest(1, 2, "foo", True)
errored = False
- except ntp.packet.ControlException:
+ except ctlerr:
errored = True
self.assertEqual(errored, True)
finally:
- ntp.packet.ControlPacket = tempcpkt
- ntp.packet.Authenticator = tempauth
+ ntpp.ControlPacket = tempcpkt
+ ntpp.Authenticator = tempauth
def test_getresponse(self):
logjig = FileJig()
@@ -1016,7 +1116,7 @@ class TestControlSession(unittest.TestCase):
cls.logfp = logjig
cls.sock = sockjig
try:
- ntp.packet.select = fakeselectmod
+ ntpp.select = fakeselectmod
# Test empty
sockjig.return_data = [
"\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"]
@@ -1033,8 +1133,66 @@ class TestControlSession(unittest.TestCase):
cls.sequence = 1
cls.getresponse(1, 3, True)
self.assertEqual(cls.response, "foo=4223,blah=248,x=23,quux=1")
+ # Test MAXFRAGS bail
+ maxtemp = ntpp.MAXFRAGS
+ ntpp.MAXFRAGS = 1
+ sockjig.return_data = [
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
+ "foo=4223,\x00\x00\x00",
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+ "blah=248,x=23,\x00\x00",
+ "\x0E\x81\x00\x01\x00\x02\x00\x03\x00\x17\x00\x06"
+ "quux=1\x00\x00"]
+ cls.sequence = 1
+ try:
+ cls.getresponse(1, 3, True)
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_TOOMUCH)
+ ntpp.MAXFRAGS = maxtemp
+ # Test select fail
+ fakeselectmod.select_fail = 1
+ try:
+ cls.getresponse(1, 2, True)
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_SELECT)
+ # Test no data and timeout
+ fakeselectmod.do_return = [False]
+ try:
+ cls.getresponse(1, 2, True)
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_TIMEOUT)
+ # Test partial data and no timeout
+ sockjig.return_data = [
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
+ "foo=4223,\x00\x00\x00",
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+ "blah=248,x=23,\x00\x00",
+ "\x0E\x81\x00\x01\x00\x02\x00\x03\x00\x17\x00\x06"
+ "quux=1\x00\x00"]
+ fakeselectmod.do_return = [True, False]
+ try:
+ cls.getresponse(1, 2, False)
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_INCOMPLETE)
+ # Test header parse fail
+ sockjig.return_data = [
+ "\x0E\x81\x00\x00\x00\x03"]
+ try:
+ cls.getresponse(1, 2, True)
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_UNSPEC)
finally:
- ntp.packet.select = select
+ ntpp.select = select
def test___validate_packet(self):
logjig = FileJig()
@@ -1044,7 +1202,7 @@ class TestControlSession(unittest.TestCase):
cls.logfp = logjig
# Test good packet, empty data
raw = "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
True)
@@ -1053,7 +1211,7 @@ class TestControlSession(unittest.TestCase):
logjig.data = []
raw = "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09" \
"foo=4223,\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
cls.sequence = 1
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 3),
@@ -1064,7 +1222,7 @@ class TestControlSession(unittest.TestCase):
cls.sequence = 0
logjig.data = []
raw = "\x46\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
False)
@@ -1072,7 +1230,7 @@ class TestControlSession(unittest.TestCase):
# Test bad packet, bad mode
logjig.data = []
raw = "\x0D\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
False)
@@ -1080,7 +1238,7 @@ class TestControlSession(unittest.TestCase):
# Test bad packet, bad response bit
logjig.data = []
raw = "\x0E\x01\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
False)
@@ -1088,7 +1246,7 @@ class TestControlSession(unittest.TestCase):
# Test bad packet, bad sequence
logjig.data = []
raw = "\x0E\x81\x00\x01\x00\x03\x00\x02\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
False)
@@ -1097,7 +1255,7 @@ class TestControlSession(unittest.TestCase):
# Test bad packet, bad opcode
logjig.data = []
raw = "\x0E\x80\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
False)
@@ -1108,24 +1266,24 @@ class TestControlSession(unittest.TestCase):
raw = "\x0E\xC1\x00\x00" + \
chr(ntp.control.CERR_BADVALUE) + \
"\x03\x00\x02\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
try:
cls._ControlSession__validate_packet(pkt, raw, 1, 2)
self.assertEqual(False, True) # it should have errored here
- except ntp.packet.ControlException as e:
+ except ctlerr as e:
self.assertEqual(e.errorcode, ntp.control.CERR_BADVALUE)
self.assertEqual(logjig.data, [])
# Test error packet, with more bit
logjig.data = []
errcs = chr(ntp.control.CERR_BADVALUE)
raw = "\x0E\xE1\x00\x00" + errcs + "\x03\x00\x02\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
try:
cls._ControlSession__validate_packet(pkt, raw, 1, 2)
self.assertEqual(False, True) # it should have errored here
- except ntp.packet.ControlException as e:
+ except ctlerr as e:
self.assertEqual(e.errorcode, ntp.control.CERR_BADVALUE)
errstr = "Error " + str(ntp.control.CERR_BADVALUE) + \
" received on non-final fragment\n"
@@ -1133,7 +1291,7 @@ class TestControlSession(unittest.TestCase):
# Test ok-ish packet, bad associd
logjig.data = []
raw = "\x0E\x81\x00\x00\x00\x03\x00\xFF\x00\x00\x00\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
True)
@@ -1142,7 +1300,7 @@ class TestControlSession(unittest.TestCase):
# Test bad data padding
logjig.data = []
raw = "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x01@"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
self.assertEqual(cls._ControlSession__validate_packet(pkt, raw, 1, 2),
False)
@@ -1151,13 +1309,13 @@ class TestControlSession(unittest.TestCase):
# Test too little data
logjig.data = []
raw = "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x10foo\x00"
- pkt = ntp.packet.ControlPacket(cls)
+ pkt = ntpp.ControlPacket(cls)
pkt.analyze(raw)
try:
cls._ControlSession__validate_packet(pkt, raw, 1, 2)
self.assertEqual(True, False) # should have errored here
- except ntp.packet.ControlException as e:
- self.assertEqual(e.message, ntp.packet.SERR_INCOMPLETE)
+ except ctlerr as e:
+ self.assertEqual(e.message, ntpp.SERR_INCOMPLETE)
self.assertEqual(logjig.data,
["Response fragment claims 16 octets payload, "
"above 4 received\n"])
@@ -1174,14 +1332,22 @@ class TestControlSession(unittest.TestCase):
gets.append((opcode, associd, retry))
if doerror[0]:
doerror[0] = False
- raise ntp.packet.ControlException(ntp.packet.SERR_TIMEOUT)
+ raise ctlerr(ntpp.SERR_TIMEOUT)
return "flax!"
# Init
cls = self.target()
cls.sendrequest = sendrequest_jig
cls.getresponse = getresponse_jig
- cls.sock = True # to fool havehost()
+
+ # Test no socket
+ try:
+ cls.doquery(1, 2, "blah")
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_NOHOST)
# Test no retry
+ cls.sock = True # to fool havehost()
result = cls.doquery(1, 2, "blah")
self.assertEqual(result, "flax!")
self.assertEqual(len(sends), 1)
@@ -1220,7 +1386,7 @@ class TestControlSession(unittest.TestCase):
cls.response = "\xDE\xAD\xF0\x0D"
idlist = cls.readstat()
self.assertEqual(len(idlist), 1)
- self.assertEqual(isinstance(idlist[0], ntp.packet.Peer), True)
+ self.assertEqual(isinstance(idlist[0], ntpp.Peer), True)
self.assertEqual(idlist[0].associd, 0xDEAD)
self.assertEqual(idlist[0].status, 0xF00D)
self.assertEqual(queries, [(ntp.control.CTL_OP_READSTAT,
@@ -1230,7 +1396,7 @@ class TestControlSession(unittest.TestCase):
try:
cls.readstat()
errored = False
- except ntp.packet.ControlException:
+ except ctlerr:
errored = True
self.assertEqual(errored, True)
@@ -1350,6 +1516,15 @@ class TestControlSession(unittest.TestCase):
self.assertEqual(result, False)
self.assertEqual(queries,
[(ntp.control.CTL_OP_CONFIGURE, 0, "Boo!", True)])
+ # Test no response
+ queries = []
+ cls.response = ""
+ try:
+ cls.config("blah")
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_PERMISSION)
def test_fetch_nonce(self):
queries = []
@@ -1373,7 +1548,7 @@ class TestControlSession(unittest.TestCase):
try:
result = cls.fetch_nonce()
errored = False
- except ntp.packet.ControlException:
+ except ctlerr:
errored = True
self.assertEqual(errored, True)
self.assertEqual(filefp.data, ["## Nonce expected: blah blah"])
@@ -1399,9 +1574,15 @@ class TestControlSession(unittest.TestCase):
"mv.1=2,rs.1=3",
"now=0x00000000.00000000"]
query_results = qrm[:] # qrm == query results master
+ query_fail = [0]
+ query_fail_code = []
def doquery_jig(opcode, associd=0, qdata="", auth=False):
queries.append((opcode, associd, qdata, auth))
+ if query_fail[0] > 0:
+ query_fail[0] -= 1
+ code = query_fail_code.pop(0)
+ raise ctlerr("foo", errorcode=code)
if len(query_results) > 0:
setresponse(query_results.pop(0))
logjig = FileJig()
@@ -1425,7 +1606,7 @@ class TestControlSession(unittest.TestCase):
"nonce=foo, frags=32, addr.0=10.20.30.40:23, "
"last.0=42, addr.1=1.2.3.4:23, last.1=41, "
"addr.2=1.2.3.4:23, last.2=40", False)])
- self.assertEqual(isinstance(result, ntp.packet.MRUList), True)
+ self.assertEqual(isinstance(result, ntpp.MRUList), True)
self.assertEqual(len(result.entries), 2)
mru = result.entries[0]
self.assertEqual(mru.addr, "1.2.3.4:23")
@@ -1459,7 +1640,7 @@ class TestControlSession(unittest.TestCase):
"nonce=foo, frags=27, addr.0=10.20.30.40:23, "
"last.0=42, addr.1=1.2.3.4:23, last.1=41, "
"addr.2=1.2.3.4:23, last.2=40", False)])
- self.assertEqual(isinstance(result, ntp.packet.MRUList), True)
+ self.assertEqual(isinstance(result, ntpp.MRUList), True)
self.assertEqual(len(result.entries), 2)
mru = result.entries[0]
self.assertEqual(mru.addr, "10.20.30.40:23")
@@ -1475,6 +1656,170 @@ class TestControlSession(unittest.TestCase):
self.assertEqual(mru.ct, 1)
self.assertEqual(mru.mv, 2)
self.assertEqual(mru.rs, 3)
+ # Test sorter error
+ nonce_fetch_count = [0]
+ try:
+ cls.mrulist(variables={"sort": "foo"})
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_BADSORT % "foo")
+ # Test varbind error
+ nonce_fetch_count = [0]
+ try:
+ cls.mrulist(variables={"foo": 1})
+ errored = False
+ except ctlerr as e:
+ errored = e.message
+ self.assertEqual(errored, ntpp.SERR_BADPARAM % "foo")
+ # Test add to request errors
+ # Test None error
+ nonce_fetch_count = [0]
+ queries = []
+ query_fail = [1]
+ query_fail_code = [None]
+ try:
+ cls.mrulist()
+ errored = False
+ except ctlerr as e:
+ errored = e.errorcode
+ self.assertEqual(errored, None)
+ # Test random error
+ nonce_fetch_count = [0]
+ queries = []
+ query_fail = [1]
+ query_fail_code = ["therdaglib"]
+ try:
+ cls.mrulist()
+ errored = False
+ except ctlerr as e:
+ errored = e.errorcode
+ self.assertEqual(errored, "therdaglib")
+ # Test unknown var error
+ nonce_fetch_count = [0]
+ query_results = qrm[:]
+ queries = []
+ query_fail = [1]
+ query_fail_code = [ntp.control.CERR_UNKNOWNVAR] * 2
+ cls.response = ""
+ result = cls.mrulist()
+ self.assertEqual(nonce_fetch_count, [5])
+ self.assertEqual(queries,
+ [(10, 0, "nonce=foo, frags=32", False),
+ (10, 0, "nonce=foo, frags=32", False),
+ (10, 0,
+ "nonce=foo, frags=32, addr.0=1.2.3.4:23, last.0=40",
+ False),
+ (10, 0,
+ "nonce=foo, frags=32, addr.0=1.2.3.4:23, "
+ "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+ (10, 0,
+ "nonce=foo, frags=32, addr.0=10.20.30.40:23, "
+ "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+ "addr.2=1.2.3.4:23, last.2=40", False)])
+ self.assertEqual(isinstance(result, ntpp.MRUList), True)
+ self.assertEqual(len(result.entries), 2)
+ # Test bad value error
+ nonce_fetch_count = [0]
+ query_results = qrm[:]
+ queries = []
+ query_fail = [2]
+ query_fail_code = [ntp.control.CERR_BADVALUE] * 2
+ cls.response = ""
+ logjig.data = []
+ cls.debug = 1
+ result = cls.mrulist()
+ self.assertEqual(nonce_fetch_count, [6])
+ self.assertEqual(queries,
+ [(10, 0, "nonce=foo, frags=32", False),
+ (10, 0, "nonce=foo, limit=96", False),
+ (10, 0, "nonce=foo, limit=96", False),
+ (10, 0,
+ "nonce=foo, limit=96, addr.0=1.2.3.4:23, last.0=40",
+ False),
+ (10, 0,
+ "nonce=foo, limit=96, addr.0=1.2.3.4:23, "
+ "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+ (10, 0,
+ "nonce=foo, limit=96, addr.0=10.20.30.40:23, "
+ "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+ "addr.2=1.2.3.4:23, last.2=40", False)])
+ self.assertEqual(isinstance(result, ntpp.MRUList), True)
+ self.assertEqual(len(result.entries), 2)
+ self.assertEqual(logjig.data, ["Reverted to row limit from "
+ "fragments limit.\n",
+ "Row limit reduced to 96 following "
+ "CERR_BADVALUE.\n"])
+ # Test incomplete error
+ nonce_fetch_count = [0]
+ query_results = qrm[:]
+ queries = []
+ query_fail = [3]
+ query_fail_code = [ntpp.SERR_INCOMPLETE,
+ ntp.control.CERR_BADVALUE, # Trigger cap_frags
+ ntpp.SERR_INCOMPLETE]
+ cls.response = ""
+ logjig.data = []
+ cls.debug = 1
+ result = cls.mrulist()
+ self.assertEqual(nonce_fetch_count, [7])
+ self.assertEqual(queries,
+ [(10, 0, "nonce=foo, frags=32", False),
+ (10, 0, "nonce=foo, frags=16", False),
+ (10, 0, "nonce=foo, limit=96", False),
+ (10, 0, "nonce=foo, limit=48", False),
+ (10, 0,
+ "nonce=foo, limit=49, addr.0=1.2.3.4:23, last.0=40",
+ False),
+ (10, 0,
+ "nonce=foo, limit=51, addr.0=1.2.3.4:23, "
+ "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+ (10, 0,
+ "nonce=foo, limit=52, addr.0=10.20.30.40:23, "
+ "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+ "addr.2=1.2.3.4:23, last.2=40", False)])
+ self.assertEqual(len(result.entries), 2)
+ self.assertEqual(logjig.data,
+ ["Frag limit reduced to 16 following incomplete "
+ "response.\n",
+ "Reverted to row limit from fragments limit.\n",
+ "Row limit reduced to 48 following incomplete "
+ "response.\n"])
+ # Test timeout error
+ nonce_fetch_count = [0]
+ query_results = qrm[:]
+ queries = []
+ query_fail = [3]
+ query_fail_code = [ntpp.SERR_TIMEOUT,
+ ntp.control.CERR_BADVALUE, # Trigger cap_frags
+ ntpp.SERR_TIMEOUT]
+ cls.response = ""
+ logjig.data = []
+ cls.debug = 1
+ result = cls.mrulist()
+ self.assertEqual(nonce_fetch_count, [7])
+ self.assertEqual(queries,
+ [(10, 0, "nonce=foo, frags=32", False),
+ (10, 0, "nonce=foo, frags=16", False),
+ (10, 0, "nonce=foo, limit=96", False),
+ (10, 0, "nonce=foo, limit=48", False),
+ (10, 0,
+ "nonce=foo, limit=49, addr.0=1.2.3.4:23, last.0=40",
+ False),
+ (10, 0,
+ "nonce=foo, limit=51, addr.0=1.2.3.4:23, "
+ "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+ (10, 0,
+ "nonce=foo, limit=52, addr.0=10.20.30.40:23, "
+ "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+ "addr.2=1.2.3.4:23, last.2=40", False)])
+ self.assertEqual(len(result.entries), 2)
+ self.assertEqual(logjig.data,
+ ["Frag limit reduced to 16 following incomplete "
+ "response.\n",
+ "Reverted to row limit from fragments limit.\n",
+ "Row limit reduced to 48 following incomplete "
+ "response.\n"])
def test___ordlist(self):
queries = []
@@ -1523,7 +1868,7 @@ class TestControlSession(unittest.TestCase):
class TestAuthenticator(unittest.TestCase):
- target = ntp.packet.Authenticator
+ target = ntpp.Authenticator
open_calls = []
open_files = []
open_data = []
@@ -1537,7 +1882,7 @@ class TestAuthenticator(unittest.TestCase):
def test___init__(self):
try:
- ntp.packet.open = self.openjig
+ ntpp.open = self.openjig
# Test without file
cls = self.target()
self.assertEqual(cls.passwords, {})
@@ -1554,7 +1899,7 @@ class TestAuthenticator(unittest.TestCase):
self.assertEqual(self.open_calls, ["file"])
self.assertEqual(len(self.open_files), 1)
finally:
- ntp.packet.open = open
+ ntpp.open = open
self.open_calls = []
self.open_files = []
self.open_data = []
@@ -1581,7 +1926,7 @@ class TestAuthenticator(unittest.TestCase):
self.assertEqual(cls.control(1), (1, None, None))
try:
# Read keyid from /etc/ntp.conf
- ntp.packet.open = self.openjig
+ ntpp.open = self.openjig
self.open_data = ["blah blah", "control 2"]
self.assertEqual(cls.control(), (2, "b", "y"))
# Fail to read keyid from /etc/ntp.conf
@@ -1593,21 +1938,21 @@ class TestAuthenticator(unittest.TestCase):
errored = True
self.assertEqual(errored, True)
finally:
- ntp.packet.open = open
+ ntpp.open = open
def test_compute_mac(self):
f = self.target.compute_mac
try:
- temphash = ntp.packet.hashlib
+ temphash = ntpp.hashlib
fakehashlibmod = HashlibModuleJig()
- ntp.packet.hashlib = fakehashlibmod
+ ntpp.hashlib = fakehashlibmod
# Test no digest
self.assertEqual(f(None, None, None, None), None)
# Test with digest
self.assertEqual(f("foo", 0x42, "bar", "quux"),
"\x00\x00\x00\x42blahblahblahblah")
finally:
- ntp.packet.hashlib = temphash
+ ntpp.hashlib = temphash
def test_have_mac(self):
f = self.target.have_mac
@@ -1624,15 +1969,15 @@ class TestAuthenticator(unittest.TestCase):
good_pkt = "foobar\x00\x00\x00\x23blahblahblahblah"
bad_pkt = "foobar\xDE\xAD\xDE\xAFblahblahblah"
try:
- temphash = ntp.packet.hashlib
+ temphash = ntpp.hashlib
fakehashlibmod = HashlibModuleJig()
- ntp.packet.hashlib = fakehashlibmod
+ ntpp.hashlib = fakehashlibmod
# Test good
self.assertEqual(cls.verify_mac(good_pkt), True)
# Test bad
self.assertEqual(cls.verify_mac(bad_pkt), False)
finally:
- ntp.packet.hashlib = temphash
+ ntpp.hashlib = temphash
if __name__ == "__main__":
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/177743d243584de8fb8d229f4de1f8ced7639cb5...6e5765c2edd810cb4bb22ac21d586226a72c42f7
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/177743d243584de8fb8d229f4de1f8ced7639cb5...6e5765c2edd810cb4bb22ac21d586226a72c42f7
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170823/21cd52dc/attachment.html>
More information about the vc
mailing list