[Git][NTPsec/ntpsec][master] AgentX code and tests now work in python 3.
Ian Bruene
gitlab at mg.gitlab.com
Wed Sep 27 17:17:13 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
c69555cd by Ian Bruene at 2017-09-27T12:13:51-05:00
AgentX code and tests now work in python 3.
- - - - -
2 changed files:
- pylib/agentx.py
- tests/pylib/test_agentx.py
Changes:
=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -7,6 +7,84 @@ from __future__ import print_function
import struct
from ntp.util import slicedata
+master_encoding = 'latin-1'
+
+if str is bytes: # Python 2
+ polystr = str
+ polybytes = bytes
+ polyord = ord
+ polychr = str
+ input = raw_input
+
+ def string_escape(s):
+ return s.decode('string_escape')
+
+ def make_wrapper(fp):
+ return fp
+
+else: # Python 3
+ import io
+
+ def polystr(o):
+ "Polymorphic string factory function"
+ if isinstance(o, str):
+ return o
+ if not isinstance(o, bytes):
+ return str(o)
+ return str(o, encoding=master_encoding)
+
+ def polybytes(s):
+ "Polymorphic string encoding function"
+ if isinstance(s, bytes):
+ return s
+ if not isinstance(s, str):
+ return bytes(s)
+ return bytes(s, encoding=master_encoding)
+
+ def polyord(c):
+ "Polymorphic ord() function"
+ if isinstance(c, str):
+ return ord(c)
+ else:
+ return c
+
+ def polychr(c):
+ "Polymorphic chr() function"
+ if isinstance(c, int):
+ return chr(c)
+ else:
+ return c
+
+ def string_escape(s):
+ "Polymorphic string_escape/unicode_escape"
+ # This hack is necessary because Unicode strings in Python 3 don't
+ # have a decode method, so there's no simple way to ask it for the
+ # equivalent of decode('string_escape') in Python 2. This function
+ # assumes that it will be called with a Python 3 'str' instance
+ return s.encode(master_encoding).decode('unicode_escape')
+
+ def make_wrapper(fp):
+ "Wrapper factory function to enforce master encoding"
+ # This can be used to wrap normally binary streams for API
+ # compatibility with functions that need a text stream in
+ # Python 3; it ensures that the binary bytes are decoded using
+ # the master encoding we use to turn bytes to Unicode in
+ # polystr above
+ # newline="\n" ensures that Python 3 won't mangle line breaks
+ return io.TextIOWrapper(fp, encoding=master_encoding, newline="\n")
+
+ def make_std_wrapper(stream):
+ "Standard input/output wrapper factory function"
+ # This ensures that the encoding of standard output and standard
+ # error on Python 3 matches the master encoding we use to turn
+ # bytes to Unicode in polystr above
+ # line_buffering=True ensures that interactive
+ # command sessions work as expected
+ return io.TextIOWrapper(stream.buffer,
+ encoding=master_encoding, newline="\n",
+ line_buffering=True)
+
+
internetPrefix = (1, 3, 6, 1) # Used by the prefix option of OID headers
prefixCount = len(internetPrefix)
@@ -77,7 +155,7 @@ class AgentXPDU:
s = self.__class__.__name__ + "("
v = []
myvars = self.packetVars()
- keys = myvars.keys()
+ keys = list(myvars.keys())
keys.sort() # they will always be in the same order: testable
for name in keys:
value = myvars[name]
@@ -109,7 +187,7 @@ class AgentXPDU:
def decode_OpenPDU(data, header):
flags = header["flags"]
temp, data = slicedata(data, 4)
- timeout = struct.unpack("Bxxx", temp)[0]
+ timeout = struct.unpack("Bxxx", polybytes(temp))[0]
oid, data = decode_OID(data, header)
description = decode_octetstr(data, header)[0]
result = OpenPDU(flags["bigEndian"], header["session_id"],
@@ -150,7 +228,9 @@ class OpenPDU(AgentXPDU):
def decode_ClosePDU(data, header):
flags = header["flags"]
- reason = ord(data[0]) # Bxxx
+ reason = data[0] # Bxxx
+ if isinstance(reason, str):
+ reason = ord(reason)
result = ClosePDU(flags["bigEndian"], header["session_id"],
header["transaction_id"], header["packet_id"],
reason)
@@ -185,11 +265,12 @@ def decode_xRegisterPDU(data, header):
endianToken = getendian(flags["bigEndian"])
context, data = decode_context(data, header)
temp, data = slicedata(data, 4)
- timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx", temp)
+ timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx",
+ polybytes(temp))
oid, data = decode_OID(data, header)
if rangeSubid != 0:
temp, data = slicedata(data, 4)
- upperBound = struct.unpack(endianToken + "I", temp)[0]
+ upperBound = struct.unpack(endianToken + "I", polybytes(temp))[0]
else:
upperBound = None
if header["type"] == PDU_REGISTER:
@@ -320,7 +401,7 @@ def decode_GetBulkPDU(data, header):
endianToken = getendian(flags["bigEndian"])
context, data = decode_context(data, header)
temp, data = slicedata(data, 4)
- nonReps, maxReps = struct.unpack(endianToken + "HH", temp)
+ nonReps, maxReps = struct.unpack(endianToken + "HH", polybytes(temp))
oidranges = decode_searchrange_list(data, header)
result = GetBulkPDU(flags["bigEndian"], header["session_id"],
header["transaction_id"], header["packet_id"],
@@ -638,7 +719,8 @@ def decode_ResponsePDU(data, header):
flags = header["flags"]
endianToken = getendian(flags["bigEndian"])
temp, data = slicedata(data, 8)
- sysUptime, resError, resIndex = struct.unpack(endianToken + "IHH", temp)
+ sysUptime, resError, resIndex = struct.unpack(endianToken + "IHH",
+ polybytes(temp))
if len(data) > 0:
varbinds = decode_varbindlist(data, header)
else:
@@ -706,7 +788,7 @@ def decode_OID(data, header):
flags = header["flags"]
# Need to split off the header to get the subid count
header, data = slicedata(data, 4)
- n_subid, prefix, include = struct.unpack("BBBx", header)
+ n_subid, prefix, include = struct.unpack("BBBx", polybytes(header))
if prefix != 0:
subids = internetPrefix + (prefix,)
else:
@@ -720,7 +802,7 @@ def decode_OID(data, header):
data, rest = slicedata(data, byteCount)
endianToken = getendian(flags["bigEndian"])
formatString = endianToken + ("I" * n_subid)
- subids += struct.unpack(formatString, data)
+ subids += struct.unpack(formatString, polybytes(data))
result = OID(subids, include)
return (result, rest)
@@ -766,7 +848,8 @@ class OID:
if x[i] == y[i]:
continue
else:
- c = cmp(x[i], y[i])
+ # this is the Py3 version of c = cmp(x[i], y[i])
+ c = (x[i] > y[i]) - (x[i] < y[i])
c = -c if flipped is True else c
return c
# Only reach this if shorter, and each index is equal
@@ -818,8 +901,9 @@ def encode_octetstr(bigEndian, octets):
pad = (numoctets % 4)
if pad > 0: # Pad out the data to word boundary
pad = 4 - pad
- pad = "\x00" * pad
+ pad = b"\x00" * pad
if type(octets) is str:
+ octets = polybytes(octets)
data = header + octets + pad
else:
fmt = "B" * numoctets
@@ -832,13 +916,13 @@ def decode_octetstr(data, header):
flags = header["flags"]
header, data = slicedata(data, 4)
endianToken = getendian(flags["bigEndian"])
- numoctets = struct.unpack(endianToken + "I", header)[0]
+ numoctets = struct.unpack(endianToken + "I", polybytes(header))[0]
if len(data) < numoctets:
raise ValueError("Octet string shorter than length")
pad = numoctets % 4
if pad > 0: # Pad out the data to word boundary
pad = 4 - pad
- return data[:numoctets], data[numoctets + pad:]
+ return polystr(data[:numoctets]), data[numoctets + pad:]
def sanity_octetstr(data):
@@ -856,7 +940,7 @@ def decode_Varbind(data, header):
flags = header["flags"]
bindheader, data = slicedata(data, 4)
endianToken = getendian(flags["bigEndian"])
- valType = struct.unpack(endianToken + "Hxx", bindheader)[0]
+ valType = struct.unpack(endianToken + "Hxx", polybytes(bindheader))[0]
name, data = decode_OID(data, header)
if valType not in definedValueTypes.keys():
raise ValueError("Value type %s not in defined types" % valType)
@@ -933,7 +1017,7 @@ def sanity_integer32(data):
def encode_nullvalue(bigEndian, data):
- return ""
+ return b""
def decode_nullvalue(data, header):
@@ -953,7 +1037,7 @@ def decode_integer64(data, header):
flags = header["flags"]
endianToken = getendian(flags["bigEndian"])
num, data = slicedata(data, 8)
- num = struct.unpack(endianToken + "Q", num)[0]
+ num = struct.unpack(endianToken + "Q", polybytes(num))[0]
return (num, data)
@@ -969,7 +1053,7 @@ def encode_ipaddr(bigEndian, octets):
def decode_ipaddr(data, header):
addr, data = decode_octetstr(data, header)
- addr = struct.unpack("BBBB", addr)
+ addr = struct.unpack("BBBB", polybytes(addr))
return addr, data
@@ -1031,7 +1115,7 @@ def encode_searchrange_list(bigEndian, searchranges, nullTerminate=False):
if nullTerminate:
noid = OID(())
encoded.append(noid.encode(bigEndian))
- encoded = "".join(encoded)
+ encoded = b"".join(encoded)
return encoded
@@ -1055,7 +1139,7 @@ def decode_searchrange_list_nullterm(data, header):
def encode_varbindlist(bigEndian, varbinds):
- payload = ""
+ payload = b""
for varbind in varbinds:
payload += varbind.encode(bigEndian)
return payload
@@ -1108,7 +1192,7 @@ def encode_pduheader(pduType, instanceRegistration, newIndex,
def decode_pduheader(data): # Endianness is controlled from the PDU header
lineone, data = slicedata(data, 4)
- version, pduType, flags = struct.unpack(">BBBx", lineone)
+ version, pduType, flags = struct.unpack(">BBBx", polybytes(lineone))
if pduType not in definedPDUTypes:
raise ValueError("PDU type %s not in defined types" % pduType)
# Slice up the flags
@@ -1122,7 +1206,7 @@ def decode_pduheader(data): # Endianness is controlled from the PDU header
# then parse them
fmt = getendian(flagDict["bigEndian"]) + "IIII"
linen, data = slicedata(data, 16) # 4 x 4-byte variables
- sID, tactionID, pktID, dataLen = struct.unpack(fmt, linen)
+ sID, tactionID, pktID, dataLen = struct.unpack(fmt, polybytes(linen))
result = {"version": version, "type": pduType, "flags": flagDict,
"session_id": sID, "transaction_id": tactionID,
"packet_id": pktID, "length": dataLen}
@@ -1135,7 +1219,7 @@ def encode_context(bigEndian, context):
payload = encode_octetstr(bigEndian, context)
else:
contextP = False
- payload = ""
+ payload = b""
return (contextP, payload)
=====================================
tests/pylib/test_agentx.py
=====================================
The diff for this file was not included because it is too large.
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/c69555cdc1351e6497d136ead43e4dda36d933b1
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/c69555cdc1351e6497d136ead43e4dda36d933b1
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170927/59ea0c74/attachment.html>
More information about the vc
mailing list