[Git][NTPsec/ntpsec][master] 2 commits: Added functions to between octet bitstreams and lists of bools
Ian Bruene
gitlab at mg.gitlab.com
Sun Nov 12 21:24:58 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
631046e6 by Ian Bruene at 2017-11-12T15:24:18-06:00
Added functions to between octet bitstreams and lists of bools
- - - - -
06b1eaed by Ian Bruene at 2017-11-12T15:24:18-06:00
Added basic support for the SNMP set command
NTPv4-MIB::ntpEntNotifBits can now be set
Validation checks are still very crude, only one variable can be changed,
and new variables cannot be created.
- - - - -
3 changed files:
- ntpclients/ntpsnmpd
- pylib/agentx.py
- tests/pylib/test_agentx.py
Changes:
=====================================
ntpclients/ntpsnmpd
=====================================
--- a/ntpclients/ntpsnmpd
+++ b/ntpclients/ntpsnmpd
@@ -71,68 +71,68 @@ class DataSource: # This may be broken up in future to be less NTP-specific
node(None, None, True,
# ntpNetSoftwareName utf8str
{1: node((lambda oid:
- self.cb_systemInfo(oid, None, "name")),
+ self.cbr_systemInfo(oid, None, "name")),
None, True, None),
# ntpEntSoftwareVersion utf8str
2: node((lambda oid:
- self.cb_systemInfo(oid, None, "version")),
+ self.cbr_systemInfo(oid, None, "version")),
None, True, None),
# ntpEntSoftwareVendor utf8str
3: node((lambda oid:
- self.cb_systemInfo(oid, None, "vendor")),
+ self.cbr_systemInfo(oid, None, "vendor")),
None, True, None),
# ntpEntSystemType utf8str
4: node((lambda oid:
- self.cb_systemInfo(oid, None, "system")),
+ self.cbr_systemInfo(oid, None, "system")),
None, True, None),
# ntpEntTimeResolution uint32
- 5: node(self.cb_timeResolution, None, True, None),
+ 5: node(self.cbr_timeResolution, None, True, None),
# ntpEntTimePrecision int32
- 6: node(self.cb_timePrecision, None, True, None),
+ 6: node(self.cbr_timePrecision, None, True, None),
# ntpEntTimeDistance DisplayString
- 7: node(self.cb_timeDistance, None, True, None)}),
+ 7: node(self.cbr_timeDistance, None, True, None)}),
# ntpEntStatus
2:
node(None, None, True,
# ntpEntStatusCurrentMode INTEGER {...}
- {1: node(self.cb_statusCurrentMode, None, True, None),
+ {1: node(self.cbr_statusCurrentMode, None, True, None),
# ntpEntStatusStratum NtpStratum
- 2: node(self.cb_statusStratum, None, True, None),
+ 2: node(self.cbr_statusStratum, None, True, None),
# ntpEntStatusActiveRefSourceId
# uint32 (0..99999)
- 3: node(self.cb_statusActiveRefSourceID,
+ 3: node(self.cbr_statusActiveRefSourceID,
None, True, None),
# ntpEntStatusActiveRefSourceName utf8str
- 4: node(self.cb_statusActiveRefSourceName,
+ 4: node(self.cbr_statusActiveRefSourceName,
None, True, None),
# ntpEntStatusActiveOffset DisplayString
- 5: node(self.cb_statusActiveOffset, None, True, None),
+ 5: node(self.cbr_statusActiveOffset, None, True, None),
# ntpEntStatusNumberOfRefSources
# unit32 (0..99)
- 6: node(self.cb_statusNumRefSources, None, True, None),
+ 6: node(self.cbr_statusNumRefSources, None, True, None),
# ntpEntStatusDispersion DisplayString
- 7: node(self.cb_statusDispersion, None, True, None),
+ 7: node(self.cbr_statusDispersion, None, True, None),
# ntpEntStatusEntityUptime TimeTicks
- 8: node(self.cb_statusEntityUptime, None, True, None),
+ 8: node(self.cbr_statusEntityUptime, None, True, None),
# ntpEntStatusDateTime NtpDateTime
- 9: node(self.cb_statusDateTime, None, True, None),
+ 9: node(self.cbr_statusDateTime, None, True, None),
# ntpEntStatusLeapSecond NtpDateTime
- 10: node(self.cb_statusLeapSecond, None, True, None),
+ 10: node(self.cbr_statusLeapSecond, None, True, None),
# ntpEntStatusLeapSecondDirection
# int32 (-1..1)
- 11: node(self.cb_statusLeapSecDirection,
+ 11: node(self.cbr_statusLeapSecDirection,
None, True, None),
# ntpEntStatusInPkts Counter32
- 12: node(self.cb_statusInPkts, None, True, None),
+ 12: node(self.cbr_statusInPkts, None, True, None),
# ntpEntStatusOutPkts Counter32
- 13: node(self.cb_statusOutPkts, None, True, None),
+ 13: node(self.cbr_statusOutPkts, None, True, None),
# ntpEntStatusBadVersion Counter32
- 14: node(self.cb_statusBadVersion, None, True, None),
+ 14: node(self.cbr_statusBadVersion, None, True, None),
# ntpEntStatusProtocolError Counter32
- 15: node(self.cb_statusProtocolError,
+ 15: node(self.cbr_statusProtocolError,
None, True, None),
# ntpEntStatusNotifications Counter32
- 16: node(self.cb_statusNotifications,
+ 16: node(self.cbr_statusNotifications,
None, True, None),
# ntpEntStatPktModeTable
# SEQUENCE of NtpEntStatPktModeEntry
@@ -158,38 +158,38 @@ class DataSource: # This may be broken up in future to be less NTP-specific
{1:
node(None, None, True,
# ntpAssocId uint32 (1..99999)
- {1: node(self.cb_assocID, None, True, None),
+ {1: node(self.cbr_assocID, None, True, None),
# ntpAssocName utf8str
- 2: node(self.cb_assocName,
+ 2: node(self.cbr_assocName,
None, True, None),
# ntpAssocRefId DisplayString
- 3: node(self.cb_assocRefID,
+ 3: node(self.cbr_assocRefID,
None, True, None),
# ntpAssocAddressType
# InetAddressType
- 4: node(self.cb_assocAddrType,
+ 4: node(self.cbr_assocAddrType,
None, True, None),
# ntpAssocAddress
# InetAddress SIZE (4|8|16|20)
- 5: node(self.cb_assocAddr,
+ 5: node(self.cbr_assocAddr,
None, True, None),
# ntpAssocOffset DisplayString
- 6: node(self.cb_assocOffset,
+ 6: node(self.cbr_assocOffset,
None, True, None),
# ntpAssocStratum NtpStratum
- 7: node(self.cb_assocStratum,
+ 7: node(self.cbr_assocStratum,
None, True, None),
# ntpAssocStatusJitter
# DisplayString
- 8: node(self.cb_assocStatusJitter,
+ 8: node(self.cbr_assocStatusJitter,
None, True, None),
# ntpAssocStatusDelay
# DisplayString
- 9: node(self.cb_assocStatusDelay,
+ 9: node(self.cbr_assocStatusDelay,
None, True, None),
# ntpAssocStatusDispersion
# DisplayString
- 10: node(self.cb_assocStatusDisp,
+ 10: node(self.cbr_assocStatusDisp,
None, True, None)})}),
# ntpAssociationStatisticsTable
# SEQUENCE of ntpAssociationStatisticsEntry
@@ -200,28 +200,29 @@ class DataSource: # This may be broken up in future to be less NTP-specific
{1:
node(None, None, True,
# ntpAssocStatInPkts Counter32
- {1: node(self.cb_assocStatInPkts,
+ {1: node(self.cbr_assocStatInPkts,
None, True, None),
# ntpAssocStatOutPkts Counter32
- 2: node(self.cb_assocStatOutPkts,
+ 2: node(self.cbr_assocStatOutPkts,
None, True, None),
# ntpAssocStatProtocolError
# Counter32
- 3: node(self.cb_assocStatProtoErr,
+ 3: node(self.cbr_assocStatProtoErr,
None, True, None)})})}),
# ntpEntControl
4:
node(None, None, True,
# ntpEntHeartbeatInterval unit32
- {1: node(self.cb_entHeartbeatInterval,
+ {1: node(self.cbr_entHeartbeatInterval,
None, True, None),
# ntpEntNotifBits BITS {...}
- 2: node(self.cb_entNotifBits, None, True, None)}),
+ 2: node(self.cbr_entNotifBits, self.cbw_entNotifBits,
+ True, None)}),
# ntpEntNotifObjects
5:
node(None, None, True,
# ntpEntNotifMessage utf8str
- {1: node(self.cb_entNotifMessage, None, True, None)})}),
+ {1: node(self.cbr_entNotifMessage, None, True, None)})}),
# ntpEntConformance
2:
node(None, None, True,
@@ -243,6 +244,22 @@ class DataSource: # This may be broken up in future to be less NTP-specific
3: node(None, None, True, None)})})}
self.session = ntp.packet.ControlSession()
self.session.openhost(DEFHOST) # only local for now
+ # I am currently assuming that the undo system is only for the last
+ # set operation.
+ self.inSetP = False # Are we currently in the set procedure?
+ self.setVarbinds = [] # Varbind of the current set operation
+ self.setHandlers = [] # Handlers for commit/undo/cleanup of set
+ self.setUndoData = [] # Previous values for undoing
+ # Notify bits, these should be saved to disk
+ # Also they currently have no effect
+ self.notifyModeChange = False # 1
+ self.notifyStratumChange = False # 2
+ self.notifySyspeerChange = False # 3
+ self.notifyAddAssociation = False # 4
+ self.notifyRMAssociation = False # 5
+ self.notifyConfigChange = False # 6
+ self.notifyLeapSecondAnnounced = False # 7
+ self.notifyHeartbeat = False # 8
def getOID_core(self, nextP, searchoid, returnGenerator=False):
gen = ax.walkMIBTree(self.oidTree, ntpRootOID)
@@ -255,14 +272,14 @@ class DataSource: # This may be broken up in future to be less NTP-specific
oidhit = (oid.subids == searchoid.subids)
if oidhit and (reader is not None):
if returnGenerator is True:
- return oid, reader, gen
+ return oid, reader, writer, gen
else:
- return oid, reader
+ return oid, reader, writer
except StopIteration:
if returnGenerator is True:
- return None, None, None
+ return None, None, None, None
else:
- return None, None
+ return None, None, None
# These exist instead of just getOID_core for clearer semantics
def getOID(self, searchoid, returnGenerator=False):
@@ -286,13 +303,13 @@ class DataSource: # This may be broken up in future to be less NTP-specific
elif oid.subids == oidrange.start.subids:
# ok, found the start, do we need to skip it?
if oidrange.start.include is True:
- oids.append((oid, reader))
+ oids.append((oid, reader, writer))
break
else:
continue
elif oid > oidrange.start:
# If we are here it means we hit the start but skipped
- oids.append((oid, reader))
+ oids.append((oid, reader, writer))
break
except StopIteration:
# Couldn't find *anything*
@@ -308,20 +325,19 @@ class DataSource: # This may be broken up in future to be less NTP-specific
elif (oidrange.end is not None) and (oid >= oidrange.end):
break # past the end of a bounded range
else:
- oids.append((oid, reader))
+ oids.append((oid, reader, writer))
except StopIteration:
break # We have run off the end of the MIB
return oids
# =============================================================
- # Data retrevial callbacks start here
+ # Data read callbacks start here
# comment divider lines represent not yet implemented callbacks
- # If writing will return True on success
# =============================================================
#########################
- def cb_systemInfo(self, oid, write=None, category=None):
+ def cbr_systemInfo(self, oid, write=None, category=None):
if write is None:
if category == "name": # The product name of the running NTP
data = "NTPsec"
@@ -336,46 +352,46 @@ class DataSource: # This may be broken up in future to be less NTP-specific
vb = ax.Varbind(ax.VALUE_OCTET_STR, oid, data)
return vb
- def cb_timeResolution(self, oid, write=None): # DUMMY
+ def cbr_timeResolution(self, oid, write=None): # DUMMY
# Uinteger32
if write is None:
return ax.Varbind(ax.VALUE_GAUGE32, oid, 42)
- def cb_timePrecision(self, oid, write=None):
+ def cbr_timePrecision(self, oid, write=None):
if write is None:
data = self.session.readvar(0, ["precision"])
return ax.Varbind(ax.VALUE_INTEGER, oid, data["precision"])
- def cb_timeDistance(self, oid, write=None): # DUMMY
+ def cbr_timeDistance(self, oid, write=None): # DUMMY
# Displaystring
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "foo")
#############################
- def cb_statusCurrentMode(self, oid, write=None): # DUMMY
+ def cbr_statusCurrentMode(self, oid, write=None): # DUMMY
# Range of integers
if write is None:
return ax.Varbind(ax.VALUE_INTEGER, oid, 15)
- def cb_statusStratum(self, oid, write=None):
+ def cbr_statusStratum(self, oid, write=None):
# NTPstratum
if write is None:
data = self.session.readvar(0, ["stratum"])
return ax.Varbind(ax.VALUE_GAUGE32, oid, data["stratum"])
- def cb_statusActiveRefSourceID(self, oid, write=None): # DUMMY
+ def cbr_statusActiveRefSourceID(self, oid, write=None): # DUMMY
# range of uint32
if write is None:
return ax.Varbind(ax.VALUE_GAUGE32, oid, 1024)
- def cb_statusActiveRefSourceName(self, oid, write=None):
+ def cbr_statusActiveRefSourceName(self, oid, write=None):
# utf8
if write is None: # TODO: pretty sure this is right, just need DNS
data = self.session.readvar(0, ["peeradr"])
return ax.Varbind(ax.VALUE_OCTET_STR, oid, data["peeradr"])
- def cb_statusActiveOffset(self, oid, write=None):
+ def cbr_statusActiveOffset(self, oid, write=None):
# DisplayString
if write is None:
data = self.session.readvar(0, ["koffset"], raw=True)
@@ -384,24 +400,24 @@ class DataSource: # This may be broken up in future to be less NTP-specific
print("\n\n\nactive offset:", repr(data))
return ax.Varbind(ax.VALUE_OCTET_STR, oid, data)
- def cb_statusNumRefSources(self, oid, write=None):
+ def cbr_statusNumRefSources(self, oid, write=None):
# range of uint32
if write is None:
data = self.session.readstat()
return ax.Varbind(ax.VALUE_GAUGE32, oid, len(data))
- def cb_statusDispersion(self, oid, write=None):
+ def cbr_statusDispersion(self, oid, write=None):
# DisplayString
if write is None:
data = self.session.readvar(0, ["rootdisp"], raw=True)
return ax.Varbind(ax.VALUE_OCTET_STR, oid, data["rootdisp"][1])
- def cb_statusEntityUptime(self, oid, write=None): # DUMMY
+ def cbr_statusEntityUptime(self, oid, write=None): # DUMMY
# TimeTicks
if write is None:
return ax.Varbind(ax.VALUE_TIME_TICKS, oid, 8)
- def cb_statusDateTime(self, oid, write=None):
+ def cbr_statusDateTime(self, oid, write=None):
# NtpDateTime
if write is None:
data = self.session.readvar(0, ["reftime"])
@@ -410,122 +426,164 @@ class DataSource: # This may be broken up in future to be less NTP-specific
txt = "".join(txt.split(".")) # Strip '.'
return ax.Varbind(ax.VALUE_OCTET_STR, oid, txt)
- def cb_statusLeapSecond(self, oid, write=None): # DUMMY
+ def cbr_statusLeapSecond(self, oid, write=None): # DUMMY
# NtpDateTime
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "blah")
- def cb_statusLeapSecDirection(self, oid, write=None): # DUMMY
+ def cbr_statusLeapSecDirection(self, oid, write=None): # DUMMY
# range of int32
if write is None:
return ax.Varbind(ax.VALUE_INTEGER, oid, -1)
- def cb_statusInPkts(self, oid, write=None):
+ def cbr_statusInPkts(self, oid, write=None):
if write is None:
data = self.session.readvar(0, ["io_received"])
return ax.Varbind(ax.VALUE_COUNTER32, oid, data["io_received"])
- def cb_statusOutPkts(self, oid, write=None):
+ def cbr_statusOutPkts(self, oid, write=None):
if write is None:
data = self.session.readvar(0, ["io_sent"])
return ax.Varbind(ax.VALUE_COUNTER32, oid, data["io_sent"])
- def cb_statusBadVersion(self, oid, write=None):
+ def cbr_statusBadVersion(self, oid, write=None):
if write is None:
data = self.session.readvar(0, ["ss_oldver"])
return ax.Varbind(ax.VALUE_COUNTER32, oid, data["ss_oldver"])
- def cb_statusProtocolError(self, oid, write=None): # DUMMY
+ def cbr_statusProtocolError(self, oid, write=None): # DUMMY
if write is None:
return ax.Varbind(ax.VALUE_COUNTER32, oid, 400)
- def cb_statusNotifications(self, oid, write=None): # DUMMY
+ def cbr_statusNotifications(self, oid, write=None): # DUMMY
if write is None:
return ax.Varbind(ax.VALUE_COUNTER32, oid, 500)
##############################
- def cb_assocID(self, oid, write=None): # DUMMY
+ def cbr_assocID(self, oid, write=None): # DUMMY
if write is None:
return ax.Varbind(ax.VALUE_GAUGE32, oid, 1)
- def cb_assocName(self, oid, write=None): # DUMMY
+ def cbr_assocName(self, oid, write=None): # DUMMY
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "It")
- def cb_assocRefID(self, oid, write=None): # DUMMY
+ def cbr_assocRefID(self, oid, write=None): # DUMMY
# DisplayString
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "says;")
- def cb_assocAddrType(self, oid, write=None): # DUMMY
+ def cbr_assocAddrType(self, oid, write=None): # DUMMY
# InetAddressType (range of ints)
if write is None:
return ax.Varbind(ax.VALUE_INTEGER, oid, 3)
- def cb_assocAddr(self, oid, write=None): # DUMMY
+ def cbr_assocAddr(self, oid, write=None): # DUMMY
# InetAddress
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "\x01\x02\x03\x04")
- def cb_assocOffset(self, oid, write=None): # DUMMY
+ def cbr_assocOffset(self, oid, write=None): # DUMMY
# DisplayString
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "Would")
- def cb_assocStratum(self, oid, write=None): # DUMMY
+ def cbr_assocStratum(self, oid, write=None): # DUMMY
# NTPStratum
if write is None:
return ax.Varbind(ax.VALUE_GAUGE32, oid, 12)
- def cb_assocStatusJitter(self, oid, write=None): # DUMMY
+ def cbr_assocStatusJitter(self, oid, write=None): # DUMMY
# DisplayString
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "You")
- def cb_assocStatusDelay(self, oid, write=None): # DUMMY
+ def cbr_assocStatusDelay(self, oid, write=None): # DUMMY
# DisplayString
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "Kindly?")
- def cb_assocStatusDisp(self, oid, write=None): # DUMMY
+ def cbr_assocStatusDisp(self, oid, write=None): # DUMMY
# DisplayString
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "*thunk*")
- def cb_assocStatInPkts(self, oid, write=None): # DUMMY
+ def cbr_assocStatInPkts(self, oid, write=None): # DUMMY
if write is None:
return ax.Varbind(ax.VALUE_COUNTER32, oid, 2)
- def cb_assocStatOutPkts(self, oid, write=None): # DUMMY
+ def cbr_assocStatOutPkts(self, oid, write=None): # DUMMY
if write is None:
return ax.Varbind(ax.VALUE_COUNTER32, oid, 4)
- def cb_assocStatProtoErr(self, oid, write=None): # DUMMY
+ def cbr_assocStatProtoErr(self, oid, write=None): # DUMMY
if write is None:
return ax.Varbind(ax.VALUE_COUNTER32, oid, 8)
#########################
- def cb_entHeartbeatInterval(self, oid, write=None): # DUMMY
+ def cbr_entHeartbeatInterval(self, oid, write=None): # DUMMY
# uint32
if write is None:
return ax.Varbind(ax.VALUE_GAUGE32, oid, 16)
- def cb_entNotifBits(self, oid, write=None): # DUMMY
+ def cbr_entNotifBits(self, oid, write=None): # DUMMY
# BITS
if write is None:
- return ax.Varbind(ax.VALUE_OCTET_STR, oid, "\x10\x20")
+ data = ax.bools2Bits((self.notifyModeChange,
+ self.notifyStratumChange,
+ self.notifySyspeerChange,
+ self.notifyAddAssociation,
+ self.notifyRMAssociation,
+ self.notifyConfigChange,
+ self.notifyLeapSecondAnnounced,
+ self.notifyHeartbeat))
+ return ax.Varbind(ax.VALUE_OCTET_STR, oid, data)
##########################
- def cb_entNotifMessage(self, oid, write=None): # DUMMY
+ def cbr_entNotifMessage(self, oid, write=None): # DUMMY
# utf8str
if write is None:
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "jabber")
#########################
+ # =====================================
+ # Data write callbacks
+ # Returns an error value (or noError)
+ # Must check that the value is correct for the bind, this does not mean
+ # the type: the master agent handles that
+ # Actions: test, undo, commit, cleanup
+ # =====================================
+
+ def cbw_entNotifBits(self, action, varbind, oldData=None): # DUMMY
+ if action == "test":
+ return ax.ERR_NOERROR
+ elif action == "commit":
+ (self.notifyModeChange,
+ self.notifyStratumChange,
+ self.notifySyspeerChange,
+ self.notifyAddAssociation,
+ self.notifyRMAssociation,
+ self.notifyConfigChange,
+ self.notifyLeapSecondAnnounced,
+ self.notifyHeartbeat) = ax.bits2Bools(varbind.payload, 8)
+ return ax.ERR_NOERROR
+ elif action == "undo":
+ (self.notifyModeChange,
+ self.notifyStratumChange,
+ self.notifySyspeerChange,
+ self.notifyAddAssociation,
+ self.notifyRMAssociation,
+ self.notifyConfigChange,
+ self.notifyLeapSecondAnnounced,
+ self.notifyHeartbeat) = ax.bits2Bools(oldData, 8)
+ return ax.ERR_NOERROR
+ elif action == "cleanup":
+ pass
+
def dolog(text, level):
if debug >= level:
@@ -560,7 +618,11 @@ class PacketControl:
# indexed on pdu code
self.pduHandlers = {ax.PDU_GET: self.handle_GetPDU,
ax.PDU_GET_NEXT: self.handle_GetNextPDU,
- ax.PDU_GET_BULK: self.handle_GetBulkPDU}
+ ax.PDU_GET_BULK: self.handle_GetBulkPDU,
+ ax.PDU_TEST_SET: self.handle_TestSetPDU,
+ ax.PDU_COMMIT_SET: self.handle_CommitSetPDU,
+ ax.PDU_UNDO_SET: self.handle_UndoSetPDU,
+ ax.PDU_CLEANUP_SET: self.handle_CleanupSetPDU}
def mainloop(self, runforever):
if runforever:
@@ -693,11 +755,11 @@ class PacketControl:
binds = []
for oidr in packet.oidranges:
target = oidr.start
- oid, callback = self.database.getOID(target)
- if (oid != target) or (callback is None):
+ oid, reader, _ = self.database.getOID(target)
+ if (oid != target) or (reader is None):
binds.append(ax.Varbind(ax.VALUE_NO_SUCH_OBJECT, target))
else:
- binds.append(callback(oid))
+ binds.append(reader(oid))
# There should also be a situation that leads to noSuchInstance
# but I do not understand the requirements for that
# Need to implement genError
@@ -709,15 +771,11 @@ class PacketControl:
binds = []
for oidr in packet.oidranges:
oids = self.database.getOIDsInRange(oidr, True)
- print()
- print("GetNext:", oidr)
- print("result:", oids)
- print()
if len(oids) == 0: # Nothing found
binds.append(ax.Varbind(ax.VALUE_END_OF_MIB_VIEW, oidr.start))
else:
- oid, callback = oids[0]
- binds.append(callback(oid))
+ oid, reader, _ = oids[0]
+ binds.append(reader(oid))
# Need to implement genError
resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
packet.packetID, 0, ax.ERR_NOERROR, 0, binds)
@@ -733,20 +791,109 @@ class PacketControl:
if len(oids) == 0: # Nothing found
binds.append(ax.Varbind(ax.VALUE_END_OF_MIB_VIEW, oidr.start))
else:
- oid, callback = oids[0]
- binds.append(callback(oid))
+ oid, reader, _ = oids[0]
+ binds.append(reader(oid))
# Handle repeaters
for oidr in repeats:
oids = self.database.getOIDsInRange(oidr)
if len(oids) == 0: # Nothing found
binds.append(ax.Varbind(ax.VALUE_END_OF_MIB_VIEW, oidr.start))
else:
- for oid, callback in oids[:packet.maxReps]:
- binds.append(callback(oid))
+ for oid, reader, _ in oids[:packet.maxReps]:
+ binds.append(reader(oid))
resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
packet.packetID, 0, ax.ERR_NOERROR, 0, binds)
self.sendPacket(resp, False)
+ def handle_TestSetPDU(self, packet): # WIP
+ # Be advised: MOST OF THE VALIDATION IS DUMMY CODE OR DOESN'T EXIST
+ # According to the RFC this is one of the most demanding parts and
+ # *has* to be gotten right
+ if self.database.inSetP is True:
+ pass # Is this an error?
+ # if (inSetP is True) is an error these will go in an else block
+ self.database.inSetP = True
+ self.database.setVarbinds = []
+ self.database.setHandlers = []
+ self.database.setUndoData = []
+ error = None
+ clearables = []
+ for bindIndex in range(len(packet.varbinds)):
+ varbind = packet.varbinds[bindIndex]
+ # Find an OID, then validate it
+ oid, reader, writer = self.database.getOID(varbind.oid)
+ if oid is None: # doesn't exist, can we create it?
+ # Dummy, assume we can't create anything
+ error = ax.ERR_NO_ACCESS
+ break
+ elif writer is None: # exists, writing not implemented
+ error = ax.ERR_NOT_WRITABLE
+ break
+ # Ok, we have an existing or new OID, assemble the orders
+ # If we created a new bind undoData is None, must delete it
+ undoData = reader(oid)
+ error = writer("test", varbind)
+ if error != ax.ERR_NOERROR:
+ break
+ self.database.setVarbinds.append(varbind)
+ self.database.setHandlers.append(writer)
+ self.database.setUndoData.append(undoData)
+ if error != ax.ERR_NOERROR:
+ resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
+ packet.packetID, 0, error, bindIndex)
+ self.sendPacket(resp, False)
+ for i in range(bindIndex):
+ # Errored out, clear the sucessful ones
+ self.database.setHandlers[i]("clear",
+ self.database.setVarbinds[i])
+ self.database.inSetP = False
+ else:
+ resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
+ packet.packetID, 0, ax.ERR_NOERROR, 0)
+ self.sendPacket(resp, False)
+
+ def handle_CommitSetPDU(self, packet):
+ if self.database.inSetP is False:
+ pass # how to handle this?
+ varbinds = self.database.setVarbinds
+ handlers = self.database.setHandlers
+ for i in range(len(varbinds)):
+ error = handlers[i]("commit", varbinds[i])
+ if error != ax.ERR_NOERROR:
+ break
+ if error != ax.ERR_NOERROR:
+ resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
+ packet.packetID, 0, error, i)
+ else:
+ resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
+ packet.packetID, 0, ax.ERR_NOERROR, 0)
+ print("About to send:", resp)
+ self.sendPacket(resp, False)
+
+ def handle_UndoSetPDU(self, packet):
+ varbinds = self.database.setVarbinds
+ handlers = self.database.setHandlers
+ undoData = self.database.setUndoData
+ for i in range(len(varbinds)):
+ error = handlers[i]("undo", varbinds[i], undoData[i])
+ if error != ax.ERR_NOERROR:
+ break
+ if error != ax.ERR_NOERROR:
+ resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
+ packet.packetID, o, error, i)
+ else:
+ resp = ax.ResponsePDU(True, self.sessionID, packet.transactionID,
+ packet.packetID, 0, ax.ERR_NOERROR, 0)
+ self.sendPacket(resp, False)
+
+ def handle_CleanupSetPDU(self, packet):
+ varbinds = self.database.setVarbinds
+ handlers = self.database.setHandlers
+ undoData = self.database.setUndoData
+ for i in range(len(varbinds)):
+ handlers[i]("clean", varbinds[i])
+ self.database.inSetP = False
+
def mainloop():
dolog("initing loop\n", 1)
=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -1349,6 +1349,40 @@ def mibnode(reader, writer, static, subs):
"static": static, "subids": subs}
+def bits2Bools(bitString, cropLength=None):
+ bits = []
+ for octet in bitString:
+ octet = ord(octet)
+ bits.append(bool(octet & 0x80)) # Yes, these are backwards, that is
+ bits.append(bool(octet & 0x40)) # how SNMP wants them. It does make
+ bits.append(bool(octet & 0x20)) # sense if you think about it as a
+ bits.append(bool(octet & 0x10)) # stream of bits instead of octets.
+ bits.append(bool(octet & 0x08))
+ bits.append(bool(octet & 0x04)) # If you don't like it go yell at
+ bits.append(bool(octet & 0x02)) # the SNMP designers.
+ bits.append(bool(octet & 0x01))
+ if cropLength is not None: # used when a bitfield is not a multiple of 8
+ bits = bits[:cropLength]
+ return bits
+
+def bools2Bits(bits):
+ bitCounter = 0
+ octets = []
+ current = 0
+ for bit in bits:
+ current += (int(bit) << (7 - bitCounter))
+ bitCounter += 1
+ if bitCounter >= 8: # end of byte
+ bitCounter = 0
+ octets.append(chr(current))
+ current = 0
+ else:
+ if bitCounter != 0:
+ octets.append(chr(current))
+ octets = "".join(octets)
+ return octets
+
+
# Value types
VALUE_INTEGER = 2
VALUE_OCTET_STR = 4
@@ -1465,6 +1499,8 @@ ERR_WRONG_VALUE = 10
ERR_NO_CREATION = 11
ERR_INCONSISTENT_VALUE = 12
ERR_RESOURCE_UNAVAILABLE = 13
+ERR_COMMIT_FAILED = 14
+ERR_UNDO_FAILED = 15
ERR_NOT_WRITABLE = 17
ERR_INCONSISTENT_NAME = 18
definedErrors = (ERR_NOERROR, ERR_GENERR, ERR_NO_ACCESS, ERR_WRONG_TYPE,
=====================================
tests/pylib/test_agentx.py
=====================================
--- a/tests/pylib/test_agentx.py
+++ b/tests/pylib/test_agentx.py
@@ -2601,6 +2601,40 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
(x.OID((23, 1, 2)), None, None),
(x.OID((23, 2)), None, None)))
+ def test_bits2Bools(self):
+ bits2bool = ntp.agentx.bits2Bools
+
+ # Test empty
+ self.assertEqual(bits2bool(""), [])
+ # Test round bytes
+ self.assertEqual(bits2bool("\xFA\xCE"),
+ [True, True, True, True,
+ True, False, True, False,
+ True, True, False, False,
+ True, True, True, False])
+ # Test partial bytes
+ self.assertEqual(bits2bool("\xFF\xE1", 12), # The extra bit is to
+ [True, True, True, True, # confirm crop
+ True, True, True, True,
+ True, True, True, False])
+
+ def test_bools2bits(self):
+ bool2bits = ntp.agentx.bools2Bits
+
+ # Test empty
+ self.assertEqual(bool2bits([]), "")
+ # Test round bytes
+ self.assertEqual(bool2bits([True, True, True, True,
+ True, False, True, False,
+ True, True, False, False,
+ True, True, True, False]),
+ "\xFA\xCE")
+ # Test partial bytes
+ self.assertEqual(bool2bits([True, True, True, True,
+ True, True, True, True,
+ True, True, True, False]),
+ "\xFF\xE0")
+
if __name__ == "__main__":
unittest.main()
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/7caec8f0b5ad27d598dd193a39e77a7a03326242...06b1eaedb731ffa1ec64a1522b3e2eba9253423f
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/7caec8f0b5ad27d598dd193a39e77a7a03326242...06b1eaedb731ffa1ec64a1522b3e2eba9253423f
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20171112/629aac08/attachment.html>
More information about the vc
mailing list