[Git][NTPsec/ntpsec][master] 3 commits: Compacted duplicate code into dynamicCallbackPeerdata method.
Ian Bruene
gitlab at mg.gitlab.com
Wed Feb 21 01:37:11 UTC 2018
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
285b969d by Ian Bruene at 2018-02-20T10:23:22-06:00
Compacted duplicate code into dynamicCallbackPeerdata method.
- - - - -
94133a06 by Ian Bruene at 2018-02-20T12:45:18-06:00
Added Ping and response checking functionality.
- - - - -
256b4e29 by Ian Bruene at 2018-02-20T19:35:27-06:00
PacketControl now breaks loop upon disconnection.
- - - - -
2 changed files:
- ntpclients/ntpsnmpd.py
- pylib/agentx.py
Changes:
=====================================
ntpclients/ntpsnmpd.py
=====================================
--- a/ntpclients/ntpsnmpd.py
+++ b/ntpclients/ntpsnmpd.py
@@ -555,14 +555,8 @@ class DataSource(ntp.agentx.MIBControl):
return self.dynamicCallbackSkeleton(handler)
def sub_assocName(self):
- def handler(oid, associd):
- pdata = self.misc_getPeerData()
- if pdata is None:
- return None
- peername = pdata[associd]["srcadr"][1]
- peername = ntp.util.canonicalize_dns(peername)
- return ax.Varbind(ax.VALUE_OCTET_STR, oid, peername)
- return self.dynamicCallbackSkeleton(handler)
+ return self.dynamicCallbackPeerdata("srcadr", True,
+ ax.VALUE_OCTET_STR)
def sub_assocRefID(self):
def handler(oid, associd):
@@ -647,40 +641,20 @@ class DataSource(ntp.agentx.MIBControl):
return self.dynamicCallbackSkeleton(handler)
def sub_assocStratum(self):
- def handler(oid, associd):
- pdata = self.misc_getPeerData()
- if pdata is None:
- return None
- stratum = pdata[associd]["stratum"][0]
- return ax.Varbind(ax.VALUE_GAUGE32, oid, stratum)
- return self.dynamicCallbackSkeleton(handler)
+ return self.dynamicCallbackPeerdata("stratum", False,
+ ax.VALUE_GAUGE32)
def sub_assocJitter(self):
- def handler(oid, associd):
- pdata = self.misc_getPeerData()
- if pdata is None:
- return None
- jitter = pdata[associd]["jitter"][1]
- return ax.Varbind(ax.VALUE_OCTET_STR, oid, jitter)
- return self.dynamicCallbackSkeleton(handler)
+ return self.dynamicCallbackPeerdata("jitter", True,
+ ax.VALUE_OCTET_STR)
def sub_assocDelay(self):
- def handler(oid, associd):
- pdata = self.misc_getPeerData()
- if pdata is None:
- return None
- delay = pdata[associd]["delay"][1]
- return ax.Varbind(ax.VALUE_OCTET_STR, oid, delay)
- return self.dynamicCallbackSkeleton(handler)
+ return self.dynamicCallbackPeerdata("delay", True,
+ ax.VALUE_OCTET_STR)
def sub_assocDispersion(self):
- def handler(oid, associd):
- pdata = self.misc_getPeerData()
- if pdata is None:
- return None
- dispersion = pdata[associd]["rootdisp"][1]
- return ax.Varbind(ax.VALUE_OCTET_STR, oid, dispersion)
- return self.dynamicCallbackSkeleton(handler)
+ return self.dynamicCallbackPeerdata("rootdisp", True,
+ ax.VALUE_OCTET_STR)
def sub_assocStatInPkts(self):
def handler(oid, associd):
@@ -991,6 +965,16 @@ class DataSource(ntp.agentx.MIBControl):
except ntp.packet.ControlException:
return None
+ def dynamicCallbackPeerdata(self, variable, raw, valueType):
+ rawindex = 1 if raw is True else 0
+ def handler(oid, associd):
+ pdata = self.misc_getPeerData()
+ if pdata is None:
+ return None
+ value = pdata[associd][variable][rawindex]
+ return ax.Varbind(valueType, oid, value)
+ return self.dynamicCallbackSkeleton(handler)
+
def dynamicCallbackSkeleton(self, handler):
# Build a dynamic MIB tree, installing the provided handler in it
def readCallback(oid):
=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -18,6 +18,7 @@ except ImportError as e:
defaultTimeout = 30
+pingTime = 60
class MIBControl:
@@ -139,6 +140,8 @@ class PacketControl:
self.timeout = timeout
self.sessionID = None # need this for all packets
self.highestTransactionID = 0 # used for exchanges we start
+ self.lastReception = None
+ self.stillConnected = False
# indexed on pdu code
self.pduHandlers = {ax.PDU_GET: self.handle_GetPDU,
ax.PDU_GET_NEXT: self.handle_GetNextPDU,
@@ -146,17 +149,21 @@ class PacketControl:
ax.PDU_TEST_SET: self.handle_TestSetPDU,
ax.PDU_COMMIT_SET: self.handle_CommitSetPDU,
ax.PDU_UNDO_SET: self.handle_UndoSetPDU,
- ax.PDU_CLEANUP_SET: self.handle_CleanupSetPDU}
+ ax.PDU_CLEANUP_SET: self.handle_CleanupSetPDU,
+ ax.PDU_RESPONSE: self.handle_ResponsePDU}
def mainloop(self, runforever):
+ if self.stillConnected is not True:
+ return False
if runforever:
- while True:
+ while self.stillConnected is True:
self._doloop()
if self.loopCallback is not None:
self.loopCallback(self)
time.sleep(self.spinGap)
else:
self._doloop()
+ return self.stillConnected
def _doloop(self):
# loop body split out to separate the one-shot/run-forever switches
@@ -177,8 +184,13 @@ class PacketControl:
else:
self.log("dropping packet type %i, not implemented\n" % ptype,
1)
+ self.checkResponses()
+ if self.lastReception is not None:
+ currentTime = time.time()
+ if (currentTime - self.lastReception) > pingTime:
+ self.sendPing()
- def initNewSession(self): # WORKING
+ def initNewSession(self):
self.log("init new session...\n", 1)
# We already have a connection, need to open a session.
openpkt = ax.OpenPDU(True, 23, 0, 0, self.timeout, (),
@@ -196,6 +208,7 @@ class PacketControl:
self.sendPacket(register, False)
self.log("Sent registration\n", 1)
response = self.waitForResponse(register, True)
+ self.stillConnected = True
def waitForResponse(self, opkt, ignoreSID=False):
"Wait for a response to a specific packet, dropping everything else"
@@ -214,6 +227,16 @@ class PacketControl:
return packet
time.sleep(self.spinGap)
+ def checkResponses(self):
+ "Check for expected responses that have timed out"
+ currentTime = time.time()
+ for key in self.packetLog.keys():
+ timeout, originalPkt, callback = self.packetLog[key]
+ if currentTime > timeout:
+ if callback is not None:
+ callback(None, originalPkt)
+ del self.packetLog[key]
+
def packetEater(self):
"Slurps data from the input buffer and tries to parse packets from it"
self.pollSocket()
@@ -241,7 +264,8 @@ class PacketControl:
# whole buffer if too many failures in a row?
self.receivedData = e.remainingData
- def sendPacket(self, packet, expectsReply):
+ def sendPacket(self, packet, expectsReply, replyTimeout=defaultTimeout,
+ callback=None):
encoded = packet.encode()
self.log("\nsending packet: %s\n%s \n" % (repr(packet), repr(encoded)),
4)
@@ -250,7 +274,20 @@ class PacketControl:
index = (packet.sessionID,
packet.transactionID,
packet.packetID)
- self.packetLog[index] = packet
+ self.packetLog[index] = (replyTimeout, packet, callback)
+
+ def sendPing(self):
+ # DUMMY transactionID, does this count for Pings?
+ # DUMMY packetID, does this need to change? or does the pktID only
+ # count relative to a given transaction ID?
+ tid = self.highestTransactionID + 5 # +5 to avoid collisions
+ self.highestTransactionID = tid
+ pkt = ax.PingPDU(True, self.sessionID, tid, 1)
+ def callback(resp, orig):
+ if resp is None: # Timed out. Need to restart the session.
+ # Er, problem: Can't handle reconnect from inside PacketControl
+ self.stillConnected = False
+ self.sendPacket(pkt, True)
def sendNotify(self, varbinds, context=None):
# DUMMY packetID, does this need to change? or does the pktID only
@@ -258,7 +295,7 @@ class PacketControl:
tid = self.highestTransactionID + 5 # +5 to avoid collisions
self.highestTransactionID = tid
pkt = ax.NotifyPDU(True, self.sessionID, tid, 1, varbinds, context)
- self.sendPacket(pkt, True)
+ self.sendPacket(pkt, True) # TODO: callback
def sendErrorResponse(self, errorHeader, errorType, errorIndex):
err = ax.ResponsePDU(errorHeader["flags"]["bigEndian"],
@@ -280,6 +317,7 @@ class PacketControl:
if len(newdata) > 0:
self.log("Received data: " + repr(newdata) + "\n", 4)
data += newdata
+ self.lastReception = time.time()
else:
break
self.recievedData += data
@@ -445,3 +483,15 @@ class PacketControl:
for i in range(len(varbinds)):
handlers[i]("clean", varbinds[i])
self.database.inSetP = False
+
+ def handle_ResponsePDU(self, packet):
+ index = (packet.sessionID, packet.transactionID, packet.packetID)
+ if index in self.packetLog:
+ timeout, originalPkt, callback = self.packetLog[index]
+ del self.packetLog[index]
+ if callback is not None:
+ callback(packet, originalPkt)
+ else:
+ # Ok, response with no associated packet.
+ # Probably something that timed out.
+ pass
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/f3c7978b932249665d35f7d909dbc4f6decf3962...256b4e298c967f5314f47b59c3eb6abdae9bc82c
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/f3c7978b932249665d35f7d909dbc4f6decf3962...256b4e298c967f5314f47b59c3eb6abdae9bc82c
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20180221/2e898d2c/attachment.html>
More information about the vc
mailing list