[Git][NTPsec/ntpsec][master] 11 commits: Changed python 2.x polystr to be str instead of unicode.

Ian Bruene gitlab at mg.gitlab.com
Tue Oct 30 21:48:28 UTC 2018


Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
2dea4490 by Ian Bruene at 2018-10-29T21:32:24Z
Changed python 2.x polystr to be str instead of unicode.

- - - - -
40a0a2b9 by Ian Bruene at 2018-10-29T21:38:18Z
Removed duplicate poly string shims from agentx_packet.py.

- - - - -
1e2307b1 by Ian Bruene at 2018-10-29T21:47:08Z
Changed python 2.x polystr to be str rather than unicode.

- - - - -
d7ecd22d by Ian Bruene at 2018-10-29T22:04:35Z
Added polyinput shim.

- - - - -
923fb98b by Ian Bruene at 2018-10-29T22:23:17Z
ntpq and ntpmon now call check_unicode seperately from any poly handling.

- - - - -
6ac641c5 by Ian Bruene at 2018-10-29T22:27:02Z
Removed check_unicode() call from poly switch.

Calling check_unicode here messed up numerous tests. It should be called
from the programs themselves after imports are complete.

- - - - -
9f7ab658 by Ian Bruene at 2018-10-29T22:29:31Z
Comverted packet.py to use poly module.

- - - - -
155b6d71 by Ian Bruene at 2018-10-30T17:32:35Z
Added polyunicode shim.

- - - - -
c4fb2a04 by Ian Bruene at 2018-10-30T17:35:16Z
Converted ntpq to use poly module.

- - - - -
205ecdb4 by Ian Bruene at 2018-10-30T19:15:05Z
Removed forgotten debugging print()s.

- - - - -
6bafe67a by Ian Bruene at 2018-10-30T19:23:01Z
Changed tests to work on gentoo.

- - - - -


7 changed files:

- ntpclients/ntpmon.py
- ntpclients/ntpq.py
- pylib/agentx_packet.py
- pylib/packet.py
- pylib/poly.py
- pylib/util.py
- tests/pylib/test_util.py


Changes:

=====================================
ntpclients/ntpmon.py
=====================================
@@ -44,7 +44,9 @@ except ImportError as e:
     sys.stderr.write("%s\n" % e)
     sys.exit(1)
 
-
+# This used to force UTF-8 encoding, but that breaks the readline system.
+# Unfortunately sometimes sys.stdout.encoding lies about the encoding,
+# so expect random false positives.
 # LANG=C or LANG=POSIX refuse unicode when combined with curses
 disableunicode = ntp.util.check_unicode()
 


=====================================
ntpclients/ntpq.py
=====================================
@@ -27,97 +27,19 @@ try:
     import ntp.ntpc
     import ntp.packet
     import ntp.util
+    import ntp.poly
 except ImportError as e:
     sys.stderr.write(
         "ntpq: can't find Python NTP library -- check PYTHONPATH.\n")
     sys.stderr.write("%s\n" % e)
     sys.exit(1)
 
-version = ntp.util.stdversion()
-master_encoding = 'latin-1'
+# This used to force UTF-8 encoding, but that breaks the readline system.
+# Unfortunately sometimes sys.stdout.encoding lies about the encoding,
+# so expect random false positives.
+ntp.util.check_unicode()
 
