[Git][NTPsec/ntpsec][master] 4 commits: Added test for parse_mru_variables.

Ian Bruene gitlab at mg.gitlab.com
Tue Dec 31 21:29:26 UTC 2019



Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
ee3d077d by Ian Bruene at 2019-12-31T12:35:29-06:00
Added test for parse_mru_variables.

- - - - -
ec836ade by Ian Bruene at 2019-12-31T12:35:29-06:00
Added test for stitch_mru.

- - - - -
5a69bd61 by Ian Bruene at 2019-12-31T12:35:29-06:00
Added test for generate_mru_parms.

- - - - -
20d398b7 by Ian Bruene at 2019-12-31T14:03:08-06:00
Added test for generate_mru_lastseen.

- - - - -


2 changed files:

- pylib/packet.py
- tests/pylib/test_packet.py


Changes:

=====================================
pylib/packet.py
=====================================
@@ -1304,6 +1304,7 @@ This combats source address spoofing
         raise ControlException(SERR_BADNONCE)
 
     def __mru_analyze(self, variables, span, direct):
+        """Extracts data from the key/value list into a more useful form"""
         curidx = -1
         mru = None
         nonce = None
@@ -1556,8 +1557,7 @@ def parse_mru_variables(variables):
                 raise ControlException(SERR_BADSORT % sortkey)
     for k in list(variables.keys()):
         if k in ("mincount", "resall", "resany", "kod", "limited",
-                 "maxlstint", "laddr", "recent", "sort",
-                 "frags", "limit"):
+                 "maxlstint", "laddr", "recent", "sort", "frags", "limit"):
             continue
         else:
             raise ControlException(SERR_BADPARAM % k)
@@ -1591,7 +1591,7 @@ def stitch_mru(span, sorter, sortkey):
     for addr in addrdict:
         deletia += sorted(addrdict[addr], key=lambda x: x[1])[:-1]
     deletia = [x[0] for x in deletia]
-    deletia.sort(reverse=True)
+    deletia.sort(reverse=True)  # Delete from top down so indices don't change
     for i in deletia:
         span.entries.pop(i)
 


=====================================
tests/pylib/test_packet.py
=====================================
@@ -124,6 +124,110 @@ class TestPacket(unittest.TestCase):
         # Test mode
         self.assertEqual(cls.mode(), 4)
 
