[Git][NTPsec/ntpsec][master] 3 commits: Moved a few DUMMY / TODO labels around for better grepping.
Ian Bruene
gitlab at mg.gitlab.com
Wed Jan 10 19:24:14 UTC 2018
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
d3066ace by Ian Bruene at 2018-01-09T10:47:46-06:00
Moved a few DUMMY / TODO labels around for better grepping.
- - - - -
77c6dfd0 by Ian Bruene at 2018-01-09T10:48:16-06:00
Increased agentx.py coverage to 97%. (does not include py3 shims)
- - - - -
52c4a02c by Ian Bruene at 2018-01-10T13:22:46-06:00
Increased coverage/explicit ignoring of packet.py tests to 100%.
- - - - -
5 changed files:
- ntpclients/ntpsnmpd.py
- pylib/agentx.py
- pylib/packet.py
- tests/pylib/test_agentx.py
- tests/pylib/test_packet.py
Changes:
=====================================
ntpclients/ntpsnmpd.py
=====================================
--- a/ntpclients/ntpsnmpd.py
+++ b/ntpclients/ntpsnmpd.py
@@ -543,6 +543,7 @@ class DataSource: # This will be broken up in future to be less NTP-specific
def cbr_entNotifMessage(self, oid): # DUMMY
# utf8str
+ # Should probably either return null, or remove this handler.
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "jabber")
#########################
@@ -911,8 +912,8 @@ class DataSource: # This will be broken up in future to be less NTP-specific
control.sendNotify(vl)
self.sentNotifications += 1
- def doNotifyConfigChange(self, control):
- pass # DUMMY
+ def doNotifyConfigChange(self, control): # DUMMY
+ pass
def doNotifyLeapSecondAnnounced(self, control):
oldLeap = self.oldValues["leap"]
@@ -992,8 +993,7 @@ class DataSource: # This will be broken up in future to be less NTP-specific
return (adds, rms)
return
- def misc_getMode(self):
- # DUMMY: not fully implemented
+ def misc_getMode(self): # DUMMY: not fully implemented
try:
# Don't care about the data, this is a ploy to the the rstatus
self.session.readvar(0, ["stratum"])
=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -188,8 +188,11 @@ class AgentXPDU:
return False
if self.packetID != other.packetID:
return False
- if self.context != other.context:
+ if self._hascontext != other._hascontext:
return False
+ if self._hascontext is True:
+ if self.context != other.context:
+ return False
return True
def __ne__(self, other):
@@ -313,7 +316,7 @@ class RegisterPDU(AgentXPDU):
def __eq__(self, other):
if AgentXPDU.__eq__(self, other) is not True:
return False
- if hasattr(self, "timeout"):
+ if hasattr(self, "timeout"): # Relevant to UnregisterPDU subclass
if self.timeout != other.timeout:
return False
if self.priority != other.priority:
@@ -1122,9 +1125,7 @@ class SearchRange:
return not self.__eq__(other)
def __repr__(self):
- r = "SearchRange("
- r += repr(self.start) + ", "
- r += repr(self.end) + ")"
+ r = "SearchRange(%s, %s)" % (repr(self.start), repr(self.end))
return r
def sanity(self):
=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -220,7 +220,8 @@ import ntp.util
master_encoding = 'latin-1'
-if str is bytes: # Python 2
+if str is bytes: # pragma: no cover
+ # Python 2
polystr = str
polybytes = bytes
polyord = ord
@@ -233,7 +234,8 @@ if str is bytes: # Python 2
def make_wrapper(fp):
return fp
-else: # Python 3
+else: # pragma: nocover
+ # Python 3
import io
def polystr(o):
@@ -359,8 +361,7 @@ class Packet:
return self.li_vn_mode & 0x7
-class SyncException(BaseException):
-
+class SyncException(BaseException): # pragma: no cover
def __init__(self, message, errorcode=0):
self.message = message
self.errorcode = errorcode
@@ -854,7 +855,7 @@ class ControlSession:
if self.logfp is not None:
self.logfp.write("ntpq: ndp lookup failed, %s\n"
% e2.strerror)
- except AttributeError:
+ except AttributeError: # pragma: no cover
if self.logfp is not None:
self.logfp.write(
"ntpq: API error, missing socket attributes\n")
@@ -911,7 +912,7 @@ class ControlSession:
key_id = 0
if key_id == 0 or key_id > MAX_KEYID:
raise ControlException(SERR_BADKEY)
- except (SyntaxError, ValueError):
+ except (SyntaxError, ValueError): # pragma: no cover
raise ControlException(SERR_BADKEY)
self.keyid = key_id
@@ -938,7 +939,7 @@ class ControlSession:
if self.logfp is not None:
self.logfp.write("Write to %s failed\n" % self.hostname)
return -1
- if (self.debug >= 5) and (self.logfp is not None):
+ if (self.debug >= 5) and (self.logfp is not None): # pragma: no cover
# special, not replacing with dolog()
self.logfp.write("Request packet:\n")
dump_hex_printable(xdata, self.logfp)
@@ -1047,7 +1048,8 @@ class ControlSession:
if timeo:
raise ControlException(SERR_TIMEOUT)
if timeo:
- if (self.debug >= 1) and (self.logfp is not None):
+ if (self.debug >= 1) and \
+ (self.logfp is not None): # pragma: no cover
# special, not replacing with dolog()
self.logfp.write(
"ERR_INCOMPLETE: Received fragments:\n")
@@ -1060,7 +1062,7 @@ class ControlSession:
warndbg("At %s, socket read begins\n" % time.asctime(), 4)
try:
rawdata = polybytes(self.sock.recv(4096))
- except socket.error:
+ except socket.error: # pragma: no cover
# usually, errno 111: connection refused
raise ControlException(SERR_SOCKET)
@@ -1073,7 +1075,7 @@ class ControlSession:
# Validate that packet header is sane, and the correct type
valid = self.__validate_packet(rpkt, rawdata, opcode, associd)
- if valid is False:
+ if valid is False: # pragma: no cover
continue
# Someday, perhaps, check authentication here
@@ -1085,7 +1087,9 @@ class ControlSession:
warn("Received count of 0 in non-final fragment\n")
continue
- if seenlastfrag and rpkt.more():
+ if seenlastfrag and rpkt.more(): # pragma: no cover
+ # I'n not sure this can be triggered without hitting another
+ # error first.
warn("Received second last fragment\n")
continue
@@ -1144,13 +1148,14 @@ class ControlSession:
warndbg("Fragment collection ends. %d bytes "
" in %d fragments\n"
% (len(self.response), len(fragments)), 1)
- if self.debug >= 5: # special, not replacing with dolog()
+ # special loggers, not replacing with dolog()
+ if self.debug >= 5: # pragma: no cover
warn("Response packet:\n")
dump_hex_printable(self.response, self.logfp)
- elif self.debug >= 3:
+ elif self.debug >= 3: # pragma: no cover
# FIXME: Garbage when retrieving assoc list (binary)
warn("Response packet:\n%s\n" % repr(self.response))
- elif self.debug >= 2:
+ elif self.debug >= 2: # pragma: no cover
# FIXME: Garbage when retrieving assoc list (binary)
eol = self.response.find("\n")
firstline = self.response[:eol]
@@ -1591,7 +1596,7 @@ This combats source address spoofing
req_buf += incr
if direct is not None:
span.entries = []
- except KeyboardInterrupt:
+ except KeyboardInterrupt: # pragma: no cover
pass # We can test for interruption with is_complete()
# C ntpq's code for stitching together spans was absurdly
=====================================
tests/pylib/test_agentx.py
=====================================
--- a/tests/pylib/test_agentx.py
+++ b/tests/pylib/test_agentx.py
@@ -71,14 +71,15 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# PDU tests
#
def test_AgentXPDU(self):
+ cls = ntp.agentx.AgentXPDU
# Test these so we don't need a bunch of redundant tests
# Test basic, without context
- test = ntp.agentx.AgentXPDU(0, True, 1, 2, 3, context=extraData)
+ test = cls(0, True, 1, 2, 3, context=extraData)
self.assertEqual(repr(test),
"AgentXPDU(bigEndian=True, packetID=3, "
"pduType=0, sessionID=1, transactionID=2)")
# Test basic, with context
- test = ntp.agentx.AgentXPDU(0, True, 1, 2, 3, hascontext=True)
+ test = cls(0, True, 1, 2, 3, hascontext=True)
self.assertEqual(repr(test),
"AgentXPDU(bigEndian=True, context=None, packetID=3, "
"pduType=0, sessionID=1, transactionID=2)")
@@ -91,6 +92,39 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"context='jabber jabber jabber', "
"foo=42, packetID=3, "
"pduType=0, sessionID=1, transactionID=2)")
+ # Test __eq__
+ a = cls(0, True, 1, 2, 3)
+ b = cls(0, True, 1, 2, 3)
+ # Test all equal
+ self.assertEqual(a == b, True)
+ # Test same class, false
+ b = "blah blah"
+ self.assertEqual(a == b, False)
+ # Test different pdu
+ b = cls(1, True, 1, 2, 3)
+ self.assertEqual(a == b, False)
+ # Test different endianess
+ b = cls(0, False, 1, 2, 3)
+ self.assertEqual(a == b, False)
+ # Test different session ID
+ b = cls(0, True, 42, 2, 3)
+ self.assertEqual(a == b, False)
+ # Test different transaction ID
+ b = cls(0, True, 1, 23, 3)
+ self.assertEqual(a == b, False)
+ # Test different packet ID
+ b = cls(0, True, 1, 2, 13)
+ self.assertEqual(a == b, False)
+ # Test different _hascontext
+ b = cls(0, True, 1, 2, 3, hascontext=True)
+ self.assertEqual(a == b, False)
+ # Test with context, equal
+ a = cls(0, True, 1, 2, 3, True, "foo")
+ b = cls(0, True, 1, 2, 3, True, "foo")
+ self.assertEqual(a == b, True)
+ # Test with context, not equal
+ b = cls(0, True, 1, 2, 3, True, "bar")
+ self.assertEqual(a == b, False)
def test_OpenPDU(self):
dec = ntp.agentx.decode_OpenPDU
@@ -310,6 +344,13 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"rangeSubid": 0,
"upperBound": None,
"context": None})
+ # Test __eq__ gap, equal
+ a = cls(True, 1, 2, 3, 4, 5, ())
+ b = cls(True, 1, 2, 3, 4, 5, ())
+ self.assertEqual(a == b, True)
+ # Test __eq__ gap, unequal
+ b = cls(False, 1, 2, 3, 4, 5, ())
+ self.assertEqual(a == b, False)
def test_UnregisterPDU(self):
dec = ntp.agentx.decode_xRegisterPDU
@@ -1718,6 +1759,15 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test >=
self.assertEqual(target((1, 2, 3)) >= target((1, 2)), True)
self.assertEqual(target((1, 2, 3)) >= target((1, 2, 3)), True)
+ # Test insane subids type
+ try:
+ errored = target("foo")
+ except TypeError:
+ errored = True
+ self.assertEqual(errored, True)
+ # Test isNull
+ self.assertEqual(target((1, 2)).isNull(), False)
+ self.assertEqual(target(()).isNull(), True)
def test_searchrange(self):
target = ntp.agentx.SearchRange
@@ -1806,6 +1856,31 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
b"\x07\x00\x00\x00\x08\x00\x00\x00",
lilEndianFlags),
(target((1, 2, 3, 4), (5, 6, 7, 8), True), b""))
+ # Test __eq__
+ # Test equal
+ a = target((1, 2, 3), (1, 2, 3))
+ b = target((1, 2, 3), (1, 2, 3))
+ self.assertEqual(a == b, True)
+ # Test start unequal
+ b = target((1, 2, 3), (1, 2, 3), True)
+ self.assertEqual(a == b, False)
+ # Test end unequal
+ b = target((1, 2, 3), (1, 2, 3, 4))
+ self.assertEqual(a == b, False)
+ # Test __ne__
+ # Test equal
+ a = target((1, 2, 3), (1, 2, 3))
+ b = target((1, 2, 3), (1, 2, 3))
+ self.assertEqual(a != b, False)
+ # Test start unequal
+ b = target((1, 2, 3), (1, 2, 3), True)
+ self.assertEqual(a != b, True)
+ # Test end unequal
+ b = target((1, 2, 3), (1, 2, 3, 4))
+ self.assertEqual(a != b, True)
+ # Test __repr__
+ self.assertEqual(repr(target((1, 2), (1, 3))),
+ "SearchRange(OID((1, 2), False), OID((1, 3), False))")
def test_encode_searchrange_list(self):
enc = ntp.agentx.encode_searchrange_list
@@ -1855,9 +1930,10 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
(srch((1, 2), (1, 2), True),
srch((2, 3), (3, 4), False)))
- def test_encode_octetstr(self):
+ def test_xcode_octetstr(self):
enc = ntp.agentx.encode_octetstr
dec = ntp.agentx.decode_octetstr
+ san = ntp.agentx.sanity_octetstr
# Encode empty
self.assertEqual(enc(True, ()), b"\x00\x00\x00\x00")
@@ -1885,6 +1961,43 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
b"blarg\x00\x00\x00" + extraData,
standardFlags),
("blarg", extraData))
+ # Test sanity
+ # Test str
+ try:
+ errored = san("foo") # str is always sane
+ except Exception as e:
+ errored = e
+ self.assertEqual(errored, None)
+ # Test sane list
+ try:
+ errored = san([1, 2, 3])
+ except Exception as e:
+ errored = e
+ self.assertEqual(errored, None)
+ # Test sane tuple
+ try:
+ errored = san((1, 2, 3))
+ except Exception as e:
+ errored = e
+ self.assertEqual(errored, None)
+ # Test insane list
+ try:
+ errored = san([23, 300, 42])
+ except ValueError:
+ errored = True
+ self.assertEqual(errored, True)
+ # Test insane tuple
+ try:
+ errored = san((23, 300, 42))
+ except ValueError:
+ errored = True
+ self.assertEqual(errored, True)
+ # Test insane type
+ try:
+ errored = san(42.23)
+ except TypeError:
+ errored = True
+ self.assertEqual(errored, True)
def test_Varbind(self):
target = ntp.agentx.Varbind
=====================================
tests/pylib/test_packet.py
=====================================
--- a/tests/pylib/test_packet.py
+++ b/tests/pylib/test_packet.py
@@ -488,11 +488,19 @@ class TestSyncPacket(unittest.TestCase):
cls.origin_timestamp = self.target.posix_to_ntp(1)
cls.receive_timestamp = self.target.posix_to_ntp(2)
cls.transmit_timestamp = self.target.posix_to_ntp(3)
+ # Test without extensions
self.assertEqual(cls.__repr__(),
"<NTP:unsync:4:3:0.000000:0.000000:0.0.0.0:"
"1970-01-01T00:00:00Z:1970-01-01T00:00:01Z:"
"1970-01-01T00:00:02Z:1970-01-01T00:00:03Z>")
-
+ # Test with extensions
+ cls.extfields = "foobar"
+ cls.mac = "machinations"
+ self.assertEqual(cls.__repr__(),
+ "<NTP:unsync:4:3:0.000000:0.000000:0.0.0.0:"
+ "1970-01-01T00:00:00Z:1970-01-01T00:00:01Z:"
+ "1970-01-01T00:00:02Z:1970-01-01T00:00:03Z:"
+ "'foobar':machinations>")
class TestMisc(unittest.TestCase):
def test_Peer(self):
@@ -770,7 +778,7 @@ class TestControlSession(unittest.TestCase):
cls.logfp = logjig
# Test first type
fakesockmod.gai_returns = [42]
- result = cls._ControlSession__lookuphost("blah.com", "family")
+ result = cls._ControlSession__lookuphost("[blah.com]", "family")
self.assertEqual(result, 42)
self.assertEqual(fakesockmod.gai_calls,
[("blah.com", "ntp", cls.ai_family,
@@ -1071,6 +1079,20 @@ class TestControlSession(unittest.TestCase):
cls.getresponse(1, 3, True)
self.assertEqual(cls.response,
polybytes("foo=4223,blah=248,x=23,quux=1"))
+ # Test with data, duplicate packet
+ sockjig.return_data = [
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
+ "foo=4223,\x00\x00\x00",
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+ "blah=248,x=23,\x00\x00",
+ "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x09\x00\x0E"
+ "blah=248,x=23,\x00\x00",
+ "\x0E\x81\x00\x01\x00\x02\x00\x03\x00\x17\x00\x06"
+ "quux=1\x00\x00"]
+ cls.sequence = 1
+ cls.getresponse(1, 3, True)
+ self.assertEqual(cls.response,
+ polybytes("foo=4223,blah=248,x=23,quux=1"))
# Test MAXFRAGS bail
maxtemp = ntpp.MAXFRAGS
ntpp.MAXFRAGS = 1
@@ -1571,18 +1593,24 @@ class TestControlSession(unittest.TestCase):
nonce_fetch_count = [0]
query_results = qrm[:]
queries = []
- result = cls.mrulist(variables={"sort": "addr", "frags": 24})
+ result = cls.mrulist(variables={"sort": "addr",
+ "frags": 24,
+ "resall":5})
self.assertEqual(nonce_fetch_count, [4])
+ print(repr(queries))
self.assertEqual(queries,
- [(10, 0, "nonce=foo, frags=24", False),
+ [(10, 0, "nonce=foo, frags=24, resall=0x5", False),
(10, 0,
- "nonce=foo, frags=25, addr.0=1.2.3.4:23, last.0=40",
+ "nonce=foo, frags=25, resall=0x5, "
+ "addr.0=1.2.3.4:23, last.0=40",
False),
(10, 0,
- "nonce=foo, frags=26, addr.0=1.2.3.4:23, "
- "last.0=41, addr.1=1.2.3.4:23, last.1=40", False),
+ "nonce=foo, frags=26, resall=0x5, "
+ "addr.0=1.2.3.4:23, last.0=41, addr.1=1.2.3.4:23, "
+ "last.1=40", False),
(10, 0,
- "nonce=foo, frags=27, addr.0=10.20.30.40:23, "
+ "nonce=foo, frags=27, resall=0x5, "
+ "addr.0=10.20.30.40:23, "
"last.0=42, addr.1=1.2.3.4:23, last.1=41, "
"addr.2=1.2.3.4:23, last.2=40", False)])
self.assertEqual(isinstance(result, ntpp.MRUList), True)
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/d761d7748bec8bb2af14d7a48f790360fc30b64f...52c4a02c90bdf085f0c6f84f12060b46c58f9749
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/d761d7748bec8bb2af14d7a48f790360fc30b64f...52c4a02c90bdf085f0c6f84f12060b46c58f9749
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20180110/225647de/attachment.html>
More information about the vc
mailing list