[Git][NTPsec/ntpsec][master] AgentX code and tests now work in python 3.

Ian Bruene gitlab at mg.gitlab.com
Wed Sep 27 17:17:13 UTC 2017


Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
c69555cd by Ian Bruene at 2017-09-27T12:13:51-05:00
AgentX code and tests now work in python 3.

- - - - -


2 changed files:

- pylib/agentx.py
- tests/pylib/test_agentx.py


Changes:

=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -7,6 +7,84 @@ from __future__ import print_function
 import struct
 from ntp.util import slicedata
 
+master_encoding = 'latin-1'
+
+if str is bytes:  # Python 2
+    polystr = str
+    polybytes = bytes
+    polyord = ord
+    polychr = str
+    input = raw_input
+
+    def string_escape(s):
+        return s.decode('string_escape')
+
+    def make_wrapper(fp):
+        return fp
+
+else:  # Python 3
+    import io
+
+    def polystr(o):
+        "Polymorphic string factory function"
+        if isinstance(o, str):
+            return o
+        if not isinstance(o, bytes):
+            return str(o)
+        return str(o, encoding=master_encoding)
+
+    def polybytes(s):
+        "Polymorphic string encoding function"
+        if isinstance(s, bytes):
+            return s
+        if not isinstance(s, str):
+            return bytes(s)
+        return bytes(s, encoding=master_encoding)
+
+    def polyord(c):
+        "Polymorphic ord() function"
+        if isinstance(c, str):
+            return ord(c)
+        else:
+            return c
+
+    def polychr(c):
+        "Polymorphic chr() function"
+        if isinstance(c, int):
+            return chr(c)
+        else:
+            return c
+
+    def string_escape(s):
+        "Polymorphic string_escape/unicode_escape"
+        # This hack is necessary because Unicode strings in Python 3 don't
+        # have a decode method, so there's no simple way to ask it for the
+        # equivalent of decode('string_escape') in Python 2. This function
+        # assumes that it will be called with a Python 3 'str' instance
+        return s.encode(master_encoding).decode('unicode_escape')
+
+    def make_wrapper(fp):
+        "Wrapper factory function to enforce master encoding"
+        # This can be used to wrap normally binary streams for API
+        # compatibility with functions that need a text stream in
+        # Python 3; it ensures that the binary bytes are decoded using
+        # the master encoding we use to turn bytes to Unicode in
+        # polystr above
+        # newline="\n" ensures that Python 3 won't mangle line breaks
+        return io.TextIOWrapper(fp, encoding=master_encoding, newline="\n")
+
+    def make_std_wrapper(stream):
+        "Standard input/output wrapper factory function"
+        # This ensures that the encoding of standard output and standard
+        # error on Python 3 matches the master encoding we use to turn
+        # bytes to Unicode in polystr above
+        # line_buffering=True ensures that interactive
+        # command sessions work as expected
+        return io.TextIOWrapper(stream.buffer,
+                                encoding=master_encoding, newline="\n",
+                                line_buffering=True)
+
+
 internetPrefix = (1, 3, 6, 1)  # Used by the prefix option of OID headers
 prefixCount = len(internetPrefix)
 
@@ -77,7 +155,7 @@ class AgentXPDU:
         s = self.__class__.__name__ + "("
         v = []
         myvars = self.packetVars()
-        keys = myvars.keys()
+        keys = list(myvars.keys())
         keys.sort()  # they will always be in the same order: testable
         for name in keys:
             value = myvars[name]
@@ -109,7 +187,7 @@ class AgentXPDU:
 def decode_OpenPDU(data, header):
     flags = header["flags"]
     temp, data = slicedata(data, 4)
