[Git][NTPsec/ntpsec][master] Added agentx library and tests
Ian Bruene
gitlab at mg.gitlab.com
Mon Jun 26 16:28:54 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
68533351 by Ian Bruene at 2017-06-26T11:16:41-05:00
Added agentx library and tests
- - - - -
2 changed files:
- + pylib/agentx.py
- + tests/pylib/test_agentx.py
Changes:
=====================================
pylib/agentx.py
=====================================
--- /dev/null
+++ b/pylib/agentx.py
@@ -0,0 +1,708 @@
+# -*- coding: utf-8 -*-
+# Common utility functions
+# SPDX-License-Identifier: BSD-2-clause
+
+from __future__ import print_function
+
+import struct
+
+# Endianess is selectable in the AgentX packet headers. In this program
+# *ALL* values will be transmitted in big / network endianess.
+
+internetPrefix = (1, 3, 6, 1) # Used by the prefix option of OID headers
+prefixCount = len(internetPrefix)
+
+# =======================================================================
+#
+# Function types fall into the following categories:
+# Data type encoder/decoders
+# Packet body encoder/decoders
+# Glue/Utility/Misc functions
+#
+# To encode a packet, call the relevant encode_*pdu function.
+#
+# To decode a packet, call the decode_packet function, which will select
+# the correct decoder for that packet type.
+#
+# =======================================================================
+
+
+# =======================================================================
+#
+# Packet body encoders / decoders
+#
+# Encoders take information for both the header and body and
+# return a complete packet.
+#
+# Decoders take just the body sliced away from everything else,
+# and a context flag. They return tuples containing the data.
+# They do not return extra data as they are never supposed to
+# receive it.
+# Decoders are not meant to be called directly by external code.
+#
+# =======================================================================
+
+
+def encode_openpdu(sID, tactID, pktID, timeout, oid, description):
+ payload = struct.pack("Bxxx", timeout)
+ payload += encode_oid(oid, False)
+ payload += encode_octetstr(description)
+ header = encode_pduheader(PDU_OPEN, False, False, False, False,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_openpdu(data, flags):
+ temp, data = slicedata(data, 4)
+ timeout = struct.unpack("Bxxx", temp)[0]
+ oid, data = decode_oid(data, flags)
+ octets = decode_octetstr(data, flags)[0]
+ return (timeout, oid, octets)
+
+
+def encode_closepdu(sID, tactID, pktID, reason):
+ if reason not in definedReasons:
+ raise ValueError("Close reason %s not in defined types" % reason)
+ payload = struct.pack("Bxxx", reason)
+ header = encode_pduheader(PDU_CLOSE, False, False, False, False,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_closepdu(data, flags):
+ reason = ord(data[0]) # Bxxx
+ return reason
+
+
+def encode_registerpdu_core(isregister, sID, tactID, pktID,
+ timeout, priority, subtree,
+ rangeSubid=0, upperBound=None, context=None):
+ if isregister:
+ pkttype = PDU_REGISTER
+ regbit = True
+ else:
+ pkttype = PDU_UNREGISTER
+ timeout = 0
+ regbit = False
+ contextP, payload = encode_context(context)
+ payload += struct.pack(">BBBx", timeout, priority, rangeSubid)
+ payload += encode_oid(subtree, False)
+ if rangeSubid != 0:
+ if upperBound is None:
+ raise ValueError("upperBound must be set if rangeSubid is set")
+ payload += struct.pack(">I", upperBound)
+ header = encode_pduheader(pkttype, regbit, False, False, contextP,
+ sID, tactID, pktID, len(payload))
+ packet = header + payload
+ return packet
+
+
+def decode_registerpdu_core(data, flags):
+ endianToken = getendian(flags)
+ context, data = decode_context(data, flags)
+ temp, data = slicedata(data, 4)
+ timeout, priority, rangeSubid = struct.unpack(endianToken + "BBBx", temp)
+ oid, data = decode_oid(data, flags)
+ if rangeSubid != 0:
+ temp, data = slicedata(data, 4)
+ upperBound = struct.unpack(endianToken + "I", temp)[0]
+ else:
+ upperBound = None
+ result = (oid, timeout, priority, context, rangeSubid, upperBound)
+ return result
+
+
+def encode_registerpdu(sID, tactID, pktID, timeout, priority, subtree,
+ rangeSubid=0, upperBound=None, context=None):
+ return encode_registerpdu_core(True, sID, tactID, pktID, timeout,
+ priority, subtree, rangeSubid,
+ upperBound, context)
+
+
+def encode_unregisterpdu(sID, tactID, pktID, priority, subtree,
+ rangeSubid=0, upperBound=None, context=None):
+ return encode_registerpdu_core(False, sID, tactID, pktID, 0,
+ priority, subtree, rangeSubid,
+ upperBound, context)
+
+
+decode_registerpdu = decode_registerpdu_core
+
+
+def decode_unregisterpdu(data, flags):
+ result = decode_registerpdu_core(data, flags)
+ result = (result[0],) + result[2:] # Remove the timeout
+ return result
+
+
+def encode_getpdu_core(isnext, sID, tactID, pktID, oidranges, context=None):
+ if isnext is True:
+ pkttype = PDU_GET_NEXT
+ else:
+ pkttype = PDU_GET
+ contextP, payload = encode_context(context)
+ payload += encode_searchrange_list(oidranges)
+ header = encode_pduheader(pkttype, False, False, False, contextP,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_getpdu_core(data, flags):
+ context, data = decode_context(data, flags)
+ oidranges, data = decode_searchrange_list(data, flags)
+ return (context, oidranges)
+
+
+def encode_getpdu(sID, tactID, pktID, oidranges, context=None):
+ return encode_getpdu_core(False, sID, tactID, pktID, oidranges, context)
+
+
+def encode_getnextpdu(sID, tactID, pktID, oidranges, context=None):
+ return encode_getpdu_core(True, sID, tactID, pktID, oidranges, context)
+
+
+decode_getpdu = decode_getnextpdu = decode_getpdu_core
+
+
+def encode_getbulkpdu(sID, tactID, pktID, nonReps, maxReps,
+ oidranges, context=None):
+ contextP, payload = encode_context(context)
+ payload += struct.pack(">HH", nonReps, maxReps)
+ payload += encode_searchrange_list(oidranges)
+ header = encode_pduheader(PDU_GET_BULK, False, False, False, contextP,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_getbulkpdu(data, flags):
+ endianToken = getendian(flags)
+ context, data = decode_context(data, flags)
+ temp, data = slicedata(data, 4)
+ nonReps, maxReps = struct.unpack(endianToken + "HH", temp)
+ oidranges, data = decode_searchrange_list(data, flags)
+ result = (context, nonReps, maxReps, oidranges)
+ return result
+
+
+def encode_testsetpdu(sID, tactID, pktID, varbinds, context=None):
+ contextP, payload = encode_context(context)
+ payload += encode_varbindlist(varbinds)
+ header = encode_pduheader(PDU_TEST_SET, False, False, False, contextP,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_testsetpdu(data, flags):
+ context, data = decode_context(data, flags)
+ varbinds = decode_varbindlist(data, flags)
+ return context, varbinds
+
+
+# CommitSet, UndoSet, and CleanupSet are bare headers. Don't need decoders
+
+def encode_commitsetpdu(sID, tactID, pktID):
+ return encode_pduheader(PDU_COMMIT_SET, False, False, False, False,
+ sID, tactID, pktID, 0)
+
+
+def encode_undosetpdu(sID, tactID, pktID):
+ return encode_pduheader(PDU_UNDO_SET, False, False, False, False,
+ sID, tactID, pktID, 0)
+
+
+def encode_cleanupsetpdu(sID, tactID, pktID):
+ return encode_pduheader(PDU_CLEANUP_SET, False, False, False, False,
+ sID, tactID, pktID, 0)
+
+
+def encode_pingpdu(sID, tactID, pktID, context=None):
+ contextP, payload = encode_context(context)
+ header = encode_pduheader(PDU_PING, False, False, False, contextP,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_pingpdu(data, flags):
+ context, data = decode_context(data, flags)
+ return context
+
+
+def encode_notifypdu(sID, tactID, pktID, varbinds, context=None):
+ contextP, payload = encode_context(context)
+ payload += encode_varbindlist(varbinds)
+ header = encode_pduheader(PDU_NOTIFY, False, False, False, contextP,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_notifypdu(data, flags):
+ context, data = decode_context(data, flags)
+ varbinds = decode_varbindlist(data, flags)
+ return (context, varbinds)
+
+
+def encode_indexallocpdu_core(isdealloc,
+ sID, tactID, pktID, newIndex, anyIndex,
+ varbinds, context=None):
+ pkttype = PDU_INDEX_DEALLOC if isdealloc is True else PDU_INDEX_ALLOC
+ contextP, payload = encode_context(context)
+ payload += encode_varbindlist(varbinds)
+ header = encode_pduheader(pkttype, False, newIndex, anyIndex,
+ contextP, sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def encode_indexallocpdu(sID, tactID, pktID, newIndex, anyIndex,
+ varbinds, context=None):
+ return encode_indexallocpdu_core(False,
+ sID, tactID, pktID, newIndex, anyIndex,
+ varbinds, context)
+
+
+def encode_indexdeallocpdu(sID, tactID, pktID, newIndex, anyIndex,
+ varbinds, context=None):
+ return encode_indexallocpdu_core(True,
+ sID, tactID, pktID, newIndex, anyIndex,
+ varbinds, context)
+
+
+def decode_indexallocpdu(data, flags):
+ context, data = decode_context(data, flags)
+ varbinds = decode_varbindlist(data, flags)
+ return (context, varbinds)
+
+
+decode_indexdeallocpdu = decode_indexallocpdu
+
+
+def encode_addagentcapspdu(sID, tactID, pktID, oid, descr, context=None):
+ contextP, payload = encode_context(context)
+ payload += encode_oid(oid, False)
+ payload += encode_octetstr(descr)
+ header = encode_pduheader(PDU_ADD_AGENT_CAPS,
+ False, False, False, contextP,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_addagentcapspdu(data, flags):
+ context, data = decode_context(data, flags)
+ oid, data = decode_oid(data, flags)
+ oid = oid[0] # we don't need the include field here
+ descr = decode_octetstr(data, flags)[0]
+ return (context, oid, descr)
+
+
+def encode_rmagentcapspdu(sID, tactID, pktID, oid, context=None):
+ contextP, payload = encode_context(context)
+ payload += encode_oid(oid, False)
+ header = encode_pduheader(PDU_RM_AGENT_CAPS,
+ False, False, False, contextP,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_rmagentcapspdu(data, flags):
+ context, data = decode_context(data, flags)
+ oid, data = decode_oid(data, flags)
+ oid = oid[0] # we don't need the include here
+ return (context, oid)
+
+
+def encode_responsepdu(sID, tactID, pktID,
+ sysUpTime, resError, resIndex,
+ varbinds=None):
+ payload = struct.pack(">IHH", sysUpTime, resError, resIndex)
+ if varbinds is not None:
+ payload += encode_varbindlist(varbinds)
+ header = encode_pduheader(PDU_RESPONSE, False, False, False, False,
+ sID, tactID, pktID, len(payload))
+ return header + payload
+
+
+def decode_responsepdu(data, flags):
+ endianToken = getendian(flags)
+ temp, data = slicedata(data, 8)
+ sysUpTime, resError, resIndex = struct.unpack(endianToken + "IHH", temp)
+ if len(data) > 0:
+ varbinds = decode_varbindlist(data, flags)
+ else:
+ varbinds = None
+ return (sysUpTime, resError, resIndex, varbinds)
+
+
+# ========================================================================
+#
+# Data type encoders / decoders
+#
+# Encoders take data relevant to the type, return encoded string.
+#
+# Decoders take encoded string, return a tuple of (value, restOfData).
+#
+# ========================================================================
+
+def encode_oid(subids, include):
+ subids = tuple(subids)
+ numsubs = len(subids)
+ if not (0 <= numsubs <= 128): # Packet can have a maximum of 128 sections
+ raise ValueError("OID has too many subids")
+ if subids[:prefixCount] == internetPrefix:
+ if numsubs > prefixCount:
+ prefix = subids[prefixCount] # the number after the prefix
+ subids = subids[prefixCount + 1:] # +1 to strip prefix arg
+ else: # Have a prefix, but nothing else. Leave as is
+ prefix = 0
+ else:
+ prefix = 0
+ n_subid = len(subids)
+ include = int(bool(include)) # force integer bool
+ body = struct.pack(">BBBx", n_subid, prefix, include)
+ for subid in subids:
+ body += struct.pack(">I", subid)
+ return body
+
+
+def decode_oid(data, flags):
+ # Need to split off the header to get the subid count
+ header, data = slicedata(data, 4)
+ n_subid, prefix, include = struct.unpack("BBBx", header)
+ if prefix != 0:
+ subids = internetPrefix + (prefix,)
+ else:
+ subids = ()
+ totalLen = len(subids) + n_subid
+ if not (0 <= totalLen <= 128):
+ raise ValueError("OID has too many subids")
+ include = bool(include) # could be anything, force bool
+ # Now have the count, need to split the subids from the rest of the packet
+ byteCount = n_subid * 4
+ data, rest = slicedata(data, byteCount)
+ endianToken = getendian(flags)
+ formatString = endianToken + ("I" * n_subid)
+ subids += struct.unpack(formatString, data)
+ return ((subids, include), rest)
+
+
+def encode_octetstr(octets):
+ numoctets = len(octets)
+ header = struct.pack(">I", numoctets)
+ pad = (numoctets % 4)
+ if pad > 0: # Pad out the data to word boundary
+ pad = 4 - pad
+ pad = "\x00" * pad
+ if type(octets) is str:
+ data = header + octets + pad
+ else:
+ fmt = "B" * numoctets
+ data = struct.pack(fmt, *octets)
+ data = header + data + pad
+ return data
+
+
+def decode_octetstr(data, flags):
+ header, data = slicedata(data, 4)
+ endianToken = getendian(flags)
+ numoctets = struct.unpack(endianToken + "I", header)[0]
+ if len(data) < numoctets:
+ raise ValueError("Octet string shorter than length")
+ pad = numoctets % 4
+ if pad > 0: # Pad out the data to word boundary
+ pad = 4 - pad
+ return data[:numoctets], data[numoctets + pad:]
+
+
+def encode_varbind(valType, oid, *payload):
+ if valType not in definedValueTypes.keys():
+ raise ValueError("Value type %s not in defined types" % valType)
+ header = struct.pack(">Hxx", valType)
+ name = encode_oid(oid, False)
+ handlers = definedValueTypes[valType]
+ if handlers is None:
+ data = header + name
+ else:
+ data = header + name + handlers[0](*payload)
+ return data
+
+
+def decode_varbind(data, flags):
+ header, data = slicedata(data, 4)
+ endianToken = getendian(flags)
+ valType = struct.unpack(endianToken + "Hxx", header)[0]
+ (name, _), data = decode_oid(data, flags)
+ if valType not in definedValueTypes.keys():
+ raise ValueError("Value type %s not in defined types" % valType)
+ handlers = definedValueTypes[valType]
+ payload, data = handlers[1](data, flags)
+ result = (valType, name, payload)
+ return result, data
+
+
+def encode_integer32(num):
+ return struct.pack(">I", num)
+
+
+def decode_integer32(data, flags):
+ endianToken = getendian(flags)
+ num, data = slicedata(data, 4)
+ num = struct.unpack(endianToken + "I", num)[0]
+ return (num, data)
+
+
+def encode_nullvalue():
+ return ""
+
+
+def decode_nullvalue(data, flags):
+ return (None, data)
+
+
+def encode_integer64(num):
+ return struct.pack(">Q", num)
+
+
+def decode_integer64(data, flags):
+ endianToken = getendian(flags)
+ num, data = slicedata(data, 8)
+ num = struct.unpack(endianToken + "Q", num)[0]
+ return (num, data)
+
+
+def encode_ipaddr(octets):
+ if len(octets) != 4:
+ raise ValueError("IP Address incorrect length")
+ return encode_octetstr(octets)
+
+
+def decode_ipaddr(data, flags):
+ addr, data = decode_octetstr(data, flags)
+ addr = struct.unpack("BBBB", addr)
+ return addr, data
+
+
+def encode_searchrange(startOID, endOID, inclusiveP):
+ startOIDstr = encode_oid(startOID, inclusiveP)
+ endOIDstr = encode_oid(endOID, False)
+ return startOIDstr + endOIDstr
+
+
+def decode_searchrange(data, flags):
+ startOID, data = decode_oid(data, flags)
+ endOID, data = decode_oid(data, flags)
+ return (startOID, endOID, data)
+
+
+def encode_searchrange_list(oidranges):
+ encoded = []
+ for oran in oidranges:
+ encoded.append(encode_searchrange(*oran))
+ encoded.append(encode_oid(tuple(), False))
+ encoded = "".join(encoded)
+ return encoded
+
+
+def decode_searchrange_list(data, flags):
+ oidranges = []
+ while True:
+ one, data = decode_oid(data, flags)
+ if isnullOID(one):
+ break
+ two, data = decode_oid(data, flags)
+ oidranges.append((one[0], two[0], one[1])) # oid, oid, inclusive
+ return tuple(oidranges), data
+
+
+# =========================================
+# Utilities, glue, and misc
+# =========================================
+
+def getendian(flags):
+ return ">" if flags["bigEndian"] is True else "<"
+
+
+def slicedata(data, slicepoint):
+ return data[:slicepoint], data[slicepoint:]
+
+
+def isnullOID(oid):
+ if (len(oid[0]) == 0) and (oid[1] is False):
+ return True
+ return False
+
+
+def encode_varbindlist(varbinds):
+ payload = ""
+ for varbind in varbinds:
+ payload += encode_varbind(*varbind)
+ return payload
+
+
+def decode_varbindlist(data, flags):
+ if len(data) > 0:
+ varbinds = []
+ while len(data) > 0:
+ vb, data = decode_varbind(data, flags)
+ vb = tuple(vb)
+ varbinds.append(vb)
+ varbinds = tuple(varbinds)
+ else:
+ varbinds = None
+ return varbinds
+
+
+def encode_pduheader(pduType, instanceRegistration, newIndex,
+ anyIndex, nonDefaultContext, sessionID,
+ transactionID, packetID, payloadLength):
+ # version type flags reserved
+ # sessionID
+ # transactionID
+ # packetID
+ # payload_length
+ if pduType not in definedPDUTypes:
+ raise ValueError("PDU type %s not in defined types" % pduType)
+ flags = 0
+ flags |= instanceRegistration
+ flags |= newIndex << 1
+ flags |= anyIndex << 2
+ flags |= nonDefaultContext << 3
+ flags |= True << 4 # network byte order, we always send big endian
+ data = struct.pack(">BBBxIIII", 1, pduType, flags,
+ sessionID,
+ transactionID,
+ packetID,
+ payloadLength)
+ return data
+
+
+def decode_pduheader(data): # Endianess is controlled from the PDU header
+ lineone, data = slicedata(data, 4)
+ version, pduType, flags = struct.unpack(">BBBx", lineone)
+ if pduType not in definedPDUTypes:
+ raise ValueError("PDU type %s not in defined types" % pduType)
+ # Slice up the flags
+ flagDict = {}
+ flagDict["instReg"] = bool(flags & 0x1)
+ flagDict["newIndex"] = bool(flags & 0x2)
+ flagDict["anyIndex"] = bool(flags & 0x4)
+ flagDict["contextP"] = bool(flags & 0x8)
+ flagDict["bigEndian"] = bool(flags & 0x10)
+ # Chop the remaining parts of the header from the rest of the datastream
+ # then parse them
+ fmt = getendian(flagDict) + "IIII"
+ linen, data = slicedata(data, 16) # 4 x 4-byte variables
+ sID, tactionID, pktID, dataLen = struct.unpack(fmt, linen)
+ return (version, pduType, flagDict, sID, tactionID, pktID, dataLen)
+
+
+def encode_context(context):
+ if context is not None:
+ contextP = True
+ payload = encode_octetstr(context)
+ else:
+ contextP = False
+ payload = ""
+ return (contextP, payload)
+
+
+def decode_context(data, flags):
+ if flags["contextP"] is True:
+ context, data = decode_octetstr(data, flags)
+ else:
+ context = None
+ return (context, data)
+
+
+def decode_packet(data):
+ header, data = slicedata(data, 20)
+ header = decode_pduheader(header)
+ version, pduType, flags, sID, tactID, pktID, dataLen = header
+ if version != 1:
+ raise ValueError("Unknown packet version", version)
+ if pduType not in definedPDUTypes.keys():
+ raise ValueError("PDU type %s not in defined types" % pduType)
+ headerData = (pduType, flags, sID, tactID, pktID)
+ decoder = definedPDUTypes[pduType]
+ if decoder is None:
+ parsedPkt = None
+ else:
+ packetSlice, data = slicedata(data, dataLen)
+ parsedPkt = decoder(packetSlice, flags)
+ return headerData + (parsedPkt,)
+
+
+# Value types
+INTEGER = 2
+OCTET_STR = 4
+NULL = 5
+OID = 6
+IP_ADDR = 64
+COUNTER32 = 65
+GAUGE32 = 66
+TIME_TICKS = 67
+OPAQUE = 68
+COUNTER64 = 70
+NO_SUCH_OBJECT = 128
+NO_SUCH_INSTANCE = 129
+END_OF_MIB_VIEW = 130
+definedValueTypes = { # Used by the varbind functions
+ INTEGER: (encode_integer32, decode_integer32),
+ OCTET_STR: (encode_octetstr, decode_octetstr),
+ NULL: (encode_nullvalue, decode_nullvalue),
+ OID: (encode_oid, decode_oid),
+ IP_ADDR: (encode_ipaddr, decode_ipaddr),
+ COUNTER32: (encode_integer32, decode_integer32),
+ GAUGE32: (encode_integer32, decode_integer32),
+ TIME_TICKS: (encode_integer32, decode_integer32),
+ OPAQUE: (encode_octetstr, decode_octetstr),
+ COUNTER64: (encode_integer64, decode_integer64),
+ NO_SUCH_OBJECT: (encode_nullvalue, decode_nullvalue),
+ NO_SUCH_INSTANCE: (encode_nullvalue, decode_nullvalue),
+ END_OF_MIB_VIEW: (encode_nullvalue, decode_nullvalue)}
+
+# PDU types
+PDU_OPEN = 1
+PDU_CLOSE = 2
+PDU_REGISTER = 3
+PDU_UNREGISTER = 4
+PDU_GET = 5
+PDU_GET_NEXT = 6
+PDU_GET_BULK = 7
+PDU_TEST_SET = 8
+PDU_COMMIT_SET = 9
+PDU_UNDO_SET = 10
+PDU_CLEANUP_SET = 11
+PDU_NOTIFY = 12
+PDU_PING = 13
+PDU_INDEX_ALLOC = 14
+PDU_INDEX_DEALLOC = 15
+PDU_ADD_AGENT_CAPS = 16
+PDU_RM_AGENT_CAPS = 17
+PDU_RESPONSE = 18
+definedPDUTypes = { # Used by the decode_packet function
+ PDU_OPEN: decode_openpdu,
+ PDU_CLOSE: decode_closepdu,
+ PDU_REGISTER: decode_registerpdu,
+ PDU_UNREGISTER: decode_unregisterpdu,
+ PDU_GET: decode_getpdu,
+ PDU_GET_NEXT: decode_getnextpdu,
+ PDU_GET_BULK: decode_getbulkpdu,
+ PDU_TEST_SET: decode_testsetpdu,
+ PDU_COMMIT_SET: None,
+ PDU_UNDO_SET: None,
+ PDU_CLEANUP_SET: None,
+ PDU_NOTIFY: decode_notifypdu,
+ PDU_PING: decode_pingpdu,
+ PDU_INDEX_ALLOC: decode_indexallocpdu,
+ PDU_INDEX_DEALLOC: decode_indexdeallocpdu,
+ PDU_ADD_AGENT_CAPS: decode_addagentcapspdu,
+ PDU_RM_AGENT_CAPS: decode_rmagentcapspdu,
+ PDU_RESPONSE: decode_responsepdu}
+
+# Closing reasons
+RSN_OTHER = 1
+RSN_PARSE_ERROR = 2
+RSN_PROTOCOL_ERROR = 3
+RSN_TIMEOUT = 4
+RSN_SHUTDOWN = 5
+RSN_BY_MANAGER = 6
+definedReasons = (RSN_OTHER, RSN_PARSE_ERROR, RSN_PROTOCOL_ERROR,
+ RSN_TIMEOUT, RSN_SHUTDOWN, RSN_BY_MANAGER)
=====================================
tests/pylib/test_agentx.py
=====================================
--- /dev/null
+++ b/tests/pylib/test_agentx.py
@@ -0,0 +1,1684 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import unittest
+import ntp.agentx
+
+extraData = "Would you kindly ignore this?"
+
+maximumOIDsubs = tuple(range(1, 129))
+maximumOIDstr = "\x80\x00\x00\x00\
+\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00\x04\
+\x00\x00\x00\x05\x00\x00\x00\x06\x00\x00\x00\x07\x00\x00\x00\x08\
+\x00\x00\x00\x09\x00\x00\x00\x0A\x00\x00\x00\x0B\x00\x00\x00\x0C\
+\x00\x00\x00\x0D\x00\x00\x00\x0E\x00\x00\x00\x0F\x00\x00\x00\x10\
+\
+\x00\x00\x00\x11\x00\x00\x00\x12\x00\x00\x00\x13\x00\x00\x00\x14\
+\x00\x00\x00\x15\x00\x00\x00\x16\x00\x00\x00\x17\x00\x00\x00\x18\
+\x00\x00\x00\x19\x00\x00\x00\x1A\x00\x00\x00\x1B\x00\x00\x00\x1C\
+\x00\x00\x00\x1D\x00\x00\x00\x1E\x00\x00\x00\x1F\x00\x00\x00\x20\
+\
+\x00\x00\x00\x21\x00\x00\x00\x22\x00\x00\x00\x23\x00\x00\x00\x24\
+\x00\x00\x00\x25\x00\x00\x00\x26\x00\x00\x00\x27\x00\x00\x00\x28\
+\x00\x00\x00\x29\x00\x00\x00\x2A\x00\x00\x00\x2B\x00\x00\x00\x2C\
+\x00\x00\x00\x2D\x00\x00\x00\x2E\x00\x00\x00\x2F\x00\x00\x00\x30\
+\
+\x00\x00\x00\x31\x00\x00\x00\x32\x00\x00\x00\x33\x00\x00\x00\x34\
+\x00\x00\x00\x35\x00\x00\x00\x36\x00\x00\x00\x37\x00\x00\x00\x38\
+\x00\x00\x00\x39\x00\x00\x00\x3A\x00\x00\x00\x3B\x00\x00\x00\x3C\
+\x00\x00\x00\x3D\x00\x00\x00\x3E\x00\x00\x00\x3F\x00\x00\x00\x40\
+\
+\x00\x00\x00\x41\x00\x00\x00\x42\x00\x00\x00\x43\x00\x00\x00\x44\
+\x00\x00\x00\x45\x00\x00\x00\x46\x00\x00\x00\x47\x00\x00\x00\x48\
+\x00\x00\x00\x49\x00\x00\x00\x4A\x00\x00\x00\x4B\x00\x00\x00\x4C\
+\x00\x00\x00\x4D\x00\x00\x00\x4E\x00\x00\x00\x4F\x00\x00\x00\x50\
+\
+\x00\x00\x00\x51\x00\x00\x00\x52\x00\x00\x00\x53\x00\x00\x00\x54\
+\x00\x00\x00\x55\x00\x00\x00\x56\x00\x00\x00\x57\x00\x00\x00\x58\
+\x00\x00\x00\x59\x00\x00\x00\x5A\x00\x00\x00\x5B\x00\x00\x00\x5C\
+\x00\x00\x00\x5D\x00\x00\x00\x5E\x00\x00\x00\x5F\x00\x00\x00\x60\
+\
+\x00\x00\x00\x61\x00\x00\x00\x62\x00\x00\x00\x63\x00\x00\x00\x64\
+\x00\x00\x00\x65\x00\x00\x00\x66\x00\x00\x00\x67\x00\x00\x00\x68\
+\x00\x00\x00\x69\x00\x00\x00\x6A\x00\x00\x00\x6B\x00\x00\x00\x6C\
+\x00\x00\x00\x6D\x00\x00\x00\x6E\x00\x00\x00\x6F\x00\x00\x00\x70\
+\
+\x00\x00\x00\x71\x00\x00\x00\x72\x00\x00\x00\x73\x00\x00\x00\x74\
+\x00\x00\x00\x75\x00\x00\x00\x76\x00\x00\x00\x77\x00\x00\x00\x78\
+\x00\x00\x00\x79\x00\x00\x00\x7A\x00\x00\x00\x7B\x00\x00\x00\x7C\
+\x00\x00\x00\x7D\x00\x00\x00\x7E\x00\x00\x00\x7F\x00\x00\x00\x80\
+"
+
+# The most commonly used flag setups, some tests use custom flags
+standardFlags = {"instReg": False,
+ "newIndex": False,
+ "anyIndex": False,
+ "contextP": False,
+ "bigEndian": True}
+lilEndianFlags = {"instReg": False,
+ "newIndex": False,
+ "anyIndex": False,
+ "contextP": False,
+ "bigEndian": False}
+contextFlags = {"instReg": False,
+ "newIndex": False,
+ "anyIndex": False,
+ "contextP": True,
+ "bigEndian": True}
+
+
+def makeFlags(iR, nI, aI, cP, bE):
+ return {"instReg": iR,
+ "newIndex": nI,
+ "anyIndex": aI,
+ "contextP": cP,
+ "bigEndian": bE}
+
+
+class TestNtpclientsNtpsnmpd(unittest.TestCase):
+ #
+ # PDU tests
+ #
+ def test_encode_openpdu(self):
+ f = ntp.agentx.encode_openpdu
+
+ # Test null packet
+ self.assertEqual(f(0, 0, 0, 0, (), ""),
+ "\x01\x01\x10\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x0C"
+ "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
+ # Test basic packet
+ self.assertEqual(f(12, 34, 56, 78, (1, 2, 3, 4), "foo"),
+ "\x01\x01\x10\x00"
+ "\x00\x00\x00\x0C\x00\x00\x00\x22"
+ "\x00\x00\x00\x38\x00\x00\x00\x20"
+ "\x4E\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x03foo\x00")
+
+ def test_decode_openpdu(self):
+ f = ntp.agentx.decode_openpdu
+
+ # Test null packet
+ self.assertEqual(f("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
+ standardFlags),
+ (0, ((), False), ""))
+ # Test basic packet
+ self.assertEqual(f("\x4E\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x03foo\x00", standardFlags),
+ (78, ((1, 2, 3, 4), False), "foo"))
+ # Test basic packet, little endian
+ self.assertEqual(f("\x4E\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x03\x00\x00\x00foo\x00", lilEndianFlags),
+ (78, ((1, 2, 3, 4), False), "foo"))
+
+ def test_encode_closepdu(self):
+ f = ntp.agentx.encode_closepdu
+ a = ntp.agentx
+
+ # Test
+ self.assertEqual(f(1, 2, 3, a.RSN_OTHER),
+ "\x01\x02\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x01\x00\x00\x00")
+
+ def test_decode_closepdu(self):
+ f = ntp.agentx.decode_closepdu
+
+ # Test
+ self.assertEqual(f("\x01\x00\x00\x00", standardFlags), 1)
+
+ def test_encode_registerpdu(self):
+ f = ntp.agentx.encode_registerpdu
+
+ # Test basic packet
+ self.assertEqual(f(1, 2, 3, 4, 5, (1, 2, 3)),
+ "\x01\x03\x11\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x14"
+ "\x04\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03")
+ # Test with context
+ self.assertEqual(f(1, 2, 3, 4, 5, (1, 2, 3), context="blah"),
+ "\x01\x03\x19\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x1C"
+ "\x00\x00\x00\x04blah"
+ "\x04\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03")
+ # Test with range
+ self.assertEqual(f(1, 2, 3, 4, 5, (1, 2, 3),
+ rangeSubid=5, upperBound=23),
+ "\x01\x03\x11\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x18"
+ "\x04\x05\x05\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x17")
+ # Test with context and range
+ self.assertEqual(f(1, 2, 3, 4, 5, (1, 2, 3), context="blah",
+ rangeSubid=5, upperBound=23),
+ "\x01\x03\x19\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x20"
+ "\x00\x00\x00\x04blah"
+ "\x04\x05\x05\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x17")
+
+ def test_decode_registerpdu(self):
+ f = ntp.agentx.decode_registerpdu
+
+ # Test basic packet
+ self.assertEqual(f("\x04\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03", standardFlags),
+ (((1, 2, 3), False), 4, 5, None, 0, None))
+ # Test basic, little endian
+ self.assertEqual(f("\x04\x05\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00", lilEndianFlags),
+ (((1, 2, 3), False), 4, 5, None, 0, None))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah\x04\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03", contextFlags),
+ (((1, 2, 3), False), 4, 5, "blah", 0, None))
+ # Test with range
+ self.assertEqual(f("\x04\x05\x05\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x17", standardFlags),
+ (((1, 2, 3), False), 4, 5, None, 5, 23))
+ # Test with context and range
+ self.assertEqual(f("\x00\x00\x00\x04blah\x04\x05\x05\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x17", contextFlags),
+ (((1, 2, 3), False), 4, 5, "blah", 5, 23))
+
+ def test_encode_unregisterpdu(self):
+ f = ntp.agentx.encode_unregisterpdu
+
+ # Test basic packet
+ self.assertEqual(f(1, 2, 3, 5, (1, 2, 3)),
+ "\x01\x04\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x14"
+ "\x00\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03")
+ # Test with context
+ self.assertEqual(f(1, 2, 3, 5, (1, 2, 3), context="blah"),
+ "\x01\x04\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x1C"
+ "\x00\x00\x00\x04blah"
+ "\x00\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03")
+ # Test with range
+ self.assertEqual(f(1, 2, 3, 5, (1, 2, 3),
+ rangeSubid=5, upperBound=23),
+ "\x01\x04\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x18"
+ "\x00\x05\x05\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x17")
+ # Test with context and range
+ self.assertEqual(f(1, 2, 3, 5, (1, 2, 3), context="blah",
+ rangeSubid=5, upperBound=23),
+ "\x01\x04\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x20"
+ "\x00\x00\x00\x04blah"
+ "\x00\x05\x05\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x17")
+
+ def test_decode_unregisterpdu(self):
+ f = ntp.agentx.decode_unregisterpdu
+
+ # Test basic packet
+ self.assertEqual(f("\x00\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03", standardFlags),
+ (((1, 2, 3), False), 5, None, 0, None))
+ # Test basic, little endian
+ self.assertEqual(f("\x00\x05\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00", lilEndianFlags),
+ (((1, 2, 3), False), 5, None, 0, None))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah\x00\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03", contextFlags),
+ (((1, 2, 3), False), 5, "blah", 0, None))
+ # Test with range
+ self.assertEqual(f("\x04\x05\x05\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x17", standardFlags),
+ (((1, 2, 3), False), 5, None, 5, 23))
+ # Test with context and range
+ self.assertEqual(f("\x00\x00\x00\x04blah\x04\x05\x05\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x17", contextFlags),
+ (((1, 2, 3), False), 5, "blah", 5, 23))
+
+ def test_encode_getpdu(self):
+ f = ntp.agentx.encode_getpdu
+
+ # Test null
+ self.assertEqual(f(1, 2, 3, ()),
+ "\x01\x05\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x00")
+ # Test basic
+ self.assertEqual(f(1, 2, 3,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True))),
+ "\x01\x05\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x3C"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x05"
+ "\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
+ "\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
+ "\x00\x00\x00\x00")
+ # Test with context
+ self.assertEqual(f(1, 2, 3,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True)),
+ context="blah"),
+ "\x01\x05\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x44"
+ "\x00\x00\x00\x04blah"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x05"
+ "\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
+ "\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
+ "\x00\x00\x00\x00")
+
+ def test_decode_getpdu(self):
+ f = ntp.agentx.decode_getpdu
+
+ # Test null
+ self.assertEqual(f("\x00\x00\x00\x00", standardFlags), (None, ()))
+ # Test basic
+ self.assertEqual(f(
+ "\x03\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x05"
+ "\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
+ "\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
+ "\x00\x00\x00\x00", standardFlags),
+ (None,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True))))
+ # Test basic, little endian
+ self.assertEqual(f(
+ "\x03\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x05\x00\x00\x00"
+ "\x02\x00\x01\x00\x0A\x00\x00\x00\x14\x00\x00\x00"
+ "\x02\x00\x00\x00\x1E\x00\x00\x00\x28\x00\x00\x00"
+ "\x00\x00\x00\x00", lilEndianFlags),
+ (None,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True))))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x05"
+ "\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
+ "\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
+ "\x00\x00\x00\x00", contextFlags),
+ ("blah",
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True))))
+
+ def test_encode_getnextpdu(self):
+ f = ntp.agentx.encode_getnextpdu
+
+ # Test null
+ self.assertEqual(f(1, 2, 3, ()),
+ "\x01\x06\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x00")
+ # Test basic
+ self.assertEqual(f(1, 2, 3,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True))),
+ "\x01\x06\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x3C"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x05"
+ "\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
+ "\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
+ "\x00\x00\x00\x00")
+ # Test with context
+ self.assertEqual(f(1, 2, 3,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True)),
+ context="blah"),
+ "\x01\x06\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x44"
+ "\x00\x00\x00\x04blah"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x05"
+ "\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
+ "\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
+ "\x00\x00\x00\x00")
+
+ def test_decode_getnextpdu(self):
+ f = ntp.agentx.decode_getnextpdu
+
+ # Test null
+ self.assertEqual(f("\x00\x00\x00\x00", standardFlags), (None, ()))
+ # Test basic
+ self.assertEqual(f(
+ "\x03\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x05"
+ "\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
+ "\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
+ "\x00\x00\x00\x00", standardFlags),
+ (None,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True))))
+ # Test basic, little endian
+ self.assertEqual(f(
+ "\x03\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x05\x00\x00\x00"
+ "\x02\x00\x01\x00\x0A\x00\x00\x00\x14\x00\x00\x00"
+ "\x02\x00\x00\x00\x1E\x00\x00\x00\x28\x00\x00\x00"
+ "\x00\x00\x00\x00", lilEndianFlags),
+ (None,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True))))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x05"
+ "\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
+ "\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
+ "\x00\x00\x00\x00", contextFlags),
+ ("blah",
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True))))
+
+ def test_encode_getbulkpdu(self):
+ f = ntp.agentx.encode_getbulkpdu
+
+ # Test basic
+ self.assertEqual(f(1, 2, 3, 1, 5,
+ (((1, 2), (3, 4), False),
+ ((6, 7), (8, 9), True))),
+ "\x01\x07\x10\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x38"
+ "\x00\x01\x00\x05"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x02\x00\x01\x00\x00\x00\x00\x06\x00\x00\x00\x07"
+ "\x02\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x09"
+ "\x00\x00\x00\x00")
+ # Test with context
+ self.assertEqual(f(1, 2, 3, 1, 5,
+ (((1, 2), (3, 4), False),
+ ((6, 7), (8, 9), True)),
+ context="blah"),
+ "\x01\x07\x18\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x00\x00\x04blah"
+ "\x00\x01\x00\x05"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x02\x00\x01\x00\x00\x00\x00\x06\x00\x00\x00\x07"
+ "\x02\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x09"
+ "\x00\x00\x00\x00")
+
+ def test_decode_getbulkpdu(self):
+ f = ntp.agentx.decode_getbulkpdu
+
+ # Test basic
+ self.assertEqual(f("\x00\x01\x00\x05"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x02\x00\x01\x00\x00\x00\x00\x06\x00\x00\x00\x07"
+ "\x02\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x09"
+ "\x00\x00\x00\x00", standardFlags),
+ (None, 1, 5,
+ (((1, 2), (3, 4), False),
+ ((6, 7), (8, 9), True))))
+ # Test basic, little endian
+ self.assertEqual(f("\x01\x00\x05\x00"
+ "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x02\x00\x01\x00\x06\x00\x00\x00\x07\x00\x00\x00"
+ "\x02\x00\x00\x00\x08\x00\x00\x00\x09\x00\x00\x00"
+ "\x00\x00\x00\x00", lilEndianFlags),
+ (None, 1, 5,
+ (((1, 2), (3, 4), False),
+ ((6, 7), (8, 9), True))))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah"
+ "\x00\x01\x00\x05"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x02\x00\x01\x00\x00\x00\x00\x06\x00\x00\x00\x07"
+ "\x02\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x09"
+ "\x00\x00\x00\x00", contextFlags),
+ ("blah", 1, 5,
+ (((1, 2), (3, 4), False),
+ ((6, 7), (8, 9), True))))
+
+ def test_encode_testsetpdu(self):
+ f = ntp.agentx.encode_testsetpdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f(1, 2, 3,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah"))),
+ "\x01\x08\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+ # Test with context
+ self.assertEqual(f(1, 2, 3,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")), context="blah"),
+ "\x01\x08\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x48"
+ "\x00\x00\x00\x04blah"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+
+ def test_decode_testsetpdu(self):
+ f = ntp.agentx.decode_testsetpdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f("\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", standardFlags),
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+ # Test, little endian
+ self.assertEqual(f("\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah", lilEndianFlags),
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", contextFlags),
+ ("blah",
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+
+ def test_encode_commitsetpdu(self):
+ f = ntp.agentx.encode_commitsetpdu
+
+ # Test
+ self.assertEqual(f(1, 2, 3), "\x01\x09\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x00")
+
+ def test_encode_undosetpdu(self):
+ f = ntp.agentx.encode_undosetpdu
+
+ # Test
+ self.assertEqual(f(1, 2, 3), "\x01\x0A\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x00")
+
+ def test_encode_cleanupsetpdu(self):
+ f = ntp.agentx.encode_cleanupsetpdu
+
+ # Test
+ self.assertEqual(f(1, 2, 3), "\x01\x0B\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x00")
+
+ def test_encode_pingpdu(self):
+ f = ntp.agentx.encode_pingpdu
+
+ # Test
+ self.assertEqual(f(1, 2, 3), "\x01\x0D\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x00")
+ # Test with context
+ self.assertEqual(f(1, 2, 3, "blah"), "\x01\x0D\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x08"
+ "\x00\x00\x00\x04blah")
+
+ def test_decode_pingpdu(self):
+ f = ntp.agentx.decode_pingpdu
+
+ # Test
+ self.assertEqual(f("", standardFlags), None)
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah", contextFlags), "blah")
+
+ def test_encode_notifypdu(self):
+ f = ntp.agentx.encode_notifypdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f(1, 2, 3,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah"))),
+ "\x01\x0C\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+ # Test with context
+ self.assertEqual(f(1, 2, 3,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")), context="blah"),
+ "\x01\x0C\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x48"
+ "\x00\x00\x00\x04blah"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+
+ def test_decode_notifypdu(self):
+ f = ntp.agentx.decode_notifypdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f("\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", standardFlags),
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+ # Test, little endian
+ self.assertEqual(f("\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah", lilEndianFlags),
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", contextFlags),
+ ("blah",
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+
+ def test_encode_indexallocpdu(self):
+ f = ntp.agentx.encode_indexallocpdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f(1, 2, 3, True, True,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah"))),
+ "\x01\x0E\x16\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+ # Test with context
+ self.assertEqual(f(1, 2, 3, True, True,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")), context="blah"),
+ "\x01\x0E\x1E\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x48"
+ "\x00\x00\x00\x04blah"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+
+ def test_decode_indexallocpdu(self):
+ f = ntp.agentx.decode_indexallocpdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f("\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", standardFlags),
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+ # Test, little endian
+ self.assertEqual(f("\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah", lilEndianFlags),
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", contextFlags),
+ ("blah",
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+
+ def test_encode_indexdeallocpdu(self):
+ f = ntp.agentx.encode_indexdeallocpdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f(1, 2, 3, True, True,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah"))),
+ "\x01\x0F\x16\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+ # Test with context
+ self.assertEqual(f(1, 2, 3, True, True,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")), context="blah"),
+ "\x01\x0F\x1E\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x48"
+ "\x00\x00\x00\x04blah"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+
+ def test_decode_indexdeallocpdu(self):
+ f = ntp.agentx.decode_indexdeallocpdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f("\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", standardFlags),
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+ # Test, little endian
+ self.assertEqual(f("\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah", lilEndianFlags),
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04blah"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", contextFlags),
+ ("blah",
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+
+ def test_encode_addagentcapspdu(self):
+ f = ntp.agentx.encode_addagentcapspdu
+
+ # Test
+ self.assertEqual(f(1, 2, 3, (4, 5, 6), "blah"),
+ "\x01\x10\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x18"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x04blah")
+ # Test with context
+ self.assertEqual(f(1, 2, 3, (4, 5, 6), "blah", context="bluh"),
+ "\x01\x10\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x20"
+ "\x00\x00\x00\x04bluh"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x04blah")
+
+ def test_decode_addagentcapspdu(self):
+ f = ntp.agentx.decode_addagentcapspdu
+
+ # Test
+ self.assertEqual(f("\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x04blah", standardFlags),
+ (None, (4, 5, 6), "blah"))
+ # Test, little endian
+ self.assertEqual(f("\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00blah", lilEndianFlags),
+ (None, (4, 5, 6), "blah"))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04bluh"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x04blah", contextFlags),
+ ("bluh", (4, 5, 6), "blah"))
+
+ def test_encode_rmagentcapspdu(self):
+ f = ntp.agentx.encode_rmagentcapspdu
+
+ # Test
+ self.assertEqual(f(1, 2, 3, (4, 5, 6)),
+ "\x01\x11\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x10"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06")
+ # Test with context
+ self.assertEqual(f(1, 2, 3, (4, 5, 6), context="bluh"),
+ "\x01\x11\x18\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x18"
+ "\x00\x00\x00\x04bluh"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06")
+
+ def test_decode_rmagentcapspdu(self):
+ f = ntp.agentx.decode_rmagentcapspdu
+
+ # Test
+ self.assertEqual(f("\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06", standardFlags),
+ (None, (4, 5, 6)))
+ # Test, little endian
+ self.assertEqual(f("\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00", lilEndianFlags),
+ (None, (4, 5, 6)))
+ # Test with context
+ self.assertEqual(f("\x00\x00\x00\x04bluh"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06", contextFlags),
+ ("bluh", (4, 5, 6)))
+
+ def test_encode_responsepdu(self):
+ f = ntp.agentx.encode_responsepdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f(1, 2, 3, 4, 5, 6),
+ "\x01\x12\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x08"
+ "\x00\x00\x00\x04\x00\x05\x00\x06")
+ # Test with varbinds
+ self.assertEqual(f(1, 2, 3, 4, 5, 6,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah"))),
+ "\x01\x12\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x48"
+ "\x00\x00\x00\x04\x00\x05\x00\x06"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah")
+
+ def test_decode_responsepdu(self):
+ f = ntp.agentx.decode_responsepdu
+ x = ntp.agentx
+
+ # Test
+ self.assertEqual(f("\x00\x00\x00\x04\x00\x05\x00\x06", standardFlags),
+ (4, 5, 6, None))
+ # Test, little endian
+ self.assertEqual(f("\x04\x00\x00\x00\x05\x00\x06\x00", lilEndianFlags),
+ (4, 5, 6, None))
+ # Test with varbinds
+ self.assertEqual(f("\x00\x00\x00\x04\x00\x05\x00\x06"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah", standardFlags),
+ (4, 5, 6,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah"))))
+
+ #
+ # Data type tests
+ #
+ def test_encode_integer32(self):
+ f = ntp.agentx.encode_integer32
+
+ # Test
+ self.assertEqual(f(42), "\x00\x00\x00\x2A")
+
+ def test_decode_integer32(self):
+ f = ntp.agentx.decode_integer32
+
+ # Test
+ self.assertEqual(f("\x00\x00\x00\x2A" + extraData, standardFlags),
+ (42, extraData))
+ # Test little endian
+ self.assertEqual(f("\x2A\x00\x00\x00" + extraData, lilEndianFlags),
+ (42, extraData))
+
+ def test_encode_nullvalue(self):
+ f = ntp.agentx.encode_nullvalue
+
+ # Test
+ self.assertEqual(f(), "")
+
+ def test_decode_nullvalue(self):
+ f = ntp.agentx.decode_nullvalue
+
+ # Test
+ self.assertEqual(f(extraData, standardFlags), (None, extraData))
+
+ def test_encode_integer64(self):
+ f = ntp.agentx.encode_integer64
+
+ # Test
+ self.assertEqual(f(42), "\x00\x00\x00\x00\x00\x00\x00\x2A")
+
+ def test_decode_integer64(self):
+ f = ntp.agentx.decode_integer64
+
+ # Test
+ self.assertEqual(f("\x00\x00\x00\x00\x00\x00\x00\x2A" + extraData,
+ standardFlags),
+ (42, extraData))
+ # Test
+ self.assertEqual(f("\x2A\x00\x00\x00\x00\x00\x00\x00" + extraData,
+ lilEndianFlags),
+ (42, extraData))
+
+ def test_encode_ipaddr(self):
+ f = ntp.agentx.encode_ipaddr
+
+ # Test correct
+ self.assertEqual(f((1, 2, 3, 4)),
+ "\x00\x00\x00\x04\x01\x02\x03\x04")
+ # Test incorrect
+ try:
+ f((1, 2, 3, 4, 5))
+ errored = False
+ except ValueError:
+ errored = True
+ self.assertEqual(errored, True)
+
+ def test_decode_ipaddr(self):
+ f = ntp.agentx.decode_ipaddr
+
+ # Test
+ self.assertEqual(f("\x00\x00\x00\x04\x01\x02\x03\x04" + extraData,
+ standardFlags),
+ ((1, 2, 3, 4), extraData))
+ # Test little endian
+ self.assertEqual(f("\x04\x00\x00\x00\x01\x02\x03\x04" + extraData,
+ lilEndianFlags),
+ ((1, 2, 3, 4), extraData))
+
+ def test_encode_oid(self):
+ f = ntp.agentx.encode_oid
+
+ # Test empty OID
+ self.assertEqual(f((), False), "\x00\x00\x00\x00")
+ # Test basic OID
+ self.assertEqual(f((1, 2, 3, 4, 5), False),
+ "\x05\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x05")
+ # Test prefixed OID
+ self.assertEqual(f((1, 3, 6, 1, 23, 1, 2, 3), False),
+ "\x03\x17\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03")
+ # Test include
+ self.assertEqual(f((1, 2), True),
+ "\x02\x00\x01\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02")
+ # Test together
+ self.assertEqual(f((1, 3, 6, 1, 1, 3, 4, 5, 6), True),
+ "\x04\x01\x01\x00"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06")
+
+ # Test maximum size
+ self.assertEqual(f(maximumOIDsubs, False), maximumOIDstr)
+ # Test over maximum size
+ try:
+ f(maximumOIDsubs + (42,), False)
+ fail = False
+ except ValueError:
+ fail = True
+ self.assertEqual(fail, True)
+
+ def test_decode_oid(self):
+ f = ntp.agentx.decode_oid
+
+ # Test empty OID, extra data
+ self.assertEqual(f("\x00\x00\x00\x00" + extraData, standardFlags),
+ (((), False), extraData))
+ # Test basic OID, extra data
+ self.assertEqual(f("\x05\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x04\x00\x00\x00\x05" + extraData,
+ standardFlags),
+ (((1, 2, 3, 4, 5), False), extraData))
+ # Test basic OID, little endian
+ self.assertEqual(f("\x05\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x04\x00\x00\x00\x05\x00\x00\x00", lilEndianFlags),
+ (((1, 2, 3, 4, 5), False), ""))
+ # Test prefixed OID
+ self.assertEqual(f("\x03\x17\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03", standardFlags),
+ (((1, 3, 6, 1, 23, 1, 2, 3), False), ""))
+ # Test include
+ self.assertEqual(f("\x02\x00\x05\x00\x00\x00\x00\x01\x00\x00\x00\x02",
+ standardFlags),
+ (((1, 2), True), ""))
+ # Test together
+ self.assertEqual(f("\x04\x01\x02\x00\x00\x00\x00\x03"
+ "\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x06",
+ standardFlags),
+ (((1, 3, 6, 1, 1, 3, 4, 5, 6), True), ""))
+ # Test maximum size
+ self.assertEqual(f(maximumOIDstr, standardFlags),
+ ((maximumOIDsubs, False), ""))
+ # Test over maximum size
+ # Need to replace the hardcoded n_subid=128 with 129
+ fatOID = "\x81" + maximumOIDstr[1:] + "\xDE\xAD\xBE\xEF"
+ try:
+ f(fatOID, standardFlags)
+ fail = False
+ except ValueError:
+ fail = True
+ self.assertEqual(fail, True)
+
+ def test_encode_searchrange(self):
+ f = ntp.agentx.encode_searchrange
+
+ # Test minimum size
+ self.assertEqual(f((), (), False), "\x00\x00\x00\x00\x00\x00\x00\x00")
+ # Test inclusive
+ self.assertEqual(f((1, 2, 3, 4), (5, 6, 7, 8), True),
+ "\x04\x00\x01\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x07\x00\x00\x00\x08")
+ # Test exclusive
+ self.assertEqual(f((1, 2, 3, 4), (5, 6, 7, 8), False),
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x07\x00\x00\x00\x08")
+
+ def test_decode_searchrange(self):
+ f = ntp.agentx.decode_searchrange
+
+ # Test minimum size, extra data
+ self.assertEqual(f("\x00\x00\x00\x00\x00\x00\x00\x00" + extraData,
+ standardFlags),
+ (((), False), ((), False), extraData))
+ # Test inclusive
+ self.assertEqual(f("\x04\x00\x01\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x07\x00\x00\x00\x08", standardFlags),
+ (((1, 2, 3, 4), True), ((5, 6, 7, 8), False), ""))
+ # Test exclusive
+ self.assertEqual(f("\x04\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x07\x00\x00\x00\x08", standardFlags),
+ (((1, 2, 3, 4), False), ((5, 6, 7, 8), False), ""))
+ # Test little endian
+ self.assertEqual(f("\x04\x00\x01\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x07\x00\x00\x00\x08\x00\x00\x00", lilEndianFlags),
+ (((1, 2, 3, 4), True), ((5, 6, 7, 8), False), ""))
+
+ def test_encode_searchrange_list(self):
+ f = ntp.agentx.encode_searchrange_list
+
+ # Test
+ self.assertEqual(f((((1, 2), (1, 2), True),
+ ((2, 3), (3, 4), False))),
+ "\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x00")
+
+ def test_decode_searchrange_list(self):
+ f = ntp.agentx.decode_searchrange_list
+
+ # Test
+ self.assertEqual(f("\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x00" + extraData, standardFlags),
+ ((
+ ((1, 2), (1, 2), True),
+ ((2, 3), (3, 4), False)
+ ), extraData))
+ # Test, little endian
+ self.assertEqual(f("\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x00\x00\x00\x00" + extraData, lilEndianFlags),
+ ((
+ ((1, 2), (1, 2), True),
+ ((2, 3), (3, 4), False)
+ ), extraData))
+
+ def test_encode_octetstr(self):
+ f = ntp.agentx.encode_octetstr
+
+ # Test empty
+ self.assertEqual(f(()), "\x00\x00\x00\x00")
+ # Test word multiple
+ self.assertEqual(f((1, 2, 3, 4)), "\x00\x00\x00\x04\x01\x02\x03\x04")
+ # Test non word multiple
+ self.assertEqual(f((1, 2, 3, 4, 5)),
+ "\x00\x00\x00\x05\x01\x02\x03\x04\x05\x00\x00\x00")
+ # Test string
+ self.assertEqual(f("blah"), "\x00\x00\x00\x04blah")
+
+ def test_decode_octetstr(self):
+ f = ntp.agentx.decode_octetstr
+
+ # Test empty
+ self.assertEqual(f("\x00\x00\x00\x00", standardFlags), ("", ""))
+ # Test word multiple, extra data
+ self.assertEqual(f("\x00\x00\x00\x04blah" + extraData, standardFlags),
+ ("blah", extraData))
+ # Test word multiple, little endian
+ self.assertEqual(f("\x04\x00\x00\x00blah", lilEndianFlags),
+ ("blah", ""))
+ # Test non word multiple, extra data
+ self.assertEqual(f("\x00\x00\x00\x05"
+ "blarg\x00\x00\x00" + extraData, standardFlags),
+ ("blarg", extraData))
+
+ def test_encode_varbind(self):
+ f = ntp.agentx.encode_varbind
+ a = ntp.agentx
+
+ # Test payloadless types
+ self.assertEqual(f(a.NULL, (1, 2, 3)),
+ "\x00\x05\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
+ self.assertEqual(f(a.NO_SUCH_OBJECT, (1, 2, 3)),
+ "\x00\x80\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
+ self.assertEqual(f(a.NO_SUCH_INSTANCE, (1, 2, 3)),
+ "\x00\x81\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
+ self.assertEqual(f(a.END_OF_MIB_VIEW, (1, 2, 3)),
+ "\x00\x82\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
+ # Test octet based types
+ self.assertEqual(f(a.OCTET_STR, (1, 2, 3), (1, 2, 3, 4, 5)),
+ "\x00\x04\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x05"
+ "\x01\x02\x03\x04\x05\x00\x00\x00")
+ self.assertEqual(f(a.IP_ADDR, (1, 2, 3), (16, 32, 48, 64)),
+ "\x00\x40\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x04\x10\x20\x30\x40")
+ # Test integer32 types
+ self.assertEqual(f(a.INTEGER, (1, 2, 3), 42),
+ "\x00\x02\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x2A")
+ self.assertEqual(f(a.COUNTER32, (1, 2, 3), 42),
+ "\x00\x41\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x2A")
+ self.assertEqual(f(a.GAUGE32, (1, 2, 3), 42),
+ "\x00\x42\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x2A")
+ self.assertEqual(f(a.TIME_TICKS, (1, 2, 3), 42),
+ "\x00\x43\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x2A")
+ # Test integer64 type
+ self.assertEqual(f(a.COUNTER64, (1, 2, 3), 42),
+ "\x00\x46\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x00\x00\x00\x00\x2A")
+ # Text oid type
+ self.assertEqual(f(a.OID, (1, 2, 3), (16, 42, 256), False),
+ "\x00\x06\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x10"
+ "\x00\x00\x00\x2A\x00\x00\x01\x00")
+
+ def test_decode_varbind(self):
+ f = ntp.agentx.decode_varbind
+ a = ntp.agentx
+
+ # Test payloadless types
+ self.assertEqual(f("\x00\x05\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03",
+ standardFlags),
+ ((a.NULL, (1, 2, 3), None), ""))
+ self.assertEqual(f("\x00\x80\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03",
+ standardFlags),
+ ((a.NO_SUCH_OBJECT, (1, 2, 3), None), ""))
+ self.assertEqual(f("\x00\x81\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03",
+ standardFlags),
+ ((a.NO_SUCH_INSTANCE, (1, 2, 3), None), ""))
+ self.assertEqual(f("\x00\x82\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03",
+ standardFlags),
+ ((a.END_OF_MIB_VIEW, (1, 2, 3), None), ""))
+ # Test octet based types
+ self.assertEqual(f("\x00\x04\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x0512345\x00\x00\x00",
+ standardFlags),
+ ((a.OCTET_STR, (1, 2, 3), "12345"), ""))
+ self.assertEqual(f("\x00\x40\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x04\x10\x20\x30\x40", standardFlags),
+ ((a.IP_ADDR, (1, 2, 3), (16, 32, 48, 64)), ""))
+ # Test integer32 types
+ self.assertEqual(f("\x00\x02\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x2A", standardFlags),
+ ((a.INTEGER, (1, 2, 3), 42), ""))
+ self.assertEqual(f("\x00\x41\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x2A", standardFlags),
+ ((a.COUNTER32, (1, 2, 3), 42), ""))
+ self.assertEqual(f("\x00\x42\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x2A", standardFlags),
+ ((a.GAUGE32, (1, 2, 3), 42), ""))
+ self.assertEqual(f("\x00\x43\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x2A", standardFlags),
+ ((a.TIME_TICKS, (1, 2, 3), 42), ""))
+ # Test integer64 type
+ self.assertEqual(f("\x00\x46\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x00\x00\x00\x00\x2A", standardFlags),
+ ((a.COUNTER64, (1, 2, 3), 42), ""))
+ # Test oid type
+ self.assertEqual(f("\x00\x06\x00\x00\x03\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x10"
+ "\x00\x00\x00\x2A\x00\x00\x01\x00", standardFlags),
+ ((a.OID, (1, 2, 3), ((16, 42, 256), False)), ""))
+ # Test integer32 with little endian
+ self.assertEqual(f("\x43\x00\x00\x00\x03\x00\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x2A\x00\x00\x00", lilEndianFlags),
+ ((a.TIME_TICKS, (1, 2, 3), 42), ""))
+
+ #
+ # Misc tests
+ #
+ def test_getendian(self):
+ f = ntp.agentx.getendian
+
+ # Test big endian
+ self.assertEqual(f(standardFlags), ">")
+ # Test little endian
+ self.assertEqual(f(lilEndianFlags), "<")
+
+ def test_slicedata(self):
+ f = ntp.agentx.slicedata
+
+ # Test
+ self.assertEqual(f("foobaz", 2), ("fo", "obaz"))
+
+ def test_encode_pduheader(self):
+ f = ntp.agentx.encode_pduheader
+ a = ntp.agentx
+
+ # Test "empty" header
+ self.assertEqual(f(a.PDU_OPEN,
+ False, False, False, False,
+ 0xDEADBEEF, 0xCAFEBABE, 0xFACEF00D, 0),
+ "\x01\x01\x10\x00"
+ "\xDE\xAD\xBE\xEF\xCA\xFE\xBA\xBE"
+ "\xFA\xCE\xF0\x0D\x00\x00\x00\x00")
+ # Test flags
+ self.assertEqual(f(a.PDU_OPEN,
+ True, True, True, True,
+ 0xDEADBEEF, 0xCAFEBABE, 0xFACEF00D, 0),
+ "\x01\x01\x1F\x00"
+ "\xDE\xAD\xBE\xEF\xCA\xFE\xBA\xBE"
+ "\xFA\xCE\xF0\x0D\x00\x00\x00\x00")
+
+ def test_decode_pduheader(self):
+ f = ntp.agentx.decode_pduheader
+ a = ntp.agentx
+
+ # Test "empty" header
+ self.assertEqual(f("\x01\x01\x10\x00"
+ "\xDE\xAD\xBE\xEF\xCA\xFE\xBA\xBE"
+ "\xFA\xCE\xF0\x0D\x00\x00\x00\x00"),
+ (1, a.PDU_OPEN, {"instReg": False,
+ "newIndex": False,
+ "anyIndex": False,
+ "contextP": False,
+ "bigEndian": True},
+ 0xDEADBEEF, 0xCAFEBABE, 0xFACEF00D, 0), "")
+ # Test "empty" header, little endian
+ self.assertEqual(f("\x01\x01\x00\x00"
+ "\xEF\xBE\xAD\xDE\xBE\xBA\xFE\xCA"
+ "\x0D\xF0\xCE\xFA\x00\x00\x00\x00"),
+ (1, a.PDU_OPEN, {"instReg": False,
+ "newIndex": False,
+ "anyIndex": False,
+ "contextP": False,
+ "bigEndian": False},
+ 0xDEADBEEF, 0xCAFEBABE, 0xFACEF00D, 0), "")
+ # Test "empty" header, extra data
+ self.assertEqual(f("\x01\x01\x10\x00"
+ "\xDE\xAD\xBE\xEF\xCA\xFE\xBA\xBE"
+ "\xFA\xCE\xF0\x0D\x00\x00\x00\x00" + extraData),
+ (1, a.PDU_OPEN, {"instReg": False,
+ "newIndex": False,
+ "anyIndex": False,
+ "contextP": False,
+ "bigEndian": True},
+ 0xDEADBEEF, 0xCAFEBABE, 0xFACEF00D, 0), extraData)
+ # Test flags
+ self.assertEqual(f("\x01\x01\x1F\x00"
+ "\xDE\xAD\xBE\xEF\xCA\xFE\xBA\xBE"
+ "\xFA\xCE\xF0\x0D\x00\x00\x00\x00"),
+ (1, a.PDU_OPEN, {"instReg": True,
+ "newIndex": True,
+ "anyIndex": True,
+ "contextP": True,
+ "bigEndian": True},
+ 0xDEADBEEF, 0xCAFEBABE, 0xFACEF00D, 0), "")
+
+ def test_decode_packet(self):
+ f = ntp.agentx.decode_packet
+ x = ntp.agentx
+ # Not testing all the variants of each packet type, that is
+ # the job of the other tests.
+
+ # Test open
+ self.assertEqual(f("\x01\x01\x10\x00"
+ "\x00\x00\x00\x0C\x00\x00\x00\x22"
+ "\x00\x00\x00\x38\x00\x00\x00\x20"
+ "\x4E\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x03foo\x00"),
+ (x.PDU_OPEN, standardFlags, 12, 34, 56,
+ (78, ((1, 2, 3, 4), False), "foo")))
+ # Test close
+ self.assertEqual(f("\x01\x02\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x01\x00\x00\x00"),
+ (x.PDU_CLOSE, standardFlags, 1, 2, 3,
+ x.RSN_OTHER))
+ # Test register
+ self.assertEqual(f("\x01\x03\x11\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x14"
+ "\x04\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"),
+ (x.PDU_REGISTER,
+ makeFlags(True, False, False, False, True),
+ 1, 2, 3,
+ (((1, 2, 3), False), 4, 5, None, 0, None)))
+ # Test unregister
+ self.assertEqual(f("\x01\x04\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x14"
+ "\x00\x05\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"),
+ (x.PDU_UNREGISTER, standardFlags, 1, 2, 3,
+ (((1, 2, 3), False), 5, None, 0, None)))
+ # Test get
+ self.assertEqual(f("\x01\x05\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x00"),
+ (x.PDU_GET, standardFlags, 1, 2, 3,
+ (None, ())))
+ # Test get next
+ self.assertEqual(f("\x01\x06\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x00"),
+ (x.PDU_GET_NEXT, standardFlags, 1, 2, 3,
+ (None, ())))
+ # Test get bulk
+ self.assertEqual(f("\x01\x07\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x38"
+ "\x00\x01\x00\x05"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x02\x00\x01\x00\x00\x00\x00\x06\x00\x00\x00\x07"
+ "\x02\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x09"
+ "\x00\x00\x00\x00"),
+ (x.PDU_GET_BULK, standardFlags, 1, 2, 3,
+ (None, 1, 5,
+ (((1, 2), (3, 4), False),
+ ((6, 7), (8, 9), True)))))
+ # Test test set
+ self.assertEqual(f("\x01\x08\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah"),
+ (x.PDU_TEST_SET, standardFlags, 1, 2, 3,
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah")))))
+ # Test commit set
+ self.assertEqual(f("\x01\x09\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x00"),
+ (x.PDU_COMMIT_SET, standardFlags, 1, 2, 3,
+ None))
+ # Test undo set
+ self.assertEqual(f("\x01\x0A\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x00"),
+ (x.PDU_UNDO_SET, standardFlags, 1, 2, 3,
+ None))
+ # Test cleanup set
+ self.assertEqual(f("\x01\x0B\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x00"),
+ (x.PDU_CLEANUP_SET, standardFlags, 1, 2, 3,
+ None))
+ # Test notify
+ self.assertEqual(f("\x01\x0C\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah"),
+ (x.PDU_NOTIFY, standardFlags, 1, 2, 3,
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah")))))
+ # Test ping
+ self.assertEqual(f("\x01\x0D\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x00"),
+ (x.PDU_PING, standardFlags, 1, 2, 3, None))
+ # Test index alloc
+ self.assertEqual(f("\x01\x0E\x16\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah"),
+ (x.PDU_INDEX_ALLOC,
+ makeFlags(False, True, True, False, True),
+ 1, 2, 3,
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah")))))
+ # Test index dealloc
+ self.assertEqual(f("\x01\x0F\x16\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x40"
+ "\x00\x06\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x04\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x04"
+ "\x00\x00\x00\x04blah"),
+ (x.PDU_INDEX_DEALLOC,
+ makeFlags(False, True, True, False, True),
+ 1, 2, 3,
+ (None,
+ ((x.OID, (1, 2, 3), ((4, 5, 6), False)),
+ (x.OCTET_STR, (1, 2, 4), "blah")))))
+ # Test add agent caps
+ self.assertEqual(f("\x01\x10\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x18"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x04blah"),
+ (x.PDU_ADD_AGENT_CAPS, standardFlags, 1, 2, 3,
+ (None, (4, 5, 6), "blah")))
+ # Test rm agent caps
+ self.assertEqual(f("\x01\x11\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x10"
+ "\x03\x00\x00\x00\x00\x00\x00\x04"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"),
+ (x.PDU_RM_AGENT_CAPS, standardFlags, 1, 2, 3,
+ (None, (4, 5, 6))))
+ # Test response
+ self.assertEqual(f("\x01\x12\x10\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x08"
+ "\x00\x00\x00\x04\x00\x05\x00\x06"),
+ (x.PDU_RESPONSE, standardFlags, 1, 2, 3,
+ (4, 5, 6, None)))
+
+
+if __name__ == "__main__":
+ unittest.main()
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/68533351cf175da79bd803bd31b735c88c0028cf
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/68533351cf175da79bd803bd31b735c88c0028cf
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170626/a7f77216/attachment.html>
More information about the vc
mailing list