[Git][NTPsec/ntpsec][master] Use ntp.poly module instead of a local copy in jigs.py and test_packet.py

Matt Selsky gitlab at mg.gitlab.com
Wed Dec 12 06:48:11 UTC 2018


Matt Selsky pushed to branch master at NTPsec / ntpsec


Commits:
1c645a3c by Matt Selsky at 2018-12-12T06:03:01Z
Use ntp.poly module instead of a local copy in jigs.py and test_packet.py

- - - - -


2 changed files:

- tests/pylib/jigs.py
- tests/pylib/test_packet.py


Changes:

=====================================
tests/pylib/jigs.py
=====================================
@@ -7,71 +7,7 @@ import socket
 import select
 import os.path
 import time
-
-
-master_encoding = 'latin-1'
-
-if str is bytes:  # Python 2
-    polystr = str
-    polybytes = bytes
-    polyord = ord
-    polychr = str
-    input = raw_input
-
-    def string_escape(s):
-        return s.decode('string_escape')
-
-else:  # Python 3
-    import io
-
-    def polystr(o):
-        "Polymorphic string factory function"
-        if isinstance(o, str):
-            return o
-        if not isinstance(o, bytes):
-            return str(o)
-        return str(o, encoding=master_encoding)
-
-    def polybytes(s):
-        "Polymorphic string encoding function"
-        if isinstance(s, bytes):
-            return s
-        if not isinstance(s, str):
-            return bytes(s)
-        return bytes(s, encoding=master_encoding)
-
-    def polyord(c):
-        "Polymorphic ord() function"
-        if isinstance(c, str):
-            return ord(c)
-        else:
-            return c
-
-    def polychr(c):
-        "Polymorphic chr() function"
-        if isinstance(c, int):
-            return chr(c)
-        else:
-            return c
-
-    def string_escape(s):
-        "Polymorphic string_escape/unicode_escape"
-        # This hack is necessary because Unicode strings in Python 3 don't
-        # have a decode method, so there's no simple way to ask it for the
-        # equivalent of decode('string_escape') in Python 2. This function
-        # assumes that it will be called with a Python 3 'str' instance
-        return s.encode(master_encoding).decode('unicode_escape')
-
-    def make_std_wrapper(stream):
-        "Standard input/output wrapper factory function"
-        # This ensures that the encoding of standard output and standard
-        # error on Python 3 matches the master encoding we use to turn
-        # bytes to Unicode in polystr above
-        # line_buffering=True ensures that interactive
-        # command sessions work as expected
-        return io.TextIOWrapper(stream.buffer,
-                                encoding=master_encoding, newline="\n",
-                                line_buffering=True)
+import ntp.poly
 
 
 class FileJig:
@@ -157,7 +93,7 @@ class HasherJig:
             self.digest_size += 1
 
     def digest(self):
-        return polybytes("blah" * 4)  # 16 byte hash
+        return ntp.poly.polybytes("blah" * 4)  # 16 byte hash
 
 
 class SocketModuleJig:


=====================================
tests/pylib/test_packet.py
=====================================
@@ -13,76 +13,13 @@ import select
 import sys
 import getpass
 import jigs
+import ntp.poly
 
 odict = ntp.util.OrderedDict
 
 ntpp = ntp.packet
 ctlerr = ntp.packet.ControlException
 
-master_encoding = 'latin-1'
-
-if str is bytes:  # Python 2
-    polystr = str
-    polybytes = bytes
-    polyord = ord
-    polychr = str
-    input = raw_input
-
-    def string_escape(s):
-        return s.decode('string_escape')
-
-else:  # Python 3
-    import io
-
-    def polystr(o):
-        "Polymorphic string factory function"
-        if isinstance(o, str):
-            return o
-        if not isinstance(o, bytes):
-            return str(o)
-        return str(o, encoding=master_encoding)
-
-    def polybytes(s):
-        "Polymorphic string encoding function"
-        if isinstance(s, bytes):
-            return s
-        if not isinstance(s, str):
-            return bytes(s)
-        return bytes(s, encoding=master_encoding)
-
-    def polyord(c):
-        "Polymorphic ord() function"
-        if isinstance(c, str):
-            return ord(c)
-        else:
-            return c
-
-    def polychr(c):
-        "Polymorphic chr() function"
-        if isinstance(c, int):
-            return chr(c)
-        else:
-            return c
-
-    def string_escape(s):
-        "Polymorphic string_escape/unicode_escape"
-        # This hack is necessary because Unicode strings in Python 3 don't
-        # have a decode method, so there's no simple way to ask it for the
-        # equivalent of decode('string_escape') in Python 2. This function
-        # assumes that it will be called with a Python 3 'str' instance
-        return s.encode(master_encoding).decode('unicode_escape')
-
-    def make_std_wrapper(stream):
-        "Standard input/output wrapper factory function"
-        # This ensures that the encoding of standard output and standard
-        # error on Python 3 matches the master encoding we use to turn
-        # bytes to Unicode in polystr above
-        # line_buffering=True ensures that interactive
-        # command sessions work as expected
-        return io.TextIOWrapper(stream.buffer,
-                                encoding=master_encoding, newline="\n",
-                                line_buffering=True)
-
 
 class SessionJig:
     def __init__(self):