-    timeout = struct.unpack("Bxxx", temp)[0]
+    timeout = struct.unpack("Bxxx", polybytes(temp))[0]
     oid, data = decode_OID(data, header)
     description = decode_octetstr(data, header)[0]
     result = OpenPDU(flags["bigEndian"], header["session_id"],
@@ -150,7 +228,9 @@ class OpenPDU(AgentXPDU):
 
 def decode_ClosePDU(data, header):
     flags = header["flags"]
-    reason = ord(data[0])  # Bxxx
+    reason = data[0]  # Bxxx
+    if isinstance(reason, str):
+        reason = ord(reason)
     result = ClosePDU(flags["bigEndian"], header["session_id"],
                       header["transaction_id"], header["packet_id"],
                       reason)
@@ -185,11 +265,12 @@ def decode_xRegisterPDU(data, header):
     endianToken = getendian(flags["bigEndian"])
     context, data = decode_context(data, header)
     temp, data = slicedata(data, 4)
-    timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx", temp)
+    timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx",
+                                                  polybytes(temp))
     oid, data = decode_OID(data, header)
     if rangeSubid != 0:
         temp, data = slicedata(data, 4)
-        upperBound = struct.unpack(endianToken + "I", temp)[0]
+        upperBound = struct.unpack(endianToken + "I", polybytes(temp))[0]
     else:
         upperBound = None
     if header["type"] == PDU_REGISTER:
@@ -320,7 +401,7 @@ def decode_GetBulkPDU(data, header):
     endianToken = getendian(flags["bigEndian"])
     context, data = decode_context(data, header)
     temp, data = slicedata(data, 4)
