[Git][NTPsec/ntpsec][master] 5 commits: Changed OIDs to be classes instead of dicts
Ian Bruene
gitlab at mg.gitlab.com
Wed Sep 27 14:31:40 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
90ab8314 by Ian Bruene at 2017-09-27T09:30:03-05:00
Changed OIDs to be classes instead of dicts
- - - - -
5f39cbae by Ian Bruene at 2017-09-27T09:30:03-05:00
Transplanted OID comparison function into OID class
- - - - -
701b81dc by Ian Bruene at 2017-09-27T09:30:03-05:00
Converted Search Ranges to a class
- - - - -
b5b92d45 by Ian Bruene at 2017-09-27T09:30:03-05:00
Converted Varbinds into a class
- - - - -
f6507e3f by Ian Bruene at 2017-09-27T09:30:03-05:00
Rearrangement of some functions, and pep8/pyflakex cleanup
- - - - -
2 changed files:
- pylib/agentx.py
- tests/pylib/test_agentx.py
Changes:
=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -110,7 +110,7 @@ def decode_OpenPDU(data, header):
flags = header["flags"]
temp, data = slicedata(data, 4)
timeout = struct.unpack("Bxxx", temp)[0]
- oid, data = decode_oid(data, header)
+ oid, data = decode_OID(data, header)
description = decode_octetstr(data, header)[0]
result = OpenPDU(flags["bigEndian"], header["session_id"],
header["transaction_id"], header["packet_id"],
@@ -123,7 +123,7 @@ class OpenPDU(AgentXPDU):
timeout, oid, description):
AgentXPDU.__init__(self, PDU_OPEN, bigEndian, sID, tactID, pktID)
self.timeout = timeout
- self.oid = oid
+ self.oid = classifyOID(oid)
self.description = description
def __eq__(self, other):
@@ -139,7 +139,7 @@ class OpenPDU(AgentXPDU):
def encode(self):
payload = struct.pack("Bxxx", self.timeout)
- payload += encode_oid(self.bigEndian, self.oid, False)
+ payload += self.oid.encode(self.bigEndian)
payload += encode_octetstr(self.bigEndian, self.description)
header = encode_pduheader(self.pduType,
False, False, False, False, self.bigEndian,
@@ -186,7 +186,7 @@ def decode_xRegisterPDU(data, header):
context, data = decode_context(data, header)
temp, data = slicedata(data, 4)
timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx", temp)
- oid, data = decode_oid(data, header)
+ oid, data = decode_OID(data, header)
if rangeSubid != 0:
temp, data = slicedata(data, 4)
upperBound = struct.unpack(endianToken + "I", temp)[0]
@@ -212,7 +212,7 @@ class RegisterPDU(AgentXPDU):
bigEndian, sID, tactID, pktID, True, context)
self.timeout = timeout
self.priority = priority
- self.subtree = subtree
+ self.subtree = classifyOID(subtree)
self.rangeSubid = rangeSubid
self.upperBound = upperBound
self._instReg = True # so we don't have to write two encode()s
@@ -242,7 +242,7 @@ class RegisterPDU(AgentXPDU):
else:
payload += struct.pack(endianToken + "xBBx",
self.priority, self.rangeSubid)
- payload += encode_oid(self.bigEndian, self.subtree, False)
+ payload += self.subtree.encode(self.bigEndian)
if self.rangeSubid != 0:
if self.upperBound is None:
raise ValueError("upperBound must be set if rangeSubid is set")
@@ -565,7 +565,7 @@ class IndexDeallocPDU(IndexAllocPDU):
def decode_AddAgentCapsPDU(data, header):
flags = header["flags"]
context, data = decode_context(data, header)
- oid, data = decode_oid(data, header)
+ oid, data = decode_OID(data, header)
descr = decode_octetstr(data, header)[0]
result = AddAgentCapsPDU(flags["bigEndian"], header["session_id"],
header["transaction_id"], header["packet_id"],
@@ -578,7 +578,7 @@ class AddAgentCapsPDU(AgentXPDU):
oid, description, context=None):
AgentXPDU.__init__(self, PDU_ADD_AGENT_CAPS,
bigEndian, sID, tactID, pktID, True, context)
- self.oid = oid
+ self.oid = classifyOID(oid)
self.description = description
def __eq__(self, other):
@@ -592,7 +592,7 @@ class AddAgentCapsPDU(AgentXPDU):
def encode(self):
contextP, payload = encode_context(self.bigEndian, self.context)
- payload += encode_oid(self.bigEndian, self.oid, False)
+ payload += self.oid.encode(self.bigEndian)
payload += encode_octetstr(self.bigEndian, self.description)
header = encode_pduheader(self.pduType, False, False, False,
contextP, self.bigEndian,
@@ -604,7 +604,7 @@ class AddAgentCapsPDU(AgentXPDU):
def decode_RMAgentCapsPDU(data, header):
flags = header["flags"]
context, data = decode_context(data, header)
- oid, data = decode_oid(data, header)
+ oid, data = decode_OID(data, header)
result = RMAgentCapsPDU(flags["bigEndian"], header["session_id"],
header["transaction_id"], header["packet_id"],
oid, context)
@@ -615,7 +615,7 @@ class RMAgentCapsPDU(AgentXPDU):
def __init__(self, bigEndian, sID, tactID, pktID, oid, context=None):
AgentXPDU.__init__(self, PDU_RM_AGENT_CAPS,
bigEndian, sID, tactID, pktID, True, context)
- self.oid = oid
+ self.oid = classifyOID(oid)
def __eq__(self, other):
if AgentXPDU.__eq__(self, other) is not True:
@@ -626,7 +626,7 @@ class RMAgentCapsPDU(AgentXPDU):
def encode(self):
contextP, payload = encode_context(self.bigEndian, self.context)
- payload += encode_oid(self.bigEndian, self.oid, False)
+ payload += self.oid.encode(self.bigEndian)
header = encode_pduheader(self.pduType, False, False, False,
contextP, self.bigEndian,
self.sessionID, self.transactionID,
@@ -694,29 +694,15 @@ class ResponsePDU(AgentXPDU):
#
# ========================================================================
-def encode_oid(bigEndian, subids, include):
- subids = tuple(subids)
- numsubs = len(subids)
- if not (0 <= numsubs <= 128): # Packet can have a maximum of 128 sections
- raise ValueError("OID has too many subids")
- if subids[:prefixCount] == internetPrefix:
- if numsubs > prefixCount:
- prefix = subids[prefixCount] # the number after the prefix
- subids = subids[prefixCount + 1:] # +1 to strip prefix arg
- else: # Have a prefix, but nothing else. Leave as is
- prefix = 0
- else:
- prefix = 0
- n_subid = len(subids)
- include = int(bool(include)) # force integer bool
- endianToken = getendian(bigEndian)
- body = struct.pack(endianToken + "BBBx", n_subid, prefix, include)
- for subid in subids:
- body += struct.pack(endianToken + "I", subid)
- return body
+
+def classifyOID(oid):
+ "utility function to allow the user to send a bare tuple for some cases"
+ if isinstance(oid, OID):
+ return oid
+ return OID(oid, False)
-def decode_oid(data, header):
+def decode_OID(data, header):
flags = header["flags"]
# Need to split off the header to get the subid count
header, data = slicedata(data, 4)
@@ -735,10 +721,96 @@ def decode_oid(data, header):
endianToken = getendian(flags["bigEndian"])
formatString = endianToken + ("I" * n_subid)
subids += struct.unpack(formatString, data)
- result = {"subids": subids, "include": include}
+ result = OID(subids, include)
return (result, rest)
+class OID:
+ def __init__(self, subids, include=False):
+ self.subids = subids
+ self.include = include
+ self.sanity()
+
+ def __eq__(self, other):
+ if not (self.subids == other.subids):
+ return False
+ elif self.include != other.include:
+ return False
+ return True
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __lt__(self, other):
+ return self.compareOID(other) == -1
+
+ def __gt__(self, other):
+ return self.compareOID(other) == 1
+
+ def compareOID(self, other):
+ if self.subids == other.subids:
+ return 0
+ lself = len(self.subids)
+ lother = len(other.subids)
+ if lself > lother:
+ x = other.subids
+ y = self.subids
+ lx = lother
+ flipped = True
+ else:
+ x = self.subids
+ y = other.subids
+ lx = lself
+ flipped = False
+ for i in range(lx):
+ if x[i] == y[i]:
+ continue
+ else:
+ c = cmp(x[i], y[i])
+ c = -c if flipped is True else c
+ return c
+ # Only reach this if shorter, and each index is equal
+ if flipped is True:
+ return 1
+ else:
+ return -1
+
+ def __repr__(self):
+ return "OID(%s, %s)" % (repr(self.subids), repr(self.include))
+
+ def sanity(self):
+ if not isinstance(self.subids, (tuple, list)):
+ raise TypeError
+ if len(self.subids) > 128:
+ raise ValueError
+
+ def isNull(self):
+ if (len(self.subids) == 0) and (self.include is False):
+ return True
+ return False
+
+ def encode(self, bigEndian):
+ subids = self.subids[:] # we may need to modify the copy
+ numsubs = len(subids)
+ if not (0 <= numsubs <= 128): # OIDs can have a maximum of 128 subs
+ raise ValueError("OID has too many subids")
+ if subids[:prefixCount] == internetPrefix:
+ if numsubs > prefixCount:
+ prefix = subids[prefixCount] # the number after the prefix
+ subids = subids[prefixCount + 1:] # +1 to strip prefix arg
+ else: # Have a prefix but nothing else, leave it as is.
+ prefix = 0
+ else:
+ prefix = 0
+ n_subid = len(subids)
+ include = int(bool(self.include)) # force integer bool
+ endianToken = getendian(bigEndian)
+ body = struct.pack(endianToken + "BBBx", n_subid, prefix, include)
+ for subid in subids:
+ body += struct.pack(endianToken + "I", subid)
+ return body
+
+
def encode_octetstr(bigEndian, octets):
numoctets = len(octets)
endianToken = getendian(bigEndian)
@@ -769,34 +841,79 @@ def decode_octetstr(data, header):
return data[:numoctets], data[numoctets + pad:]
-def encode_varbind(bigEndian, valType, oid, *payload):
- if valType not in definedValueTypes.keys():
- raise ValueError("Value type %s not in defined types" % valType)
- endianToken = getendian(bigEndian)
- header = struct.pack(endianToken + "Hxx", valType)
- name = encode_oid(bigEndian, oid, False)
- handlers = definedValueTypes[valType]
- if handlers is None:
- data = header + name
- else:
- data = header + name + handlers[0](bigEndian, *payload)
- return data
+def sanity_octetstr(data):
+ if isinstance(data, str):
+ return
+ if isinstance(data, (list, tuple)):
+ for c in data:
+ if not (0 <= c < 256):
+ raise ValueError
+ return
+ raise TypeError
-def decode_varbind(data, header):
+def decode_Varbind(data, header):
flags = header["flags"]
bindheader, data = slicedata(data, 4)
endianToken = getendian(flags["bigEndian"])
valType = struct.unpack(endianToken + "Hxx", bindheader)[0]
- name, data = decode_oid(data, header)
+ name, data = decode_OID(data, header)
if valType not in definedValueTypes.keys():
raise ValueError("Value type %s not in defined types" % valType)
handlers = definedValueTypes[valType]
- payload, data = handlers[1](data, header)
- result = {"type": valType, "name": name, "data": payload}
+ payload, data = handlers[2](data, header)
+ result = Varbind(valType, name, payload)
return result, data
+class Varbind:
+ def __init__(self, vtype, oid, payload=None):
+ self.valueType = vtype
+ self.oid = classifyOID(oid)
+ self.payload = payload # payload=None exists for Null types
+ self.sanity()
+
+ def __eq__(self, other):
+ if self.valueType != other.valueType:
+ return False
+ if self.oid != other.oid:
+ return False
+ if self.payload != other.payload:
+ return False
+ return True
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __repr__(self):
+ fmt = "Varbind(vtype=%s, oid=%s, payload=%s)"
+ r = fmt % (repr(self.valueType), repr(self.oid), repr(self.payload))
+ return r
+
+ def sanity(self):
+ self.oid.sanity()
+ vt = self.valueType
+ if vt not in definedValueTypes.keys():
+ raise ValueError("Value type %s not in defined types" % vt)
+ sanifyer, encoder, decoder = definedValueTypes[vt]
+ if sanifyer is None: # it is a class
+ self.payload.sanity()
+ else:
+ sanifyer(self.payload)
+
+ def encode(self, bigEndian):
+ endianToken = getendian(bigEndian)
+ header = struct.pack(endianToken + "Hxx", self.valueType)
+ name = self.oid.encode(bigEndian)
+ handlers = definedValueTypes[self.valueType]
+ sanifyer, encoder, decoder = handlers
+ if sanifyer is None: # Classes have the .sanity() method
+ data = header + name + self.payload.encode(bigEndian)
+ else: # Non-classes have an associated sanity_<vartype>() function
+ data = header + name + encoder(bigEndian, self.payload)
+ return data
+
+
def encode_integer32(bigEndian, num):
endianToken = getendian(bigEndian)
return struct.pack(endianToken + "I", num)
@@ -810,7 +927,12 @@ def decode_integer32(data, header):
return (num, data)
-def encode_nullvalue(bigEndian):
+def sanity_integer32(data):
+ if data != (data & 0xFFFFFFFF):
+ raise ValueError
+
+
+def encode_nullvalue(bigEndian, data):
return ""
@@ -818,6 +940,10 @@ def decode_nullvalue(data, header):
return (None, data)
+def sanity_nullvalue(data):
+ pass # Seriously?
+
+
def encode_integer64(bigEndian, num):
endianToken = getendian(bigEndian)
return struct.pack(endianToken + "Q", num)
@@ -831,9 +957,13 @@ def decode_integer64(data, header):
return (num, data)
+def sanity_integer64(data):
+ if data != (data & 0xFFFFFFFFFFFFFFFF):
+ raise ValueError
+
+
def encode_ipaddr(bigEndian, octets):
- if len(octets) != 4:
- raise ValueError("IP Address incorrect length")
+ sanity_ipaddr(octets)
return encode_octetstr(bigEndian, octets)
@@ -843,25 +973,64 @@ def decode_ipaddr(data, header):
return addr, data
-def encode_searchrange(bigEndian, startOID, endOID, inclusiveP):
- startOIDstr = encode_oid(bigEndian, startOID, inclusiveP)
- endOIDstr = encode_oid(bigEndian, endOID, False)
- return startOIDstr + endOIDstr
+def sanity_ipaddr(data):
+ if len(data) not in (4, 16):
+ raise ValueError
-def decode_searchrange(data, header):
- startOID, data = decode_oid(data, header)
- endOID, data = decode_oid(data, header)
- result = {"start": startOID, "end": endOID}
+def decode_SearchRange(data, header):
+ startOID, data = decode_OID(data, header)
+ endOID, data = decode_OID(data, header)
+ result = SearchRange(startOID, endOID)
return result, data
-def encode_searchrange_list(bigEndian, oidranges, nullTerminate=False):
+class SearchRange:
+ def __init__(self, start, end, include=None):
+ self.start = classifyOID(start)
+ self.end = classifyOID(end)
+ self.end.include = False # sanify
+ if include is not None:
+ # if the user does not provide include it defaults to whatever
+ # start is already set to
+ self.start.include = include
+ self.sanity()
+
+ def __eq__(self, other):
+ if self.start != other.start:
+ return False
+ if self.end != other.end:
+ return False
+ return True
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __repr__(self):
+ r = "SearchRange("
+ r += repr(self.start) + ", "
+ r += repr(self.end) + ")"
+ return r
+
+ def sanity(self):
+ self.start.sanity()
+ self.end.sanity()
+ if self.end.include is True:
+ raise ValueError
+
+ def encode(self, bigEndian):
+ startOIDstr = self.start.encode(bigEndian)
+ endOIDstr = self.end.encode(bigEndian)
+ return startOIDstr + endOIDstr
+
+
+def encode_searchrange_list(bigEndian, searchranges, nullTerminate=False):
encoded = []
- for oran in oidranges:
- encoded.append(encode_searchrange(bigEndian, *oran))
+ for sran in searchranges:
+ encoded.append(sran.encode(bigEndian))
if nullTerminate:
- encoded.append(encode_oid(bigEndian, tuple(), False))
+ noid = OID(())
+ encoded.append(noid.encode(bigEndian))
encoded = "".join(encoded)
return encoded
@@ -869,7 +1038,7 @@ def encode_searchrange_list(bigEndian, oidranges, nullTerminate=False):
def decode_searchrange_list(data, header): # Cannot handle extra data
oidranges = []
while len(data) > 0:
- oids, data = decode_searchrange(data, header)
+ oids, data = decode_SearchRange(data, header)
oidranges.append(oids)
return tuple(oidranges)
@@ -877,62 +1046,18 @@ def decode_searchrange_list(data, header): # Cannot handle extra data
def decode_searchrange_list_nullterm(data, header):
oidranges = []
while len(data) > 0:
- one, data = decode_oid(data, header)
- if isnullOID(one):
+ one, data = decode_OID(data, header)
+ if one.isNull():
break
- two, data = decode_oid(data, header)
- oidranges.append({"start": one, "end": two})
+ two, data = decode_OID(data, header)
+ oidranges.append(SearchRange(one, two))
return tuple(oidranges), data
-# =========================================
-# Utilities, glue, and misc
-# =========================================
-
-
-def compareOID(one, two):
- if one == two: # Behold! The magic of a high level language
- return 0
- lone = len(one)
- ltwo = len(two)
- if lone > ltwo:
- x = two
- y = one
- lx = ltwo
- flipped = True
- else:
- x = one
- y = two
- lx = lone
- flipped = False
- for i in range(lx):
- if x[i] == y[i]:
- continue
- else:
- c = cmp(x[i], y[i])
- c = -c if flipped is True else c
- return c
- # Only reach this if shorter, and each index is equal
- if flipped is True:
- return 1
- else:
- return -1
-
-
-def getendian(bigEndian):
- return ">" if bigEndian is True else "<"
-
-
-def isnullOID(oid):
- if (len(oid["subids"]) == 0) and (oid["include"] is False):
- return True
- return False
-
-
def encode_varbindlist(bigEndian, varbinds):
payload = ""
for varbind in varbinds:
- payload += encode_varbind(bigEndian, *varbind)
+ payload += varbind.encode(bigEndian)
return payload
@@ -940,7 +1065,7 @@ def decode_varbindlist(data, header):
if len(data) > 0:
varbinds = []
while len(data) > 0:
- vb, data = decode_varbind(data, header)
+ vb, data = decode_Varbind(data, header)
varbinds.append(vb)
varbinds = tuple(varbinds)
else:
@@ -948,6 +1073,15 @@ def decode_varbindlist(data, header):
return varbinds
+# =========================================
+# Utilities, glue, and misc
+# =========================================
+
+
+def getendian(bigEndian):
+ return ">" if bigEndian is True else "<"
+
+
def encode_pduheader(pduType, instanceRegistration, newIndex,
anyIndex, nonDefaultContext, bigEndian,
sessionID, transactionID, packetID, payloadLength):
@@ -1034,33 +1168,60 @@ def decode_packet(data):
# Value types
-INTEGER = 2
-OCTET_STR = 4
-NULL = 5
-OID = 6
-IP_ADDR = 64
-COUNTER32 = 65
-GAUGE32 = 66
-TIME_TICKS = 67
-OPAQUE = 68
-COUNTER64 = 70
-NO_SUCH_OBJECT = 128
-NO_SUCH_INSTANCE = 129
-END_OF_MIB_VIEW = 130
+VALUE_INTEGER = 2
+VALUE_OCTET_STR = 4
+VALUE_NULL = 5
+VALUE_OID = 6
+VALUE_IP_ADDR = 64
+VALUE_COUNTER32 = 65
+VALUE_GAUGE32 = 66
+VALUE_TIME_TICKS = 67
+VALUE_OPAQUE = 68
+VALUE_COUNTER64 = 70
+VALUE_NO_SUCH_OBJECT = 128
+VALUE_NO_SUCH_INSTANCE = 129
+VALUE_END_OF_MIB_VIEW = 130
+# Sub-tuple format: (sanityChecker, encoder/class, decoder)
+# if the sanityChecker is None it means the data type is held in
+# a class, which has it's own .sanity() method
definedValueTypes = { # Used by the varbind functions
- INTEGER: (encode_integer32, decode_integer32),
- OCTET_STR: (encode_octetstr, decode_octetstr),
- NULL: (encode_nullvalue, decode_nullvalue),
- OID: (encode_oid, decode_oid),
- IP_ADDR: (encode_ipaddr, decode_ipaddr),
- COUNTER32: (encode_integer32, decode_integer32),
- GAUGE32: (encode_integer32, decode_integer32),
- TIME_TICKS: (encode_integer32, decode_integer32),
- OPAQUE: (encode_octetstr, decode_octetstr),
- COUNTER64: (encode_integer64, decode_integer64),
- NO_SUCH_OBJECT: (encode_nullvalue, decode_nullvalue),
- NO_SUCH_INSTANCE: (encode_nullvalue, decode_nullvalue),
- END_OF_MIB_VIEW: (encode_nullvalue, decode_nullvalue)}
+ VALUE_INTEGER: (sanity_integer32,
+ encode_integer32,
+ decode_integer32),
+ VALUE_OCTET_STR: (sanity_octetstr,
+ encode_octetstr,
+ decode_octetstr),
+ VALUE_NULL: (sanity_nullvalue,
+ encode_nullvalue,
+ decode_nullvalue),
+ VALUE_OID: (None, OID, decode_OID),
+ VALUE_IP_ADDR: (sanity_ipaddr,
+ encode_ipaddr,
+ decode_ipaddr),
+ VALUE_COUNTER32: (sanity_integer32,
+ encode_integer32,
+ decode_integer32),
+ VALUE_GAUGE32: (sanity_integer32,
+ encode_integer32,
+ decode_integer32),
+ VALUE_TIME_TICKS: (sanity_integer32,
+ encode_integer32,
+ decode_integer32),
+ VALUE_OPAQUE: (sanity_octetstr,
+ encode_octetstr,
+ decode_octetstr),
+ VALUE_COUNTER64: (sanity_integer64,
+ encode_integer64,
+ decode_integer64),
+ VALUE_NO_SUCH_OBJECT: (sanity_nullvalue,
+ encode_nullvalue,
+ decode_nullvalue),
+ VALUE_NO_SUCH_INSTANCE: (sanity_nullvalue,
+ encode_nullvalue,
+ decode_nullvalue),
+ VALUE_END_OF_MIB_VIEW: (sanity_nullvalue,
+ encode_nullvalue,
+ decode_nullvalue)}
# PDU types
PDU_OPEN = 1
=====================================
tests/pylib/test_agentx.py
=====================================
--- a/tests/pylib/test_agentx.py
+++ b/tests/pylib/test_agentx.py
@@ -121,13 +121,13 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
nullPkt = cls(True, 1, 2, 3, 4, (), "")
test_pducore(self, nullPkt, x.PDU_OPEN, True, 1, 2, 3)
self.assertEqual(nullPkt.timeout, 4)
- self.assertEqual(nullPkt.oid, ())
+ self.assertEqual(nullPkt.oid, x.OID((), False))
self.assertEqual(nullPkt.description, "")
# Test PDU init, basic packet
basicPkt = cls(False, 1, 2, 3, 4, (1, 2, 3, 4), "foo")
test_pducore(self, basicPkt, x.PDU_OPEN, False, 1, 2, 3)
self.assertEqual(basicPkt.timeout, 4)
- self.assertEqual(basicPkt.oid, (1, 2, 3, 4))
+ self.assertEqual(basicPkt.oid, x.OID((1, 2, 3, 4), False))
self.assertEqual(basicPkt.description, "foo")
# Test encoding, null packet
nullPkt_str = nullPkt.encode()
@@ -153,7 +153,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
nullPkt_new = dec(body, header)
test_pducore(self, nullPkt_new, x.PDU_OPEN, True, 1, 2, 3)
self.assertEqual(nullPkt_new.timeout, 4)
- self.assertEqual(nullPkt_new.oid, {"subids": (), "include": False})
+ self.assertEqual(nullPkt_new.oid, x.OID((), False))
self.assertEqual(nullPkt_new.description, "")
# Test decoding, basic packet
header, body = slicedata(basicPkt_str, 20)
@@ -161,8 +161,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
basicPkt_new = dec(body, header)
test_pducore(self, basicPkt_new, x.PDU_OPEN, False, 1, 2, 3)
self.assertEqual(basicPkt_new.timeout, 4)
- self.assertEqual(basicPkt_new.oid, {"subids": (1, 2, 3, 4),
- "include": False})
+ self.assertEqual(basicPkt_new.oid, x.OID((1, 2, 3, 4), False))
self.assertEqual(basicPkt_new.description, "foo")
# Test packetVars
self.assertEqual(basicPkt_new.packetVars(),
@@ -172,7 +171,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"transactionID": 2,
"packetID": 3,
"timeout": 4,
- "oid": {"subids": (1, 2, 3, 4), "include": False},
+ "oid": x.OID((1, 2, 3, 4), False),
"description": "foo"})
def test_ClosePDU(self):
@@ -233,7 +232,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
test_pducore(self, basicPkt, x.PDU_REGISTER, True, 1, 2, 3)
self.assertEqual(basicPkt.timeout, 4)
self.assertEqual(basicPkt.priority, 5)
- self.assertEqual(basicPkt.subtree, (1, 2, 3))
+ self.assertEqual(basicPkt.subtree, x.OID((1, 2, 3), False))
self.assertEqual(basicPkt.rangeSubid, 0)
self.assertEqual(basicPkt.upperBound, None)
self.assertEqual(basicPkt.context, None)
@@ -242,7 +241,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
test_pducore(self, basicPkt_LE, x.PDU_REGISTER, False, 1, 2, 3)
self.assertEqual(basicPkt_LE.timeout, 4)
self.assertEqual(basicPkt_LE.priority, 5)
- self.assertEqual(basicPkt_LE.subtree, (1, 2, 3))
+ self.assertEqual(basicPkt_LE.subtree, x.OID((1, 2, 3), False))
self.assertEqual(basicPkt_LE.rangeSubid, 0)
self.assertEqual(basicPkt_LE.upperBound, None)
self.assertEqual(basicPkt_LE.context, None)
@@ -252,7 +251,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
test_pducore(self, fancyPkt, x.PDU_REGISTER, True, 1, 2, 3)
self.assertEqual(fancyPkt.timeout, 4)
self.assertEqual(fancyPkt.priority, 5)
- self.assertEqual(fancyPkt.subtree, (1, 2, 3))
+ self.assertEqual(fancyPkt.subtree, x.OID((1, 2, 3), False))
self.assertEqual(fancyPkt.rangeSubid, 5)
self.assertEqual(fancyPkt.upperBound, 23)
self.assertEqual(fancyPkt.context, "blah")
@@ -292,8 +291,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
test_pducore(self, basicPkt_new, x.PDU_REGISTER, True, 1, 2, 3)
self.assertEqual(basicPkt_new.timeout, 4)
self.assertEqual(basicPkt_new.priority, 5)
- self.assertEqual(basicPkt_new.subtree, {"subids": (1, 2, 3),
- "include": False})
+ self.assertEqual(basicPkt_new.subtree, x.OID((1, 2, 3), False))
self.assertEqual(basicPkt_new.rangeSubid, 0)
self.assertEqual(basicPkt_new.upperBound, None)
self.assertEqual(basicPkt_new.context, None)
@@ -304,8 +302,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
test_pducore(self, basicPkt_LE_new, x.PDU_REGISTER, False, 1, 2, 3)
self.assertEqual(basicPkt_LE_new.timeout, 4)
self.assertEqual(basicPkt_LE_new.priority, 5)
- self.assertEqual(basicPkt_LE_new.subtree, {"subids": (1, 2, 3),
- "include": False})
+ self.assertEqual(basicPkt_LE_new.subtree, x.OID((1, 2, 3), False))
self.assertEqual(basicPkt_LE_new.rangeSubid, 0)
self.assertEqual(basicPkt_LE_new.upperBound, None)
self.assertEqual(basicPkt_LE_new.context, None)
@@ -316,8 +313,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
test_pducore(self, fancyPkt_new, x.PDU_REGISTER, True, 1, 2, 3)
self.assertEqual(fancyPkt_new.timeout, 4)
self.assertEqual(fancyPkt_new.priority, 5)
- self.assertEqual(fancyPkt_new.subtree, {"subids": (1, 2, 3),
- "include": False})
+ self.assertEqual(fancyPkt_new.subtree, x.OID((1, 2, 3), False))
self.assertEqual(fancyPkt_new.rangeSubid, 5)
self.assertEqual(fancyPkt_new.upperBound, 23)
self.assertEqual(fancyPkt_new.context, "blah")
@@ -330,7 +326,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"packetID": 3,
"timeout": 4,
"priority": 5,
- "subtree": {"subids": (1, 2, 3), "include": False},
+ "subtree": x.OID((1, 2, 3), False),
"rangeSubid": 0,
"upperBound": None,
"context": None})
@@ -344,7 +340,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
basicPkt = cls(True, 1, 2, 3, 5, (1, 2, 3))
test_pducore(self, basicPkt, x.PDU_UNREGISTER, True, 1, 2, 3)
self.assertEqual(basicPkt.priority, 5)
- self.assertEqual(basicPkt.subtree, (1, 2, 3))
+ self.assertEqual(basicPkt.subtree, x.OID((1, 2, 3), False))
self.assertEqual(basicPkt.rangeSubid, 0)
self.assertEqual(basicPkt.upperBound, None)
self.assertEqual(basicPkt.context, None)
@@ -352,7 +348,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
basicPkt_LE = cls(False, 1, 2, 3, 5, (1, 2, 3))
test_pducore(self, basicPkt_LE, x.PDU_UNREGISTER, False, 1, 2, 3)
self.assertEqual(basicPkt_LE.priority, 5)
- self.assertEqual(basicPkt_LE.subtree, (1, 2, 3))
+ self.assertEqual(basicPkt_LE.subtree, x.OID((1, 2, 3), False))
self.assertEqual(basicPkt_LE.rangeSubid, 0)
self.assertEqual(basicPkt_LE.upperBound, None)
self.assertEqual(basicPkt_LE.context, None)
@@ -361,7 +357,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
rangeSubid=5, upperBound=23, context="blah")
test_pducore(self, fancyPkt, x.PDU_UNREGISTER, True, 1, 2, 3)
self.assertEqual(fancyPkt.priority, 5)
- self.assertEqual(fancyPkt.subtree, (1, 2, 3))
+ self.assertEqual(fancyPkt.subtree, x.OID((1, 2, 3), False))
self.assertEqual(fancyPkt.rangeSubid, 5)
self.assertEqual(fancyPkt.upperBound, 23)
self.assertEqual(fancyPkt.context, "blah")
@@ -400,8 +396,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
basicPkt_new = dec(body, header)
test_pducore(self, basicPkt, x.PDU_UNREGISTER, True, 1, 2, 3)
self.assertEqual(basicPkt_new.priority, 5)
- self.assertEqual(basicPkt_new.subtree, {"subids": (1, 2, 3),
- "include": False})
+ self.assertEqual(basicPkt_new.subtree, x.OID((1, 2, 3), False))
self.assertEqual(basicPkt_new.rangeSubid, 0)
self.assertEqual(basicPkt_new.upperBound, None)
self.assertEqual(basicPkt_new.context, None)
@@ -411,8 +406,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
basicPkt_LE_new = dec(body, header)
test_pducore(self, basicPkt_LE, x.PDU_UNREGISTER, False, 1, 2, 3)
self.assertEqual(basicPkt_LE_new.priority, 5)
- self.assertEqual(basicPkt_LE_new.subtree, {"subids": (1, 2, 3),
- "include": False})
+ self.assertEqual(basicPkt_LE_new.subtree, x.OID((1, 2, 3), False))
self.assertEqual(basicPkt_LE_new.rangeSubid, 0)
self.assertEqual(basicPkt_LE_new.upperBound, None)
self.assertEqual(basicPkt_LE_new.context, None)
@@ -422,8 +416,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
fancyPkt_new = dec(body, header)
test_pducore(self, fancyPkt_new, x.PDU_UNREGISTER, True, 1, 2, 3)
self.assertEqual(fancyPkt_new.priority, 5)
- self.assertEqual(fancyPkt_new.subtree, {"subids": (1, 2, 3),
- "include": False})
+ self.assertEqual(fancyPkt_new.subtree, x.OID((1, 2, 3), False))
self.assertEqual(fancyPkt_new.rangeSubid, 5)
self.assertEqual(fancyPkt_new.upperBound, 23)
self.assertEqual(fancyPkt_new.context, "blah")
@@ -435,7 +428,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"transactionID": 2,
"packetID": 3,
"priority": 5,
- "subtree": {"subids": (1, 2, 3), "include": False},
+ "subtree": x.OID((1, 2, 3), False),
"rangeSubid": 0,
"upperBound": None,
"context": None})
@@ -444,6 +437,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
dec = ntp.agentx.decode_xGetPDU
cls = ntp.agentx.GetPDU
x = ntp.agentx
+ srch = x.SearchRange
# Test init, null packet
nullPkt = cls(True, 1, 2, 3, ())
@@ -452,21 +446,23 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(nullPkt.context, None)
# Test init, full packet
fullPkt = cls(True, 1, 2, 3,
- (((1, 2, 3), (1, 2, 5), False),
- ((10, 20), (30, 40), True)),
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)),
context="blah")
test_pducore(self, fullPkt, x.PDU_GET, True, 1, 2, 3)
- self.assertEqual(fullPkt.oidranges, (((1, 2, 3), (1, 2, 5), False),
- ((10, 20), (30, 40), True)))
+ self.assertEqual(fullPkt.oidranges,
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)))
self.assertEqual(fullPkt.context, "blah")
# Test init, full packet, little endian
fullPkt_LE = cls(False, 1, 2, 3,
- (((1, 2, 3), (1, 2, 5), False),
- ((10, 20), (30, 40), True)),
- context="blah")
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)),
+ context="blah")
test_pducore(self, fullPkt_LE, x.PDU_GET, False, 1, 2, 3)
- self.assertEqual(fullPkt_LE.oidranges, (((1, 2, 3), (1, 2, 5), False),
- ((10, 20), (30, 40), True)))
+ self.assertEqual(fullPkt_LE.oidranges,
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)))
self.assertEqual(fullPkt_LE.context, "blah")
# Test encode, null packet
nullPkt_str = nullPkt.encode()
@@ -516,10 +512,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
fullPkt_new = dec(body, header)
test_pducore(self, fullPkt_new, x.PDU_GET, True, 1, 2, 3)
self.assertEqual(fullPkt_new.oidranges,
- ({"start": {"subids": (1, 2, 3), "include": False},
- "end": {"subids": (1, 2, 5), "include": False}},
- {"start": {"subids": (10, 20), "include": True},
- "end": {"subids": (30, 40), "include": False}}))
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)))
self.assertEqual(fullPkt_new.context, "blah")
# Test decoding, full packet, little endian
header, body = slicedata(fullPkt_LE_str, 20)
@@ -527,10 +521,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
fullPkt_LE_new = dec(body, header)
test_pducore(self, fullPkt_LE_new, x.PDU_GET, False, 1, 2, 3)
self.assertEqual(fullPkt_LE_new.oidranges,
- ({"start": {"subids": (1, 2, 3), "include": False},
- "end": {"subids": (1, 2, 5), "include": False}},
- {"start": {"subids": (10, 20), "include": True},
- "end": {"subids": (30, 40), "include": False}}))
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)))
self.assertEqual(fullPkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(nullPkt_new.packetVars(),
@@ -546,6 +538,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
dec = ntp.agentx.decode_xGetPDU
cls = ntp.agentx.GetNextPDU
x = ntp.agentx
+ srch = x.SearchRange
# Test init, null packet
nullPkt = cls(True, 1, 2, 3, ())
@@ -554,21 +547,23 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(nullPkt.context, None)
# Test init, full packet
fullPkt = cls(True, 1, 2, 3,
- (((1, 2, 3), (1, 2, 5), False),
- ((10, 20), (30, 40), True)),
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)),
context="blah")
test_pducore(self, fullPkt, x.PDU_GET_NEXT, True, 1, 2, 3)
- self.assertEqual(fullPkt.oidranges, (((1, 2, 3), (1, 2, 5), False),
- ((10, 20), (30, 40), True)))
+ self.assertEqual(fullPkt.oidranges,
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)))
self.assertEqual(fullPkt.context, "blah")
# Test init, full packet, little endian
fullPkt_LE = cls(False, 1, 2, 3,
- (((1, 2, 3), (1, 2, 5), False),
- ((10, 20), (30, 40), True)),
- context="blah")
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)),
+ context="blah")
test_pducore(self, fullPkt_LE, x.PDU_GET_NEXT, False, 1, 2, 3)
- self.assertEqual(fullPkt_LE.oidranges, (((1, 2, 3), (1, 2, 5), False),
- ((10, 20), (30, 40), True)))
+ self.assertEqual(fullPkt_LE.oidranges,
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)))
self.assertEqual(fullPkt_LE.context, "blah")
# Test encode, null packet
nullPkt_str = nullPkt.encode()
@@ -615,10 +610,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
fullPkt_new = dec(body, header)
test_pducore(self, fullPkt_new, x.PDU_GET_NEXT, True, 1, 2, 3)
self.assertEqual(fullPkt_new.oidranges,
- ({"start": {"subids": (1, 2, 3), "include": False},
- "end": {"subids": (1, 2, 5), "include": False}},
- {"start": {"subids": (10, 20), "include": True},
- "end": {"subids": (30, 40), "include": False}}))
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)))
self.assertEqual(fullPkt_new.context, "blah")
# Test decoding, full packet, little endian
header, body = slicedata(fullPkt_LE_str, 20)
@@ -626,10 +619,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
fullPkt_LE_new = dec(body, header)
test_pducore(self, fullPkt_LE_new, x.PDU_GET_NEXT, False, 1, 2, 3)
self.assertEqual(fullPkt_LE_new.oidranges,
- ({"start": {"subids": (1, 2, 3), "include": False},
- "end": {"subids": (1, 2, 5), "include": False}},
- {"start": {"subids": (10, 20), "include": True},
- "end": {"subids": (30, 40), "include": False}}))
+ (srch((1, 2, 3), (1, 2, 5), False),
+ srch((10, 20), (30, 40), True)))
self.assertEqual(fullPkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(nullPkt_new.packetVars(),
@@ -645,30 +636,31 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
dec = ntp.agentx.decode_GetBulkPDU
cls = ntp.agentx.GetBulkPDU
x = ntp.agentx
+ srch = x.SearchRange
# Test init
pkt = cls(True, 1, 2, 3, 1, 5,
- (((1, 2), (3, 4), False),
- ((6, 7), (8, 9), True)),
+ (srch((1, 2), (3, 4), False),
+ srch((6, 7), (8, 9), True)),
context="blah")
test_pducore(self, pkt, x.PDU_GET_BULK, True, 1, 2, 3)
self.assertEqual(pkt.nonReps, 1)
self.assertEqual(pkt.maxReps, 5)
self.assertEqual(pkt.oidranges,
- (((1, 2), (3, 4), False),
- ((6, 7), (8, 9), True)))
+ (srch((1, 2), (3, 4), False),
+ srch((6, 7), (8, 9), True)))
self.assertEqual(pkt.context, "blah")
# Test init, little endian
pkt_LE = cls(False, 1, 2, 3, 1, 5,
- (((1, 2), (3, 4), False),
- ((6, 7), (8, 9), True)),
- context="blah")
+ (srch((1, 2), (3, 4), False),
+ srch((6, 7), (8, 9), True)),
+ context="blah")
test_pducore(self, pkt_LE, x.PDU_GET_BULK, False, 1, 2, 3)
self.assertEqual(pkt_LE.nonReps, 1)
self.assertEqual(pkt_LE.maxReps, 5)
self.assertEqual(pkt_LE.oidranges,
- (((1, 2), (3, 4), False),
- ((6, 7), (8, 9), True)))
+ (srch((1, 2), (3, 4), False),
+ srch((6, 7), (8, 9), True)))
self.assertEqual(pkt_LE.context, "blah")
# Test encoding
pkt_str = pkt.encode()
@@ -702,10 +694,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(pkt_new.nonReps, 1)
self.assertEqual(pkt_new.maxReps, 5)
self.assertEqual(pkt_new.oidranges,
- ({"start": {"subids": (1, 2), "include": False},
- "end": {"subids": (3, 4), "include": False}},
- {"start": {"subids": (6, 7), "include": True},
- "end": {"subids": (8, 9), "include": False}}))
+ ((srch((1, 2), (3, 4), False),
+ srch((6, 7), (8, 9), True))))
self.assertEqual(pkt_new.context, "blah")
# Test decoding, little endian
header, body = slicedata(pkt_LE_str, 20)
@@ -715,10 +705,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(pkt_LE_new.nonReps, 1)
self.assertEqual(pkt_LE_new.maxReps, 5)
self.assertEqual(pkt_LE_new.oidranges,
- ({"start": {"subids": (1, 2), "include": False},
- "end": {"subids": (3, 4), "include": False}},
- {"start": {"subids": (6, 7), "include": True},
- "end": {"subids": (8, 9), "include": False}}))
+ ((srch((1, 2), (3, 4), False),
+ srch((6, 7), (8, 9), True))))
self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
@@ -729,14 +717,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"packetID": 3,
"nonReps": 1,
"maxReps": 5,
- "oidranges": ({"start": {"subids": (1, 2),
- "include": False},
- "end": {"subids": (3, 4),
- "include": False}},
- {"start": {"subids": (6, 7),
- "include": True},
- "end": {"subids": (8, 9),
- "include": False}}),
+ "oidranges": (srch((1, 2), (3, 4), False),
+ srch((6, 7), (8, 9), True)),
"context": "blah"})
def test_TestSetPDU(self):
@@ -746,23 +728,26 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test init
pkt = cls(True, 1, 2, 3,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")),
+ (x.Varbind(x.VALUE_OID, (1, 2, 3), x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")),
context="blah")
test_pducore(self, pkt, x.PDU_TEST_SET, True, 1, 2, 3)
self.assertEqual(pkt.varbinds,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt.context, "blah")
# Test init, little endian
pkt_LE = cls(False, 1, 2, 3,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")),
- context="blah")
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")),
+ context="blah")
test_pducore(self, pkt_LE, x.PDU_TEST_SET, False, 1, 2, 3)
self.assertEqual(pkt_LE.varbinds,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_LE.context, "blah")
# Test encoding
pkt_str = pkt.encode()
@@ -802,12 +787,12 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
pkt_new = dec(body, header)
test_pducore(self, pkt_new, x.PDU_TEST_SET, True, 1, 2, 3)
self.assertEqual(pkt_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID,
+ x.OID((1, 2, 3), False),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR,
+ x.OID((1, 2, 4), False),
+ "blah")))
self.assertEqual(pkt_new.context, "blah")
# Test decoding, little endian
header, body = slicedata(pkt_LE_str, 20)
@@ -815,12 +800,12 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
pkt_LE_new = dec(body, header)
test_pducore(self, pkt_LE_new, x.PDU_TEST_SET, False, 1, 2, 3)
self.assertEqual(pkt_LE_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID,
+ x.OID((1, 2, 3), False),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR,
+ x.OID((1, 2, 4), False),
+ "blah")))
self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
@@ -829,16 +814,13 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"sessionID": 1,
"transactionID": 2,
"packetID": 3,
- "varbinds": ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"}),
- "context": "blah"})
+ "varbinds": (x.Varbind(x.VALUE_OID,
+ x.OID((1, 2, 3), False),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR,
+ x.OID((1, 2, 4), False),
+ "blah")),
+ "context": "blah"})
def test_CommitSetPDU(self):
dec = ntp.agentx.decode_CommitSetPDU
@@ -1018,23 +1000,26 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test init
pkt = cls(True, 1, 2, 3,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")),
+ (x.Varbind(x.VALUE_OID, (1, 2, 3), x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")),
context="blah")
test_pducore(self, pkt, x.PDU_NOTIFY, True, 1, 2, 3)
self.assertEqual(pkt.varbinds,
- ((6, (1, 2, 3), (4, 5, 6), False),
- (4, (1, 2, 4), 'blah')))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt.context, "blah")
# Test init, little endian
pkt_LE = cls(False, 1, 2, 3,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")),
- context="blah")
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")),
+ context="blah")
test_pducore(self, pkt_LE, x.PDU_NOTIFY, False, 1, 2, 3)
self.assertEqual(pkt_LE.varbinds,
- ((6, (1, 2, 3), (4, 5, 6), False),
- (4, (1, 2, 4), 'blah')))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_LE.context, "blah")
# Test encode
pkt_str = pkt.encode()
@@ -1074,12 +1059,9 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
pkt_new = dec(body, header)
test_pducore(self, pkt_new, x.PDU_NOTIFY, True, 1, 2, 3)
self.assertEqual(pkt_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_new.context, "blah")
# Test decode, little endian
header, body = slicedata(pkt_LE_str, 20)
@@ -1087,12 +1069,9 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
pkt_LE_new = dec(body, header)
test_pducore(self, pkt_LE_new, x.PDU_NOTIFY, False, 1, 2, 3)
self.assertEqual(pkt_LE_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
@@ -1101,15 +1080,10 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"sessionID": 1,
"transactionID": 2,
"packetID": 3,
- "varbinds": ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"}),
+ "varbinds": (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4),
+ "blah")),
"context": "blah"})
def test_IndexAllocPDU(self):
@@ -1119,27 +1093,30 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test init
pkt = cls(True, 1, 2, 3, True, True,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")),
+ (x.Varbind(x.VALUE_OID, (1, 2, 3), x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")),
context="blah")
test_pducore(self, pkt, x.PDU_INDEX_ALLOC, True, 1, 2, 3)
self.assertEqual(pkt.newIndex, True)
self.assertEqual(pkt.anyIndex, True)
self.assertEqual(pkt.varbinds,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt.context, "blah")
# Test init, little endian
pkt_LE = cls(False, 1, 2, 3, True, True,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")),
- context="blah")
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")),
+ context="blah")
test_pducore(self, pkt_LE, x.PDU_INDEX_ALLOC, False, 1, 2, 3)
self.assertEqual(pkt_LE.newIndex, True)
self.assertEqual(pkt_LE.anyIndex, True)
self.assertEqual(pkt_LE.varbinds,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_LE.context, "blah")
# Test encode
pkt_str = pkt.encode()
@@ -1181,12 +1158,9 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(pkt_new.newIndex, True)
self.assertEqual(pkt_new.anyIndex, True)
self.assertEqual(pkt_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
# Test decode, little endian
header, body = slicedata(pkt_LE_str, 20)
header = decode_pduheader(header)
@@ -1195,12 +1169,9 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(pkt_LE_new.newIndex, True)
self.assertEqual(pkt_LE_new.anyIndex, True)
self.assertEqual(pkt_LE_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
@@ -1211,15 +1182,10 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"packetID": 3,
"newIndex": True,
"anyIndex": True,
- "varbinds": ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"}),
+ "varbinds": (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4),
+ "blah")),
"context": "blah"})
def test_IndexDeallocPDU(self):
@@ -1229,27 +1195,30 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test init
pkt = cls(True, 1, 2, 3, True, True,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")),
+ (x.Varbind(x.VALUE_OID, (1, 2, 3), x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")),
context="blah")
test_pducore(self, pkt, x.PDU_INDEX_DEALLOC, True, 1, 2, 3)
self.assertEqual(pkt.newIndex, True)
self.assertEqual(pkt.anyIndex, True)
self.assertEqual(pkt.varbinds,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt.context, "blah")
# Test init, little endian
pkt_LE = cls(False, 1, 2, 3, True, True,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")),
- context="blah")
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")),
+ context="blah")
test_pducore(self, pkt_LE, x.PDU_INDEX_DEALLOC, False, 1, 2, 3)
self.assertEqual(pkt_LE.newIndex, True)
self.assertEqual(pkt_LE.anyIndex, True)
self.assertEqual(pkt_LE.varbinds,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_LE.context, "blah")
# Test encode
pkt_str = pkt.encode()
@@ -1291,12 +1260,9 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(pkt_new.newIndex, True)
self.assertEqual(pkt_new.anyIndex, True)
self.assertEqual(pkt_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_new.context, "blah")
# Test decode, little endian
header, body = slicedata(pkt_LE_str, 20)
@@ -1306,12 +1272,9 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(pkt_LE_new.newIndex, True)
self.assertEqual(pkt_LE_new.anyIndex, True)
self.assertEqual(pkt_LE_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
@@ -1322,15 +1285,10 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"packetID": 3,
"newIndex": True,
"anyIndex": True,
- "varbinds": ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"}),
+ "varbinds": (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4),
+ "blah")),
"context": "blah"})
def test_AddAgentCapsPDU(self):
@@ -1341,13 +1299,13 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test init
pkt = cls(True, 1, 2, 3, (4, 5, 6), "blah", context="bluh")
test_pducore(self, pkt, x.PDU_ADD_AGENT_CAPS, True, 1, 2, 3)
- self.assertEqual(pkt.oid, (4, 5, 6))
+ self.assertEqual(pkt.oid, x.OID((4, 5, 6), False))
self.assertEqual(pkt.description, "blah")
self.assertEqual(pkt.context, "bluh")
# Test init, little endian
pkt_LE = cls(False, 1, 2, 3, (4, 5, 6), "blah", context="bluh")
test_pducore(self, pkt_LE, x.PDU_ADD_AGENT_CAPS, False, 1, 2, 3)
- self.assertEqual(pkt_LE.oid, (4, 5, 6))
+ self.assertEqual(pkt_LE.oid, x.OID((4, 5, 6), False))
self.assertEqual(pkt_LE.description, "blah")
self.assertEqual(pkt_LE.context, "bluh")
# Test encode
@@ -1375,8 +1333,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
header = decode_pduheader(header)
pkt_new = dec(body, header)
test_pducore(self, pkt_new, x.PDU_ADD_AGENT_CAPS, True, 1, 2, 3)
- self.assertEqual(pkt_new.oid, {"subids": (4, 5, 6),
- "include": False})
+ self.assertEqual(pkt_new.oid, x.OID((4, 5, 6), False))
self.assertEqual(pkt_new.description, "blah")
self.assertEqual(pkt_new.context, "bluh")
# Test decode, little endian
@@ -1384,8 +1341,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
header = decode_pduheader(header)
pkt_LE_new = dec(body, header)
test_pducore(self, pkt_LE_new, x.PDU_ADD_AGENT_CAPS, False, 1, 2, 3)
- self.assertEqual(pkt_LE_new.oid, {"subids": (4, 5, 6),
- "include": False})
+ self.assertEqual(pkt_LE_new.oid, x.OID((4, 5, 6), False))
self.assertEqual(pkt_LE_new.description, "blah")
self.assertEqual(pkt_LE_new.context, "bluh")
# Test packetVars
@@ -1395,7 +1351,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"sessionID": 1,
"transactionID": 2,
"packetID": 3,
- "oid": {"subids": (4, 5, 6), "include": False},
+ "oid": x.OID((4, 5, 6), False),
"description": "blah",
"context": "bluh"})
@@ -1407,12 +1363,12 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test init
pkt = cls(True, 1, 2, 3, (4, 5, 6), context="bluh")
test_pducore(self, pkt, x.PDU_RM_AGENT_CAPS, True, 1, 2, 3)
- self.assertEqual(pkt.oid, (4, 5, 6))
+ self.assertEqual(pkt.oid, x.OID((4, 5, 6), False))
self.assertEqual(pkt.context, "bluh")
# Test init, little endian
pkt_LE = cls(False, 1, 2, 3, (4, 5, 6), context="bluh")
test_pducore(self, pkt_LE, x.PDU_RM_AGENT_CAPS, False, 1, 2, 3)
- self.assertEqual(pkt_LE.oid, (4, 5, 6))
+ self.assertEqual(pkt_LE.oid, x.OID((4, 5, 6), False))
self.assertEqual(pkt_LE.context, "bluh")
# Test encode
pkt_str = pkt.encode()
@@ -1437,16 +1393,14 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
header = decode_pduheader(header)
pkt_new = dec(body, header)
test_pducore(self, pkt_new, x.PDU_RM_AGENT_CAPS, True, 1, 2, 3)
- self.assertEqual(pkt_new.oid, {"subids": (4, 5, 6),
- "include": False})
+ self.assertEqual(pkt_new.oid, x.OID((4, 5, 6), False))
self.assertEqual(pkt_new.context, "bluh")
# Test decode, little endian
header, body = slicedata(pkt_LE_str, 20)
header = decode_pduheader(header)
pkt_LE_new = dec(body, header)
test_pducore(self, pkt_LE_new, x.PDU_RM_AGENT_CAPS, False, 1, 2, 3)
- self.assertEqual(pkt_LE_new.oid, {"subids": (4, 5, 6),
- "include": False})
+ self.assertEqual(pkt_LE_new.oid, x.OID((4, 5, 6), False))
self.assertEqual(pkt_LE_new.context, "bluh")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
@@ -1455,7 +1409,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"sessionID": 1,
"transactionID": 2,
"packetID": 3,
- "oid": {"subids": (4, 5, 6), "include": False},
+ "oid": x.OID((4, 5, 6), False),
"context": "bluh"})
def test_ResponsePDU(self):
@@ -1465,26 +1419,29 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test init
pkt = cls(True, 1, 2, 3, 4, 5, 6,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3), x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
test_pducore(self, pkt, x.PDU_RESPONSE, True, 1, 2, 3)
self.assertEqual(pkt.sysUptime, 4)
self.assertEqual(pkt.resError, 5)
self.assertEqual(pkt.resIndex, 6)
self.assertEqual(pkt.varbinds,
- ((6, (1, 2, 3), (4, 5, 6), False),
- (4, (1, 2, 4), 'blah')))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
# Test init, little endian
pkt_LE = cls(False, 1, 2, 3, 4, 5, 6,
- ((x.OID, (1, 2, 3), (4, 5, 6), False),
- (x.OCTET_STR, (1, 2, 4), "blah")))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
test_pducore(self, pkt_LE, x.PDU_RESPONSE, False, 1, 2, 3)
self.assertEqual(pkt_LE.sysUptime, 4)
self.assertEqual(pkt_LE.resError, 5)
self.assertEqual(pkt_LE.resIndex, 6)
self.assertEqual(pkt_LE.varbinds,
- ((6, (1, 2, 3), (4, 5, 6), False),
- (4, (1, 2, 4), 'blah')))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -1526,12 +1483,9 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(pkt_new.resError, 5)
self.assertEqual(pkt_new.resIndex, 6)
self.assertEqual(pkt_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
# Test decode, little endian
header, body = slicedata(pkt_LE_str, 20)
header = decode_pduheader(header)
@@ -1541,12 +1495,9 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(pkt_LE_new.resError, 5)
self.assertEqual(pkt_LE_new.resIndex, 6)
self.assertEqual(pkt_LE_new.varbinds,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": {"subids": (4, 5, 6), "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4), "include": False},
- "data": "blah"}))
+ (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4), "blah")))
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 18,
@@ -1557,15 +1508,10 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"sysUptime": 4,
"resError": 5,
"resIndex": 6,
- "varbinds": ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"})})
+ "varbinds": (x.Varbind(x.VALUE_OID, (1, 2, 3),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR, (1, 2, 4),
+ "blah"))})
#
# Data type tests
@@ -1590,7 +1536,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
dec = ntp.agentx.decode_nullvalue
# Encode
- self.assertEqual(enc(True), "")
+ self.assertEqual(enc(True, "this is ignored"), "")
# Decode
self.assertEqual(dec(extraData, standardFlags), (None, extraData))
@@ -1637,104 +1583,181 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
lilEndianFlags),
((1, 2, 3, 4), extraData))
- def test_oid(self):
- enc = ntp.agentx.encode_oid
- dec = ntp.agentx.decode_oid
+ def test_OID(self):
+ target = ntp.agentx.OID
+ dec = ntp.agentx.decode_OID
# Encode empty OID
- self.assertEqual(enc(True, (), False), "\x00\x00\x00\x00")
+ cls = target((), False)
+ self.assertEqual(cls.encode(True), "\x00\x00\x00\x00")
# Encode basic OID
- self.assertEqual(enc(True, (1, 2, 3, 4, 5), False),
+ cls = target((1, 2, 3, 4, 5), False)
+ self.assertEqual(cls.encode(True),
"\x05\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x00\x00\x00\x05")
# Encode basic OID, little endian
- self.assertEqual(enc(False, (1, 2, 3, 4, 5), False),
+ self.assertEqual(cls.encode(False),
"\x05\x00\x00\x00"
"\x01\x00\x00\x00\x02\x00\x00\x00"
"\x03\x00\x00\x00\x04\x00\x00\x00"
"\x05\x00\x00\x00")
# Encode prefixed OID
- self.assertEqual(enc(True, (1, 3, 6, 1, 23, 1, 2, 3), False),
+ cls = target((1, 3, 6, 1, 23, 1, 2, 3), False)
+ self.assertEqual(cls.encode(True),
"\x03\x17\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03")
# Encode include
- self.assertEqual(enc(True, (1, 2), True),
+ cls = target((1, 2), True)
+ self.assertEqual(cls.encode(True),
"\x02\x00\x01\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02")
# Encode together
- self.assertEqual(enc(True, (1, 3, 6, 1, 1, 3, 4, 5, 6), True),
+ cls = target((1, 3, 6, 1, 1, 3, 4, 5, 6), True)
+ self.assertEqual(cls.encode(True),
"\x04\x01\x01\x00"
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x00\x00\x00\x05\x00\x00\x00\x06")
-
# Encode maximum size
- self.assertEqual(enc(True, maximumOIDsubs, False), maximumOIDstr)
+ cls = target(maximumOIDsubs, False)
+ self.assertEqual(cls.encode(True), maximumOIDstr)
# Encode over maximum size
try:
- enc(True, maximumOIDsubs + (42,), False)
- fail = False
+ cls = target(maximumOIDsubs + (42,), False)
+ cls.encode(True)
+ errored = False
except ValueError:
- fail = True
- self.assertEqual(fail, True)
+ errored = True
+ self.assertEqual(errored, True)
+
# Decode empty OID, extra data
- self.assertEqual(dec("\x00\x00\x00\x00" + extraData, standardFlags),
- ({"subids": (), "include": False}, extraData))
+ cls, xtr = dec("\x00\x00\x00\x00" + extraData, standardFlags)
+ self.assertEqual(isinstance(cls, target), True)
+ self.assertEqual(cls.subids, ())
+ self.assertEqual(cls.include, False)
+ self.assertEqual(xtr, extraData)
# Decode basic OID, extra data
- self.assertEqual(dec("\x05\x00\x00\x00\x00\x00\x00\x01"
- "\x00\x00\x00\x02\x00\x00\x00\x03"
- "\x00\x00\x00\x04\x00\x00\x00\x05" + extraData,
- standardFlags),
- ({"subids": (1, 2, 3, 4, 5), "include": False},
- extraData))
+ cls, xtr = dec("\x05\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x04\x00\x00\x00\x05" + extraData,
+ standardFlags)
+ self.assertEqual(isinstance(cls, target), True)
+ self.assertEqual(cls.subids, (1, 2, 3, 4, 5))
+ self.assertEqual(cls.include, False)
+ self.assertEqual(xtr, extraData)
# Decode basic OID, little endian
- self.assertEqual(dec("\x05\x00\x00\x00\x01\x00\x00\x00"
- "\x02\x00\x00\x00\x03\x00\x00\x00"
- "\x04\x00\x00\x00\x05\x00\x00\x00",
- lilEndianFlags),
- ({"subids": (1, 2, 3, 4, 5), "include": False},
- ""))
+ cls, xtr = dec("\x05\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x04\x00\x00\x00\x05\x00\x00\x00",
+ lilEndianFlags)
+ self.assertEqual(isinstance(cls, target), True)
+ self.assertEqual(cls.subids, (1, 2, 3, 4, 5))
+ self.assertEqual(cls.include, False)
+ self.assertEqual(xtr, "")
# Decode prefixed OID
- self.assertEqual(dec("\x03\x17\x00\x00\x00\x00\x00\x01"
- "\x00\x00\x00\x02\x00\x00\x00\x03", standardFlags),
- ({"subids": (1, 3, 6, 1, 23, 1, 2, 3),
- "include": False},
- ""))
+ cls, xtr = dec("\x03\x17\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03", standardFlags)
+ self.assertEqual(isinstance(cls, target), True)
+ self.assertEqual(cls.subids, (1, 3, 6, 1, 23, 1, 2, 3))
+ self.assertEqual(cls.include, False)
+ self.assertEqual(xtr, "")
# Decode include
- self.assertEqual(dec("\x02\x00\x05\x00\x00\x00\x00\x01\x00\x00\x00\x02",
- standardFlags),
- ({"subids": (1, 2), "include": True}, ""))
+ cls, xtr = dec("\x02\x00\x05\x00\x00\x00\x00\x01\x00\x00\x00\x02",
+ standardFlags)
+ self.assertEqual(isinstance(cls, target), True)
+ self.assertEqual(cls.subids, (1, 2))
+ self.assertEqual(cls.include, True)
+ self.assertEqual(xtr, "")
# Decode together
- self.assertEqual(dec("\x04\x01\x02\x00\x00\x00\x00\x03"
- "\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x06",
- standardFlags),
- ({"subids": (1, 3, 6, 1, 1, 3, 4, 5, 6),
- "include": True},
- ""))
+ cls, xtr = dec("\x04\x01\x02\x00\x00\x00\x00\x03"
+ "\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x06",
+ standardFlags)
+ self.assertEqual(isinstance(cls, target), True)
+ self.assertEqual(cls.subids, (1, 3, 6, 1, 1, 3, 4, 5, 6))
+ self.assertEqual(cls.include, True)
+ self.assertEqual(xtr, "")
# Decode maximum size
- self.assertEqual(dec(maximumOIDstr, standardFlags),
- ({"subids": maximumOIDsubs, "include": False}, ""))
+ cls, xtr = dec(maximumOIDstr, standardFlags)
+ self.assertEqual(isinstance(cls, target), True)
+ self.assertEqual(cls.subids, maximumOIDsubs)
+ self.assertEqual(cls.include, False)
+ self.assertEqual(xtr, "")
# Decode over maximum size
# Need to replace the hardcoded n_subid=128 with 129
fatOID = "\x81" + maximumOIDstr[1:] + "\xDE\xAD\xBE\xEF"
try:
- dec(fatOID, standardFlags)
- fail = False
+ cls, xtr = dec(fatOID, standardFlags)
+ errored = False
except ValueError:
- fail = True
- self.assertEqual(fail, True)
+ errored = True
+ self.assertEqual(errored, True)
+ # Test compareOID
+ # Test equal
+ a = target((1, 2, 3, 4))
+ b = target((1, 2, 3, 4))
+ self.assertEqual(a.compareOID(b), 0)
+
+ # Test equal length, one < two
+ b = target((1, 2, 3, 5))
+ self.assertEqual(a.compareOID(b), -1)
+ # Test equal length, one > two
+ b = target((1, 2, 3, 0))
+ self.assertEqual(a.compareOID(b), 1)
+ # Test one shorter, less than two, equal for length
+ a = target((1, 2, 3))
+ b = target((1, 2, 3, 4))
+ self.assertEqual(a.compareOID(b), -1)
+ # Test one shorter, less than two
+ b = target((1, 2, 4, 5))
+ self.assertEqual(a.compareOID(b), -1)
+ # Test one shorter, greater than two
+ b = target((1, 2, 2, 4))
+ self.assertEqual(a.compareOID(b), 1)
+ # Test two shorter, less than one, equal for length
+ a = target((1, 2, 3, 4))
+ b = target((1, 2, 3))
+ self.assertEqual(a.compareOID(b), 1)
+ # Test two shorter, less than one
+ a = target((1, 2, 4, 5))
+ self.assertEqual(a.compareOID(b), 1)
+ # Test two shorter, greater than one
+ a = target((1, 2, 2, 4))
+ b = target((1, 2, 3))
+ self.assertEqual(a.compareOID(b), -1)
def test_searchrange(self):
- enc = ntp.agentx.encode_searchrange
- dec = ntp.agentx.decode_searchrange
+ target = ntp.agentx.SearchRange
+ dec = ntp.agentx.decode_SearchRange
+ oid = ntp.agentx.OID
+ # Test init
+ # Basic
+ cls = target(oid((1, 2), True), oid((3, 4), False))
+ self.assertEqual(cls.start.subids, (1, 2))
+ self.assertEqual(cls.start.include, True)
+ self.assertEqual(cls.end.subids, (3, 4))
+ self.assertEqual(cls.end.include, False)
+ # Override
+ cls = target(oid((1, 2), True), oid((3, 4), True), False)
+ self.assertEqual(cls.start.subids, (1, 2))
+ self.assertEqual(cls.start.include, False)
+ self.assertEqual(cls.end.subids, (3, 4))
+ self.assertEqual(cls.end.include, False)
+ # Turn tuples into OIDs
+ cls = target((1, 2), (3, 4), True)
+ self.assertEqual(cls.start.subids, (1, 2))
+ self.assertEqual(cls.start.include, True)
+ self.assertEqual(cls.end.subids, (3, 4))
+ self.assertEqual(cls.end.include, False)
+ # Test encoding
# Encode minimum size
- self.assertEqual(enc(True, (), (), False),
- "\x00\x00\x00\x00\x00\x00\x00\x00")
+ cls = target((), (), False)
+ self.assertEqual(cls.encode(True), "\x00\x00\x00\x00\x00\x00\x00\x00")
# Encode inclusive
- self.assertEqual(enc(True, (1, 2, 3, 4), (5, 6, 7, 8), True),
+ cls = target((1, 2, 3, 4), (5, 6, 7, 8), True)
+ self.assertEqual(cls.encode(True),
"\x04\x00\x01\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x04"
@@ -1742,7 +1765,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x05\x00\x00\x00\x06"
"\x00\x00\x00\x07\x00\x00\x00\x08")
# Encode exclusive
- self.assertEqual(enc(True, (1, 2, 3, 4), (5, 6, 7, 8), False),
+ cls = target((1, 2, 3, 4), (5, 6, 7, 8), False)
+ self.assertEqual(cls.encode(True),
"\x04\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x04"
@@ -1750,19 +1774,18 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x05\x00\x00\x00\x06"
"\x00\x00\x00\x07\x00\x00\x00\x08")
# Encode exclusive, little endian
- self.assertEqual(enc(False, (1, 2, 3, 4), (5, 6, 7, 8), False),
+ self.assertEqual(cls.encode(False),
"\x04\x00\x00\x00"
"\x01\x00\x00\x00\x02\x00\x00\x00"
"\x03\x00\x00\x00\x04\x00\x00\x00"
"\x04\x00\x00\x00"
"\x05\x00\x00\x00\x06\x00\x00\x00"
"\x07\x00\x00\x00\x08\x00\x00\x00")
+ # Test decode
# Decode minimum size, extra data
self.assertEqual(dec("\x00\x00\x00\x00\x00\x00\x00\x00" + extraData,
standardFlags),
- ({"start": {"subids": (), "include": False},
- "end": {"subids": (), "include": False}},
- extraData))
+ (target((), (), False), extraData))
# Decode inclusive
self.assertEqual(dec("\x04\x00\x01\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
@@ -1771,9 +1794,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x05\x00\x00\x00\x06"
"\x00\x00\x00\x07\x00\x00\x00\x08",
standardFlags),
- ({"start": {"subids": (1, 2, 3, 4), "include": True},
- "end": {"subids": (5, 6, 7, 8), "include": False}},
- ""))
+ (target((1, 2, 3, 4), (5, 6, 7, 8), True), ""))
# Decode exclusive
self.assertEqual(dec("\x04\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
@@ -1782,9 +1803,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x05\x00\x00\x00\x06"
"\x00\x00\x00\x07\x00\x00\x00\x08",
standardFlags),
- ({"start": {"subids": (1, 2, 3, 4), "include": False},
- "end": {"subids": (5, 6, 7, 8), "include": False}},
- ""))
+ (target((1, 2, 3, 4), (5, 6, 7, 8), False), ""))
# Decode little endian
self.assertEqual(dec("\x04\x00\x01\x00"
"\x01\x00\x00\x00\x02\x00\x00\x00"
@@ -1793,32 +1812,30 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x05\x00\x00\x00\x06\x00\x00\x00"
"\x07\x00\x00\x00\x08\x00\x00\x00",
lilEndianFlags),
- ({"start": {"subids": (1, 2, 3, 4), "include": True},
- "end": {"subids": (5, 6, 7, 8), "include": False}},
- ""))
+ (target((1, 2, 3, 4), (5, 6, 7, 8), True), ""))
def test_encode_searchrange_list(self):
enc = ntp.agentx.encode_searchrange_list
+ srch = ntp.agentx.SearchRange
# Encode
- self.assertEqual(enc(True, (((1, 2), (1, 2), True),
- ((2, 3), (3, 4), False))),
+ self.assertEqual(enc(True, (srch((1, 2), (1, 2), True),
+ srch((2, 3), (3, 4)))),
"\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
"\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
"\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
"\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04")
# Encode, little endian
- self.assertEqual(enc(False, (((1, 2), (1, 2), True),
- ((2, 3), (3, 4), False))),
+ self.assertEqual(enc(False, (srch((1, 2), (1, 2), True),
+ srch((2, 3), (3, 4)))),
"\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
"\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
"\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
"\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00")
# Test, null terminated
- self.assertEqual(enc(True,
- (((1, 2), (1, 2), True),
- ((2, 3), (3, 4), False)),
- nullTerminate=True),
+ self.assertEqual(enc(True, (srch((1, 2), (1, 2), True),
+ srch((2, 3), (3, 4))),
+ nullTerminate=True),
"\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
"\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
"\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
@@ -1827,52 +1844,62 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_decode_searchrange_list(self):
dec = ntp.agentx.decode_searchrange_list
+ srch = ntp.agentx.SearchRange
# Decode
- self.assertEqual(dec("\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
- "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04",
+ self.assertEqual(dec("\x02\x00\x01\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x02\x00\x00\x00"
+ "\x00\x00\x00\x03\x00\x00\x00\x04",
standardFlags),
- ({"start": {"subids": (1, 2), "include": True},
- "end": {"subids": (1, 2), "include": False}},
- {"start": {"subids": (2, 3), "include": False},
- "end": {"subids": (3, 4), "include": False}}))
+ (srch((1, 2), (1, 2), True),
+ srch((2, 3), (3, 4), False)))
# Test, little endian
- self.assertEqual(dec("\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
- "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00",
+ self.assertEqual(dec("\x02\x00\x01\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00",
lilEndianFlags),
- ({"start": {"subids": (1, 2), "include": True},
- "end": {"subids": (1, 2), "include": False}},
- {"start": {"subids": (2, 3), "include": False},
- "end": {"subids": (3, 4), "include": False}}))
+ (srch((1, 2), (1, 2), True),
+ srch((2, 3), (3, 4), False)))
def test_decode_searchrange_list_nullterm(self):
dec = ntp.agentx.decode_searchrange_list_nullterm
+ srch = ntp.agentx.SearchRange
# Decode
- self.assertEqual(dec("\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
- "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
- "\x00\x00\x00\x00" + extraData, standardFlags),
- (({"start": {"subids": (1, 2), "include": True},
- "end": {"subids": (1, 2), "include": False}},
- {"start": {"subids": (2, 3), "include": False},
- "end": {"subids": (3, 4), "include": False}}),
- extraData))
+ self.assertEqual(dec("\x02\x00\x01\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x02\x00\x00\x00"
+ "\x00\x00\x00\x03\x00\x00\x00\x04",
+ standardFlags),
+ ((srch((1, 2), (1, 2), True),
+ srch((2, 3), (3, 4), False)),
+ ""))
# Test, little endian
- self.assertEqual(dec("\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
- "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00"
+ self.assertEqual(dec("\x02\x00\x01\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
"\x00\x00\x00\x00" + extraData, lilEndianFlags),
- (({"start": {"subids": (1, 2), "include": True},
- "end": {"subids": (1, 2), "include": False}},
- {"start": {"subids": (2, 3), "include": False},
- "end": {"subids": (3, 4), "include": False}}),
+ ((srch((1, 2), (1, 2), True),
+ srch((2, 3), (3, 4), False)),
extraData))
def test_encode_octetstr(self):
@@ -1894,7 +1921,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Decode empty
self.assertEqual(dec("\x00\x00\x00\x00", standardFlags), ("", ""))
# Decode word multiple, extra data
- self.assertEqual(dec("\x00\x00\x00\x04blah" + extraData, standardFlags),
+ self.assertEqual(dec("\x00\x00\x00\x04blah" + extraData,
+ standardFlags),
("blah", extraData))
# Decode word multiple, little endian
self.assertEqual(dec("\x04\x00\x00\x00blah", lilEndianFlags),
@@ -1905,172 +1933,187 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
standardFlags),
("blarg", extraData))
- def test_encode_varbind(self):
- enc = ntp.agentx.encode_varbind
+ def test_Varbind(self):
+ target = ntp.agentx.Varbind
a = ntp.agentx
+ # Test init
+ cls = target(a.VALUE_INTEGER, (1, 2, 3), 42)
+ self.assertEqual(cls.valueType, a.VALUE_INTEGER)
+ self.assertEqual(cls.oid, a.OID((1, 2, 3), False))
+ self.assertEqual(cls.payload, 42)
+ # Test repr
+ self.assertEqual(repr(cls),
+ "Varbind(vtype=2, oid=OID((1, 2, 3), False), "
+ "payload=42)")
# Test payloadless types
- self.assertEqual(enc(True, a.NULL, (1, 2, 3)),
+ cls = target(a.VALUE_NULL, (1, 2, 3))
+ self.assertEqual(cls.encode(True),
"\x00\x05\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
- self.assertEqual(enc(True, a.NO_SUCH_OBJECT, (1, 2, 3)),
+ cls = target(a.VALUE_NO_SUCH_OBJECT, (1, 2, 3))
+ self.assertEqual(cls.encode(True),
"\x00\x80\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
- self.assertEqual(enc(True, a.NO_SUCH_INSTANCE, (1, 2, 3)),
+ cls = target(a.VALUE_NO_SUCH_INSTANCE, (1, 2, 3))
+ self.assertEqual(cls.encode(True),
"\x00\x81\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
- self.assertEqual(enc(True, a.END_OF_MIB_VIEW, (1, 2, 3)),
+ cls = target(a.VALUE_END_OF_MIB_VIEW, (1, 2, 3))
+ self.assertEqual(cls.encode(True),
"\x00\x82\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
# Test octet based types
- self.assertEqual(enc(True, a.OCTET_STR, (1, 2, 3), (1, 2, 3, 4, 5)),
+ cls = target(a.VALUE_OCTET_STR, (1, 2, 3), (1, 2, 3, 4, 5))
+ self.assertEqual(cls.encode(True),
"\x00\x04\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x05"
"\x01\x02\x03\x04\x05\x00\x00\x00")
- self.assertEqual(enc(True, a.IP_ADDR, (1, 2, 3), (16, 32, 48, 64)),
+ cls = target(a.VALUE_IP_ADDR, (1, 2, 3), (16, 32, 48, 64))
+ self.assertEqual(cls.encode(True),
"\x00\x40\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x04\x10\x20\x30\x40")
# Test integer32 types
- self.assertEqual(enc(True, a.INTEGER, (1, 2, 3), 42),
+ cls = target(a.VALUE_INTEGER, (1, 2, 3), 42)
+ self.assertEqual(cls.encode(True),
"\x00\x02\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A")
- self.assertEqual(enc(True, a.COUNTER32, (1, 2, 3), 42),
+ cls = target(a.VALUE_COUNTER32, (1, 2, 3), 42)
+ self.assertEqual(cls.encode(True),
"\x00\x41\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A")
- self.assertEqual(enc(True, a.GAUGE32, (1, 2, 3), 42),
+ cls = target(a.VALUE_GAUGE32, (1, 2, 3), 42)
+ self.assertEqual(cls.encode(True),
"\x00\x42\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A")
- self.assertEqual(enc(True, a.TIME_TICKS, (1, 2, 3), 42),
+ cls = target(a.VALUE_TIME_TICKS, (1, 2, 3), 42)
+ self.assertEqual(cls.encode(True),
"\x00\x43\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A")
# Test integer64 type
- self.assertEqual(enc(True, a.COUNTER64, (1, 2, 3), 42),
+ cls = target(a.VALUE_COUNTER64, (1, 2, 3), 42)
+ self.assertEqual(cls.encode(True),
"\x00\x46\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x00\x00\x00\x00\x2A")
# Test oid type
- self.assertEqual(enc(True, a.OID, (1, 2, 3), (16, 42, 256), False),
+ cls = target(a.VALUE_OID, (1, 2, 3), a.OID((16, 42, 256), False))
+ self.assertEqual(cls.encode(True),
"\x00\x06\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x03\x00\x00\x00\x00\x00\x00\x10"
"\x00\x00\x00\x2A\x00\x00\x01\x00")
# Test oid type, little endian
- self.assertEqual(enc(False, a.OID, (1, 2, 3), (16, 42, 256), False),
+ cls = target(a.VALUE_OID, (1, 2, 3), a.OID((16, 42, 256), False))
+ self.assertEqual(cls.encode(False),
"\x06\x00\x00\x00\x03\x00\x00\x00"
"\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
"\x03\x00\x00\x00\x10\x00\x00\x00"
"\x2A\x00\x00\x00\x00\x01\x00\x00")
def test_decode_varbind(self):
- f = ntp.agentx.decode_varbind
+ f = ntp.agentx.decode_Varbind
a = ntp.agentx
# Test payloadless types
self.assertEqual(f("\x00\x05\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03",
standardFlags),
- ({"type": a.NULL,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": None},
+ (a.Varbind(a.VALUE_NULL, a.OID((1, 2, 3), False)),
""))
self.assertEqual(f("\x00\x80\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03",
standardFlags),
- ({"type": a.NO_SUCH_OBJECT,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": None},
+ (a.Varbind(a.VALUE_NO_SUCH_OBJECT,
+ a.OID((1, 2, 3), False)),
""))
self.assertEqual(f("\x00\x81\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03",
standardFlags),
- ({"type": a.NO_SUCH_INSTANCE,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": None},
+ (a.Varbind(a.VALUE_NO_SUCH_INSTANCE,
+ a.OID((1, 2, 3), False)),
""))
self.assertEqual(f("\x00\x82\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03",
standardFlags),
- ({"type": a.END_OF_MIB_VIEW,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": None},
+ (a.Varbind(a.VALUE_END_OF_MIB_VIEW,
+ a.OID((1, 2, 3), False)),
""))
# Test octet based types
self.assertEqual(f("\x00\x04\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x0512345\x00\x00\x00",
standardFlags),
- ({"type": a.OCTET_STR,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": "12345"},
+ (a.Varbind(a.VALUE_OCTET_STR,
+ a.OID((1, 2, 3), False),
+ "12345"),
""))
self.assertEqual(f("\x00\x40\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x04\x10\x20\x30\x40", standardFlags),
- ({"type": a.IP_ADDR,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": (16, 32, 48, 64)},
+ (a.Varbind(a.VALUE_IP_ADDR,
+ a.OID((1, 2, 3), False),
+ (16, 32, 48, 64)),
""))
# Test integer32 types
self.assertEqual(f("\x00\x02\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A", standardFlags),
- ({"type": a.INTEGER,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": 42},
+ (a.Varbind(a.VALUE_INTEGER,
+ a.OID((1, 2, 3), False),
+ 42),
""))
self.assertEqual(f("\x00\x41\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A", standardFlags),
- ({"type": a.COUNTER32,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": 42},
+ (a.Varbind(a.VALUE_COUNTER32,
+ a.OID((1, 2, 3), False),
+ 42),
""))
self.assertEqual(f("\x00\x42\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A", standardFlags),
- ({"type": a.GAUGE32,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": 42},
+ (a.Varbind(a.VALUE_GAUGE32,
+ a.OID((1, 2, 3), False),
+ 42),
""))
self.assertEqual(f("\x00\x43\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A", standardFlags),
- ({"type": a.TIME_TICKS,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": 42},
+ (a.Varbind(a.VALUE_TIME_TICKS,
+ a.OID((1, 2, 3), False),
+ 42),
""))
# Test integer64 type
self.assertEqual(f("\x00\x46\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x00\x00\x00\x00\x2A", standardFlags),
- ({"type": a.COUNTER64,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": 42},
+ (a.Varbind(a.VALUE_COUNTER64,
+ a.OID((1, 2, 3), False),
+ 42),
""))
# Test oid type
self.assertEqual(f("\x00\x06\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x03\x00\x00\x00\x00\x00\x00\x10"
"\x00\x00\x00\x2A\x00\x00\x01\x00", standardFlags),
- ({"type": a.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (16, 42, 256),
- "include": False}},
+ (a.Varbind(a.VALUE_OID,
+ a.OID((1, 2, 3), False),
+ a.OID((16, 42, 256), False)),
""))
# Test integer32 with little endian
self.assertEqual(f("\x43\x00\x00\x00\x03\x00\x00\x00"
"\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
"\x2A\x00\x00\x00", lilEndianFlags),
- ({"type": a.TIME_TICKS,
- "name": {"subids": (1, 2, 3), "include": False},
- "data": 42},
+ (a.Varbind(a.VALUE_TIME_TICKS,
+ a.OID((1, 2, 3), False),
+ 42),
""))
#
@@ -2171,6 +2214,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_decode_packet(self):
f = ntp.agentx.decode_packet
x = ntp.agentx
+ srch = x.SearchRange
# Not testing all the variants of each packet type, that is
# the job of the other tests.
self.maxDiff = None
@@ -2185,8 +2229,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x00\x00\x00\x03foo\x00"),
(x.OpenPDU(True, 12, 34, 56, 78,
- {"subids": (1, 2, 3, 4),
- "include": False},
+ x.OID((1, 2, 3, 4), False),
"foo"),
""))
# Test open, extraData
@@ -2199,8 +2242,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x00\x00\x00\x03foo\x00" + extraData),
(x.OpenPDU(True, 12, 34, 56, 78,
- {"subids": (1, 2, 3, 4),
- "include": False},
+ x.OID((1, 2, 3, 4), False),
"foo"),
extraData))
# Test close
@@ -2218,8 +2260,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x03"),
(x.RegisterPDU(True, 1, 2, 3, 4, 5,
- {"subids": (1, 2, 3),
- "include": False},
+ x.OID((1, 2, 3), False),
0, None, None),
""))
# Test unregister
@@ -2230,8 +2271,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x03"),
(x.UnregisterPDU(True, 1, 2, 3, 5,
- {"subids": (1, 2, 3),
- "include": False},
+ x.OID((1, 2, 3), False),
0, None, None),
""))
# Test get
@@ -2257,14 +2297,8 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x02\x00\x01\x00\x00\x00\x00\x06\x00\x00\x00\x07"
"\x02\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x09"),
(x.GetBulkPDU(True, 1, 2, 3, 1, 5,
- ({"start": {"subids": (1, 2),
- "include": False},
- "end": {"subids": (3, 4),
- "include": False}},
- {"start": {"subids": (6, 7),
- "include": True},
- "end": {"subids": (8, 9),
- "include": False}})),
+ (srch((1, 2), (3, 4), False),
+ srch((6, 7), (8, 9), True))),
""))
# Test test set
self.assertEqual(f("\x01\x08\x10\x00"
@@ -2280,15 +2314,12 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah"),
(x.TestSetPDU(True, 1, 2, 3,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"})),
+ (x.Varbind(x.VALUE_OID,
+ x.OID((1, 2, 3), False),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR,
+ x.OID((1, 2, 4), False),
+ "blah"))),
""))
# Test commit set
self.assertEqual(f("\x01\x09\x10\x00"
@@ -2322,15 +2353,12 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah"),
(x.NotifyPDU(True, 1, 2, 3,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"})),
+ (x.Varbind(x.VALUE_OID,
+ x.OID((1, 2, 3), False),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR,
+ x.OID((1, 2, 4), False),
+ "blah"))),
""))
# Test ping
self.assertEqual(f("\x01\x0D\x10\x00"
@@ -2352,15 +2380,12 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah"),
(x.IndexAllocPDU(True, 1, 2, 3, True, True,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"})),
+ (x.Varbind(x.VALUE_OID,
+ x.OID((1, 2, 3), False),
+ x.OID((4, 5, 6), False)),
+ x.Varbind(x.VALUE_OCTET_STR,
+ x.OID((1, 2, 4), False),
+ "blah"))),
""))
# Test index dealloc
self.assertEqual(f("\x01\x0F\x16\x00"
@@ -2376,15 +2401,15 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah"),
(x.IndexDeallocPDU(True, 1, 2, 3, True, True,
- ({"type": x.OID,
- "name": {"subids": (1, 2, 3),
- "include": False},
- "data": {"subids": (4, 5, 6),
- "include": False}},
- {"type": x.OCTET_STR,
- "name": {"subids": (1, 2, 4),
- "include": False},
- "data": "blah"})),
+ (x.Varbind(x.VALUE_OID,
+ x.OID((1, 2, 3),
+ False),
+ x.OID((4, 5, 6),
+ False)),
+ x.Varbind(x.VALUE_OCTET_STR,
+ x.OID((1, 2, 4),
+ False),
+ "blah"))),
""))
# Test add agent caps
self.assertEqual(f("\x01\x10\x10\x00"
@@ -2394,8 +2419,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x05\x00\x00\x00\x06"
"\x00\x00\x00\x04blah"),
(x.AddAgentCapsPDU(True, 1, 2, 3,
- {"subids": (4, 5, 6),
- "include": False},
+ x.OID((4, 5, 6), False),
"blah"),
""))
# Test rm agent caps
@@ -2405,8 +2429,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x04"
"\x00\x00\x00\x05\x00\x00\x00\x06"),
(x.RMAgentCapsPDU(True, 1, 2, 3,
- {"subids": (4, 5, 6),
- "include": False}),
+ x.OID((4, 5, 6), False)),
""))
# Test response
self.assertEqual(f("\x01\x12\x10\x00"
@@ -2426,29 +2449,6 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
fail = True
self.assertEqual(fail, True)
- def test_compareOID(self):
- f = ntp.agentx.compareOID
-
- # Test equal
- self.assertEqual(f((1, 2, 3, 4), (1, 2, 3, 4)), 0)
-
- # Test equal length, one < two
- self.assertEqual(f((1, 2, 3, 4), (1, 2, 3, 5)), -1)
- # Test equal length, one > two
- self.assertEqual(f((1, 2, 3, 4), (1, 2, 3, 0)), 1)
- # Test one shorter, less than two, equal for length
- self.assertEqual(f((1, 2, 3), (1, 2, 3, 4)), -1)
- # Test one shorter, less than two
- self.assertEqual(f((1, 2, 3), (1, 2, 4, 5)), -1)
- # Test one shorter, greater than two
- self.assertEqual(f((1, 2, 3), (1, 2, 2, 4)), 1)
- # Test two shorter, less than one, equal for length
- self.assertEqual(f((1, 2, 3, 4), (1, 2, 3)), 1)
- # Test two shorter, less than one
- self.assertEqual(f((1, 2, 4, 5), (1, 2, 3)), 1)
- # Test two shorter, greater than one
- self.assertEqual(f((1, 2, 2, 4), (1, 2, 3)), -1)
-
if __name__ == "__main__":
unittest.main()
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/993078379661418a893ed218a76c813ec45de6cb...f6507e3f80e599411aa53eade23d93024fea8d1c
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/993078379661418a893ed218a76c813ec45de6cb...f6507e3f80e599411aa53eade23d93024fea8d1c
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170927/8d81616d/attachment.html>
More information about the vc
mailing list