@@ -205,7 +142,7 @@ class TestSyncPacket(unittest.TestCase):
         self.assertEqual(cls.origin_timestamp, 0)
         self.assertEqual(cls.receive_timestamp, 0)
         self.assertEqual(cls.transmit_timestamp, 0)
-        self.assertEqual(cls.extension, polybytes(''))
+        self.assertEqual(cls.extension, ntp.poly.polybytes(''))
         self.assertEqual(cls.extfields, [])
         self.assertEqual(cls.mac, '')
         self.assertEqual(cls.hostname, None)
@@ -276,11 +213,11 @@ class TestSyncPacket(unittest.TestCase):
         self.assertEqual(cls.receive_timestamp, 0x0201020304050607)
         self.assertEqual(cls.transmit_timestamp, 0x0301020304050607)
         self.assertEqual(cls.extfields,
-                         [(1, polybytes("blah")),
-                          (2, polybytes("jabberjabber")),
-                          (3, polybytes("In the end, our choices make us."))])
-        self.assertEqual(cls.extension, polybytes(ext + mac))
-        self.assertEqual(cls.mac, polybytes(mac))
+                         [(1, ntp.poly.polybytes("blah")),
+                          (2, ntp.poly.polybytes("jabberjabber")),
+                          (3, ntp.poly.polybytes("In the end, our choices make us."))])
+        self.assertEqual(cls.extension, ntp.poly.polybytes(ext + mac))
+        self.assertEqual(cls.mac, ntp.poly.polybytes(mac))
         # Test with extension, DES
         data2 = data + ext + "\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC"
         try:
@@ -311,14 +248,14 @@ class TestSyncPacket(unittest.TestCase):
               "\x0A\x0B\x0C\x0D\x0E\x0F\x10\x11\x12\x13"
         data2 = data + ext + mac
         cls = self.target(data2)
-        self.assertEqual(cls.mac, polybytes(mac))
-        self.assertEqual(cls.extension, polybytes(ext + mac))
+        self.assertEqual(cls.mac, ntp.poly.polybytes(mac))
+        self.assertEqual(cls.extension, ntp.poly.polybytes(ext + mac))
         # Test with extension, MD5 or SHA1, 24
         mac += "\x14\x15\x16\x17"
         data2 = data + ext + mac
-        cls = self.target(polybytes(data2))
-        self.assertEqual(cls.mac, polybytes(mac))
-        self.assertEqual(cls.extension, polybytes(ext + mac))
+        cls = self.target(ntp.poly.polybytes(data2))
+        self.assertEqual(cls.mac, ntp.poly.polybytes(mac))
+        self.assertEqual(cls.extension, ntp.poly.polybytes(ext + mac))
 
     # ====================================================================
     # The tests referring to the Timeless Void are testing outside of NTP's
@@ -427,7 +364,7 @@ class TestSyncPacket(unittest.TestCase):
               "\x00\x00\x00\x03\x00\x00\x00\x20" \
               "In the end, our choices make us." \
               "\x11\x22\x33\x44"
-        pkt = polybytes(data + ext)
+        pkt = ntp.poly.polybytes(data + ext)
         cls = self.target(pkt)
         self.assertEqual(cls.flatten(), pkt)
 
@@ -563,18 +500,18 @@ class TestMisc(unittest.TestCase):
             # Test sortaddr, ipv6
             cls.addr = "[11:22:33::44:55]:42"
             self.assertEqual(cls.sortaddr(),
-                             polybytes("\x00\x11\x00\x22\x00\x33\x00\x00"
-                                       "\x00\x00\x00\x00\x00\x44\x00\x55"))
+                             ntp.poly.polybytes("\x00\x11\x00\x22\x00\x33\x00\x00"
+                                                "\x00\x00\x00\x00\x00\x44\x00\x55"))
             # Test sortaddr, ipv6, local
             cls.addr = "[11:22:33::44:55%8]:42"
             self.assertEqual(cls.sortaddr(),
-                             polybytes("\x00\x11\x00\x22\x00\x33\x00\x00"
-                                       "\x00\x00\x00\x00\x00\x44\x00\x55"))
+                             ntp.poly.polybytes("\x00\x11\x00\x22\x00\x33\x00\x00"
+                                                "\x00\x00\x00\x00\x00\x44\x00\x55"))
 
         # Test sortaddr, ipv4
         cls.addr = "11.22.33.44:23"
         self.assertEqual(cls.sortaddr(),
-                         polybytes((("\0" * 16) + "\x0b\x16\x21\x2c")))
+                         ntp.poly.polybytes((("\0" * 16) + "\x0b\x16\x21\x2c")))
         # Test __repr__
         # Python dicts enumeration order changes with different versions
         if sys.version_info[0] < 3:
@@ -626,7 +563,7 @@ class TestControlPacket(unittest.TestCase):
         self.assertEqual(cls.status, 0)
         self.assertEqual(cls.associd, 0)
         self.assertEqual(cls.offset, 0)
-        self.assertEqual(cls.extension, polybytes(""))
+        self.assertEqual(cls.extension, ntp.poly.polybytes(""))
         self.assertEqual(cls.count, 0)
 
     def test_is_response(self):
@@ -712,7 +649,7 @@ class TestControlPacket(unittest.TestCase):
         self.assertEqual(cls.offset, 32)
         self.assertEqual(cls.count, 16)
         # Test flatten
-        self.assertEqual(cls.flatten(), polybytes(totaldata))
+        self.assertEqual(cls.flatten(), ntp.poly.polybytes(totaldata))
         # Test send
         send_data = []
 
@@ -720,7 +657,7 @@ class TestControlPacket(unittest.TestCase):
             send_data.append(pkt)
         cls.session.sendpkt = send_jig
         cls.send()
-        self.assertEqual(send_data, [polybytes(totaldata)])
+        self.assertEqual(send_data, [ntp.poly.polybytes(totaldata)])
 
 
 class TestControlSession(unittest.TestCase):
@@ -1028,12 +965,12 @@ class TestControlSession(unittest.TestCase):
             timetemp = ntp.util.time
             ntp.util.time = faketimemod
             # Test
-            res = cls.sendpkt(polybytes("blahfoo"))
+            res = cls.sendpkt(ntp.poly.polybytes("blahfoo"))
             self.assertEqual(res, 0)
             self.assertEqual(logjig.data,
                              ["1970-01-01T00:00:00Z Sending 8 octets.  "
                               "seq=0\n"])
-            self.assertEqual(sockjig.data, [polybytes("blahfoo\x00")])
+            self.assertEqual(sockjig.data, [ntp.poly.polybytes("blahfoo\x00")])
             # Test error
             logjig.__init__()
             sockjig.fail_send = 1
@@ -1067,15 +1004,15 @@ class TestControlSession(unittest.TestCase):
                               "***Internal error! Data too large "
                               "(" + str(len(data)) + ")\n"])
             # Test no auth
-            result = cls.sendrequest(1, 2, polybytes("foo"))
+            result = cls.sendrequest(1, 2, ntp.poly.polybytes("foo"))
             self.assertEqual(result.sequence, 1)
-            self.assertEqual(result.extension, polybytes("foo\x00"))
+            self.assertEqual(result.extension, ntp.poly.polybytes("foo\x00"))
             # Test with auth
             cls.keyid = 1
             cls.passwd = "qwerty"
-            result = cls.sendrequest(1, 2, polybytes("foo"), True)
+            result = cls.sendrequest(1, 2, ntp.poly.polybytes("foo"), True)
             self.assertEqual(result.sequence, 2)
-            self.assertEqual(result.extension, polybytes("foo\x00mac"))
+            self.assertEqual(result.extension, ntp.poly.polybytes("foo\x00mac"))
             # Test with auth keyid / password failure
             cls.keyid = None
             try:
@@ -1103,7 +1040,7 @@ class TestControlSession(unittest.TestCase):
             sockjig.return_data = [
                 "\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"]
             cls.getresponse(1, 2, True)
-            self.assertEqual(cls.response, polybytes(""))
+            self.assertEqual(cls.response, ntp.poly.polybytes(""))
             # Test with data
             sockjig.return_data = [
                 "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
@@ -1115,7 +1052,7 @@ class TestControlSession(unittest.TestCase):
             cls.sequence = 1
             cls.getresponse(1, 3, True)
             self.assertEqual(cls.response,
-                             polybytes("foo=4223,blah=248,x=23,quux=1"))
+                             ntp.poly.polybytes("foo=4223,blah=248,x=23,quux=1"))
             # Test with data, duplicate packet
             sockjig.return_data = [
                 "\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
@@ -1129,7 +1066,7 @@ class TestControlSession(unittest.TestCase):
             cls.sequence = 1
             cls.getresponse(1, 3, True)
             self.assertEqual(cls.response,
-                             polybytes("foo=4223,blah=248,x=23,quux=1"))
+                             ntp.poly.polybytes("foo=4223,blah=248,x=23,quux=1"))
             # Test MAXFRAGS bail
             maxtemp = ntpp.MAXFRAGS
             ntpp.MAXFRAGS = 1