-# General notes on Python 2/3 compatibility:
-#
-# This code uses the following strategy to allow it to run on both Python 2
-# and Python 3:
-#
-# - Use latin-1 encoding to transform binary data to/from Unicode when
-#   necessary for operations where Python 3 expects Unicode; the
-#   polystr and polybytes functions are used to do this so that
-#   when running on Python 2, the byte string data is used unchanged.
-#
-# - Construct custom stdout and stderr streams when running
-#   on Python 3 that force UTF-8 encoding, and wrap them around the
-#   underlying binary buffers (in Python 2, the streams are binary
-#   and are used unchanged); this ensures that the same transformation
-#   is done on data from/to the standard streams, as is done on binary
-#   data from/to files and subprocesses; the make_std_wrapper function
-#   does this.
-#
-# anyone that changes this needs to test with all combinations of
-# python2, python3, LC_ALL=ascii, LC_ALL=latin-1, LC_ALL=en_US.utf8, and
-# piping output to a file.  While looking at the UTF-8 in the output.
-
-forced_utf8 = False
-
-if str is bytes:  # Python 2
-    polystr = unicode
-    polybytes = bytes
-
-    def string_escape(s):
-        return s.decode('string_escape')
-
-    # This used to force UTF-8 encoding, but that breaks the readline system.
-    # Unfortunately sometimes sys.stdout.encoding lies about the encoding,
-    # so expect random false positives.
-    ntp.util.check_unicode()
-
-else:  # Python 3
-    import io
-
-    def polystr(o):
-        "Polymorphic string factory function"
-        if isinstance(o, str):
-            return o
-        if not isinstance(o, bytes):
-            return str(o)
-        return str(o, encoding=master_encoding)
-
-    def polybytes(s):
-        "Polymorphic string encoding function"
-        if isinstance(s, bytes):
-            return s
-        if not isinstance(s, str):
-            return bytes(s)
-        return bytes(s, encoding=master_encoding)
-
-    def string_escape(s):
-        "Polymorphic string_escape/unicode_escape"
-        # This hack is necessary because Unicode strings in Python 3 don't
-        # have a decode method, so there's no simple way to ask it for the
-        # equivalent of decode('string_escape') in Python 2. This function
-        # assumes that it will be called with a Python 3 'str' instance
-        return s.encode(master_encoding).decode('unicode_escape')
-
-    def make_std_wrapper(stream):
-        "Standard input/output wrapper factory function"
-        # This ensures that the encoding of standard output and standard
-        # error on Python 3 matches the master encoding we use to turn
-        # bytes to Unicode in polystr above
-        # line_buffering=True ensures that interactive command sessions
-        # work as expected
-        return io.TextIOWrapper(stream.buffer, encoding="utf-8",
-                                newline="\n", line_buffering=True)
-
-    # This is the one situation where we *can* force unicode.
-    if "UTF-8" != sys.stdout.encoding:
-        forced_utf8 = True
-        sys.stdout = make_std_wrapper(sys.stdout)
-    if "UTF-8" != sys.stderr.encoding:
-        forced_utf8 = True
-        sys.stderr = make_std_wrapper(sys.stderr)
-
-# NTP-specific parts resume here
+version = ntp.util.stdversion()
 
 # Flags for forming descriptors.
 OPT = 0x80        # this argument is optional, or'd with type */
@@ -228,7 +150,7 @@ class Ntpq(cmd.Cmd):
 
     def say(self, msg):
         try:
-            sys.stdout.write(polystr(msg))
+            sys.stdout.write(ntp.poly.polyunicode(msg))
         except UnicodeEncodeError as e:
             print("Unicode failure:", e)
             print("msg:\n", repr(msg))
@@ -236,7 +158,7 @@ class Ntpq(cmd.Cmd):
         sys.stdout.flush()    # In case we're piping the output
 
     def warn(self, msg):
