[Git][NTPsec/ntpsec][master] Use ntp.poly module instead of a local copy in jigs.py and test_packet.py
Matt Selsky
gitlab at mg.gitlab.com
Wed Dec 12 06:48:11 UTC 2018
Matt Selsky pushed to branch master at NTPsec / ntpsec
Commits:
1c645a3c by Matt Selsky at 2018-12-12T06:03:01Z
Use ntp.poly module instead of a local copy in jigs.py and test_packet.py
- - - - -
2 changed files:
- tests/pylib/jigs.py
- tests/pylib/test_packet.py
Changes:
=====================================
tests/pylib/jigs.py
=====================================
@@ -7,71 +7,7 @@ import socket
import select
import os.path
import time
-
-
-master_encoding = 'latin-1'
-
-if str is bytes: # Python 2
- polystr = str
- polybytes = bytes
- polyord = ord
- polychr = str
- input = raw_input
-
- def string_escape(s):
- return s.decode('string_escape')
-
-else: # Python 3
- import io
-
- def polystr(o):
- "Polymorphic string factory function"
- if isinstance(o, str):
- return o
- if not isinstance(o, bytes):
- return str(o)
- return str(o, encoding=master_encoding)
-
- def polybytes(s):
- "Polymorphic string encoding function"
- if isinstance(s, bytes):
- return s
- if not isinstance(s, str):
- return bytes(s)
- return bytes(s, encoding=master_encoding)
-
- def polyord(c):
- "Polymorphic ord() function"
- if isinstance(c, str):
- return ord(c)
- else:
- return c
-
- def polychr(c):
- "Polymorphic chr() function"
- if isinstance(c, int):
- return chr(c)
- else:
- return c
-
- def string_escape(s):
- "Polymorphic string_escape/unicode_escape"
- # This hack is necessary because Unicode strings in Python 3 don't
- # have a decode method, so there's no simple way to ask it for the
- # equivalent of decode('string_escape') in Python 2. This function
- # assumes that it will be called with a Python 3 'str' instance
- return s.encode(master_encoding).decode('unicode_escape')
-
- def make_std_wrapper(stream):
- "Standard input/output wrapper factory function"
- # This ensures that the encoding of standard output and standard
- # error on Python 3 matches the master encoding we use to turn
- # bytes to Unicode in polystr above
- # line_buffering=True ensures that interactive
- # command sessions work as expected
- return io.TextIOWrapper(stream.buffer,
- encoding=master_encoding, newline="\n",
- line_buffering=True)
+import ntp.poly
class FileJig:
@@ -157,7 +93,7 @@ class HasherJig:
self.digest_size += 1
def digest(self):
- return polybytes("blah" * 4) # 16 byte hash
+ return ntp.poly.polybytes("blah" * 4) # 16 byte hash
class SocketModuleJig:
=====================================
tests/pylib/test_packet.py
=====================================
@@ -13,76 +13,13 @@ import select
import sys
import getpass
import jigs
+import ntp.poly
odict = ntp.util.OrderedDict
ntpp = ntp.packet
ctlerr = ntp.packet.ControlException
-master_encoding = 'latin-1'
-
-if str is bytes: # Python 2
- polystr = str
- polybytes = bytes
- polyord = ord
- polychr = str
- input = raw_input
-
- def string_escape(s):
- return s.decode('string_escape')
-
-else: # Python 3
- import io
-
- def polystr(o):
- "Polymorphic string factory function"
- if isinstance(o, str):
- return o
- if not isinstance(o, bytes):
- return str(o)
- return str(o, encoding=master_encoding)
-
- def polybytes(s):
- "Polymorphic string encoding function"
- if isinstance(s, bytes):
- return s
- if not isinstance(s, str):
- return bytes(s)
- return bytes(s, encoding=master_encoding)
-
- def polyord(c):
- "Polymorphic ord() function"
- if isinstance(c, str):
- return ord(c)
- else:
- return c
-
- def polychr(c):
- "Polymorphic chr() function"
- if isinstance(c, int):
- return chr(c)
- else:
- return c
-
- def string_escape(s):
- "Polymorphic string_escape/unicode_escape"
- # This hack is necessary because Unicode strings in Python 3 don't
- # have a decode method, so there's no simple way to ask it for the
- # equivalent of decode('string_escape') in Python 2. This function
- # assumes that it will be called with a Python 3 'str' instance
- return s.encode(master_encoding).decode('unicode_escape')
-
- def make_std_wrapper(stream):
- "Standard input/output wrapper factory function"
- # This ensures that the encoding of standard output and standard
- # error on Python 3 matches the master encoding we use to turn
- # bytes to Unicode in polystr above
- # line_buffering=True ensures that interactive
- # command sessions work as expected
- return io.TextIOWrapper(stream.buffer,
- encoding=master_encoding, newline="\n",
- line_buffering=True)
-
class SessionJig:
def __init__(self):
@@ -205,7 +142,7 @@ class TestSyncPacket(unittest.TestCase):
self.assertEqual(cls.origin_timestamp, 0)
self.assertEqual(cls.receive_timestamp, 0)
self.assertEqual(cls.transmit_timestamp, 0)
- self.assertEqual(cls.extension, polybytes(''))
+ self.assertEqual(cls.extension, ntp.poly.polybytes(''))
self.assertEqual(cls.extfields, [])
self.assertEqual(cls.mac, '')
self.assertEqual(cls.hostname, None)
@@ -276,11 +213,11 @@ class TestSyncPacket(unittest.TestCase):
self.assertEqual(cls.receive_timestamp, 0x0201020304050607)
self.assertEqual(cls.transmit_timestamp, 0x0301020304050607)
self.assertEqual(cls.extfields,
- [(1, polybytes("blah")),
- (2, polybytes("jabberjabber")),
- (3, polybytes("In the end, our choices make us."))])
- self.assertEqual(cls.extension, polybytes(ext + mac))
- self.assertEqual(cls.mac, polybytes(mac))
+ [(1, ntp.poly.polybytes("blah")),
+ (2, ntp.poly.polybytes("jabberjabber")),
+ (3, ntp.poly.polybytes("In the end, our choices make us."))])
+ self.assertEqual(cls.extension, ntp.poly.polybytes(ext + mac))
+ self.assertEqual(cls.mac, ntp.poly.polybytes(mac))
# Test with extension, DES
data2 = data + ext + "\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC"
try:
@@ -311,14 +248,14 @@ class TestSyncPacket(unittest.TestCase):
"\x0A\x0B\x0C\x0D\x0E\x0F\x10\x11\x12\x13"
data2 = data + ext + mac
cls = self.target(data2)
- self.assertEqual(cls.mac, polybytes(mac))
- self.assertEqual(cls.extension, polybytes(ext + mac))
+ self.assertEqual(cls.mac, ntp.poly.polybytes(mac))
+ self.assertEqual(cls.extension, ntp.poly.polybytes(ext + mac))
# Test with extension, MD5 or SHA1, 24
mac += "\x14\x15\x16\x17"
data2 = data + ext + mac
- cls = self.target(polybytes(data2))
- self.assertEqual(cls.mac, polybytes(mac))
- self.assertEqual(cls.extension, polybytes(ext + mac))
+ cls = self.target(ntp.poly.polybytes(data2))
+ self.assertEqual(cls.mac, ntp.poly.polybytes(mac))
+ self.assertEqual(cls.extension, ntp.poly.polybytes(ext + mac))
# ====================================================================
# The tests referring to the Timeless Void are testing outside of NTP's
@@ -427,7 +364,7 @@ class TestSyncPacket(unittest.TestCase):
"\x00\x00\x00\x03\x00\x00\x00\x20" \
"In the end, our choices make us." \
"\x11\x22\x33\x44"
- pkt = polybytes(data + ext)
+ pkt = ntp.poly.polybytes(data + ext)
cls = self.target(pkt)
self.assertEqual(cls.flatten(), pkt)
@@ -563,18 +500,18 @@ class TestMisc(unittest.TestCase):
# Test sortaddr, ipv6
cls.addr = "[11:22:33::44:55]:42"
self.assertEqual(cls.sortaddr(),
- polybytes("\x00\x11\x00\x22\x00\x33\x00\x00"
- "\x00\x00\x00\x00\x00\x44\x00\x55"))
+ ntp.poly.polybytes("\x00\x11\x00\x22\x00\x33\x00\x00"
+ "\x00\x00\x00\x00\x00\x44\x00\x55"))
# Test sortaddr, ipv6, local
cls.addr = "[11:22:33::44:55%8]:42"
self.assertEqual(cls.sortaddr(),
- polybytes("\x00\x11\x00\x22\x00\x33\x00\x00"
- "\x00\x00\x00\x00\x00\x44\x00\x55"))
+ ntp.poly.polybytes("\x00\x11\x00\x22\x00\x33\x00\x00"
+ "\x00\x00\x00\x00\x00\x44\x00\x55"))
# Test sortaddr, ipv4
cls.addr = "11.22.33.44:23"
self.assertEqual(cls.sortaddr(),
- polybytes((("\0" * 16) + "\x0b\x16\x21\x2c")))
+ ntp.poly.polybytes((("\0" * 16) + "\x0b\x16\x21\x2c")))
# Test __repr__
# Python dicts enumeration order changes with different versions
if sys.version_info[0] < 3:
@@ -626,7 +563,7 @@ class TestControlPacket(unittest.TestCase):
self.assertEqual(cls.status, 0)
self.assertEqual(cls.associd, 0)
self.assertEqual(cls.offset, 0)
- self.assertEqual(cls.extension, polybytes(""))
+ self.assertEqual(cls.extension, ntp.poly.polybytes(""))
self.assertEqual(cls.count, 0)
def test_is_response(self):
@@ -712,7 +649,7 @@ class TestControlPacket(unittest.TestCase):
self.assertEqual(cls.offset, 32)
self.assertEqual(cls.count, 16)
# Test flatten
- self.assertEqual(cls.flatten(), polybytes(totaldata))
+ self.assertEqual(cls.flatten(), ntp.poly.polybytes(totaldata))
# Test send
send_data = []
@@ -720,7 +657,7 @@ class TestControlPacket(unittest.TestCase):
send_data.append(pkt)
cls.session.sendpkt = send_jig
cls.send()
- self.assertEqual(send_data, [polybytes(totaldata)])
+ self.assertEqual(send_data, [ntp.poly.polybytes(totaldata)])
class TestControlSession(unittest.TestCase):
@@ -1028,12 +965,12 @@ class TestControlSession(unittest.TestCase):
timetemp = ntp.util.time
ntp.util.time = faketimemod
# Test
- res = cls.sendpkt(polybytes("blahfoo"))
+ res = cls.sendpkt(ntp.poly.polybytes("blahfoo"))
self.assertEqual(res, 0)
self.assertEqual(logjig.data,
["1970-01-01T00:00:00Z Sending 8 octets. "
"seq=0\n"])
- self.assertEqual(sockjig.data, [polybytes("blahfoo\x00")])
+ self.assertEqual(sockjig.data, [ntp.poly.polybytes("blahfoo\x00")])
# Test error
logjig.__init__()
sockjig.fail_send = 1
@@ -1067,15 +1004,15 @@ class TestControlSession(unittest.TestCase):
"***Internal error! Data too large "
"(" + str(len(data)) + ")\n"])
# Test no auth
- result = cls.sendrequest(1, 2, polybytes("foo"))
+ result = cls.sendrequest(1, 2, ntp.poly.polybytes("foo"))
self.assertEqual(result.sequence, 1)
- self.assertEqual(result.extension, polybytes("foo\x00"))
+ self.assertEqual(result.extension, ntp.poly.polybytes("foo\x00"))
# Test with auth
cls.keyid = 1
cls.passwd = "qwerty"
- result = cls.sendrequest(1, 2, polybytes("foo"), True)
+ result = cls.sendrequest(1, 2, ntp.poly.polybytes("foo"), True)
self.assertEqual(result.sequence, 2)
- self.assertEqual(result.extension, polybytes("foo\x00mac"))
+ self.assertEqual(result.extension, ntp.poly.polybytes("foo\x00mac"))
# Test with auth keyid / password failure
cls.keyid = None
try:
@@ -1103,7 +1040,7 @@ class TestControlSession(unittest.TestCase):
sockjig.return_data = [
"\x0E\x81\x00\x00\x00\x03\x00\x02\x00\x00\x00\x00"]
cls.getresponse(1, 2, True)
- self.assertEqual(cls.response, polybytes(""))
+ self.assertEqual(cls.response, ntp.poly.polybytes(""))
# Test with data
sockjig.return_data = [
"\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
@@ -1115,7 +1052,7 @@ class TestControlSession(unittest.TestCase):
cls.sequence = 1
cls.getresponse(1, 3, True)
self.assertEqual(cls.response,
- polybytes("foo=4223,blah=248,x=23,quux=1"))
+ ntp.poly.polybytes("foo=4223,blah=248,x=23,quux=1"))
# Test with data, duplicate packet
sockjig.return_data = [
"\x0E\xA1\x00\x01\x00\x02\x00\x03\x00\x00\x00\x09"
@@ -1129,7 +1066,7 @@ class TestControlSession(unittest.TestCase):
cls.sequence = 1
cls.getresponse(1, 3, True)
self.assertEqual(cls.response,
- polybytes("foo=4223,blah=248,x=23,quux=1"))
+ ntp.poly.polybytes("foo=4223,blah=248,x=23,quux=1"))
# Test MAXFRAGS bail
maxtemp = ntpp.MAXFRAGS
ntpp.MAXFRAGS = 1
@@ -1404,7 +1341,7 @@ class TestControlSession(unittest.TestCase):
42, "", False)])
# Test normal
queries = []
- cls.response = polybytes("\xDE\xAD\xF0\x0D")
+ cls.response = ntp.poly.polybytes("\xDE\xAD\xF0\x0D")
idlist = cls.readstat()
self.assertEqual(len(idlist), 1)
self.assertEqual(isinstance(idlist[0], ntpp.Peer), True)
@@ -1529,26 +1466,26 @@ class TestControlSession(unittest.TestCase):
# Init
cls = self.target()
cls.doquery = doquery_jig
- cls.response = polybytes("Config Succeeded \n \x00 blah blah")
+ cls.response = ntp.poly.polybytes("Config Succeeded \n \x00 blah blah")
# Test success
- result = cls.config(polybytes("Boo!"))
+ result = cls.config(ntp.poly.polybytes("Boo!"))
self.assertEqual(result, True)
self.assertEqual(queries,
[(ntp.control.CTL_OP_CONFIGURE, 0,
- polybytes("Boo!"), True)])
+ ntp.poly.polybytes("Boo!"), True)])
# Test failure
queries = []
- cls.response = polybytes("whatever man...")
- result = cls.config(polybytes("Boo!"))
+ cls.response = ntp.poly.polybytes("whatever man...")
+ result = cls.config(ntp.poly.polybytes("Boo!"))
self.assertEqual(result, False)
self.assertEqual(queries,
[(ntp.control.CTL_OP_CONFIGURE, 0,
- polybytes("Boo!"), True)])
+ ntp.poly.polybytes("Boo!"), True)])
# Test no response
queries = []
cls.response = ""
try:
- cls.config(polybytes("blah"))
+ cls.config(ntp.poly.polybytes("blah"))
errored = False
except ctlerr as e:
errored = e.message
@@ -1564,7 +1501,7 @@ class TestControlSession(unittest.TestCase):
cls = self.target()
cls.doquery = doquery_jig
# Test success
- cls.response = polybytes("nonce=blah blah ")
+ cls.response = ntp.poly.polybytes("nonce=blah blah ")
result = cls.fetch_nonce()
self.assertEqual(result, "nonce=blah blah")
self.assertEqual(queries,
@@ -1572,7 +1509,7 @@ class TestControlSession(unittest.TestCase):
# Test failure
queries = []
cls.logfp = filefp
- cls.response = polybytes("blah blah")
+ cls.response = ntp.poly.polybytes("blah blah")
try:
result = cls.fetch_nonce()
errored = False
@@ -1992,10 +1929,10 @@ class TestAuthenticator(unittest.TestCase):
fakehashlibmod = jigs.HashlibModuleJig()
ntpp.hashlib = fakehashlibmod
# Test no digest
- self.assertEqual(f("", 0, None, polybytes("")), None)
+ self.assertEqual(f("", 0, None, ntp.poly.polybytes("")), None)
# Test with digest
self.assertEqual(f("foo", 0x42, "bar", "quux"),
- polybytes("\x00\x00\x00\x42blahblahblahblah"))
+ ntp.poly.polybytes("\x00\x00\x00\x42blahblahblahblah"))
finally:
ntpp.hashlib = temphash
@@ -2018,9 +1955,9 @@ class TestAuthenticator(unittest.TestCase):
fakehashlibmod = jigs.HashlibModuleJig()
ntpp.hashlib = fakehashlibmod
# Test good
- self.assertEqual(cls.verify_mac(polybytes(good_pkt)), True)
+ self.assertEqual(cls.verify_mac(ntp.poly.polybytes(good_pkt)), True)
# Test bad
- self.assertEqual(cls.verify_mac(polybytes(bad_pkt)), False)
+ self.assertEqual(cls.verify_mac(ntp.poly.polybytes(bad_pkt)), False)
finally:
ntpp.hashlib = temphash
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/1c645a3c1094a9d28687b78bf83b0e100a5b231f
--
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/1c645a3c1094a9d28687b78bf83b0e100a5b231f
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20181212/5bccd1d7/attachment-0001.html>
More information about the vc
mailing list