[Git][NTPsec/ntpsec][master] Added tests for SyncPacket, tweaked SyncPacket
Ian Bruene
gitlab at mg.gitlab.com
Sun Aug 20 22:47:14 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
42643ae6 by Ian Bruene at 2017-08-20T17:46:33-05:00
Added tests for SyncPacket, tweaked SyncPacket
- - - - -
2 changed files:
- pylib/packet.py
- tests/pylib/test_packet.py
Changes:
=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -391,8 +391,8 @@ class SyncPacket(Packet):
self.stratum = 0
self.poll = 0
self.precision = 0
- self.rootdelay = 0
- self.rootdispersion = 0
+ self.root_delay = 0
+ self.root_dispersion = 0
self.refid = 0
self.reference_timestamp = 0
self.origin_timestamp = 0
@@ -507,10 +507,6 @@ class SyncPacket(Packet):
"Adjustment implied by this packet - 'theta' in NTP-speak."
return ((self.t2()-self.t1())+(self.t3()-self.t4()))/2
- def leap(self):
- return ("no-leap", "add-leap", "del-leap",
- "unsync")[((self.li_vn_mode) >> 6) & 0x3]
-
def flatten(self):
"Flatten the packet into an octet sequence."
body = struct.pack(SyncPacket.format,
@@ -518,8 +514,8 @@ class SyncPacket(Packet):
self.stratum,
self.poll,
self.precision,
- self.rootdelay,
- self.rootdispersion,
+ self.root_delay,
+ self.root_dispersion,
self.refid,
self.reference_timestamp,
self.origin_timestamp,
=====================================
tests/pylib/test_packet.py
=====================================
--- a/tests/pylib/test_packet.py
+++ b/tests/pylib/test_packet.py
@@ -272,6 +272,252 @@ class TestPacket(unittest.TestCase):
self.assertEqual(cls.mode(), 4)
+class TestSyncPacket(unittest.TestCase):
+ target = ntp.packet.SyncPacket
+
+ def test___init__(self):
+ # Test without data (that will be tested via analyze())
+ cls = self.target()
+ self.assertEqual(cls.status, 0)
+ self.assertEqual(cls.stratum, 0)
+ self.assertEqual(cls.poll, 0)
+ self.assertEqual(cls.precision, 0)
+ self.assertEqual(cls.root_delay, 0)
+ self.assertEqual(cls.root_dispersion, 0)
+ self.assertEqual(cls.refid, 0)
+ self.assertEqual(cls.reference_timestamp, 0)
+ self.assertEqual(cls.origin_timestamp, 0)
+ self.assertEqual(cls.receive_timestamp, 0)
+ self.assertEqual(cls.transmit_timestamp, 0)
+ self.assertEqual(cls.data, ntp.packet.polybytes(""))
+ self.assertEqual(cls.extension, '')
+ self.assertEqual(cls.extfields, [])
+ self.assertEqual(cls.mac, '')
+ self.assertEqual(cls.hostname, None)
+ self.assertEqual(cls.resolved, None)
+ # In theory should test cls.recieved here, but it is assigned to time()
+ self.assertEqual(cls.trusted, True)
+ self.assertEqual(cls.rescaled, False)
+
+ def test_analyze(self):
+ # Test without extension
+ data = "\x5C\x10\x01\xFF" \
+ "\x00\x00\x01\x01\x00\x00\x01\x02\x00\x00\x01\x03" \
+ "\x00\x01\x02\x03\x04\x05\x06\x07" \
+ "\x01\x01\x02\x03\x04\x05\x06\x07" \
+ "\x02\x01\x02\x03\x04\x05\x06\x07" \
+ "\x03\x01\x02\x03\x04\x05\x06\x07"
+ cls = self.target(data) # This calls analyze
+ self.assertEqual(cls.li_vn_mode, 0x5C)
+ self.assertEqual(cls.stratum, 16)
+ self.assertEqual(cls.poll, 1)
+ self.assertEqual(cls.precision, -1)
+ self.assertEqual(cls.root_delay, 257)
+ self.assertEqual(cls.root_dispersion, 258)
+ self.assertEqual(cls.refid, 259)
+ self.assertEqual(cls.reference_timestamp, 0x0001020304050607)
+ self.assertEqual(cls.origin_timestamp, 0x0101020304050607)
+ self.assertEqual(cls.receive_timestamp, 0x0201020304050607)
+ self.assertEqual(cls.transmit_timestamp, 0x0301020304050607)
+ self.assertEqual(cls.extfields, [])
+ # Test with extension, Crypto-NAK
+ data += "\x00\x00\x00\x01\x00\x00\x00\x04blah" \
+ "\x00\x00\x00\x02\x00\x00\x00\x0Cjabberjabber" \
+ "\x00\x00\x00\x03\x00\x00\x00\x20" \
+ "In the end, our choices make us."
+ data2 = data + "\x11\x22\x33\x44"
+ cls = self.target(data2)
+ self.assertEqual(cls.li_vn_mode, 0x5C)
+ self.assertEqual(cls.stratum, 16)
+ self.assertEqual(cls.poll, 1)
+ self.assertEqual(cls.precision, -1)
+ self.assertEqual(cls.root_delay, 257)
+ self.assertEqual(cls.root_dispersion, 258)
+ self.assertEqual(cls.refid, 259)
+ self.assertEqual(cls.reference_timestamp, 0x0001020304050607)
+ self.assertEqual(cls.origin_timestamp, 0x0101020304050607)
+ self.assertEqual(cls.receive_timestamp, 0x0201020304050607)
+ self.assertEqual(cls.transmit_timestamp, 0x0301020304050607)
+ self.assertEqual(cls.extfields,
+ [(1, "blah"), (2, "jabberjabber"),
+ (3, "In the end, our choices make us.")])
+ self.assertEqual(cls.mac, "\x11\x22\x33\x44")
+ # Test with extension, DES
+ data2 = data + "\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC"
+ try:
+ cls = self.target(data2)
+ errored = False
+ except ntp.packet.SyncException as e:
+ errored = e.message
+ self.assertEqual(errored, "Unsupported DES authentication")
+ # Test with extension, runt 8
+ data2 = data + "\x11\x22\x33\x44\x55\x66\x77\x88"
+ try:
+ cls = self.target(data2)
+ errored = False
+ except ntp.packet.SyncException as e:
+ errored = e.message
+ self.assertEqual(errored, "Packet is a runt")
+ # Test with extension, runt 16
+ data2 = data + "\x00\x11\x22\x33\x44\x55\x66\x77" \
+ "\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF"
+ try:
+ cls = self.target(data2)
+ errored = False
+ except ntp.packet.SyncException as e:
+ errored = e.message
+ self.assertEqual(errored, "Packet is a runt")
+ # Test with extension, MD5 or SHA1, 20
+ ext = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" \
+ "\x0A\x0B\x0C\x0D\x0E\x0F\x10\x11\x12\x13"
+ data2 = data + ext
+ cls = self.target(data2)
+ self.assertEqual(cls.mac, ext)
+ # Test with extension, MD5 or SHA1, 24
+ ext += "\x14\x15\x16\x17"
+ data2 = data + ext
+ cls = self.target(data2)
+ self.assertEqual(cls.mac, ext)
+
+ def test_ntp_to_posix(self):
+ f = self.target.ntp_to_posix
+ # Test the Timeless Void Before Existence
+ self.assertEqual(f(-1), -2208988800) # NTP can't see the Timeless Void
+ # Test beginning of NTP epoch
+ self.assertEqual(f(0), -2208988800)
+ # Test just before the Age of Unix
+ self.assertEqual(f(2208988799 * 2**32), -1)
+ # Test the beginning of the Age of Unix
+ self.assertEqual(f(2208988800 * 2**32), 0)
+ # Test the End of Time
+ self.assertEqual(f(0xFFFFFFFF00000000), 2085978495)
+ # Test the Timeless Void Beyond Existence
+ self.assertEqual(f(0x10000000000000000), 2085978496) # It doesn't clip
+
+ def test_posix_to_ntp(self):
+ f = self.target.posix_to_ntp
+ # Test the Timeless Void Before Existence
+ self.assertEqual(f(-2208988801), -0x100000000) # It doesn't clip
+ # Test beginning of NTP epoch
+ self.assertEqual(f(-2208988800), 0)
+ # Test just before the Age of Unix
+ self.assertEqual(f(-1), 2208988799 * 2**32)
+ # Test the beginning of the Age of Unix
+ self.assertEqual(f(0), 2208988800 * 2**32)
+ # Test the End of Time
+ self.assertEqual(f(2085978495), 0xFFFFFFFF00000000)
+ # Test the Timeless Void Beyond Existence
+ self.assertEqual(f(2085978496), 0x10000000000000000) # It doesn't clip
+
+ def test_posixize(self):
+ cls = self.target()
+ # Test already scaled
+ cls.rescaled = True
+ cls.received = 0 # __init__ sets to current time
+ cls.posixize()
+ self.assertEqual(cls.rescaled, True)
+ self.assertEqual(cls.root_delay, 0)
+ self.assertEqual(cls.root_dispersion, 0)
+ self.assertEqual(cls.reference_timestamp, 0)
+ self.assertEqual(cls.origin_timestamp, 0)
+ self.assertEqual(cls.receive_timestamp, 0)
+ self.assertEqual(cls.transmit_timestamp, 0)
+ self.assertEqual(cls.received, 0)
+ # Test scaling
+ cls.rescaled = False
+ cls.root_delay = 131072
+ cls.root_dispersion = 131072
+ cls.posixize()
+ self.assertEqual(cls.rescaled, True)
+ self.assertEqual(cls.root_delay, 2)
+ self.assertEqual(cls.root_dispersion, 2)
+ self.assertEqual(cls.reference_timestamp, -2208988800 )
+ self.assertEqual(cls.origin_timestamp, -2208988800)
+ self.assertEqual(cls.receive_timestamp, -2208988800)
+ self.assertEqual(cls.transmit_timestamp, -2208988800)
+ self.assertEqual(cls.received, -2208988800)
+
+ def test_t1_4(self):
+ cls = self.target()
+ cls.origin_timestamp = 1
+ cls.receive_timestamp = 2
+ cls.transmit_timestamp = 3
+ cls.received = 4
+ self.assertEqual(cls.t1(), 1)
+ self.assertEqual(cls.t2(), 2)
+ self.assertEqual(cls.t3(), 3)
+ self.assertEqual(cls.t4(), 4)
+
+ def test_delta_epsilon_synchd_adjust(self):
+ cls = self.target()
+ cls.origin_timestamp = 1
+ cls.receive_timestamp = 2
+ cls.transmit_timestamp = 3
+ cls.received = 4
+ cls.precision = 4
+ # Test delta
+ self.assertEqual(cls.delta(), 2)
+ # Test epsilon
+ self.assertEqual(cls.epsilon(), 16.000045)
+ # Test synchd
+ self.assertEqual(cls.synchd(), 17.000045)
+ # Test adjust
+ self.assertEqual(cls.adjust(), 0)
+
+ def test_flatten(self):
+ data = "\x5C\x10\x01\xFF" \
+ "\x00\x00\x01\x01\x00\x00\x01\x02\x00\x00\x01\x03" \
+ "\x00\x01\x02\x03\x04\x05\x06\x07" \
+ "\x01\x01\x02\x03\x04\x05\x06\x07" \
+ "\x02\x01\x02\x03\x04\x05\x06\x07" \
+ "\x03\x01\x02\x03\x04\x05\x06\x07"
+ ext = "\x00\x00\x00\x01\x00\x00\x00\x04blah" \
+ "\x00\x00\x00\x02\x00\x00\x00\x0Cjabberjabber" \
+ "\x00\x00\x00\x03\x00\x00\x00\x20" \
+ "In the end, our choices make us." \
+ "\x11\x22\x33\x44"
+ cls = self.target(data)
+ cls.extension = ext
+ self.assertEqual(cls.flatten(), data + ext)
+
+ def test_refid_octets(self):
+ cls = self.target()
+ cls.refid = 0x12345678
+ self.assertEqual(cls.refid_octets(), (0x12, 0x34, 0x56, 0x78))
+
+ def test_refid_as_string(self):
+ cls = self.target()
+ cls.refid = 0x12345678
+ self.assertEqual(cls.refid_as_string(), "\x12\x34\x56\x78")
+
+ def test_refid_as_address(self):
+ cls = self.target()
+ cls.refid = 0x01020304
+ self.assertEqual(cls.refid_as_address(), "1.2.3.4")
+
+ def test_is_crypto_nak(self):
+ cls = self.target()
+ cls.mac = "blah"
+ self.assertEqual(cls.is_crypto_nak(), True)
+
+ def test_MD5(self):
+ cls = self.target()
+ cls.mac = "blah" * 5 # 20 bytes
+ self.assertEqual(cls.has_MD5(), True)
+
+ def test_SHA1(self):
+ cls = self.target()
+ cls.mac = "blah" * 6 # 24 bytes
+ self.assertEqual(cls.has_SHA1(), True)
+
+ def test___repr__(self):
+ cls = self.target()
+ self.assertEqual(cls.__repr__(),
+ "<NTP:unsync:4:30.000000:0.000000:0.0.0.0:"
+ "1900-01-01T00:00:00Z:1900-01-01T00:00:00Z:"
+ "1900-01-01T00:00:00Z:1900-01-01T00:00:00Z>")
+
+
class TestMisc(unittest.TestCase):
def test_Peer(self):
session = SessionJig()
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/42643ae6ad0f1eef2f2d370f818df5b7ec12f4a7
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/42643ae6ad0f1eef2f2d370f818df5b7ec12f4a7
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170820/0571dc3a/attachment.html>
More information about the vc
mailing list