-        sys.stderr.write(polystr(msg))
+        sys.stderr.write(ntp.poly.polystr(msg))
 
     def help_help(self):
         self.say("""\
@@ -400,8 +322,9 @@ usage: help [ command ]
             # high-half characters.  We won't do that unless somebody
             # files a bug, Mode 6 never seems to generate those in
             # variable fetches.
-            text = polystr(session.response.replace(polybytes(",\r\n"),
-                           polybytes(",\n")))
+            text = ntp.poly.polystr(session.response.replace(
+                ntp.poly.polybytes(",\r\n"),
+                ntp.poly.polybytes(",\n")))
         else:
             if not quiet:
                 self.say("status=%04x %s,\n"
@@ -1697,7 +1620,7 @@ if __name__ == '__main__':
 
     session.logfp = interpreter.logfp = logfp
 
-    if forced_utf8 and interpreter.debug:
+    if ntp.poly.forced_utf8 and interpreter.debug:
         interpreter.warn("\nforced UTF-8 output\n")
 
     if keyfile is not None:  # Have a -k, setup the auth


=====================================
pylib/agentx_packet.py
=====================================
@@ -5,74 +5,9 @@
 from __future__ import print_function
 
 import struct
+import ntp.poly
 from ntp.util import slicedata
 
-master_encoding = 'latin-1'
-
-if str is bytes:  # pragma: no cover
-    # Python 2
-    polystr = str
-    polybytes = bytes
-    polyord = ord
-    polychr = str
-    input = raw_input
-
-    def string_escape(s):
-        return s.decode('string_escape')
-
-else:  # pragma: no cover
-    # Python 3
-    import io
-
-    def polystr(o):
-        "Polymorphic string factory function"
-        if isinstance(o, str):
-            return o
-        if not isinstance(o, bytes):
-            return str(o)
-        return str(o, encoding=master_encoding)
-
-    def polybytes(s):
-        "Polymorphic string encoding function"
-        if isinstance(s, bytes):
-            return s
-        if not isinstance(s, str):
-            return bytes(s)
-        return bytes(s, encoding=master_encoding)
-
-    def polyord(c):
-        "Polymorphic ord() function"
-        if isinstance(c, str):
-            return ord(c)
-        else:
-            return c
-
-    def polychr(c):
-        "Polymorphic chr() function"
-        if isinstance(c, int):
-            return chr(c)
-        else:
-            return c
-
-    def string_escape(s):
-        "Polymorphic string_escape/unicode_escape"
-        # This hack is necessary because Unicode strings in Python 3 don't
-        # have a decode method, so there's no simple way to ask it for the
-        # equivalent of decode('string_escape') in Python 2. This function
-        # assumes that it will be called with a Python 3 'str' instance
-        return s.encode(master_encoding).decode('unicode_escape')
-
-    def make_std_wrapper(stream):
-        "Standard input/output wrapper factory function"
-        # This ensures that the encoding of standard output and standard
-        # error on Python 3 matches the master encoding we use to turn
-        # bytes to Unicode in polystr above
-        # line_buffering=True ensures that interactive
-        # command sessions work as expected
-        return io.TextIOWrapper(stream.buffer,
-                                encoding=master_encoding, newline="\n",
-                                line_buffering=True)
-
 
 internetPrefix = (1, 3, 6, 1)  # Used by the prefix option of OID headers
 prefixCount = len(internetPrefix)
@@ -207,7 +142,7 @@ class AgentXPDU:
 def decode_OpenPDU(data, header):
     flags = header["flags"]
     temp, data = slicedata(data, 4)
-    timeout = struct.unpack("Bxxx", polybytes(temp))[0]
+    timeout = struct.unpack("Bxxx", ntp.poly.polybytes(temp))[0]
     oid, data = decode_OID(data, header)
     description = decode_octetstr(data, header)[0]
     result = OpenPDU(flags["bigEndian"], header["session_id"],
@@ -286,11 +221,12 @@ def decode_xRegisterPDU(data, header):
     context, data = decode_context(data, header)
     temp, data = slicedata(data, 4)
     timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx",
-                                                  polybytes(temp))
+                                                  ntp.poly.polybytes(temp))
     oid, data = decode_OID(data, header)
     if rangeSubid != 0:
         temp, data = slicedata(data, 4)
-        upperBound = struct.unpack(endianToken + "I", polybytes(temp))[0]
+        upperBound = struct.unpack(endianToken + "I",
+                                   ntp.poly.polybytes(temp))[0]
     else:
         upperBound = None
     if header["type"] == PDU_REGISTER:
@@ -417,7 +353,8 @@ def decode_GetBulkPDU(data, header):
     endianToken = getendian(flags["bigEndian"])
     context, data = decode_context(data, header)
     temp, data = slicedata(data, 4)
-    nonReps, maxReps = struct.unpack(endianToken + "HH", polybytes(temp))
+    nonReps, maxReps = struct.unpack(endianToken + "HH",
+                                     ntp.poly.polybytes(temp))
     oidranges = decode_searchrange_list(data, header)
     result = GetBulkPDU(flags["bigEndian"], header["session_id"],
                         header["transaction_id"], header["packet_id"],
@@ -735,7 +672,7 @@ def decode_ResponsePDU(data, header):
     endianToken = getendian(flags["bigEndian"])
     temp, data = slicedata(data, 8)
     sysUptime, resError, resIndex = struct.unpack(endianToken + "IHH",
-                                                  polybytes(temp))
+                                                  ntp.poly.polybytes(temp))
     if len(data) > 0:
         varbinds = decode_varbindlist(data, header)
     else:
@@ -803,7 +740,7 @@ def decode_OID(data, header):
     flags = header["flags"]
     # Need to split off the header to get the subid count
     header, data = slicedata(data, 4)
-    n_subid, prefix, include = struct.unpack("BBBx", polybytes(header))
+    n_subid, prefix, include = struct.unpack("BBBx", ntp.poly.polybytes(header))
     if prefix != 0:
         subids = internetPrefix + (prefix,)
     else:
@@ -817,7 +754,7 @@ def decode_OID(data, header):
     data, rest = slicedata(data, byteCount)
     endianToken = getendian(flags["bigEndian"])
     formatString = endianToken + ("I" * n_subid)
-    subids += struct.unpack(formatString, polybytes(data))
+    subids += struct.unpack(formatString, ntp.poly.polybytes(data))
     result = OID(subids, include)
     return (result, rest)
 
@@ -926,7 +863,7 @@ def encode_octetstr(bigEndian, octets):
         pad = 4 - pad
     pad = b"\x00" * pad
     if type(octets) is str:
-        octets = polybytes(octets)
+        octets = ntp.poly.polybytes(octets)
         data = header + octets + pad
     else:
         fmt = "B" * numoctets
@@ -939,13 +876,13 @@ def decode_octetstr(data, header):
     flags = header["flags"]
     header, data = slicedata(data, 4)
     endianToken = getendian(flags["bigEndian"])
-    numoctets = struct.unpack(endianToken + "I", polybytes(header))[0]
+    numoctets = struct.unpack(endianToken + "I", ntp.poly.polybytes(header))[0]
     if len(data) < numoctets:
         raise ValueError("Octet string shorter than length")
     pad = numoctets % 4
     if pad > 0:  # Pad out the data to word boundary
         pad = 4 - pad
-    return polystr(data[:numoctets]), data[numoctets + pad:]
+    return ntp.poly.polystr(data[:numoctets]), data[numoctets + pad:]
 
 
 def sanity_octetstr(data):
@@ -963,7 +900,8 @@ def decode_Varbind(data, header):
     flags = header["flags"]
     bindheader, data = slicedata(data, 4)
     endianToken = getendian(flags["bigEndian"])
-    valType = struct.unpack(endianToken + "Hxx", polybytes(bindheader))[0]
+    valType = struct.unpack(endianToken + "Hxx",
+                            ntp.poly.polybytes(bindheader))[0]
     name, data = decode_OID(data, header)
     if valType not in definedValueTypes.keys():
         raise ValueError("Value type %s not in defined types" % valType)
@@ -1078,7 +1016,7 @@ def decode_integer64(data, header):
     flags = header["flags"]
     endianToken = getendian(flags["bigEndian"])
     num, data = slicedata(data, 8)
-    num = struct.unpack(endianToken + "Q", polybytes(num))[0]
+    num = struct.unpack(endianToken + "Q", ntp.poly.polybytes(num))[0]
     return (num, data)
 
 
@@ -1094,7 +1032,7 @@ def encode_ipaddr(bigEndian, octets):
 
 def decode_ipaddr(data, header):
     addr, data = decode_octetstr(data, header)
-    addr = struct.unpack("BBBB", polybytes(addr))
+    addr = struct.unpack("BBBB", ntp.poly.polybytes(addr))
     return addr, data
 
 
@@ -1239,14 +1177,16 @@ def encode_pduheader(pduType, instanceRegistration, newIndex,
 
 def decode_pduheader(data):  # Endianness is controlled from the PDU header
     lineone, data = slicedata(data, 4)
-    version, pduType, flags = struct.unpack(">BBBx", polybytes(lineone))
+    version, pduType, flags = struct.unpack(">BBBx",
+                                            ntp.poly.polybytes(lineone))
     # Slice up the flags
     flagDict = decode_flagbyte(flags)
     # Chop the remaining parts of the header from the rest of the datastream
     # then parse them
     fmt = getendian(flagDict["bigEndian"]) + "IIII"
     linen, data = slicedata(data, 16)  # 4 x 4-byte variables
-    sID, tactionID, pktID, dataLen = struct.unpack(fmt, polybytes(linen))
+    sID, tactionID, pktID, dataLen = struct.unpack(fmt,
+                                                   ntp.poly.polybytes(linen))
     result = {"version": version, "type": pduType, "flags": flagDict,
               "session_id": sID, "transaction_id": tactionID,
               "packet_id": pktID, "length": dataLen}


=====================================
pylib/packet.py
=====================================
@@ -195,97 +195,8 @@ import ntp.control
 import ntp.magic
 import ntp.ntpc
 import ntp.util
+import ntp.poly
 
-# General notes on Python 2/3 compatibility:
-#
-# This code uses the following strategy to allow it to run on both Python 2
-# and Python 3:
-#
-# - Use binary I/O to read/write data from/to files and subprocesses;
-#   where the exact bytes are important (such as in checking for
-#   modified files), use the binary data directly
-#
-# - Use latin-1 encoding to transform binary data to/from Unicode when
-#   necessary for operations where Python 3 expects Unicode; the
-#   polystr and polybytes functions are used to do this so that
-#   when running on Python 2, the byte string data is used unchanged.
-#
-# - Construct custom stdin, stdout, and stderr streams when running
-#   on Python 3 that force latin-1 encoding, and wrap them around the
-#   underlying binary buffers (in Python 2, the streams are binary
-#   and are used unchanged); this ensures that the same transformation
-#   is done on data from/to the standard streams, as is done on binary
-#   data from/to files and subprocesses; the make_std_wrapper function
-#   does this
-
-master_encoding = 'latin-1'
-
-if str is bytes:  # pragma: no cover
-    # Python 2
-    polystr = str
-    polybytes = bytes
-    polyord = ord
-    polychr = str
-    input = raw_input
-
-    def string_escape(s):
-        return s.decode('string_escape')
-
-    def make_wrapper(fp):
-        return fp
-
-else:  # pragma: nocover
-    # Python 3
-    import io
-
-    def polystr(o):
-        "Polymorphic string factory function"
-        if isinstance(o, str):
-            return o
-        if not isinstance(o, bytes):
-            return str(o)
-        return str(o, encoding=master_encoding)
-
-    def polybytes(s):
-        "Polymorphic string encoding function"
-        if isinstance(s, bytes):
-            return s
-        if not isinstance(s, str):
-            return bytes(s)
-        return bytes(s, encoding=master_encoding)
-
-    def polyord(c):
-        "Polymorphic ord() function"
-        if isinstance(c, str):
-            return ord(c)
-        else:
-            return c
-
-    def polychr(c):
-        "Polymorphic chr() function"
-        if isinstance(c, int):
-            return chr(c)
-        else:
-            return c
-
-    def string_escape(s):
-        "Polymorphic string_escape/unicode_escape"
-        # This hack is necessary because Unicode strings in Python 3 don't
-        # have a decode method, so there's no simple way to ask it for the
-        # equivalent of decode('string_escape') in Python 2. This function
-        # assumes that it will be called with a Python 3 'str' instance
-        return s.encode(master_encoding).decode('unicode_escape')
-
-    def make_std_wrapper(stream):
-        "Standard input/output wrapper factory function"
-        # This ensures that the encoding of standard output and standard
-        # error on Python 3 matches the master encoding we use to turn
-        # bytes to Unicode in polystr above
-        # line_buffering=True ensures that interactive
-        # command sessions work as expected
-        return io.TextIOWrapper(stream.buffer,
-                                encoding=master_encoding, newline="\n",
-                                line_buffering=True)
 
 # Limit on packets in a single Mode 6 response.  Increasing this value to
 # 96 will marginally speed "mrulist" operation on lossless networks
@@ -348,7 +259,7 @@ class Packet:
 
     @extension.setter
     def extension(self, x):
-        self.__extension = polybytes(x)
+        self.__extension = ntp.poly.polybytes(x)
 
     def leap(self):
         return ("no-leap", "add-leap", "del-leap",
@@ -399,7 +310,7 @@ class SyncPacket(Packet):
         self.trusted = True
         self.rescaled = False
         if data:
-            self.analyze(polybytes(data))
+            self.analyze(ntp.poly.polybytes(data))
 
     def analyze(self, data):
         datalen = len(data)
@@ -526,11 +437,11 @@ class SyncPacket(Packet):
 
     def refid_as_string(self):
         "Sometimes it's a clock name or KOD type"
-        return polystr(struct.pack(*(("BBBB",) + self.refid_octets())))
+        return ntp.poly.polystr(struct.pack(*(("BBBB",) + self.refid_octets())))
 
     def refid_as_address(self):
         "Sometimes it's an IPV4 address."
-        return polystr("%d.%d.%d.%d" % self.refid_octets())
+        return ntp.poly.polystr("%d.%d.%d.%d" % self.refid_octets())
 
     def is_crypto_nak(self):
         return len(self.mac) == 4
@@ -605,7 +516,7 @@ class ControlPacket(Packet):
         return "%5d %5d\t%3d octets\n" % (self.offset, self.end(), self.count)
 
     def analyze(self, rawdata):
-        rawdata = polybytes(rawdata)
+        rawdata = ntp.poly.polybytes(rawdata)
         (self.li_vn_mode,
          self.r_e_m_op,
          self.sequence,
@@ -686,7 +597,8 @@ def dump_hex_printable(xdata, outfp=sys.stdout):
         # Output data in hex form
         linelen = len(linedata)
         line = "%02x " * linelen
-        linedata = [polyord(x) for x in linedata]  # Will need this later
+        # Will need linedata later
+        linedata = [ntp.poly.polyord(x) for x in linedata]
         line %= tuple(linedata)
         if linelen < rowsize:  # Pad out the line to keep columns neat
             line += "   " * (rowsize - linelen)
@@ -907,7 +819,7 @@ class ControlSession:
                     raise ControlException(SERR_NOTRUST)
             try:
                 if os.isatty(0):
-                    key_id = int(input("Keyid: "))
+                    key_id = int(ntp.poly.polyinput("Keyid: "))
                 else:
                     key_id = 0
                 if key_id == 0 or key_id > MAX_KEYID:
@@ -938,7 +850,7 @@ class ControlSession:
                        "Sending %d octets.  seq=%d"
                        % (len(xdata), self.sequence), self.debug, 3)
         try:
-            self.sock.sendall(polybytes(xdata))
+            self.sock.sendall(ntp.poly.polybytes(xdata))
         except socket.error:
             # On failure, we don't know how much data was actually received
             if self.logfp is not None:
@@ -976,7 +888,7 @@ class ControlSession:
         # If we have data, pad it out to a 32-bit boundary.
         # Do not include these in the payload count.
         if pkt.extension:
-            pkt.extension = polybytes(pkt.extension)
+            pkt.extension = ntp.poly.polybytes(pkt.extension)
             while ((ControlPacket.HEADER_LEN + len(pkt.extension)) & 3):
                 pkt.extension += b"\x00"
 
@@ -1000,7 +912,7 @@ class ControlSession:
         if mac is None:
             raise ControlException(SERR_NOKEY)
         else:
-            pkt.extension += polybytes(mac)
+            pkt.extension += ntp.poly.polybytes(mac)
         return pkt.send()
 
     def getresponse(self, opcode, associd, timeo):
@@ -1066,7 +978,7 @@ class ControlSession:
 
             warndbg("At %s, socket read begins" % time.asctime(), 4)
             try:
-                rawdata = polybytes(self.sock.recv(4096))
+                rawdata = ntp.poly.polybytes(self.sock.recv(4096))
             except socket.error:  # pragma: no cover
                 # usually, errno 111: connection refused
                 raise ControlException(SERR_SOCKET)
@@ -1148,8 +1060,9 @@ class ControlSession:
                                 % (f, len(fragments)), 1)
                         break
                 else:
-                    tempfraglist = [polystr(f.extension) for f in fragments]
-                    self.response = polybytes("".join(tempfraglist))
+                    tempfraglist = [ntp.poly.polystr(f.extension) \
+                                    for f in fragments]
+                    self.response = ntp.poly.polybytes("".join(tempfraglist))
                     warndbg("Fragment collection ends. %d bytes "
                             " in %d fragments"
                             % (len(self.response), len(fragments)), 1)
@@ -1275,9 +1188,9 @@ class ControlSession:
         kvpairs = []
         instring = False
         response = ""
-        self.response = polystr(self.response)
+        self.response = ntp.poly.polystr(self.response)
         for c in self.response:
-            cord = polyord(c)
+            cord = ntp.poly.polyord(c)
             if c == '"':
                 response += c
                 instring = not instring
@@ -1341,7 +1254,7 @@ class ControlSession:
         elif b"\x00" in self.response:
             self.response = self.response[:self.response.index(b"\x00")]
         self.response = self.response.rstrip()
-        return self.response == polybytes("Config Succeeded")
+        return self.response == ntp.poly.polybytes("Config Succeeded")
 
     def fetch_nonce(self):
         """
@@ -1352,8 +1265,8 @@ This combats source address spoofing
             # retry 4 times
             self.doquery(opcode=ntp.control.CTL_OP_REQ_NONCE)
             self.nonce_xmit = time.time()
-            if self.response.startswith(polybytes("nonce=")):
-                return polystr(self.response.strip())
+            if self.response.startswith(ntp.poly.polybytes("nonce=")):
+                return ntp.poly.polystr(self.response.strip())
             # maybe a delay between tries?
 
         # uh, oh, no nonce seen
@@ -1700,7 +1613,7 @@ class Authenticator:
     @staticmethod
     def compute_mac(payload, keyid, keytype, passwd):
         hasher = hashlib.new(keytype)
-        hasher.update(polybytes(passwd))
+        hasher.update(ntp.poly.polybytes(passwd))
         hasher.update(payload)
         if hasher.digest_size == 0:
             return None
@@ -1729,6 +1642,6 @@ class Authenticator:
         hasher = hashlib.new(keytype)
         hasher.update(passwd)
         hasher.update(payload)
-        return polybytes(hasher.digest()) == mac
+        return ntp.poly.polybytes(hasher.digest()) == mac
 
 # end


=====================================
pylib/poly.py
=====================================
@@ -37,24 +37,22 @@ master_encoding = 'latin-1'
 forced_utf8 = False
 
 if str is bytes:  # Python 2
-    polystr = unicode
+    polystr = str
+    polyunicode = unicode
     polybytes = bytes
     polyord = ord
     polychr = str
-    input = raw_input
+    polyinput = raw_input
 
     def string_escape(s):
         """String_escape/unicode_escape."""
         return s.decode('string_escape')
 
-    # This used to force UTF-8 encoding, but that breaks the readline system.
-    # Unfortunately sometimes sys.stdout.encoding lies about the encoding,
-    # so expect random false positives.
-    ntp.util.check_unicode()
-
 else:  # Python 3
     import io
 
+    polyinput = input
+
     def polystr(o):
         """Polymorphic string factory function."""
         if isinstance(o, str):
@@ -63,6 +61,8 @@ else:  # Python 3
             return str(o)
         return str(o, encoding=master_encoding)
 
+    polyunicode = polystr
+
     def polybytes(s):
         """Polymorphic string encoding function."""
         if isinstance(s, bytes):


=====================================
pylib/util.py
=====================================
@@ -199,7 +199,6 @@ def parseConf(text):
             elif text[i] == "\\":  # Starting an escape sequence
                 i += 1
                 if text[i] in "'\"n\\":
-                    print(repr(text[i]))
                     current.append(eval("\'\\" + text[i] + "\'"))
             else:
                 current.append(text[i])


=====================================
tests/pylib/test_util.py
=====================================
@@ -914,7 +914,7 @@ class TestPylibUtilMethods(unittest.TestCase):
             return cdns_jig_returns.pop(0)
 
         # Test init
-        cls = c(False, debug=3)
+        cls = c(False, debug=3, logfp=sys.stderr)
         self.assertEqual(cls.debug, 3)
         self.assertEqual(cls.logfp, sys.stderr)
         self.assertEqual(cls.now, None)
@@ -1027,7 +1027,7 @@ class TestPeerSummary(unittest.TestCase):
     target = ntp.util.PeerSummary
 
     def test___init__(self):
-        cls = self.target("peers", 4, True, False)
+        cls = self.target("peers", 4, True, False, logfp=sys.stderr)
         self.assertEqual(cls.displaymode, "peers")
         self.assertEqual(cls.pktversion, 4)
         self.assertEqual(cls.showhostnames, True)



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/9a93cda026083a2cb3fac1e81758bfce9e4c56bf...6bafe67ae210df08dabfc6a9a4acd2aa455b4672

-- 
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/9a93cda026083a2cb3fac1e81758bfce9e4c56bf...6bafe67ae210df08dabfc6a9a4acd2aa455b4672
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20181030/1348fa97/attachment-0001.html>


More information about the vc mailing list