-    nonReps, maxReps = struct.unpack(endianToken + "HH", temp)
+    nonReps, maxReps = struct.unpack(endianToken + "HH", polybytes(temp))
     oidranges = decode_searchrange_list(data, header)
     result = GetBulkPDU(flags["bigEndian"], header["session_id"],
                         header["transaction_id"], header["packet_id"],
@@ -638,7 +719,8 @@ def decode_ResponsePDU(data, header):
     flags = header["flags"]
     endianToken = getendian(flags["bigEndian"])
     temp, data = slicedata(data, 8)
-    sysUptime, resError, resIndex = struct.unpack(endianToken + "IHH", temp)
+    sysUptime, resError, resIndex = struct.unpack(endianToken + "IHH",
+                                                  polybytes(temp))
     if len(data) > 0:
         varbinds = decode_varbindlist(data, header)
     else:
@@ -706,7 +788,7 @@ def decode_OID(data, header):
     flags = header["flags"]
     # Need to split off the header to get the subid count
     header, data = slicedata(data, 4)
-    n_subid, prefix, include = struct.unpack("BBBx", header)
+    n_subid, prefix, include = struct.unpack("BBBx", polybytes(header))
     if prefix != 0:
         subids = internetPrefix + (prefix,)
     else:
@@ -720,7 +802,7 @@ def decode_OID(data, header):
     data, rest = slicedata(data, byteCount)
     endianToken = getendian(flags["bigEndian"])
     formatString = endianToken + ("I" * n_subid)
-    subids += struct.unpack(formatString, data)
+    subids += struct.unpack(formatString, polybytes(data))
     result = OID(subids, include)
     return (result, rest)
 
@@ -766,7 +848,8 @@ class OID:
             if x[i] == y[i]:
                 continue
             else:
-                c = cmp(x[i], y[i])
+                # this is the Py3 version of c = cmp(x[i], y[i])
+                c = (x[i] > y[i]) - (x[i] < y[i])
                 c = -c if flipped is True else c
                 return c
         # Only reach this if shorter, and each index is equal
@@ -818,8 +901,9 @@ def encode_octetstr(bigEndian, octets):
     pad = (numoctets % 4)
     if pad > 0:  # Pad out the data to word boundary
         pad = 4 - pad
-    pad = "\x00" * pad
+    pad = b"\x00" * pad
     if type(octets) is str:
+        octets = polybytes(octets)
         data = header + octets + pad
     else:
         fmt = "B" * numoctets
@@ -832,13 +916,13 @@ def decode_octetstr(data, header):
     flags = header["flags"]
     header, data = slicedata(data, 4)
     endianToken = getendian(flags["bigEndian"])
-    numoctets = struct.unpack(endianToken + "I", header)[0]
+    numoctets = struct.unpack(endianToken + "I", polybytes(header))[0]
     if len(data) < numoctets:
         raise ValueError("Octet string shorter than length")
     pad = numoctets % 4
     if pad > 0:  # Pad out the data to word boundary
         pad = 4 - pad
-    return data[:numoctets], data[numoctets + pad:]
+    return polystr(data[:numoctets]), data[numoctets + pad:]
 
 
 def sanity_octetstr(data):
@@ -856,7 +940,7 @@ def decode_Varbind(data, header):
     flags = header["flags"]
     bindheader, data = slicedata(data, 4)
     endianToken = getendian(flags["bigEndian"])
-    valType = struct.unpack(endianToken + "Hxx", bindheader)[0]
+    valType = struct.unpack(endianToken + "Hxx", polybytes(bindheader))[0]
     name, data = decode_OID(data, header)
     if valType not in definedValueTypes.keys():
         raise ValueError("Value type %s not in defined types" % valType)
@@ -933,7 +1017,7 @@ def sanity_integer32(data):
 
 
 def encode_nullvalue(bigEndian, data):
-    return ""
+    return b""
 
 
 def decode_nullvalue(data, header):
@@ -953,7 +1037,7 @@ def decode_integer64(data, header):
     flags = header["flags"]
     endianToken = getendian(flags["bigEndian"])
     num, data = slicedata(data, 8)
-    num = struct.unpack(endianToken + "Q", num)[0]
+    num = struct.unpack(endianToken + "Q", polybytes(num))[0]
     return (num, data)
 
 
@@ -969,7 +1053,7 @@ def encode_ipaddr(bigEndian, octets):
 
 def decode_ipaddr(data, header):
     addr, data = decode_octetstr(data, header)
-    addr = struct.unpack("BBBB", addr)
+    addr = struct.unpack("BBBB", polybytes(addr))
     return addr, data
 
 
@@ -1031,7 +1115,7 @@ def encode_searchrange_list(bigEndian, searchranges, nullTerminate=False):
     if nullTerminate:
         noid = OID(())
         encoded.append(noid.encode(bigEndian))
-    encoded = "".join(encoded)
+    encoded = b"".join(encoded)
     return encoded
 
 
@@ -1055,7 +1139,7 @@ def decode_searchrange_list_nullterm(data, header):
 
 
 def encode_varbindlist(bigEndian, varbinds):
-    payload = ""
+    payload = b""
     for varbind in varbinds:
         payload += varbind.encode(bigEndian)
     return payload
@@ -1108,7 +1192,7 @@ def encode_pduheader(pduType, instanceRegistration, newIndex,
 
 def decode_pduheader(data):  # Endianness is controlled from the PDU header
     lineone, data = slicedata(data, 4)
-    version, pduType, flags = struct.unpack(">BBBx", lineone)
+    version, pduType, flags = struct.unpack(">BBBx", polybytes(lineone))
     if pduType not in definedPDUTypes:
         raise ValueError("PDU type %s not in defined types" % pduType)
     # Slice up the flags
@@ -1122,7 +1206,7 @@ def decode_pduheader(data):  # Endianness is controlled from the PDU header
     # then parse them
     fmt = getendian(flagDict["bigEndian"]) + "IIII"
     linen, data = slicedata(data, 16)  # 4 x 4-byte variables
-    sID, tactionID, pktID, dataLen = struct.unpack(fmt, linen)
+    sID, tactionID, pktID, dataLen = struct.unpack(fmt, polybytes(linen))
     result = {"version": version, "type": pduType, "flags": flagDict,
               "session_id": sID, "transaction_id": tactionID,
               "packet_id": pktID, "length": dataLen}
@@ -1135,7 +1219,7 @@ def encode_context(bigEndian, context):
         payload = encode_octetstr(bigEndian, context)
     else:
         contextP = False
-        payload = ""
+        payload = b""
     return (contextP, payload)
 
 


=====================================
tests/pylib/test_agentx.py
=====================================
The diff for this file was not included because it is too large.


View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/c69555cdc1351e6497d136ead43e4dda36d933b1

---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/c69555cdc1351e6497d136ead43e4dda36d933b1
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170927/59ea0c74/attachment.html>


More information about the vc mailing list