[Git][NTPsec/ntpsec][master] Added tests for SyncPacket, tweaked SyncPacket

Ian Bruene gitlab at mg.gitlab.com
Sun Aug 20 22:47:14 UTC 2017


Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
42643ae6 by Ian Bruene at 2017-08-20T17:46:33-05:00
Added tests for SyncPacket, tweaked SyncPacket

- - - - -


2 changed files:

- pylib/packet.py
- tests/pylib/test_packet.py


Changes:

=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -391,8 +391,8 @@ class SyncPacket(Packet):
         self.stratum = 0
         self.poll = 0
         self.precision = 0
-        self.rootdelay = 0
-        self.rootdispersion = 0
+        self.root_delay = 0
+        self.root_dispersion = 0
         self.refid = 0
         self.reference_timestamp = 0
         self.origin_timestamp = 0
@@ -507,10 +507,6 @@ class SyncPacket(Packet):
         "Adjustment implied by this packet - 'theta' in NTP-speak."
         return ((self.t2()-self.t1())+(self.t3()-self.t4()))/2
 
-    def leap(self):
-        return ("no-leap", "add-leap", "del-leap",
-                "unsync")[((self.li_vn_mode) >> 6) & 0x3]
-
     def flatten(self):
         "Flatten the packet into an octet sequence."
         body = struct.pack(SyncPacket.format,
@@ -518,8 +514,8 @@ class SyncPacket(Packet):
                            self.stratum,
                            self.poll,
                            self.precision,
-                           self.rootdelay,
-                           self.rootdispersion,
+                           self.root_delay,
+                           self.root_dispersion,
                            self.refid,
                            self.reference_timestamp,
                            self.origin_timestamp,


=====================================
tests/pylib/test_packet.py
=====================================
--- a/tests/pylib/test_packet.py
+++ b/tests/pylib/test_packet.py
@@ -272,6 +272,252 @@ class TestPacket(unittest.TestCase):
         self.assertEqual(cls.mode(), 4)
 
 
+class TestSyncPacket(unittest.TestCase):
+    target = ntp.packet.SyncPacket
+
+    def test___init__(self):
+        # Test without data (that will be tested via analyze())
+        cls = self.target()
+        self.assertEqual(cls.status, 0)
+        self.assertEqual(cls.stratum, 0)
+        self.assertEqual(cls.poll, 0)
+        self.assertEqual(cls.precision, 0)
+        self.assertEqual(cls.root_delay, 0)
+        self.assertEqual(cls.root_dispersion, 0)
+        self.assertEqual(cls.refid, 0)
+        self.assertEqual(cls.reference_timestamp, 0)
+        self.assertEqual(cls.origin_timestamp, 0)
+        self.assertEqual(cls.receive_timestamp, 0)
+        self.assertEqual(cls.transmit_timestamp, 0)
+        self.assertEqual(cls.data, ntp.packet.polybytes(""))
+        self.assertEqual(cls.extension, '')
+        self.assertEqual(cls.extfields, [])
+        self.assertEqual(cls.mac, '')
+        self.assertEqual(cls.hostname, None)
+        self.assertEqual(cls.resolved, None)
+        # In theory should test cls.recieved here, but it is assigned to time()
+        self.assertEqual(cls.trusted, True)
+        self.assertEqual(cls.rescaled, False)
+
+    def test_analyze(self):
+        # Test without extension
+        data = "\x5C\x10\x01\xFF" \
+               "\x00\x00\x01\x01\x00\x00\x01\x02\x00\x00\x01\x03" \
+               "\x00\x01\x02\x03\x04\x05\x06\x07" \
+               "\x01\x01\x02\x03\x04\x05\x06\x07" \
+               "\x02\x01\x02\x03\x04\x05\x06\x07" \
+               "\x03\x01\x02\x03\x04\x05\x06\x07"
+        cls = self.target(data)  # This calls analyze
+        self.assertEqual(cls.li_vn_mode, 0x5C)
+        self.assertEqual(cls.stratum, 16)
+        self.assertEqual(cls.poll, 1)
+        self.assertEqual(cls.precision, -1)
+        self.assertEqual(cls.root_delay, 257)
+        self.assertEqual(cls.root_dispersion, 258)
+        self.assertEqual(cls.refid, 259)
+        self.assertEqual(cls.reference_timestamp, 0x0001020304050607)
+        self.assertEqual(cls.origin_timestamp, 0x0101020304050607)
+        self.assertEqual(cls.receive_timestamp, 0x0201020304050607)
+        self.assertEqual(cls.transmit_timestamp, 0x0301020304050607)
+        self.assertEqual(cls.extfields, [])
+        # Test with extension, Crypto-NAK
+        data += "\x00\x00\x00\x01\x00\x00\x00\x04blah" \
+                "\x00\x00\x00\x02\x00\x00\x00\x0Cjabberjabber" \
+                "\x00\x00\x00\x03\x00\x00\x00\x20" \
+                "In the end, our choices make us."
+        data2 = data + "\x11\x22\x33\x44"
+        cls = self.target(data2)
+        self.assertEqual(cls.li_vn_mode, 0x5C)
+        self.assertEqual(cls.stratum, 16)
+        self.assertEqual(cls.poll, 1)
+        self.assertEqual(cls.precision, -1)
+        self.assertEqual(cls.root_delay, 257)
+        self.assertEqual(cls.root_dispersion, 258)
+        self.assertEqual(cls.refid, 259)
+        self.assertEqual(cls.reference_timestamp, 0x0001020304050607)
+        self.assertEqual(cls.origin_timestamp, 0x0101020304050607)
+        self.assertEqual(cls.receive_timestamp, 0x0201020304050607)
+        self.assertEqual(cls.transmit_timestamp, 0x0301020304050607)
+        self.assertEqual(cls.extfields,
+                         [(1, "blah"), (2, "jabberjabber"),
+                          (3, "In the end, our choices make us.")])
+        self.assertEqual(cls.mac, "\x11\x22\x33\x44")
+        # Test with extension, DES
+        data2 = data + "\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC"
+        try:
+            cls = self.target(data2)
+            errored = False
+        except ntp.packet.SyncException as e:
+            errored = e.message
+        self.assertEqual(errored, "Unsupported DES authentication")
+        # Test with extension, runt 8
+        data2 = data + "\x11\x22\x33\x44\x55\x66\x77\x88"
+        try:
+            cls = self.target(data2)
+            errored = False
+        except ntp.packet.SyncException as e:
+            errored = e.message
+        self.assertEqual(errored, "Packet is a runt")
+        # Test with extension, runt 16
+        data2 = data + "\x00\x11\x22\x33\x44\x55\x66\x77" \
+                "\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF"
+        try:
+            cls = self.target(data2)
+            errored = False
+        except ntp.packet.SyncException as e:
+            errored = e.message
+        self.assertEqual(errored, "Packet is a runt")
+        # Test with extension, MD5 or SHA1, 20
+        ext = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" \
+              "\x0A\x0B\x0C\x0D\x0E\x0F\x10\x11\x12\x13"
+        data2 = data + ext
+        cls = self.target(data2)
+        self.assertEqual(cls.mac, ext)
+        # Test with extension, MD5 or SHA1, 24
+        ext += "\x14\x15\x16\x17"
+        data2 = data + ext
+        cls = self.target(data2)
+        self.assertEqual(cls.mac, ext)
+
+    def test_ntp_to_posix(self):
+        f = self.target.ntp_to_posix
+        # Test the Timeless Void Before Existence
+        self.assertEqual(f(-1), -2208988800)  # NTP can't see the Timeless Void
+        # Test beginning of NTP epoch
+        self.assertEqual(f(0), -2208988800)
+        # Test just before the Age of Unix
+        self.assertEqual(f(2208988799 * 2**32), -1)
+        # Test the beginning of the Age of Unix
+        self.assertEqual(f(2208988800 * 2**32), 0)
+        # Test the End of Time
+        self.assertEqual(f(0xFFFFFFFF00000000), 2085978495)
+        # Test the Timeless Void Beyond Existence
+        self.assertEqual(f(0x10000000000000000), 2085978496)  # It doesn't clip
+
+    def test_posix_to_ntp(self):
+        f = self.target.posix_to_ntp
+        # Test the Timeless Void Before Existence
+        self.assertEqual(f(-2208988801), -0x100000000)  # It doesn't clip
+        # Test beginning of NTP epoch
+        self.assertEqual(f(-2208988800), 0)
+        # Test just before the Age of Unix
+        self.assertEqual(f(-1), 2208988799 * 2**32)
+        # Test the beginning of the Age of Unix
+        self.assertEqual(f(0), 2208988800 * 2**32)
+        # Test the End of Time
+        self.assertEqual(f(2085978495), 0xFFFFFFFF00000000)
+        # Test the Timeless Void Beyond Existence
+        self.assertEqual(f(2085978496), 0x10000000000000000)  # It doesn't clip
+
+    def test_posixize(self):
+        cls = self.target()
+        # Test already scaled
+        cls.rescaled = True
+        cls.received = 0  # __init__ sets to current time
+        cls.posixize()
+        self.assertEqual(cls.rescaled, True)
+        self.assertEqual(cls.root_delay, 0)
+        self.assertEqual(cls.root_dispersion, 0)
+        self.assertEqual(cls.reference_timestamp, 0)
+        self.assertEqual(cls.origin_timestamp, 0)
+        self.assertEqual(cls.receive_timestamp, 0)
+        self.assertEqual(cls.transmit_timestamp, 0)
+        self.assertEqual(cls.received, 0)
+        # Test scaling
+        cls.rescaled = False
+        cls.root_delay = 131072
+        cls.root_dispersion = 131072
+        cls.posixize()
+        self.assertEqual(cls.rescaled, True)
+        self.assertEqual(cls.root_delay, 2)
+        self.assertEqual(cls.root_dispersion, 2)
+        self.assertEqual(cls.reference_timestamp, -2208988800 )
+        self.assertEqual(cls.origin_timestamp, -2208988800)
+        self.assertEqual(cls.receive_timestamp, -2208988800)
+        self.assertEqual(cls.transmit_timestamp, -2208988800)
+        self.assertEqual(cls.received, -2208988800)
+
+    def test_t1_4(self):
+        cls = self.target()
+        cls.origin_timestamp = 1
+        cls.receive_timestamp = 2
+        cls.transmit_timestamp = 3
+        cls.received = 4
+        self.assertEqual(cls.t1(), 1)
+        self.assertEqual(cls.t2(), 2)
+        self.assertEqual(cls.t3(), 3)
+        self.assertEqual(cls.t4(), 4)
+
+    def test_delta_epsilon_synchd_adjust(self):
+        cls = self.target()
+        cls.origin_timestamp = 1
+        cls.receive_timestamp = 2
+        cls.transmit_timestamp = 3
+        cls.received = 4
+        cls.precision = 4
+        # Test delta
+        self.assertEqual(cls.delta(), 2)
+        # Test epsilon
+        self.assertEqual(cls.epsilon(), 16.000045)
+        # Test synchd
+        self.assertEqual(cls.synchd(), 17.000045)
+        # Test adjust
+        self.assertEqual(cls.adjust(), 0)
+
+    def test_flatten(self):
+        data = "\x5C\x10\x01\xFF" \
+               "\x00\x00\x01\x01\x00\x00\x01\x02\x00\x00\x01\x03" \
+               "\x00\x01\x02\x03\x04\x05\x06\x07" \
+               "\x01\x01\x02\x03\x04\x05\x06\x07" \
+               "\x02\x01\x02\x03\x04\x05\x06\x07" \
+               "\x03\x01\x02\x03\x04\x05\x06\x07"
+        ext = "\x00\x00\x00\x01\x00\x00\x00\x04blah" \
+              "\x00\x00\x00\x02\x00\x00\x00\x0Cjabberjabber" \
+              "\x00\x00\x00\x03\x00\x00\x00\x20" \
+              "In the end, our choices make us." \
+              "\x11\x22\x33\x44"
+        cls = self.target(data)
+        cls.extension = ext
+        self.assertEqual(cls.flatten(), data + ext)
+
+    def test_refid_octets(self):
+        cls = self.target()
+        cls.refid = 0x12345678
+        self.assertEqual(cls.refid_octets(), (0x12, 0x34, 0x56, 0x78))
+
+    def test_refid_as_string(self):
+        cls = self.target()
+        cls.refid = 0x12345678
+        self.assertEqual(cls.refid_as_string(), "\x12\x34\x56\x78")
+
+    def test_refid_as_address(self):
+        cls = self.target()
+        cls.refid = 0x01020304
+        self.assertEqual(cls.refid_as_address(), "1.2.3.4")
+
+    def test_is_crypto_nak(self):
+        cls = self.target()
+        cls.mac = "blah"
+        self.assertEqual(cls.is_crypto_nak(), True)
+
+    def test_MD5(self):
+        cls = self.target()
+        cls.mac = "blah" * 5  # 20 bytes
+        self.assertEqual(cls.has_MD5(), True)
+
+    def test_SHA1(self):
+        cls = self.target()
+        cls.mac = "blah" * 6  # 24 bytes
+        self.assertEqual(cls.has_SHA1(), True)
+
+    def test___repr__(self):
+        cls = self.target()
+        self.assertEqual(cls.__repr__(),
+                         "<NTP:unsync:4:30.000000:0.000000:0.0.0.0:"
+                         "1900-01-01T00:00:00Z:1900-01-01T00:00:00Z:"
+                         "1900-01-01T00:00:00Z:1900-01-01T00:00:00Z>")
+
+
 class TestMisc(unittest.TestCase):
     def test_Peer(self):
         session = SessionJig()



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/42643ae6ad0f1eef2f2d370f818df5b7ec12f4a7

---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/42643ae6ad0f1eef2f2d370f818df5b7ec12f4a7
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170820/0571dc3a/attachment.html>


More information about the vc mailing list