@@ -1404,7 +1341,7 @@ class TestControlSession(unittest.TestCase):
                                     42, "", False)])
         # Test normal
         queries = []
-        cls.response = polybytes("\xDE\xAD\xF0\x0D")
+        cls.response = ntp.poly.polybytes("\xDE\xAD\xF0\x0D")
         idlist = cls.readstat()
         self.assertEqual(len(idlist), 1)
         self.assertEqual(isinstance(idlist[0], ntpp.Peer), True)
@@ -1529,26 +1466,26 @@ class TestControlSession(unittest.TestCase):
         # Init
         cls = self.target()
         cls.doquery = doquery_jig
-        cls.response = polybytes("Config Succeeded    \n \x00 blah blah")
+        cls.response = ntp.poly.polybytes("Config Succeeded    \n \x00 blah blah")
         # Test success
-        result = cls.config(polybytes("Boo!"))
+        result = cls.config(ntp.poly.polybytes("Boo!"))
         self.assertEqual(result, True)
         self.assertEqual(queries,
                          [(ntp.control.CTL_OP_CONFIGURE, 0,
-                           polybytes("Boo!"), True)])
+                           ntp.poly.polybytes("Boo!"), True)])
         # Test failure
         queries = []
-        cls.response = polybytes("whatever man...")
-        result = cls.config(polybytes("Boo!"))
+        cls.response = ntp.poly.polybytes("whatever man...")
+        result = cls.config(ntp.poly.polybytes("Boo!"))
         self.assertEqual(result, False)
         self.assertEqual(queries,
                          [(ntp.control.CTL_OP_CONFIGURE, 0,
-                           polybytes("Boo!"), True)])
+                           ntp.poly.polybytes("Boo!"), True)])
         # Test no response
         queries = []
         cls.response = ""
         try:
-            cls.config(polybytes("blah"))
+            cls.config(ntp.poly.polybytes("blah"))
             errored = False
         except ctlerr as e:
             errored = e.message
@@ -1564,7 +1501,7 @@ class TestControlSession(unittest.TestCase):
         cls = self.target()
         cls.doquery = doquery_jig
         # Test success
-        cls.response = polybytes("nonce=blah blah  ")
+        cls.response = ntp.poly.polybytes("nonce=blah blah  ")
         result = cls.fetch_nonce()
         self.assertEqual(result, "nonce=blah blah")
         self.assertEqual(queries,
@@ -1572,7 +1509,7 @@ class TestControlSession(unittest.TestCase):
         # Test failure
         queries = []
         cls.logfp = filefp
-        cls.response = polybytes("blah blah")
+        cls.response = ntp.poly.polybytes("blah blah")
         try:
             result = cls.fetch_nonce()
             errored = False
@@ -1992,10 +1929,10 @@ class TestAuthenticator(unittest.TestCase):
             fakehashlibmod = jigs.HashlibModuleJig()
             ntpp.hashlib = fakehashlibmod
             # Test no digest
-            self.assertEqual(f("", 0, None, polybytes("")), None)
+            self.assertEqual(f("", 0, None, ntp.poly.polybytes("")), None)
             # Test with digest
             self.assertEqual(f("foo", 0x42, "bar", "quux"),
-                             polybytes("\x00\x00\x00\x42blahblahblahblah"))
+                             ntp.poly.polybytes("\x00\x00\x00\x42blahblahblahblah"))
         finally:
             ntpp.hashlib = temphash
 
@@ -2018,9 +1955,9 @@ class TestAuthenticator(unittest.TestCase):
             fakehashlibmod = jigs.HashlibModuleJig()
             ntpp.hashlib = fakehashlibmod
             # Test good
-            self.assertEqual(cls.verify_mac(polybytes(good_pkt)), True)
+            self.assertEqual(cls.verify_mac(ntp.poly.polybytes(good_pkt)), True)
             # Test bad
-            self.assertEqual(cls.verify_mac(polybytes(bad_pkt)), False)
+            self.assertEqual(cls.verify_mac(ntp.poly.polybytes(bad_pkt)), False)
         finally:
             ntpp.hashlib = temphash
 



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/1c645a3c1094a9d28687b78bf83b0e100a5b231f

-- 
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/1c645a3c1094a9d28687b78bf83b0e100a5b231f
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20181212/5bccd1d7/attachment-0001.html>


More information about the vc mailing list