[Git][NTPsec/ntpsec][master] 11 commits: Changed python 2.x polystr to be str instead of unicode.
Ian Bruene
gitlab at mg.gitlab.com
Tue Oct 30 21:48:28 UTC 2018
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
2dea4490 by Ian Bruene at 2018-10-29T21:32:24Z
Changed python 2.x polystr to be str instead of unicode.
- - - - -
40a0a2b9 by Ian Bruene at 2018-10-29T21:38:18Z
Removed duplicate poly string shims from agentx_packet.py.
- - - - -
1e2307b1 by Ian Bruene at 2018-10-29T21:47:08Z
Changed python 2.x polystr to be str rather than unicode.
- - - - -
d7ecd22d by Ian Bruene at 2018-10-29T22:04:35Z
Added polyinput shim.
- - - - -
923fb98b by Ian Bruene at 2018-10-29T22:23:17Z
ntpq and ntpmon now call check_unicode seperately from any poly handling.
- - - - -
6ac641c5 by Ian Bruene at 2018-10-29T22:27:02Z
Removed check_unicode() call from poly switch.
Calling check_unicode here messed up numerous tests. It should be called
from the programs themselves after imports are complete.
- - - - -
9f7ab658 by Ian Bruene at 2018-10-29T22:29:31Z
Comverted packet.py to use poly module.
- - - - -
155b6d71 by Ian Bruene at 2018-10-30T17:32:35Z
Added polyunicode shim.
- - - - -
c4fb2a04 by Ian Bruene at 2018-10-30T17:35:16Z
Converted ntpq to use poly module.
- - - - -
205ecdb4 by Ian Bruene at 2018-10-30T19:15:05Z
Removed forgotten debugging print()s.
- - - - -
6bafe67a by Ian Bruene at 2018-10-30T19:23:01Z
Changed tests to work on gentoo.
- - - - -
7 changed files:
- ntpclients/ntpmon.py
- ntpclients/ntpq.py
- pylib/agentx_packet.py
- pylib/packet.py
- pylib/poly.py
- pylib/util.py
- tests/pylib/test_util.py
Changes:
=====================================
ntpclients/ntpmon.py
=====================================
@@ -44,7 +44,9 @@ except ImportError as e:
sys.stderr.write("%s\n" % e)
sys.exit(1)
-
+# This used to force UTF-8 encoding, but that breaks the readline system.
+# Unfortunately sometimes sys.stdout.encoding lies about the encoding,
+# so expect random false positives.
# LANG=C or LANG=POSIX refuse unicode when combined with curses
disableunicode = ntp.util.check_unicode()
=====================================
ntpclients/ntpq.py
=====================================
@@ -27,97 +27,19 @@ try:
import ntp.ntpc
import ntp.packet
import ntp.util
+ import ntp.poly
except ImportError as e:
sys.stderr.write(
"ntpq: can't find Python NTP library -- check PYTHONPATH.\n")
sys.stderr.write("%s\n" % e)
sys.exit(1)
-version = ntp.util.stdversion()
-master_encoding = 'latin-1'
+# This used to force UTF-8 encoding, but that breaks the readline system.
+# Unfortunately sometimes sys.stdout.encoding lies about the encoding,
+# so expect random false positives.
+ntp.util.check_unicode()
-# General notes on Python 2/3 compatibility:
-#
-# This code uses the following strategy to allow it to run on both Python 2
-# and Python 3:
-#
-# - Use latin-1 encoding to transform binary data to/from Unicode when
-# necessary for operations where Python 3 expects Unicode; the
-# polystr and polybytes functions are used to do this so that
-# when running on Python 2, the byte string data is used unchanged.
-#
-# - Construct custom stdout and stderr streams when running
-# on Python 3 that force UTF-8 encoding, and wrap them around the
-# underlying binary buffers (in Python 2, the streams are binary
-# and are used unchanged); this ensures that the same transformation
-# is done on data from/to the standard streams, as is done on binary
-# data from/to files and subprocesses; the make_std_wrapper function
-# does this.
-#
-# anyone that changes this needs to test with all combinations of
-# python2, python3, LC_ALL=ascii, LC_ALL=latin-1, LC_ALL=en_US.utf8, and
-# piping output to a file. While looking at the UTF-8 in the output.
-
-forced_utf8 = False
-
-if str is bytes: # Python 2
- polystr = unicode
- polybytes = bytes
-
- def string_escape(s):
- return s.decode('string_escape')
-
- # This used to force UTF-8 encoding, but that breaks the readline system.
- # Unfortunately sometimes sys.stdout.encoding lies about the encoding,
- # so expect random false positives.
- ntp.util.check_unicode()
-
-else: # Python 3
- import io
-
- def polystr(o):
- "Polymorphic string factory function"
- if isinstance(o, str):
- return o
- if not isinstance(o, bytes):
- return str(o)
- return str(o, encoding=master_encoding)
-
- def polybytes(s):
- "Polymorphic string encoding function"
- if isinstance(s, bytes):
- return s
- if not isinstance(s, str):
- return bytes(s)
- return bytes(s, encoding=master_encoding)
-
- def string_escape(s):
- "Polymorphic string_escape/unicode_escape"
- # This hack is necessary because Unicode strings in Python 3 don't
- # have a decode method, so there's no simple way to ask it for the
- # equivalent of decode('string_escape') in Python 2. This function
- # assumes that it will be called with a Python 3 'str' instance
- return s.encode(master_encoding).decode('unicode_escape')
-
- def make_std_wrapper(stream):
- "Standard input/output wrapper factory function"
- # This ensures that the encoding of standard output and standard
- # error on Python 3 matches the master encoding we use to turn
- # bytes to Unicode in polystr above
- # line_buffering=True ensures that interactive command sessions
- # work as expected
- return io.TextIOWrapper(stream.buffer, encoding="utf-8",
- newline="\n", line_buffering=True)
-
- # This is the one situation where we *can* force unicode.
- if "UTF-8" != sys.stdout.encoding:
- forced_utf8 = True
- sys.stdout = make_std_wrapper(sys.stdout)
- if "UTF-8" != sys.stderr.encoding:
- forced_utf8 = True
- sys.stderr = make_std_wrapper(sys.stderr)
-
-# NTP-specific parts resume here
+version = ntp.util.stdversion()
# Flags for forming descriptors.
OPT = 0x80 # this argument is optional, or'd with type */
@@ -228,7 +150,7 @@ class Ntpq(cmd.Cmd):
def say(self, msg):
try:
- sys.stdout.write(polystr(msg))
+ sys.stdout.write(ntp.poly.polyunicode(msg))
except UnicodeEncodeError as e:
print("Unicode failure:", e)
print("msg:\n", repr(msg))
@@ -236,7 +158,7 @@ class Ntpq(cmd.Cmd):
sys.stdout.flush() # In case we're piping the output
def warn(self, msg):
- sys.stderr.write(polystr(msg))
+ sys.stderr.write(ntp.poly.polystr(msg))
def help_help(self):
self.say("""\
@@ -400,8 +322,9 @@ usage: help [ command ]
# high-half characters. We won't do that unless somebody
# files a bug, Mode 6 never seems to generate those in
# variable fetches.
- text = polystr(session.response.replace(polybytes(",\r\n"),
- polybytes(",\n")))
+ text = ntp.poly.polystr(session.response.replace(
+ ntp.poly.polybytes(",\r\n"),
+ ntp.poly.polybytes(",\n")))
else:
if not quiet:
self.say("status=%04x %s,\n"
@@ -1697,7 +1620,7 @@ if __name__ == '__main__':
session.logfp = interpreter.logfp = logfp
- if forced_utf8 and interpreter.debug:
+ if ntp.poly.forced_utf8 and interpreter.debug:
interpreter.warn("\nforced UTF-8 output\n")
if keyfile is not None: # Have a -k, setup the auth
=====================================
pylib/agentx_packet.py
=====================================
@@ -5,74 +5,9 @@
from __future__ import print_function
import struct
+import ntp.poly
from ntp.util import slicedata
-master_encoding = 'latin-1'
-
-if str is bytes: # pragma: no cover
- # Python 2
- polystr = str
- polybytes = bytes
- polyord = ord
- polychr = str
- input = raw_input
-
- def string_escape(s):
- return s.decode('string_escape')
-
-else: # pragma: no cover
- # Python 3
- import io
-
- def polystr(o):
- "Polymorphic string factory function"
- if isinstance(o, str):
- return o
- if not isinstance(o, bytes):
- return str(o)
- return str(o, encoding=master_encoding)
-
- def polybytes(s):
- "Polymorphic string encoding function"
- if isinstance(s, bytes):
- return s
- if not isinstance(s, str):
- return bytes(s)
- return bytes(s, encoding=master_encoding)
-
- def polyord(c):
- "Polymorphic ord() function"
- if isinstance(c, str):
- return ord(c)
- else:
- return c
-
- def polychr(c):
- "Polymorphic chr() function"
- if isinstance(c, int):
- return chr(c)
- else:
- return c
-
- def string_escape(s):
- "Polymorphic string_escape/unicode_escape"
- # This hack is necessary because Unicode strings in Python 3 don't
- # have a decode method, so there's no simple way to ask it for the
- # equivalent of decode('string_escape') in Python 2. This function
- # assumes that it will be called with a Python 3 'str' instance
- return s.encode(master_encoding).decode('unicode_escape')
-
- def make_std_wrapper(stream):
- "Standard input/output wrapper factory function"
- # This ensures that the encoding of standard output and standard
- # error on Python 3 matches the master encoding we use to turn
- # bytes to Unicode in polystr above
- # line_buffering=True ensures that interactive
- # command sessions work as expected
- return io.TextIOWrapper(stream.buffer,
- encoding=master_encoding, newline="\n",
- line_buffering=True)
-
internetPrefix = (1, 3, 6, 1) # Used by the prefix option of OID headers
prefixCount = len(internetPrefix)
@@ -207,7 +142,7 @@ class AgentXPDU:
def decode_OpenPDU(data, header):
flags = header["flags"]
temp, data = slicedata(data, 4)
- timeout = struct.unpack("Bxxx", polybytes(temp))[0]
+ timeout = struct.unpack("Bxxx", ntp.poly.polybytes(temp))[0]
oid, data = decode_OID(data, header)
description = decode_octetstr(data, header)[0]
result = OpenPDU(flags["bigEndian"], header["session_id"],
@@ -286,11 +221,12 @@ def decode_xRegisterPDU(data, header):
context, data = decode_context(data, header)
temp, data = slicedata(data, 4)
timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx",
- polybytes(temp))
+ ntp.poly.polybytes(temp))
oid, data = decode_OID(data, header)
if rangeSubid != 0:
temp, data = slicedata(data, 4)
- upperBound = struct.unpack(endianToken + "I", polybytes(temp))[0]
+ upperBound = struct.unpack(endianToken + "I",
+ ntp.poly.polybytes(temp))[0]
else:
upperBound = None
if header["type"] == PDU_REGISTER:
@@ -417,7 +353,8 @@ def decode_GetBulkPDU(data, header):
endianToken = getendian(flags["bigEndian"])
context, data = decode_context(data, header)
temp, data = slicedata(data, 4)
- nonReps, maxReps = struct.unpack(endianToken + "HH", polybytes(temp))
+ nonReps, maxReps = struct.unpack(endianToken + "HH",
+ ntp.poly.polybytes(temp))
oidranges = decode_searchrange_list(data, header)
result = GetBulkPDU(flags["bigEndian"], header["session_id"],
header["transaction_id"], header["packet_id"],
@@ -735,7 +672,7 @@ def decode_ResponsePDU(data, header):
endianToken = getendian(flags["bigEndian"])
temp, data = slicedata(data, 8)
sysUptime, resError, resIndex = struct.unpack(endianToken + "IHH",
- polybytes(temp))
+ ntp.poly.polybytes(temp))
if len(data) > 0:
varbinds = decode_varbindlist(data, header)
else:
@@ -803,7 +740,7 @@ def decode_OID(data, header):
flags = header["flags"]
# Need to split off the header to get the subid count
header, data = slicedata(data, 4)
- n_subid, prefix, include = struct.unpack("BBBx", polybytes(header))
+ n_subid, prefix, include = struct.unpack("BBBx", ntp.poly.polybytes(header))
if prefix != 0:
subids = internetPrefix + (prefix,)
else:
@@ -817,7 +754,7 @@ def decode_OID(data, header):
data, rest = slicedata(data, byteCount)
endianToken = getendian(flags["bigEndian"])
formatString = endianToken + ("I" * n_subid)
- subids += struct.unpack(formatString, polybytes(data))
+ subids += struct.unpack(formatString, ntp.poly.polybytes(data))
result = OID(subids, include)
return (result, rest)
@@ -926,7 +863,7 @@ def encode_octetstr(bigEndian, octets):
pad = 4 - pad
pad = b"\x00" * pad
if type(octets) is str:
- octets = polybytes(octets)
+ octets = ntp.poly.polybytes(octets)
data = header + octets + pad
else:
fmt = "B" * numoctets
@@ -939,13 +876,13 @@ def decode_octetstr(data, header):
flags = header["flags"]
header, data = slicedata(data, 4)
endianToken = getendian(flags["bigEndian"])
- numoctets = struct.unpack(endianToken + "I", polybytes(header))[0]
+ numoctets = struct.unpack(endianToken + "I", ntp.poly.polybytes(header))[0]
if len(data) < numoctets:
raise ValueError("Octet string shorter than length")
pad = numoctets % 4
if pad > 0: # Pad out the data to word boundary
pad = 4 - pad
- return polystr(data[:numoctets]), data[numoctets + pad:]
+ return ntp.poly.polystr(data[:numoctets]), data[numoctets + pad:]
def sanity_octetstr(data):
@@ -963,7 +900,8 @@ def decode_Varbind(data, header):
flags = header["flags"]
bindheader, data = slicedata(data, 4)
endianToken = getendian(flags["bigEndian"])
- valType = struct.unpack(endianToken + "Hxx", polybytes(bindheader))[0]
+ valType = struct.unpack(endianToken + "Hxx",
+ ntp.poly.polybytes(bindheader))[0]
name, data = decode_OID(data, header)
if valType not in definedValueTypes.keys():
raise ValueError("Value type %s not in defined types" % valType)
@@ -1078,7 +1016,7 @@ def decode_integer64(data, header):
flags = header["flags"]
endianToken = getendian(flags["bigEndian"])
num, data = slicedata(data, 8)
- num = struct.unpack(endianToken + "Q", polybytes(num))[0]
+ num = struct.unpack(endianToken + "Q", ntp.poly.polybytes(num))[0]
return (num, data)
@@ -1094,7 +1032,7 @@ def encode_ipaddr(bigEndian, octets):
def decode_ipaddr(data, header):
addr, data = decode_octetstr(data, header)
- addr = struct.unpack("BBBB", polybytes(addr))
+ addr = struct.unpack("BBBB", ntp.poly.polybytes(addr))
return addr, data
@@ -1239,14 +1177,16 @@ def encode_pduheader(pduType, instanceRegistration, newIndex,
def decode_pduheader(data): # Endianness is controlled from the PDU header
lineone, data = slicedata(data, 4)
- version, pduType, flags = struct.unpack(">BBBx", polybytes(lineone))
+ version, pduType, flags = struct.unpack(">BBBx",
+ ntp.poly.polybytes(lineone))
# Slice up the flags
flagDict = decode_flagbyte(flags)
# Chop the remaining parts of the header from the rest of the datastream
# then parse them
fmt = getendian(flagDict["bigEndian"]) + "IIII"
linen, data = slicedata(data, 16) # 4 x 4-byte variables
- sID, tactionID, pktID, dataLen = struct.unpack(fmt, polybytes(linen))
+ sID, tactionID, pktID, dataLen = struct.unpack(fmt,
+ ntp.poly.polybytes(linen))
result = {"version": version, "type": pduType, "flags": flagDict,
"session_id": sID, "transaction_id": tactionID,
"packet_id": pktID, "length": dataLen}
=====================================
pylib/packet.py
=====================================
@@ -195,97 +195,8 @@ import ntp.control
import ntp.magic
import ntp.ntpc
import ntp.util
+import ntp.poly
-# General notes on Python 2/3 compatibility:
-#
-# This code uses the following strategy to allow it to run on both Python 2
-# and Python 3:
-#
-# - Use binary I/O to read/write data from/to files and subprocesses;
-# where the exact bytes are important (such as in checking for
-# modified files), use the binary data directly
-#
-# - Use latin-1 encoding to transform binary data to/from Unicode when
-# necessary for operations where Python 3 expects Unicode; the
-# polystr and polybytes functions are used to do this so that
-# when running on Python 2, the byte string data is used unchanged.
-#
-# - Construct custom stdin, stdout, and stderr streams when running
-# on Python 3 that force latin-1 encoding, and wrap them around the
-# underlying binary buffers (in Python 2, the streams are binary
-# and are used unchanged); this ensures that the same transformation
-# is done on data from/to the standard streams, as is done on binary
-# data from/to files and subprocesses; the make_std_wrapper function
-# does this
-
-master_encoding = 'latin-1'
-
-if str is bytes: # pragma: no cover
- # Python 2
- polystr = str
- polybytes = bytes
- polyord = ord
- polychr = str
- input = raw_input
-
- def string_escape(s):
- return s.decode('string_escape')
-
- def make_wrapper(fp):
- return fp
-
-else: # pragma: nocover
- # Python 3
- import io
-
- def polystr(o):
- "Polymorphic string factory function"
- if isinstance(o, str):
- return o
- if not isinstance(o, bytes):
- return str(o)
- return str(o, encoding=master_encoding)
-
- def polybytes(s):
- "Polymorphic string encoding function"
- if isinstance(s, bytes):
- return s
- if not isinstance(s, str):
- return bytes(s)
- return bytes(s, encoding=master_encoding)
-
- def polyord(c):
- "Polymorphic ord() function"
- if isinstance(c, str):
- return ord(c)
- else:
- return c
-
- def polychr(c):
- "Polymorphic chr() function"
- if isinstance(c, int):
- return chr(c)
- else:
- return c
-
- def string_escape(s):
- "Polymorphic string_escape/unicode_escape"
- # This hack is necessary because Unicode strings in Python 3 don't
- # have a decode method, so there's no simple way to ask it for the
- # equivalent of decode('string_escape') in Python 2. This function
- # assumes that it will be called with a Python 3 'str' instance
- return s.encode(master_encoding).decode('unicode_escape')
-
- def make_std_wrapper(stream):
- "Standard input/output wrapper factory function"
- # This ensures that the encoding of standard output and standard
- # error on Python 3 matches the master encoding we use to turn
- # bytes to Unicode in polystr above
- # line_buffering=True ensures that interactive
- # command sessions work as expected
- return io.TextIOWrapper(stream.buffer,
- encoding=master_encoding, newline="\n",
- line_buffering=True)
# Limit on packets in a single Mode 6 response. Increasing this value to
# 96 will marginally speed "mrulist" operation on lossless networks
@@ -348,7 +259,7 @@ class Packet:
@extension.setter
def extension(self, x):
- self.__extension = polybytes(x)
+ self.__extension = ntp.poly.polybytes(x)
def leap(self):
return ("no-leap", "add-leap", "del-leap",
@@ -399,7 +310,7 @@ class SyncPacket(Packet):
self.trusted = True
self.rescaled = False
if data:
- self.analyze(polybytes(data))
+ self.analyze(ntp.poly.polybytes(data))
def analyze(self, data):
datalen = len(data)
@@ -526,11 +437,11 @@ class SyncPacket(Packet):
def refid_as_string(self):
"Sometimes it's a clock name or KOD type"
- return polystr(struct.pack(*(("BBBB",) + self.refid_octets())))
+ return ntp.poly.polystr(struct.pack(*(("BBBB",) + self.refid_octets())))
def refid_as_address(self):
"Sometimes it's an IPV4 address."
- return polystr("%d.%d.%d.%d" % self.refid_octets())
+ return ntp.poly.polystr("%d.%d.%d.%d" % self.refid_octets())
def is_crypto_nak(self):
return len(self.mac) == 4
@@ -605,7 +516,7 @@ class ControlPacket(Packet):
return "%5d %5d\t%3d octets\n" % (self.offset, self.end(), self.count)
def analyze(self, rawdata):
- rawdata = polybytes(rawdata)
+ rawdata = ntp.poly.polybytes(rawdata)
(self.li_vn_mode,
self.r_e_m_op,
self.sequence,
@@ -686,7 +597,8 @@ def dump_hex_printable(xdata, outfp=sys.stdout):
# Output data in hex form
linelen = len(linedata)
line = "%02x " * linelen
- linedata = [polyord(x) for x in linedata] # Will need this later
+ # Will need linedata later
+ linedata = [ntp.poly.polyord(x) for x in linedata]
line %= tuple(linedata)
if linelen < rowsize: # Pad out the line to keep columns neat
line += " " * (rowsize - linelen)
@@ -907,7 +819,7 @@ class ControlSession:
raise ControlException(SERR_NOTRUST)
try:
if os.isatty(0):
- key_id = int(input("Keyid: "))
+ key_id = int(ntp.poly.polyinput("Keyid: "))
else:
key_id = 0
if key_id == 0 or key_id > MAX_KEYID:
@@ -938,7 +850,7 @@ class ControlSession:
"Sending %d octets. seq=%d"
% (len(xdata), self.sequence), self.debug, 3)
try:
- self.sock.sendall(polybytes(xdata))
+ self.sock.sendall(ntp.poly.polybytes(xdata))
except socket.error:
# On failure, we don't know how much data was actually received
if self.logfp is not None:
@@ -976,7 +888,7 @@ class ControlSession:
# If we have data, pad it out to a 32-bit boundary.
# Do not include these in the payload count.
if pkt.extension:
- pkt.extension = polybytes(pkt.extension)
+ pkt.extension = ntp.poly.polybytes(pkt.extension)
while ((ControlPacket.HEADER_LEN + len(pkt.extension)) & 3):
pkt.extension += b"\x00"
@@ -1000,7 +912,7 @@ class ControlSession:
if mac is None:
raise ControlException(SERR_NOKEY)
else:
- pkt.extension += polybytes(mac)
+ pkt.extension += ntp.poly.polybytes(mac)
return pkt.send()
def getresponse(self, opcode, associd, timeo):
@@ -1066,7 +978,7 @@ class ControlSession:
warndbg("At %s, socket read begins" % time.asctime(), 4)
try:
- rawdata = polybytes(self.sock.recv(4096))
+ rawdata = ntp.poly.polybytes(self.sock.recv(4096))
except socket.error: # pragma: no cover
# usually, errno 111: connection refused
raise ControlException(SERR_SOCKET)
@@ -1148,8 +1060,9 @@ class ControlSession:
% (f, len(fragments)), 1)
break
else:
- tempfraglist = [polystr(f.extension) for f in fragments]
- self.response = polybytes("".join(tempfraglist))
+ tempfraglist = [ntp.poly.polystr(f.extension) \
+ for f in fragments]
+ self.response = ntp.poly.polybytes("".join(tempfraglist))
warndbg("Fragment collection ends. %d bytes "
" in %d fragments"
% (len(self.response), len(fragments)), 1)
@@ -1275,9 +1188,9 @@ class ControlSession:
kvpairs = []
instring = False
response = ""
- self.response = polystr(self.response)
+ self.response = ntp.poly.polystr(self.response)
for c in self.response:
- cord = polyord(c)
+ cord = ntp.poly.polyord(c)
if c == '"':
response += c
instring = not instring
@@ -1341,7 +1254,7 @@ class ControlSession:
elif b"\x00" in self.response:
self.response = self.response[:self.response.index(b"\x00")]
self.response = self.response.rstrip()
- return self.response == polybytes("Config Succeeded")
+ return self.response == ntp.poly.polybytes("Config Succeeded")
def fetch_nonce(self):
"""
@@ -1352,8 +1265,8 @@ This combats source address spoofing
# retry 4 times
self.doquery(opcode=ntp.control.CTL_OP_REQ_NONCE)
self.nonce_xmit = time.time()
- if self.response.startswith(polybytes("nonce=")):
- return polystr(self.response.strip())
+ if self.response.startswith(ntp.poly.polybytes("nonce=")):
+ return ntp.poly.polystr(self.response.strip())
# maybe a delay between tries?
# uh, oh, no nonce seen
@@ -1700,7 +1613,7 @@ class Authenticator:
@staticmethod
def compute_mac(payload, keyid, keytype, passwd):
hasher = hashlib.new(keytype)
- hasher.update(polybytes(passwd))
+ hasher.update(ntp.poly.polybytes(passwd))
hasher.update(payload)
if hasher.digest_size == 0:
return None
@@ -1729,6 +1642,6 @@ class Authenticator:
hasher = hashlib.new(keytype)
hasher.update(passwd)
hasher.update(payload)
- return polybytes(hasher.digest()) == mac
+ return ntp.poly.polybytes(hasher.digest()) == mac
# end
=====================================
pylib/poly.py
=====================================
@@ -37,24 +37,22 @@ master_encoding = 'latin-1'
forced_utf8 = False
if str is bytes: # Python 2
- polystr = unicode
+ polystr = str
+ polyunicode = unicode
polybytes = bytes
polyord = ord
polychr = str
- input = raw_input
+ polyinput = raw_input
def string_escape(s):
"""String_escape/unicode_escape."""
return s.decode('string_escape')
- # This used to force UTF-8 encoding, but that breaks the readline system.
- # Unfortunately sometimes sys.stdout.encoding lies about the encoding,
- # so expect random false positives.
- ntp.util.check_unicode()
-
else: # Python 3
import io
+ polyinput = input
+
def polystr(o):
"""Polymorphic string factory function."""
if isinstance(o, str):
@@ -63,6 +61,8 @@ else: # Python 3
return str(o)
return str(o, encoding=master_encoding)
+ polyunicode = polystr
+
def polybytes(s):
"""Polymorphic string encoding function."""
if isinstance(s, bytes):
=====================================
pylib/util.py
=====================================
@@ -199,7 +199,6 @@ def parseConf(text):
elif text[i] == "\\": # Starting an escape sequence
i += 1
if text[i] in "'\"n\\":
- print(repr(text[i]))
current.append(eval("\'\\" + text[i] + "\'"))
else:
current.append(text[i])
=====================================
tests/pylib/test_util.py
=====================================
@@ -914,7 +914,7 @@ class TestPylibUtilMethods(unittest.TestCase):
return cdns_jig_returns.pop(0)
# Test init
- cls = c(False, debug=3)
+ cls = c(False, debug=3, logfp=sys.stderr)
self.assertEqual(cls.debug, 3)
self.assertEqual(cls.logfp, sys.stderr)
self.assertEqual(cls.now, None)
@@ -1027,7 +1027,7 @@ class TestPeerSummary(unittest.TestCase):
target = ntp.util.PeerSummary
def test___init__(self):
- cls = self.target("peers", 4, True, False)
+ cls = self.target("peers", 4, True, False, logfp=sys.stderr)
self.assertEqual(cls.displaymode, "peers")
self.assertEqual(cls.pktversion, 4)
self.assertEqual(cls.showhostnames, True)
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/9a93cda026083a2cb3fac1e81758bfce9e4c56bf...6bafe67ae210df08dabfc6a9a4acd2aa455b4672
--
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/9a93cda026083a2cb3fac1e81758bfce9e4c56bf...6bafe67ae210df08dabfc6a9a4acd2aa455b4672
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20181030/1348fa97/attachment-0001.html>
More information about the vc
mailing list