[Git][NTPsec/ntpsec][packettesting] 2 commits: Implement unrestrict with address and no flags to remove a rule.
Ian Bruene
gitlab at mg.gitlab.com
Thu Aug 17 01:12:38 UTC 2017
Ian Bruene pushed to branch packettesting at NTPsec / ntpsec
Commits:
756d982e by Eric S. Raymond at 2017-08-16T21:09:56-04:00
Implement unrestrict with address and no flags to remove a rule.
- - - - -
64fdd108 by Ian Bruene at 2017-08-17T01:11:53+00:00
Added tests for packet.py/ControlSession()
Also a couple tweaks for ControlSession to make it more amenable to tests.
- - - - -
3 changed files:
- ntpd/ntp_config.c
- pylib/packet.py
- + tests/pylib/test_packet.py
Changes:
=====================================
ntpd/ntp_config.c
=====================================
--- a/ntpd/ntp_config.c
+++ b/ntpd/ntp_config.c
@@ -1795,8 +1795,16 @@ config_access(
}
do {
- int op = (my_node->mode == T_Restrict)
- ? RESTRICT_FLAGS : RESTRICT_UNFLAG;
+ int op;
+ if (my_node->mode == T_Restrict)
+ op = RESTRICT_FLAGS;
+ else if (my_node->mode == T_Unrestrict
+ && flags == 0 && mflags == 0)
+ op = RESTRICT_REMOVE;
+ else if (my_node->mode == T_Unrestrict)
+ op = RESTRICT_UNFLAG;
+ else
+ continue; /* should never happen */
hack_restrict(op, &addr,
&mask, mflags, flags, 0);
if (pai != NULL &&
=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -878,7 +878,7 @@ class ControlSession:
# C implementation didn't use multiple responses, so we don't either
(family, socktype, protocol, canonname, sockaddr) = res[0]
if canonname is None:
- self.hostname = sockaddr.inet_ntop(sockaddr[0], family)
+ self.hostname = socket.inet_ntop(sockaddr[0], family)
self.isnum = True
else:
self.hostname = canonname or hname
@@ -1088,7 +1088,7 @@ class ControlSession:
# Check opcode and sequence number for a match.
# Could be old data getting to us.
# =======
- # These had the continues inside a if debug block. Probably
+ # These had the continues inside an if debug block. Probably
# shouldn't have been there, but if there is a problem move
# them back.
if rpkt.sequence != self.sequence:
@@ -1341,7 +1341,7 @@ class ControlSession:
Ask for, and get, a nonce that can be replayed.
This combats source address spoofing
"""
- for i in range(3):
+ for i in range(4):
# retry 4 times
self.doquery(opcode=ntp.control.CTL_OP_REQ_NONCE)
self.nonce_xmit = time.time()
@@ -1351,7 +1351,7 @@ This combats source address spoofing
# uh, oh, no nonce seen
# this print probably never can be seen...
- print("## Nonce expected: %s" % self.response)
+ self.logfp.write("## Nonce expected: %s" % self.response)
raise ControlException(SERR_BADNONCE)
def mrulist(self, variables=None, rawhook=None, direct=None):
@@ -1529,6 +1529,7 @@ This combats source address spoofing
if idx != curidx:
# This makes duplicates
curidx = idx
+
if mru:
# Can't have partial slots on list
# or printing crashes after ^C
=====================================
tests/pylib/test_packet.py
=====================================
--- /dev/null
+++ b/tests/pylib/test_packet.py
@@ -0,0 +1,842 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from __future__ import print_function, division
+
+import unittest
+import ntp.packet
+import ntp.control
+import ntp.util
+import socket
+import select
+import sys
+import getpass
+
+odict = ntp.util.OrderedDict
+
+
+class FileJig:
+ def __init__(self):
+ self.data = []
+ self.flushed = False
+ self.readline_return = ""
+
+ def write(self, data):
+ self.data.append(data)
+ self.flushed = False
+
+ def flush(self):
+ self.flushed = True
+
+ def readline(self):
+ return self.readline_return
+
+
+class SocketJig:
+ def __init__(self):
+ self.data = []
+ self.return_data = []
+ self.closed = False
+ self.connected = None
+ self.fail_connect = False
+
+ def sendall(self, data):
+ self.data.append(data)
+
+ def close(self):
+ self.closed = True
+
+ def connect(self, addr):
+ if self.fail_connect is True:
+ err = socket.error()
+ err.strerror = "socket!"
+ err.errno = 16
+ raise err
+ self.connected = addr
+
+ def recv(self, bytecount):
+ if len(self.return_data) > 0:
+ x = len(self.return_data)
+ current = self.return_data.pop(0)
+ if len(current) > bytecount:
+ ret = current[:bytecount]
+ current = current[bytecount:]
+ self.return_data.insert(0, current) # push unwanted data
+ return ret
+ else:
+ return current
+ return None
+
+
+class ControlPacketJig:
+ HEADER_LEN = ntp.packet.ControlPacket.HEADER_LEN
+ def __init__(self, session, opcode, associd, data):
+ self.session = session
+ self.opcode = opcode
+ self.associd = associd
+ self.extension = data
+ self.sequence = None
+ self.send_call_count = 0
+
+ def send(self):
+ self.send_call_count += 1
+ return self
+
+ def flatten(self):
+ return self.extension
+
+
+class SocketModuleJig:
+ error = socket.error
+ gaierror = socket._socket.gaierror
+ SOCK_DGRAM = socket.SOCK_DGRAM
+ IPPROTO_UDP = socket.IPPROTO_UDP
+ AF_UNSPEC = socket.AF_UNSPEC
+ AI_NUMERICHOST = socket.AI_NUMERICHOST
+ AI_CANONNAME = socket.AI_CANONNAME
+ EAI_NONAME = socket.EAI_NONAME
+ EAI_NODATA = socket.EAI_NODATA
+
+ def __init__(self):
+ self.gai_calls = []
+ self.gai_error_count = 0
+ self.socket_calls = []
+ self.socket_fail = False
+ self.socket_fail_connect = False
+ self.socketsReturned = []
+ self.inet_ntop_calls = []
+
+ def getaddrinfo(self, host, port, family=None, socktype=None,
+ proto=None, flags=None):
+ self.gai_calls.append((host, port, family, socktype, proto, flags))
+ if self.gai_error_count > 0:
+ self.gai_error_count -= 1
+ err = self.gaierror("blah")
+ err.errno = socket.EAI_NONAME
+ raise err
+ return 42
+
+ def socket(self, family, socktype, protocol):
+ self.socket_calls.append((family, socktype, protocol))
+ if self.socket_fail is True:
+ err = self.error()
+ err.strerror = "error!"
+ err.errno = 23
+ raise err
+ sock = SocketJig()
+ if self.socket_fail_connect is True:
+ sock.fail_connect = True
+ self.socketsReturned.append(sock)
+ return sock
+
+ def inet_ntop(self, addr, family):
+ self.inet_ntop_calls.append((addr, family))
+ return "canon.com"
+
+
+class GetpassModuleJig:
+ def __init__(self):
+ self.getpass_calls = []
+
+ def getpass(self, prompt, stream=None):
+ self.getpass_calls.append((prompt, stream))
+ return "xyzzy"
+
+
+class SelectModuleJig:
+ error = select.error
+
+ def __init__(self):
+ self.select_calls = []
+
+ def select(self, ins, outs, excepts, timeout=0):
+ self.select_calls.append((ins, outs, excepts, timeout))
+ return (ins, [], [])
+
+
+class AuthenticatorJig:
+ compute_mac_calls = []
+ def __init__(self):
+ self.control_call_count = 0
+ self.fail_getitem = False
+ self.compute_mac_calls = []
+
+ def __getitem__(self, key):
+ if self.fail_getitem is True:
+ raise IndexError
+ return ("passtype", "pass")
+
+ def control(self):
+ self.control_call_count += 1
+ return (23, "keytype", "miranda")
+
+ @staticmethod
+ def compute_mac(flatpkt, keyid, keytype, passwd):
+ AuthenticatorJig.compute_mac_calls.append((flatpkt, keyid,
+ keytype, passwd))
+ return "mac"
+
+
+class TestControlSession(unittest.TestCase):
+ target = ntp.packet.ControlSession
+ def test___init__(self):
+ # Test
+ cls = self.target()
+ self.assertEqual(cls.debug, 0)
+ self.assertEqual(cls.ai_family, socket.AF_UNSPEC)
+ self.assertEqual(cls.primary_timeout, ntp.packet.DEFTIMEOUT)
+ self.assertEqual(cls.secondary_timeout, ntp.packet.DEFSTIMEOUT)
+ self.assertEqual(cls.pktversion, ntp.magic.NTP_OLDVERSION + 1)
+ self.assertEqual(cls.always_auth, False)
+ self.assertEqual(cls.keytype, "MD5")
+ self.assertEqual(cls.keyid, None)
+ self.assertEqual(cls.passwd, None)
+ self.assertEqual(cls.auth, None)
+ self.assertEqual(cls.hostname, None)
+ self.assertEqual(cls.isnum, False)
+ self.assertEqual(cls.sock, None)
+ self.assertEqual(cls.port, 0)
+ self.assertEqual(cls.sequence, 0)
+ self.assertEqual(cls.response, "")
+ self.assertEqual(cls.rstatus, 0)
+ self.assertEqual(cls.ntpd_row_limit, self.target.MRU_ROW_LIMIT)
+ self.assertEqual(cls.logfp, sys.stdout)
+ self.assertEqual(cls.nonce_xmit, 0)
+
+
+ def test_close(self):
+ # Init
+ sockjig = SocketJig()
+ cls = self.target()
+ cls.sock = sockjig
+ # Test
+ cls.close()
+ self.assertEqual(sockjig.closed, True)
+ self.assertEqual(cls.sock, None)
+
+ def test_havehost(self):
+ # Init
+ cls = self.target()
+ # Test empty
+ self.assertEqual(cls.havehost(), False)
+ # Test full
+ cls.sock = True
+ self.assertEqual(cls.havehost(), True)
+
+ def test___lookuphost(self):
+ logjig = FileJig()
+ try:
+ fakesockmod = SocketModuleJig()
+ ntp.packet.socket = fakesockmod
+ # Init
+ cls = self.target()
+ cls.debug = 3
+ cls.logfp = logjig
+ # Test first type
+ result = cls._ControlSession__lookuphost("blah.com", "family")
+ self.assertEqual(result, 42)
+ self.assertEqual(fakesockmod.gai_calls,
+ [("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP,
+ socket.AI_NUMERICHOST)])
+ self.assertEqual(logjig.data, [])
+ # Test second type
+ logjig.__init__() # reset
+ fakesockmod.__init__()
+ fakesockmod.gai_error_count = 1
+ result = cls._ControlSession__lookuphost("blah.com", "family")
+ self.assertEqual(result, 42)
+ self.assertEqual(logjig.data,
+ ["ntpq: numeric-mode lookup of blah.com "
+ "failed, None\n"])
+ self.assertEqual(fakesockmod.gai_calls,
+ [("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP,
+ socket.AI_NUMERICHOST),
+ ("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0)])
+ # Test third type
+ logjig.__init__() # reset
+ fakesockmod.__init__()
+ fakesockmod.gai_error_count = 2
+ result = cls._ControlSession__lookuphost("blah.com", "family")
+ self.assertEqual(result, 42)
+ self.assertEqual(logjig.data,
+ ["ntpq: numeric-mode lookup of blah.com "
+ "failed, None\n",
+ "ntpq: standard-mode lookup of blah.com "
+ "failed, None\n"])
+ self.assertEqual(fakesockmod.gai_calls,
+ [("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP,
+ socket.AI_NUMERICHOST),
+ ("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0),
+ ("blah.com", "ntp", cls.ai_family,
+ socket.SOCK_DGRAM, socket.IPPROTO_UDP, 0)])
+ finally:
+ ntp.packet.socket = socket
+
+ def test_openhost(self):
+ lookups = []
+ returnNothing = True
+ noCanon = False
+ def lookup_jig(hostname, family):
+ lookups.append((hostname, family))
+ if returnNothing is True:
+ return None
+ elif noCanon is True:
+ return [("family", "socktype", "protocol", None,
+ ("1.2.3.4", 80)),]
+ else:
+ return [("family", "socktype", "protocol", "canon",
+ ("1.2.3.4", 80)),]
+ logjig = FileJig()
+ try:
+ fakesockmod = SocketModuleJig()
+ ntp.packet.socket = fakesockmod
+ # Init
+ cls = self.target()
+ cls.debug = 3
+ cls.logfp = logjig
+ cls._ControlSession__lookuphost = lookup_jig
+ # Test, lookup failure
+ result = cls.openhost("foo.org")
+ self.assertEqual(result, False)
+ self.assertEqual(lookups, [("foo.org", socket.AF_UNSPEC)])
+ # Test, with canon, and success
+ returnNothing = False
+ lookups = []
+ result = cls.openhost("foo.org")
+ self.assertEqual(result, True)
+ self.assertEqual(logjig.data, ["Opening host canon\n"])
+ self.assertEqual(lookups, [("foo.org", socket.AF_UNSPEC)])
+ self.assertEqual(fakesockmod.socket_calls,
+ [("family", "socktype", "protocol")])
+ sock = fakesockmod.socketsReturned[0]
+ self.assertEqual(sock.connected, ("1.2.3.4", 80))
+ self.assertEqual(cls.hostname, "canon")
+ self.assertEqual(cls.isnum, False)
+ self.assertEqual(cls.port, 80)
+ self.assertEqual(cls.sock, sock)
+ # Test, without canon, and success
+ noCanon = True
+ lookups = []
+ logjig.__init__()
+ fakesockmod.__init__()
+ cls.sock = None
+ result = cls.openhost("foo.org")
+ self.assertEqual(result, True)
+ self.assertEqual(logjig.data, ["Opening host canon.com\n"])
+ self.assertEqual(lookups, [("foo.org", socket.AF_UNSPEC)])
+ self.assertEqual(fakesockmod.socket_calls,
+ [("family", "socktype", "protocol")])
+ sock = fakesockmod.socketsReturned[0]
+ self.assertEqual(sock.connected, ("1.2.3.4", 80))
+ self.assertEqual(cls.hostname, "canon.com")
+ self.assertEqual(cls.isnum, True)
+ self.assertEqual(cls.port, 80)
+ self.assertEqual(cls.sock, sock)
+ # Test, with canon, and socket creation failure
+ noCanon = False
+ cls.sock = None
+ fakesockmod.socket_fail = True
+ try:
+ result = cls.openhost("foo.org")
+ errored = False
+ except ntp.packet.ControlException as e:
+ errored = e
+ self.assertEqual(errored.message,
+ "Error opening foo.org: error! [23]")
+ # Test, with canon, and socket connection failure
+ fakesockmod.socket_fail = False
+ fakesockmod.socket_fail_connect = True
+ cls.sock = None
+ try:
+ result = cls.openhost("foo.org")
+ errored = False
+ except ntp.packet.ControlException as e:
+ errored = e
+ self.assertEqual(errored.message,
+ "Error connecting to foo.org: socket! [16]")
+ finally:
+ ntp.packet.socket = socket
+
+ def test_password(self):
+ iojig = FileJig()
+ fakegetpmod = GetpassModuleJig()
+ # Init
+ cls = self.target()
+ try:
+ tempauth = ntp.packet.Authenticator()
+ ntp.packet.Authenticator = AuthenticatorJig
+ ntp.packet.getpass = fakegetpmod
+ tempstdin = sys.stdin
+ sys.stdin = iojig
+ tempstdout = sys.stdout
+ sys.stdout = iojig
+ # Test, with nothing
+ iojig.readline_return = "1\n"
+ cls.password()
+ self.assertEqual(isinstance(cls.auth, AuthenticatorJig), True)
+ self.assertEqual(cls.keyid, 1)
+ self.assertEqual(cls.passwd, "pass")
+ # Test, with auth and localhost
+ cls.keyid = None
+ cls.passwd = None
+ cls.hostname = "localhost"
+ cls.password()
+ self.assertEqual(cls.keyid, 23)
+ self.assertEqual(cls.keytype, "keytype")
+ self.assertEqual(cls.passwd, "miranda")
+ # Test, with all but password
+ cls.passwd = None
+ cls.auth.fail_getitem = True
+ cls.password()
+ self.assertEqual(fakegetpmod.getpass_calls,
+ [("keytype Password: ", None)])
+ self.assertEqual(cls.passwd, "xyzzy")
+ finally:
+ ntp.packet.Authenticator = tempauth
+ ntp.packet.getpass = getpass
+ sys.stdin = tempstdin
+ sys.stdout = tempstdout
+
+ def test_sendpkt(self):
+ logjig = FileJig()
+ sockjig = SocketJig()
+
+ # Init
+ cls = self.target()
+ cls.logfp = logjig
+ cls.sock = sockjig
+ cls.debug = 3
+ # Test
+ res = cls.sendpkt("blahfoo")
+ self.assertEqual(res, 0)
+ self.assertEqual(len(logjig.data), 1)
+ self.assertEqual(logjig.data[0], "Sending 8 octets. seq=0\n")
+ self.assertEqual(len(sockjig.data), 1)
+ self.assertEqual(sockjig.data[0], "blahfoo\x00")
+
+ def test_sendrequest(self):
+ logjig = FileJig()
+ try:
+ tempcpkt = ntp.packet.ControlPacket
+ ntp.packet.ControlPacket = ControlPacketJig
+ tempauth = ntp.packet.Authenticator
+ ntp.packet.Authenticator = AuthenticatorJig
+ cls = self.target()
+ cls.logfp = logjig
+ cls.debug = 3
+ # Test oversize data
+ datalen = ntp.control.CTL_MAX_DATA_LEN + 1
+ data = "a" * datalen
+ result = cls.sendrequest(1, 2, data)
+ self.assertEqual(result, -1)
+ self.assertEqual(logjig.data,
+ ["\n",
+ "sendrequest: opcode=1, associd=2, "
+ "qdata=" + data + "\n",
+ "***Internal error! Data too large "
+ "(" + str(len(data)) + ")\n"])
+ # Test no auth
+ result = cls.sendrequest(1, 2, "foo")
+ self.assertEqual(result.sequence, 1)
+ self.assertEqual(result.extension, "foo\x00")
+ # Test with auth
+ cls.keyid = 1
+ cls.passwd = "qwerty"
+ result = cls.sendrequest(1, 2, "foo", True)
+ self.assertEqual(result.sequence, 2)
+ self.assertEqual(result.extension, "foo\x00mac")
+ # Test with auth keyid / password failure
+ cls.keyid = None
+ try:
+ cls.sendrequest(1, 2, "foo", True)
+ errored = False
+ except ntp.packet.ControlException:
+ errored = True
+ self.assertEqual(errored, True)
+ finally:
+ ntp.packet.ControlPacket = tempcpkt
+ ntp.packet.Authenticator = tempauth
+
+ def test_getresponse(self):
+ logjig = FileJig()
+ sockjig = SocketJig()
+ fakeselectmod = SelectModuleJig()
+ # Init
+ cls = self.target()
+ cls.debug = 3
+ cls.logfp = logjig
+ cls.sock = sockjig
+ try:
+ ntp.packet.select = fakeselectmod
+ # Test empty
+ sockjig.return_data = [
+ "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"]
+ cls.getresponse(1, 2, True)
+ self.assertEqual(cls.response, "")
+ # Test with data
+ sockjig.return_data = [
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
+ "foo=4223,\x00\x00\x00",
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+ "blah=248,x=23,\x00\x00",
+ "\x0E\x81\x00\x01\x00\x02\x00\x03\x00\x17\x00\x06"
+ "quux=1\x00\x00"]
+ cls.sequence = 1
+ cls.getresponse(1, 3, True)
+ self.assertEqual(cls.response, "foo=4223,blah=248,x=23,quux=1")
+ finally:
+ ntp.packet.select = select
+
+ def test_doquery(self):
+ sends = []
+ def sendrequest_jig(opcode, associd, qdata, auth):
+ sends.append((opcode, associd, qdata, auth))
+ gets = []
+ doerror = [False]
+ def getresponse_jig(opcode, associd, retry):
+ gets.append((opcode, associd, retry))
+ if doerror[0]:
+ doerror[0] = False
+ raise ntp.packet.ControlException(ntp.packet.SERR_TIMEOUT)
+ return "flax!"
+ # Init
+ cls = self.target()
+ cls.sendrequest = sendrequest_jig
+ cls.getresponse = getresponse_jig
+ cls.sock = True # to fool havehost()
+ # Test no retry
+ result = cls.doquery(1, 2, "blah")
+ self.assertEqual(result, "flax!")
+ self.assertEqual(len(sends), 1)
+ self.assertEqual(sends[0], (1, 2, "blah", False))
+ self.assertEqual(len(gets), 1)
+ self.assertEqual(gets[0], (1, 2, False))
+ # Reset
+ sends = []
+ gets = []
+ doerror[0] = True
+ # Test retry
+ result = cls.doquery(1, 2, "quux")
+ self.assertEqual(result, "flax!")
+ self.assertEqual(len(sends), 2)
+ self.assertEqual(sends, [(1, 2, "quux", False),
+ (1, 2, "quux", False)])
+ self.assertEqual(len(gets), 2)
+ self.assertEqual(gets, [(1, 2, False), (1, 2, True)])
+
+ def test_readstat(self):
+ # Init
+ queries = []
+ def doquery_jig(opcode, associd=0, qdata="", auth=False):
+ queries.append((opcode, associd, qdata, auth))
+ cls = self.target()
+ cls.doquery = doquery_jig
+ # Test empty
+ cls.response = ""
+ idlist = cls.readstat(42)
+ self.assertEqual(idlist, [])
+ self.assertEqual(queries, [(ntp.control.CTL_OP_READSTAT,
+ 42, "", False)])
+ # Test normal
+ queries = []
+ cls.response = "\xDE\xAD\xF0\x0D"
+ idlist = cls.readstat()
+ self.assertEqual(len(idlist), 1)
+ self.assertEqual(isinstance(idlist[0], ntp.packet.Peer), True)
+ self.assertEqual(idlist[0].associd, 0xDEAD)
+ self.assertEqual(idlist[0].status, 0xF00D)
+ self.assertEqual(queries, [(ntp.control.CTL_OP_READSTAT,
+ 0, "", False)])
+ # Test incorrect response
+ cls.response = "foo"
+ try:
+ cls.readstat()
+ errored = False
+ except ntp.packet.ControlException as e:
+ errored = True
+ self.assertEqual(errored, True)
+
+ def test___parse_varlist(self):
+ # Init
+ cls = self.target()
+ cls.response = 'srcadr=0.0.0.0, srcport=0, srchost="0.ubuntu.pool.ntp.org",\r\ndstadr=0.0.0.0, dstport=0, leap=3, stratum=16, precision=-22,\r\nrootdelay=0.000, rootdisp=0.000, refid=POOL,\r\nreftime=0x00000000.00000000, rec=0x00000000.00000000, reach=0x0,\r\nunreach=0, hmode=3, pmode=0, hpoll=6, ppoll=10, headway=0, flash=0x1600,\r\nkeyid=0, offset=0.000, delay=0.000, dispersion=16000.000, jitter=0.000,\r\nfiltdelay= 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00,\r\nfiltoffset= 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00,\r\nfiltdisp= 16000.00 16000.00 16000.00 16000.00 16000.00 16000.00 16000.00 16000.00\r\n'
+ # Test with basic packet
+ self.assertEqual(cls._ControlSession__parse_varlist(),
+ odict((("srcadr", "0.0.0.0"), ("srcport", 0),
+ ("srchost", "0.ubuntu.pool.ntp.org"),
+ ("dstadr", "0.0.0.0"), ("dstport", 0),
+ ("leap", 3), ("stratum", 16),
+ ("precision", -22), ("rootdelay", 0.0),
+ ("rootdisp", 0.0), ("refid", "POOL"),
+ ("reftime", "0x00000000.00000000"),
+ ("rec", "0x00000000.00000000"),
+ ("reach", 0), ("unreach", 0), ("hmode", 3),
+ ("pmode", 0), ("hpoll", 6), ("ppoll", 10),
+ ("headway", 0), ("flash", 5632), ("keyid", 0),
+ ("offset", 0.0), ("delay-s", "0.000"),
+ ("delay", 0.0), ("dispersion", 16000.0),
+ ("jitter", 0.0),
+ ("filtdelay",
+ "0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00"),
+ ("filtoffset",
+ "0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00"),
+ ("filtdisp",
+ "16000.00 16000.00 16000.00 16000.00 "
+ "16000.00 16000.00 16000.00 16000.00"))))
+ # Test with basic packet, raw mode
+ self.assertEqual(cls._ControlSession__parse_varlist(raw=True),
+ odict((("srcadr", ("0.0.0.0", "0.0.0.0")),
+ ("srcport", (0, "0")),
+ ("srchost", ("0.ubuntu.pool.ntp.org",
+ "0.ubuntu.pool.ntp.org")),
+ ("dstadr", ("0.0.0.0", "0.0.0.0")),
+ ("dstport", (0, "0")),
+ ("leap", (3, "3")), ("stratum", (16, "16")),
+ ("precision", (-22, "-22")),
+ ("rootdelay", (0.0, "0.000")),
+ ("rootdisp", (0.0, "0.000")),
+ ("refid", ("POOL", "POOL")),
+ ("reftime", ("0x00000000.00000000",
+ "0x00000000.00000000")),
+ ("rec", ("0x00000000.00000000",
+ "0x00000000.00000000")),
+ ("reach", (0, "0x0")), ("unreach", (0, "0")),
+ ("hmode", (3, "3")), ("pmode", (0, "0")),
+ ("hpoll", (6, "6")), ("ppoll", (10, "10")),
+ ("headway", (0, "0")),
+ ("flash", (5632, "0x1600")),
+ ("keyid", (0, "0")),
+ ("offset", (0.0, "0.000")),
+ ("delay", (0.0, "0.000")),
+ ("dispersion", (16000.0, "16000.000")),
+ ("jitter", (0.0, "0.000")),
+ ("filtdelay",
+ ("0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00",
+ "0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00")),
+ ("filtoffset",
+ ("0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00",
+ "0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00")),
+ ("filtdisp",
+ ("16000.00 16000.00 16000.00 16000.00 "
+ "16000.00 16000.00 16000.00 16000.00",
+ "16000.00 16000.00 16000.00 16000.00 "
+ "16000.00 16000.00 16000.00 16000.00")))))
+
+ def test_readvar(self):
+ queries = []
+ def doquery_jig(opcode, associd=0, qdata="", auth=False):
+ queries.append((opcode, associd, qdata, auth))
+ # Init
+ cls = self.target()
+ cls.doquery = doquery_jig
+ cls.response = "foo=bar, murphy=42"
+ # Test basic
+ result = cls.readvar()
+ self.assertEqual(result, odict((("foo", "bar"), ("murphy", 42))))
+ self.assertEqual(queries,
+ [(ntp.control.CTL_OP_READVAR, 0, "", False)])
+ # Test raw
+ queries = []
+ result = cls.readvar(raw=True)
+ self.assertEqual(result, odict((("foo", ("bar", "bar")),
+ ("murphy", (42, "42")))))
+ self.assertEqual(queries,
+ [(ntp.control.CTL_OP_READVAR, 0, "", False)])
+ # Test with varlist
+ queries = []
+ result = cls.readvar(varlist=("foo", "bar", "quux"))
+ self.assertEqual(result, odict((("foo", "bar"), ("murphy", 42))))
+ self.assertEqual(queries,
+ [(ntp.control.CTL_OP_READVAR,
+ 0, "foo,bar,quux", False)])
+
+ def test_config(self):
+ queries = []
+ def doquery_jig(opcode, associd=0, qdata="", auth=False):
+ queries.append((opcode, associd, qdata, auth))
+ # Init
+ cls = self.target()
+ cls.doquery = doquery_jig
+ cls.response = "Config Succeeded \n \x00 blah blah"
+ # Test success
+ result = cls.config("Boo!")
+ self.assertEqual(result, True)
+ self.assertEqual(queries,
+ [(ntp.control.CTL_OP_CONFIGURE, 0, "Boo!", True)])
+ # Test failure
+ queries = []
+ cls.response = "whatever man..."
+ result = cls.config("Boo!")
+ self.assertEqual(result, False)
+ self.assertEqual(queries,
+ [(ntp.control.CTL_OP_CONFIGURE, 0, "Boo!", True)])
+
+ def test_fetch_nonce(self):
+ queries = []
+ def doquery_jig(opcode, associd=0, qdata="", auth=False):
+ queries.append((opcode, associd, qdata, auth))
+ # Init
+ filefp = FileJig()
+ cls = self.target()
+ cls.doquery = doquery_jig
+ # Test success
+ cls.response = "nonce=blah blah "
+ result = cls.fetch_nonce()
+ self.assertEqual(result, "nonce=blah blah")
+ self.assertEqual(queries,
+ [(ntp.control.CTL_OP_REQ_NONCE, 0, "", False)])
+ # Test failure
+ queries = []
+ cls.logfp = filefp
+ cls.response = "blah blah"
+ try:
+ result = cls.fetch_nonce()
+ errored = False
+ except ntp.packet.ControlException:
+ errored = True
+ self.assertEqual(errored, True)
+ self.assertEqual(filefp.data, ["## Nonce expected: blah blah"])
+ self.assertEqual(queries,
+ [(ntp.control.CTL_OP_REQ_NONCE, 0, "", False),
+ (ntp.control.CTL_OP_REQ_NONCE, 0, "", False),
+ (ntp.control.CTL_OP_REQ_NONCE, 0, "", False),
+ (ntp.control.CTL_OP_REQ_NONCE, 0, "", False)])
+
+ def test_mrulist(self):
+ def setresponse(data): # needed for doquery_jig
+ cls.response = data
+ nonce_fetch_count = [0]
+ def fetch_nonce_jig():
+ nonce_fetch_count[0] += 1
+ return "nonce=foo"
+ queries = []
+ qrm = ["addr.1=1.2.3.4:23,last.1=40,first.1=23,ct.1=1,mv.1=2,rs.1=3",
+ "addr.2=1.2.3.4:23,last.2=41,first.2=23,ct.2=1,mv.2=2,rs.2=3",
+ "addr.1=10.20.30.40:23,last.1=42,first.1=23,ct.1=1,mv.1=2,rs.1=3",
+ "now=0x00000000.00000000"]
+ query_results = qrm[:] # qrm == query results master
+ def doquery_jig(opcode, associd=0, qdata="", auth=False):
+ queries.append((opcode, associd, qdata, auth))
+ if len(query_results) > 0:
+ setresponse(query_results.pop(0))
+ logjig = FileJig()
+ # Init
+ cls = self.target()
+ cls.fetch_nonce = fetch_nonce_jig
+ cls.doquery = doquery_jig
+ cls.logfp = logjig
+ # Test empty varlist
+ result = cls.mrulist()
+ self.assertEqual(nonce_fetch_count, [4])
+ self.assertEqual(queries,
+ [(10, 0, "nonce=foo, frags=32", False),
+ (10, 0,
+ "nonce=foo, frags=32, addr.0=1.2.3.4:23, last.0=40",
+ False),
+ (10, 0,
+ "nonce=foo, frags=32, addr.0=1.2.3.4:23, last.0=41, "
+ "addr.1=1.2.3.4:23, last.1=40", False),
+ (10, 0,
+ "nonce=foo, frags=32, addr.0=10.20.30.40:23, "
+ "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+ "addr.2=1.2.3.4:23, last.2=40", False)])
+ self.assertEqual(isinstance(result, ntp.packet.MRUList), True)
+ self.assertEqual(len(result.entries), 2)
+ mru = result.entries[0]
+ self.assertEqual(mru.addr, "1.2.3.4:23")
+ self.assertEqual(mru.last, 41)
+ self.assertEqual(mru.first, 23)
+ self.assertEqual(mru.ct, 1)
+ self.assertEqual(mru.mv, 2)
+ self.assertEqual(mru.rs, 3)
+ mru = result.entries[1]
+ self.assertEqual(mru.addr, "10.20.30.40:23")
+ self.assertEqual(mru.last, 42)
+ self.assertEqual(mru.first, 23)
+ self.assertEqual(mru.ct, 1)
+ self.assertEqual(mru.mv, 2)
+ self.assertEqual(mru.rs, 3)
+ # Test with sort and frags (also test frag increment)
+ nonce_fetch_count = [0]
+ query_results = qrm[:]
+ queries = []
+ result = cls.mrulist(variables={"sort":"addr", "frags":24})
+ self.assertEqual(nonce_fetch_count, [4])
+ self.assertEqual(queries,
+ [(10, 0, "nonce=foo, frags=24", False),
+ (10, 0,
+ "nonce=foo, frags=25, addr.0=1.2.3.4:23, last.0=40",
+ False),
+ (10, 0,
+ "nonce=foo, frags=26, addr.0=1.2.3.4:23, last.0=41, "
+ "addr.1=1.2.3.4:23, last.1=40", False),
+ (10, 0,
+ "nonce=foo, frags=27, addr.0=10.20.30.40:23, "
+ "last.0=42, addr.1=1.2.3.4:23, last.1=41, "
+ "addr.2=1.2.3.4:23, last.2=40", False)])
+ self.assertEqual(isinstance(result, ntp.packet.MRUList), True)
+ self.assertEqual(len(result.entries), 2)
+ mru = result.entries[0]
+ self.assertEqual(mru.addr, "10.20.30.40:23")
+ self.assertEqual(mru.last, 42)
+ self.assertEqual(mru.first, 23)
+ self.assertEqual(mru.ct, 1)
+ self.assertEqual(mru.mv, 2)
+ self.assertEqual(mru.rs, 3)
+ mru = result.entries[1]
+ self.assertEqual(mru.addr, "1.2.3.4:23")
+ self.assertEqual(mru.last, 41)
+ self.assertEqual(mru.first, 23)
+ self.assertEqual(mru.ct, 1)
+ self.assertEqual(mru.mv, 2)
+ self.assertEqual(mru.rs, 3)
+
+ def test___ordlist(self):
+ queries = []
+ def doquery_jig(opcode, associd=0, qdata="", auth=False):
+ queries.append((opcode, associd, qdata, auth))
+ # Init
+ cls = self.target()
+ cls.doquery = doquery_jig
+ # Test
+ cls.response = "foo.0=42, bar.2=songbird"
+ result = cls._ControlSession__ordlist("blah")
+ self.assertEqual(result, [odict((("foo", 42),)), odict(),
+ odict((("bar", "songbird"),))])
+ self.assertEqual(queries,
+ [(ntp.control.CTL_OP_READ_ORDLIST_A,
+ 0, "blah", True)])
+
+ def test_reslist(self):
+ ords = []
+ def ordlist_jig(listtype):
+ ords.append(listtype)
+ return 23
+ # Init
+ cls = self.target()
+ cls._ControlSession__ordlist = ordlist_jig
+ # Test
+ result = cls.reslist()
+ self.assertEqual(result, 23)
+ self.assertEqual(ords, ["addr_restrictions"])
+
+ def test_ifstats(self):
+ ords = []
+ def ordlist_jig(listtype):
+ ords.append(listtype)
+ return 23
+ # Init
+ cls = self.target()
+ cls._ControlSession__ordlist = ordlist_jig
+ # Test
+ result = cls.ifstats()
+ self.assertEqual(result, 23)
+ self.assertEqual(ords, ["ifstats"])
+
+if __name__ == "__main__":
+ unittest.main()
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/e6c477d01f4795d4fac476e22202d4b70455a6f8...64fdd108bfe7896c54ad7c1d31703876f3596979
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/e6c477d01f4795d4fac476e22202d4b70455a6f8...64fdd108bfe7896c54ad7c1d31703876f3596979
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170817/9a2d94a3/attachment.html>
More information about the vc
mailing list