[Git][NTPsec/ntpsec][master] 3 commits: Added walkMIBTree generator to replace previous MIB tree handlers
Ian Bruene
gitlab at mg.gitlab.com
Sun Nov 5 19:04:49 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
de630640 by Ian Bruene at 2017-11-05T13:04:19-06:00
Added walkMIBTree generator to replace previous MIB tree handlers
- - - - -
cd2c24c1 by Ian Bruene at 2017-11-05T13:04:19-06:00
Changed ntpsnmpd to use the new MIB tree system
- - - - -
ce7ecc34 by Ian Bruene at 2017-11-05T13:04:19-06:00
Removed obsolete MIB tree-list converters
- - - - -
3 changed files:
- ntpclients/ntpsnmpd
- pylib/agentx.py
- tests/pylib/test_agentx.py
Changes:
=====================================
ntpclients/ntpsnmpd
=====================================
--- a/ntpclients/ntpsnmpd
+++ b/ntpclients/ntpsnmpd
@@ -23,6 +23,9 @@ except ImportError as e:
sys.exit(1)
+# TODO This is either necessary, or a different workaround is.
+ntp.util.deunicode_units()
+
logfile = "ntpsnmpd.log"
logfp = sys.stderr
nofork = True # don't daemonize while still under construction
@@ -36,234 +39,276 @@ DEFHOST = "localhost" # For now only know how to talk to the local ntp
class DataSource: # This may be broken up in future to be less NTP-specific
def __init__(self):
+ node = ax.mibnode
# This is defined as a dict tree because simpler, and avoids
# certain edge cases
# OIDs are relative from ntp root
# ntpEntNotifications
- self.oidTree = {0: (None,
- # ntpEntNotifModeChange
- {1: (None, None),
- # ntpEntNotifStratumChange
- 2: (None, None),
- # ntpEntNotifSyspeerChange
- 3: (None, None),
- # ntpEntNotifAddAssociation
- 4: (None, None),
- # ntpEntNotifRemoveAsociation
- 5: (None, None),
- # ntpEntNotifConfigChanged
- 6: (None, None),
- # ntpEntNotifLeapSecondAnnounced
- 7: (None, None),
- # ntpEntNotifHeartbeat
- 8: (None, None)}),
- # ntpSnmpMIBObjects
- 1: (None,
- # ntpEntInfo
- {1: (None,
- # ntpNetSoftwareName utf8str
- {1: ((lambda oid:
- self.cb_systemInfo(oid, None,
- "name")),
- None),
- # ntpEntSoftwareVersion utf8str
- 2: ((lambda oid:
- self.cb_systemInfo(oid, None,
- "version")),
- None),
- # ntpEntSoftwareVendor utf8str
- 3: ((lambda oid:
- self.cb_systemInfo(oid, None,
- "vendor")),
- None),
- # ntpEntSystemType utf8str
- 4: ((lambda oid:
- self.cb_systemInfo(oid, None,
- "system")),
- None),
- # ntpEntTimeResolution uint32
- 5: (self.cb_timeResolution, None),
- # ntpEntTimePrecision int32
- 6: (self.cb_timePrecision, None),
- # ntpEntTimeDistance DisplayString
- 7: (self.cb_timeDistance, None)}),
- # ntpEntStatus
- 2: (None,
- # ntpEntStatusCurrentMode INTEGER {...}
- {1: (self.cb_statusCurrentMode, None),
- # ntpEntStatusStratum NtpStratum
- 2: (self.cb_statusStratum, None),
- # ntpEntStatusActiveRefSourceId
- # uint32 (0..99999)
- 3: (self.cb_statusActiveRefSourceID,
- None),
- # ntpEntStatusActiveRefSourceName utf8str
- 4: (self.cb_statusActiveRefSourceName,
- None),
- # ntpEntStatusActiveOffset DisplayString
- 5: (self.cb_statusActiveOffset, None),
- # ntpEntStatusNumberOfRefSources
- # unit32 (0..99)
- 6: (self.cb_statusNumRefSources, None),
- # ntpEntStatusDispersion DisplayString
- 7: (self.cb_statusDispersion, None),
- # ntpEntStatusEntityUptime TimeTicks
- 8: (self.cb_statusEntityUptime, None),
- # ntpEntStatusDateTime NtpDateTime
- 9: (self.cb_statusDateTime, None),
- # ntpEntStatusLeapSecond NtpDateTime
- 10: (self.cb_statusLeapSecond, None),
- # ntpEntStatusLeapSecondDirection
- # int32 (-1..1)
- 11: (self.cb_statusLeapSecDirection,
- None),
- # ntpEntStatusInPkts Counter32
- 12: (self.cb_statusInPkts, None),
- # ntpEntStatusOutPkts Counter32
- 13: (self.cb_statusOutPkts, None),
- # ntpEntStatusBadVersion Counter32
- 14: (self.cb_statusBadVersion, None),
- # ntpEntStatusProtocolError Counter32
- 15: (self.cb_statusProtocolError, None),
- # ntpEntStatusNotifications Counter32
- 16: (self.cb_statusNotifications, None),
- # ntpEntStatPktModeTable
- # SEQUENCE of NtpEntStatPktModeEntry
- 17: (None,
- # ntpEntStatPktModeEntry SEQUENCE {...}
- {1: (None,
- # ntpEntStatPktMode INTEGER {...}
- {1: (None, None),
- # ntpEntStatPktSent Counter32
- 2: (None, None),
- # ntpEntStatPktRecived Counter32
- 3: (None, None)})})}),
- # ntpAssociation
- 3: (None,
- # ntpAssociationTable
- # SEQUENCE of NtpAssociationEntry
- {1: (None,
- # ntpAssociationEntry SEQUENCE {...}
- {1: (None,
- # ntpAssocId uint32 (1..99999)
- {1: (self.cb_assocID, None),
- # ntpAssocName utf8str
- 2: (self.cb_assocName, None),
- # ntpAssocRefId DisplayString
- 3: (self.cb_assocRefID, None),
- # ntpAssocAddressType
- # InetAddressType
- 4: (self.cb_assocAddrType,
- None),
- # ntpAssocAddress
- # InetAddress SIZE (4|8|16|20)
- 5: (self.cb_assocAddr, None),
- # ntpAssocOffset DisplayString
- 6: (self.cb_assocOffset,
- None),
- # ntpAssocStratum NtpStratum
- 7: (self.cb_assocStratum,
- None),
- # ntpAssocStatusJitter
- # DisplayString
- 8: (self.cb_assocStatusJitter,
- None),
- # ntpAssocStatusDelay
- # DisplayString
- 9: (self.cb_assocStatusDelay,
- None),
- # ntpAssocStatusDispersion
- # DisplayString
- 10: (self.cb_assocStatusDisp,
- None)})}),
- # ntpAssociationStatisticsTable
- # SEQUENCE of ntpAssociationStatisticsEntry
- 2: (None,
- # ntpAssociationStatisticsEntry
- # SEQUENCE {...}
- {1: (None,
- # ntpAssocStatInPkts Counter32
- {1: (self.cb_assocStatInPkts,
- None),
- # ntpAssocStatOutPkts Counter32
- 2: (self.cb_assocStatOutPkts,
- None),
- # ntpAssocStatProtocolError
- # Counter32
- 3: (self.cb_assocStatProtoErr,
- None)})})}),
- # ntpEntControl
- 4: (None,
- # ntpEntHeartbeatInterval unit32
- {1: (self.cb_entHeartbeatInterval,
- None),
- # ntpEntNotifBits BITS {...}
- 2: (self.cb_entNotifBits, None)}),
- # ntpEntNotifObjects
- 5: (None,
- # ntpEntNotifMessage utf8str
- {1: (self.cb_entNotifMessage, None)})}),
- # ntpEntConformance
- 2: (None,
- # ntpEntCompliances
- {1: (None,
- # ntpEntNTPCompliance
- {1: (None, None),
- # ntpEntSNTPCompliance
- 2: (None, None)}),
- # ntpEntGroups
- 2: (None,
- # ntpEntObjectsGroup1 OBJECTS {...}
- {1: (None, None),
- # ntpEntObjectsGroup2 OBJECTS {...}
- 2: (None, None),
- # ntpEntNotifGroup NOTIFICATIONS {...}
- 3: (None, None)})})}
- self.oidList = ntp.agentx.mibTree2List(self.oidTree, ntpRootOID)
+ self.oidTree = {
+ 0:
+ node(True, None,
+ # ntpEntNotifModeChange
+ {1: node(True, None, None),
+ # ntpEntNotifStratumChange
+ 2: node(True, None, None),
+ # ntpEntNotifSyspeerChange
+ 3: node(True, None, None),
+ # ntpEntNotifAddAssociation
+ 4: node(True, None, None),
+ # ntpEntNotifRemoveAsociation
+ 5: node(True, None, None),
+ # ntpEntNotifConfigChanged
+ 6: node(True, None, None),
+ # ntpEntNotifLeapSecondAnnounced
+ 7: node(True, None, None),
+ # ntpEntNotifHeartbeat
+ 8: node(True, None, None)}),
+ # ntpSnmpMIBObjects
+ 1:
+ node(True, None,
+ # ntpEntInfo
+ {1:
+ node(True, None,
+ # ntpNetSoftwareName utf8str
+ {1: node(True,
+ (lambda oid:
+ self.cb_systemInfo(oid, None, "name")),
+ None),
+ # ntpEntSoftwareVersion utf8str
+ 2: node(True,
+ (lambda oid:
+ self.cb_systemInfo(oid, None, "version")),
+ None),
+ # ntpEntSoftwareVendor utf8str
+ 3: node(True,
+ (lambda oid:
+ self.cb_systemInfo(oid, None, "vendor")),
+ None),
+ # ntpEntSystemType utf8str
+ 4: node(True,
+ (lambda oid:
+ self.cb_systemInfo(oid, None, "system")),
+ None),
+ # ntpEntTimeResolution uint32
+ 5: node(True, self.cb_timeResolution, None),
+ # ntpEntTimePrecision int32
+ 6: node(True, self.cb_timePrecision, None),
+ # ntpEntTimeDistance DisplayString
+ 7: node(True, self.cb_timeDistance, None)}),
+ # ntpEntStatus
+ 2:
+ node(True, None,
+ # ntpEntStatusCurrentMode INTEGER {...}
+ {1: node(True, self.cb_statusCurrentMode, None),
+ # ntpEntStatusStratum NtpStratum
+ 2: node(True, self.cb_statusStratum, None),
+ # ntpEntStatusActiveRefSourceId
+ # uint32 (0..99999)
+ 3: node(True,self.cb_statusActiveRefSourceID,
+ None),
+ # ntpEntStatusActiveRefSourceName utf8str
+ 4: node(True, self.cb_statusActiveRefSourceName,
+ None),
+ # ntpEntStatusActiveOffset DisplayString
+ 5: node(True, self.cb_statusActiveOffset, None),
+ # ntpEntStatusNumberOfRefSources
+ # unit32 (0..99)
+ 6: node(True, self.cb_statusNumRefSources, None),
+ # ntpEntStatusDispersion DisplayString
+ 7: node(True, self.cb_statusDispersion, None),
+ # ntpEntStatusEntityUptime TimeTicks
+ 8: node(True, self.cb_statusEntityUptime, None),
+ # ntpEntStatusDateTime NtpDateTime
+ 9: node(True, self.cb_statusDateTime, None),
+ # ntpEntStatusLeapSecond NtpDateTime
+ 10: node(True, self.cb_statusLeapSecond, None),
+ # ntpEntStatusLeapSecondDirection
+ # int32 (-1..1)
+ 11: node(True, self.cb_statusLeapSecDirection, None),
+ # ntpEntStatusInPkts Counter32
+ 12: node(True, self.cb_statusInPkts, None),
+ # ntpEntStatusOutPkts Counter32
+ 13: node(True, self.cb_statusOutPkts, None),
+ # ntpEntStatusBadVersion Counter32
+ 14: node(True, self.cb_statusBadVersion, None),
+ # ntpEntStatusProtocolError Counter32
+ 15: node(True, self.cb_statusProtocolError, None),
+ # ntpEntStatusNotifications Counter32
+ 16: node(True, self.cb_statusNotifications, None),
+ # ntpEntStatPktModeTable
+ # SEQUENCE of NtpEntStatPktModeEntry
+ 17:
+ node(True, None,
+ # ntpEntStatPktModeEntry SEQUENCE {...}
+ {1:
+ node(True, None,
+ # ntpEntStatPktMode INTEGER {...}
+ {1: node(True, None, None),
+ # ntpEntStatPktSent Counter32
+ 2: node(True, None, None),
+ # ntpEntStatPktRecived Counter32
+ 3: node(True, None, None)})})}),
+ # ntpAssociation
+ 3:
+ node(True, None,
+ # ntpAssociationTable
+ # SEQUENCE of NtpAssociationEntry
+ {1:
+ node(True, None,
+ # ntpAssociationEntry SEQUENCE {...}
+ {1:
+ node(True, None,
+ # ntpAssocId uint32 (1..99999)
+ {1: node(True, self.cb_assocID, None),
+ # ntpAssocName utf8str
+ 2: node(True, self.cb_assocName, None),
+ # ntpAssocRefId DisplayString
+ 3: node(True, self.cb_assocRefID, None),
+ # ntpAssocAddressType
+ # InetAddressType
+ 4: node(True, self.cb_assocAddrType,
+ None),
+ # ntpAssocAddress
+ # InetAddress SIZE (4|8|16|20)
+ 5: node(True, self.cb_assocAddr, None),
+ # ntpAssocOffset DisplayString
+ 6: node(True, self.cb_assocOffset,
+ None),
+ # ntpAssocStratum NtpStratum
+ 7: node(True, self.cb_assocStratum,
+ None),
+ # ntpAssocStatusJitter
+ # DisplayString
+ 8: node(True, self.cb_assocStatusJitter,
+ None),
+ # ntpAssocStatusDelay
+ # DisplayString
+ 9: node(True, self.cb_assocStatusDelay,
+ None),
+ # ntpAssocStatusDispersion
+ # DisplayString
+ 10: node(True, self.cb_assocStatusDisp,
+ None)})}),
+ # ntpAssociationStatisticsTable
+ # SEQUENCE of ntpAssociationStatisticsEntry
+ 2:
+ node(True, None,
+ # ntpAssociationStatisticsEntry
+ # SEQUENCE {...}
+ {1:
+ node(True, None,
+ # ntpAssocStatInPkts Counter32
+ {1: node(True, self.cb_assocStatInPkts,
+ None),
+ # ntpAssocStatOutPkts Counter32
+ 2: node(True, self.cb_assocStatOutPkts,
+ None),
+ # ntpAssocStatProtocolError
+ # Counter32
+ 3: node(True, self.cb_assocStatProtoErr,
+ None)})})}),
+ # ntpEntControl
+ 4:
+ node(True, None,
+ # ntpEntHeartbeatInterval unit32
+ {1: node(True, self.cb_entHeartbeatInterval,
+ None),
+ # ntpEntNotifBits BITS {...}
+ 2: node(True, self.cb_entNotifBits, None)}),
+ # ntpEntNotifObjects
+ 5:
+ node(True, None,
+ # ntpEntNotifMessage utf8str
+ {1: node(True, self.cb_entNotifMessage, None)})}),
+ # ntpEntConformance
+ 2:
+ node(True, None,
+ # ntpEntCompliances
+ {1:
+ node(True, None,
+ # ntpEntNTPCompliance
+ {1: node(True, None, None),
+ # ntpEntSNTPCompliance
+ 2: node(True, None, None)}),
+ # ntpEntGroups
+ 2:
+ node(True, None,
+ # ntpEntObjectsGroup1 OBJECTS {...}
+ {1: node(True, None, None),
+ # ntpEntObjectsGroup2 OBJECTS {...}
+ 2: node(True, None, None),
+ # ntpEntNotifGroup NOTIFICATIONS {...}
+ 3: node(True, None, None)})})}
self.session = ntp.packet.ControlSession()
self.session.openhost(DEFHOST) # only local for now
- def getOID(self, oid):
- "Get the requested OID, or the next lexicographical OID"
- for node in self.oidList:
- if node[0] is None: # No read callback
- continue # skip over not yet implemented OIDs
- if (node[1] == oid):
- return node # (read_callback, write_callback, oid)
- # Nothing in the list
- return (None, None, None)
-
- def getNextOID(self, oid):
+ def getOID_core(self, nextP, searchoid, returnGenerator=False):
+ gen = ax.walkMIBTree(self.oidTree, ntpRootOID)
+ while True:
+ try:
+ oid, callback = gen.next()
+ if nextP is True:
+ oidhit = (oid > searchoid)
+ else:
+ oidhit = (oid.subids == searchoid.subids)
+ if oidhit and (callback is not None):
+ if returnGenerator is True:
+ return oid, callback, gen
+ else:
+ return oid, callback
+ except StopIteration:
+ if returnGenerator is True:
+ return None, None, None
+ else:
+ return None, None
+
+ # These exist instead of just getOID_core for clearer semantics
+ def getOID(self, searchoid, returnGenerator=False):
+ "Get the requested OID"
+ return self.getOID_core(False, searchoid, returnGenerator)
+
+ def getNextOID(self, searchoid, returnGenerator=False):
"Get the next lexicographical OID"
- for pos in range(len(self.oidList)):
- callback, current = self.oidList[pos]
- if callback is None:
- continue # skip over not yet implemented OIDs
- if current <= oid:
- continue
- else:
- return self.oidList[pos]
- # Nothing after the supplied oid
- return (None, None)
+ return self.getOID_core(True, searchoid, returnGenerator)
def getOIDsInRange(self, oidrange, firstOnly=False):
"Get a list of every (optionally the first) OID in a range"
oids = []
- for node in self.oidList:
- if node[0] is None: # No read callback
- continue # skip over not yet implemented OIDs
- elif node[1] > oidrange.start: # Past the start, in the body
- if (oidrange.end is not None) and (node[1] >= oidrange.end):
- break # Past the end of a bounded range
+ gen = ax.walkMIBTree(self.oidTree, ntpRootOID)
+ # Find the first OID
+ while True:
+ try:
+ oid, callback = gen.next()
+ if callback is None:
+ continue # skip unimplemented OIDs
+ elif oid.subids == oidrange.start.subids:
+ # ok, found the start, do we need to skip it?
+ if oidrange.start.include is True:
+ oids.append((oid, callback))
+ break
+ else:
+ continue
+ elif oid > oidrange.start:
+ # If we are here it means we hit the start but skipped
+ oids.append((oid, callback))
+ break
+ except StopIteration:
+ # Couldn't find *anything*
+ return []
+ if firstOnly is True:
+ return oids
+ # Start filling in the rest of the range
+ while True:
+ try:
+ oid, callback = gen.next()
+ if callback is None:
+ continue # skip unimplemented OIDs
+ elif (oidrange.end is not None) and (oid >= oidrange.end):
+ break # past the end of a bounded range
else:
- oids.append(node) # Found a match
- elif (node[1].subids == oidrange.start.subids) and \
- (oidrange.start.include is True):
- oids.append(node) # Inclusive search and a match at the start
- else:
- pass
- if (firstOnly is True) and (len(oids) > 0):
- break
+ oids.append((oid, callback))
+ except StopIteration:
+ break # We have run off the end of the MIB
return oids
# =============================================================
@@ -334,6 +379,7 @@ class DataSource: # This may be broken up in future to be less NTP-specific
data = self.session.readvar(0, ["koffset"], raw=True)
data = ntp.util.unitifyvar(data["koffset"][1], "koffset",
width=None, unitSpace=True)
+ print("\n\n\nactive offset:", repr(data))
return ax.Varbind(ax.VALUE_OCTET_STR, oid, data)
def cb_statusNumRefSources(self, oid, write=None):
@@ -640,7 +686,7 @@ class PacketControl:
binds = []
for oidr in packet.oidranges:
target = oidr.start
- callback, oid = self.database.getOID(target)
+ oid, callback = self.database.getOID(target)
if (oid != target) or (callback is None):
binds.append(ax.Varbind(ax.VALUE_NO_SUCH_OBJECT, target))
else:
@@ -656,10 +702,14 @@ class PacketControl:
binds = []
for oidr in packet.oidranges:
oids = self.database.getOIDsInRange(oidr, True)
+ print()
+ print("GetNext:", oidr)
+ print("result:", oids)
+ print()
if len(oids) == 0: # Nothing found
binds.append(ax.Varbind(ax.VALUE_END_OF_MIB_VIEW, oidr.start))
else:
- callback, oid = oids[0]
+ oid, callback = oids[0]
binds.append(callback(oid))
# Need to implement genError
resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
@@ -676,7 +726,7 @@ class PacketControl:
if len(oids) == 0: # Nothing found
binds.append(ax.Varbind(ax.VALUE_END_OF_MIB_VIEW, oidr.start))
else:
- callback, oid = oids[0]
+ oid, callback = oids[0]
binds.append(callback(oid))
# Handle repeaters
for oidr in repeats:
@@ -684,7 +734,7 @@ class PacketControl:
if len(oids) == 0: # Nothing found
binds.append(ax.Varbind(ax.VALUE_END_OF_MIB_VIEW, oidr.start))
else:
- for callback, oid in oids[:packet.maxReps]:
+ for oid, callback in oids[:packet.maxReps]:
binds.append(callback(oid))
resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
packet.packetID, 0, ax.ERR_NOERROR, 0, binds)
=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -1304,44 +1304,48 @@ def decode_packet(data):
return parsedPkt, newData
-def mibTree2List(mibtree, currentPath=()):
- "Takes a tree of nested dicts representing OIDs and flattens it to a list"
- if (mibtree is None) or (mibtree == {}):
- return () # Empty tree
- paths = []
- branches = list(mibtree.keys())
- branches.sort()
- for branch in branches:
- callback, tree = mibtree[branch]
- paths.append((callback, OID(currentPath + (branch,))))
- branchPath = currentPath + (branch,)
- paths += mibTree2List(tree, branchPath)
- return tuple(paths)
-
-
-def mibList2Tree(miblist, rootPath=()):
- "Takes a list of OIDs and inflates it into a tree"
- # Tree node format: (read callback, write callback, sub-nodes)
- tree = {}
- for mibnode in miblist:
- callback, oid = mibnode[0], mibnode[1].subids
- rootlen = len(rootPath)
- if oid[:rootlen] != rootPath: # OID not decended from the root, bail
- raise ValueError("Node %s does not have root %s" %
- (oid, rootPath))
- oid = oid[rootlen:] # clip the root off for the tree
- branch = tree
- oidPos = 0
- oidSize = len(oid)
- while oidPos < oidSize:
- subid = oid[oidPos]
- if subid not in branch: # First time at this position
- branch[subid] = (callback, None) # might be a leaf
- elif branch[subid][1] is None: # It isn't a leaf
- branch[subid] = (branch[subid][0], {})
- branch = branch[subid][1]
- oidPos += 1
- return tree
+def walkMIBTree(tree, rootpath=()):
+ # Tree node formats:
+ # {"static": True, "callback": <func>, "subids": {.blah.}}
+ # {"static": False, "callback": <func>, "subids": <func>}
+ # The "subids" function in dynamic nodes must return an MIB tree
+ nodeStack = []
+ oidStack = []
+ current = tree
+ currentKeys = list(current.keys())
+ currentKeys.sort()
+ keyID = 0
+ while True:
+ if keyID >= len(currentKeys):
+ if len(nodeStack) > 0:
+ # No more nodes this level, pop higher node
+ current, currentKeys, keyID, key = nodeStack.pop()
+ oidStack.pop()
+ keyID += 1
+ continue
+ else: # Out of tree, we are done
+ return
+ key = currentKeys[keyID]
+ oid = OID(rootpath + tuple(oidStack) + (key,))
+ yield (oid, current[key]["callback"])
+ if current[key]["subids"] is not None:
+ # Push current node, move down a level
+ nodeStack.append((current, currentKeys, keyID, key))
+ oidStack.append(key)
+ if current[key]["static"] is True:
+ current = current[key]["subids"]
+ else:
+ current = current[key]["subids"]() # Tree generator function
+ currentKeys = list(current.keys())
+ currentKeys.sort()
+ keyID = 0
+ key = currentKeys[keyID]
+ continue
+ keyID += 1
+
+
+def mibnode(static, callback, subs):
+ return {"static": static, "callback": callback, "subids": subs}
# Value types
=====================================
tests/pylib/test_agentx.py
=====================================
--- a/tests/pylib/test_agentx.py
+++ b/tests/pylib/test_agentx.py
@@ -2500,91 +2500,122 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
errored = e
self.assertEqual(errored.message, "PDU type 255 not in defined types")
- def test_mibTree2List(self):
+ def test_walkMIBTree(self):
x = ntp.agentx
- f = x.mibTree2List
+ f = x.walkMIBTree
# Test empty tree
- self.assertEqual(f({}), ())
- # Test flat tree
- self.assertEqual(f({0: (None, None), 1: (None, None),
- 3: (None, None), 4: (None, None)}),
- ((None, x.OID((0,))),
- (None, x.OID((1,))),
- (None, x.OID((3,))),
- (None, x.OID((4,)))))
- # Test flat tree with root path
- self.assertEqual(f({0: (None, None), 1: (None, None),
- 3: (None, None), 4: (None, None)},
- (42, 23)),
- ((None, x.OID((42, 23, 0))),
- (None, x.OID((42, 23, 1))),
- (None, x.OID((42, 23, 3))),
- (None, x.OID((42, 23, 4)))))
- # Test nested tree
- self.assertEqual(f({0: (None, None),
- 2: (None,
- {0: (None, None),
- 1: (None, None)}),
- 3: (None,
- {5: (None,
- {0: (None, None),
- 2: (None, None)}),
- 6: (None, None)}),
- 4: (None, None)}),
- ((None, x.OID((0,))),
- (None, x.OID((2,))),
- (None, x.OID((2, 0))),
- (None, x.OID((2, 1))),
- (None, x.OID((3,))),
- (None, x.OID((3, 5))),
- (None, x.OID((3, 5, 0))),
- (None, x.OID((3, 5, 2))),
- (None, x.OID((3, 6))),
- (None, x.OID((4,)))))
-
- def test_mibList2Tree(self):
- x = ntp.agentx
- f = x.mibList2Tree
-
- # Test empty tree
- self.assertEqual(f(tuple()), {})
- # Test flat tree
- self.assertEqual(f(((None, x.OID((0,))),
- (None, x.OID((1,))),
- (None, x.OID((3,))),
- (None, x.OID((4,))))),
- {0: (None, None), 1: (None, None),
- 3: (None, None), 4: (None, None)})
- # Test flat tree with root path
- self.assertEqual(f(((None, x.OID((42, 23, 0))),
- (None, x.OID((42, 23, 1))),
- (None, x.OID((42, 23, 3))),
- (None, x.OID((42, 23, 4)))),
- (42, 23)),
- {0: (None, None), 1: (None, None),
- 3: (None, None), 4: (None, None)})
- # Test nested tree
- self.assertEqual(f(((None, x.OID((0,))),
- (None, x.OID((2,))),
- (None, x.OID((2, 0))),
- (None, x.OID((2, 1))),
- (None, x.OID((3,))),
- (None, x.OID((3, 5))),
- (None, x.OID((3, 5, 0))),
- (None, x.OID((3, 5, 2))),
- (None, x.OID((3, 6))),
- (None, x.OID((4,))))),
- {0: (None, None),
- 2: (None,
- {0: (None, None),
- 1: (None, None)}),
- 3: (None,
- {5: (None,
- {0: (None, None),
- 2: (None, None)}),
- 6: (None, None)}),
- 4: (None, None)})
+ self.assertEqual(tuple(f({})), ())
+ # Test flat, fully static tree
+ self.assertEqual(tuple(f({0: {"static": True, "callback": None,
+ "subids": None},
+ 1: {"static": True, "callback": None,
+ "subids": None},
+ 2: {"static": True, "callback": None,
+ "subids": None},
+ 5: {"static": True, "callback": None,
+ "subids": None}})),
+ ((x.OID((0,)), None),
+ (x.OID((1,)), None),
+ (x.OID((2,)), None),
+ (x.OID((5,)), None)))
+ # Test nested, fully static tree
+ self.assertEqual(tuple(f({0: {"static": True,
+ "callback": None,
+ "subids": None},
+ 1: {"static": True,
+ "callback": None,
+ "subids":
+ {0: {"static": True,
+ "callback": None,
+ "subids": None},
+ 1: {"static": True,
+ "callback": None,
+ "subids":
+ {42: {"static": True,
+ "callback": None,
+ "subids": None}}}}},
+ 5: {"static": True,
+ "callback": None,
+ "subids": None}})),
+ ((x.OID((0,)), None),
+ (x.OID((1,)), None),
+ (x.OID((1, 0)), None),
+ (x.OID((1, 1)), None),
+ (x.OID((1, 1, 42)), None),
+ (x.OID((5,)), None)))
+ # Test nested, fully static tree, with rootpath
+ self.assertEqual(tuple(f({0: {"static": True,
+ "callback": None,
+ "subids": None},
+ 1: {"static": True,
+ "callback": None,
+ "subids":
+ {0: {"static": True,
+ "callback": None,
+ "subids": None},
+ 1: {"static": True,
+ "callback": None,
+ "subids":
+ {42: {"static": True,
+ "callback": None,
+ "subids": None}}}}},
+ 5: {"static": True,
+ "callback": None,
+ "subids": None}}, (23,))),
+ ((x.OID((23, 0)), None),
+ (x.OID((23, 1)), None),
+ (x.OID((23, 1, 0)), None),
+ (x.OID((23, 1, 1)), None),
+ (x.OID((23, 1, 1, 42)), None),
+ (x.OID((23, 5)), None)))
+ # subid lambda for dynamic tree testing
+ submaker = (lambda : {0: {"static": True,
+ "callback": None,
+ "subids": None},
+ 1: {"static": True,
+ "callback": None,
+ "subids":
+ {0: {"static": True,
+ "callback": None,
+ "subids": None}}},
+ 2: {"static": True,
+ "callback": None,
+ "subids": None}})
+ # Test tree with dynamic nodes
+ self.assertEqual(tuple(f({0: {"static": True,
+ "callback": None,
+ "subids": None},
+ 1: {"static": False,
+ "callback": None,
+ "subids": submaker},
+ 2: {"static": True,
+ "callback": None,
+ "subids": None}})),
+ ((x.OID((0,)), None),
+ (x.OID((1,)), None),
+ (x.OID((1, 0)), None),
+ (x.OID((1, 1)), None),
+ (x.OID((1, 1, 0)), None),
+ (x.OID((1, 2)), None),
+ (x.OID((2,)), None)))
+ # Test tree with dynamic nodes and root path
+ self.assertEqual(tuple(f({0: {"static": True,
+ "callback": None,
+ "subids": None},
+ 1: {"static": False,
+ "callback": None,
+ "subids": submaker},
+ 2: {"static": True,
+ "callback": None,
+ "subids": None}}, (23,))),
+ ((x.OID((23, 0)), None),
+ (x.OID((23, 1)), None),
+ (x.OID((23, 1, 0)), None),
+ (x.OID((23, 1, 1)), None),
+ (x.OID((23, 1, 1, 0)), None),
+ (x.OID((23, 1, 2)), None),
+ (x.OID((23, 2)), None)))
if __name__ == "__main__":
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/fc6f99bed61ba147e7db8cd598931fe3ae96924c...ce7ecc347c5214a0b56f6ff37c8be4f3014ad13a
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/fc6f99bed61ba147e7db8cd598931fe3ae96924c...ce7ecc347c5214a0b56f6ff37c8be4f3014ad13a
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20171105/088d648c/attachment.html>
More information about the vc
mailing list