+    def test_parse_mru_variables(self):
+        f = ntpp.parse_mru_variables
+        # Test working sort
+        data = {"sort": "count", "mincount": 50, "resall": 1, "resany": 5,
+                "kod": True, "limited": True, "maxlstint": 100,
+                "laddr": "foo.test", "recent": "foo", "frags": 20, "limit": 80}
+        sorter, sortkey, frags = f(data)
+        self.assertEqual(sorter != None, True)  # can't directly test lambda
+        self.assertEqual(sortkey, "count")
+        self.assertEqual(frags, 20)
+        self.assertEqual(data,
+                         {"mincount": 50, "resall": 1, "resany": 1061,
+                          "maxlstint": 100, "laddr": "foo.test",
+                          "recent": "foo", "limit": 80})
+        # Test no sort
+        data = {"mincount": 50, "resall": 1, "resany": 5, "kod": True,
+                "limited": True, "maxlstint": 100, "laddr": "foo.test",
+                "recent": "foo", "frags": 20, "limit": 80}
+        sorter, sortkey, frags = f(data)
+        self.assertEqual(sorter, None)  # can't directly test lambda
+        self.assertEqual(sortkey, None)
+        self.assertEqual(frags, 20)
+        self.assertEqual(data,
+                         {"mincount": 50, "resall": 1, "resany": 1061,
+                          "maxlstint": 100, "laddr": "foo.test",
+                          "recent": "foo", "limit": 80})
+        # Test bad sort
+        data = {"sort": "FAIL", "mincount": 50, "resall": 1, "resany": 5,
+                "kod": True, "limited": True, "maxlstint": 100,
+                "laddr": "foo.test", "recent": "foo", "frags": 20, "limit": 80}
+        try:
+            sorter, sortkey, frags = f(data)
+            errored = False
+        except ntpp.ControlException as e:
+            errored = e.message
+        self.assertEqual(errored, "***Sort order FAIL is not implemented")
+        # Test bad variables
+        data = {"FAIL": "FAIL", "mincount": 50, "resall": 1, "resany": 5,
+                "kod": True, "limited": True, "maxlstint": 100,
+                "laddr": "foo.test", "recent": "foo", "frags": 20, "limit": 80}
+        try:
+            sorter, sortkey, frags = f(data)
+            errored = False
+        except ntpp.ControlException as e:
+            errored = e.message
+        self.assertEqual(errored, "***Unknown parameter 'FAIL'")
+
+    def test_stitch_mru(self):
+        f = ntpp.stitch_mru
+        # Prepare data
+        span = ntpp.MRUList()
+        entry1 = ntpp.MRUEntry()
+        entry1.addr = "1.2.3.4:23"
+        entry1.last = "2019-12-31T00:15:00Z"
+        entry2 = ntpp.MRUEntry()
+        entry2.addr = "10.20.30.40:42"
+        entry2.last = "2019-12-31T03:00:00Z"
+        entry3 = ntpp.MRUEntry()
+        entry3.addr = "1.2.3.4:23"
+        entry3.last = "2019-12-31T03:30:00Z"
+        entry4 = ntpp.MRUEntry()
+        entry4.addr = "1.2.3.4:23"
+        entry4.last = "2019-12-31T04:45:00Z"
+        span.entries = [entry1, entry2, entry3, entry4]
+        sorter = (lambda e: e.sortaddr())  # Extracted from parse_mru_variables
+        sortkey = "addr"
+        f(span, sorter, sortkey)
+        self.assertEqual(span.entries, [entry2, entry4])
+
+    def test_generate_mru_parms(self):
+        f = ntpp.generate_mru_parms
+        # Data
+        vars = {"foo": 1, "bar": 2, "recent": 4}
+        # Run test
+        parms, firstParms = f(vars)
+        correct = parms == ", foo=1, bar=2" or parms == ", bar=2, foo=1"
+        self.assertEqual(correct, True)
+        one = firstParms == ", recent=4, foo=1, bar=2"
+        two = firstParms == ", recent=4, bar=2, foo=1"
+        self.assertEqual(one or two, True)
+
+    def test_generate_mru_lastseen(self):
+        f = ntpp.generate_mru_lastseen
+        # Data
+        span = ntpp.MRUList()
+        entry1 = ntpp.MRUEntry()
+        entry1.addr = "1.2.3.4:23"
+        entry1.last = "2019-12-31T00:15:00Z"
+        entry2 = ntpp.MRUEntry()
+        entry2.addr = "10.20.30.40:42"
+        entry2.last = "2019-12-31T03:00:00Z"
+        entry3 = ntpp.MRUEntry()
+        entry3.addr = "2.0.0.1:23"
+        entry3.last = "2019-12-31T03:30:00Z"
+        entry4 = ntpp.MRUEntry()
+        entry4.addr = "3.4.5.6:23"
+        entry4.last = "2019-12-31T04:45:00Z"
+        span.entries = [entry1, entry2, entry3, entry4]
+        # Run test
+        buf = f(span, 370)
+        expected = ", addr.0=3.4.5.6:23, last.0=2019-12-31T04:45:00Z, " \
+                              "addr.1=2.0.0.1:23, last.1=2019-12-31T03:30:00Z"
+        self.assertEqual(buf, expected)
+
 
 class TestSyncPacket(unittest.TestCase):
     target = ntpp.SyncPacket



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/b02e1e55399827558ad30193fd6acc239f648d86...20d398b7dc91e943568a976ab39a66b3bbbf4f94

-- 
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/b02e1e55399827558ad30193fd6acc239f648d86...20d398b7dc91e943568a976ab39a66b3bbbf4f94
You're receiving this email because of your account on gitlab.com.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20191231/bc2e02fb/attachment-0001.htm>


More information about the vc mailing list