[Git][NTPsec/ntpsec][master] 3 commits: Compacted duplicate code into dynamicCallbackPeerdata method.

Ian Bruene gitlab at mg.gitlab.com
Wed Feb 21 01:37:11 UTC 2018


Ian Bruene pushed to branch master at NTPsec / ntpsec


Commits:
285b969d by Ian Bruene at 2018-02-20T10:23:22-06:00
Compacted duplicate code into dynamicCallbackPeerdata method.

- - - - -
94133a06 by Ian Bruene at 2018-02-20T12:45:18-06:00
Added Ping and response checking functionality.

- - - - -
256b4e29 by Ian Bruene at 2018-02-20T19:35:27-06:00
PacketControl now breaks loop upon disconnection.

- - - - -


2 changed files:

- ntpclients/ntpsnmpd.py
- pylib/agentx.py


Changes:

=====================================
ntpclients/ntpsnmpd.py
=====================================
--- a/ntpclients/ntpsnmpd.py
+++ b/ntpclients/ntpsnmpd.py
@@ -555,14 +555,8 @@ class DataSource(ntp.agentx.MIBControl):
         return self.dynamicCallbackSkeleton(handler)
 
     def sub_assocName(self):
-        def handler(oid, associd):
-            pdata = self.misc_getPeerData()
-            if pdata is None:
-                return None
-            peername = pdata[associd]["srcadr"][1]
-            peername = ntp.util.canonicalize_dns(peername)
-            return ax.Varbind(ax.VALUE_OCTET_STR, oid, peername)
-        return self.dynamicCallbackSkeleton(handler)
+        return self.dynamicCallbackPeerdata("srcadr", True,
+                                            ax.VALUE_OCTET_STR)
 
     def sub_assocRefID(self):
         def handler(oid, associd):
@@ -647,40 +641,20 @@ class DataSource(ntp.agentx.MIBControl):
         return self.dynamicCallbackSkeleton(handler)
 
     def sub_assocStratum(self):
-        def handler(oid, associd):
-            pdata = self.misc_getPeerData()
-            if pdata is None:
-                return None
-            stratum = pdata[associd]["stratum"][0]
-            return ax.Varbind(ax.VALUE_GAUGE32, oid, stratum)
-        return self.dynamicCallbackSkeleton(handler)
+        return self.dynamicCallbackPeerdata("stratum", False,
+                                            ax.VALUE_GAUGE32)
 
     def sub_assocJitter(self):
-        def handler(oid, associd):
-            pdata = self.misc_getPeerData()
-            if pdata is None:
-                return None
-            jitter = pdata[associd]["jitter"][1]
-            return ax.Varbind(ax.VALUE_OCTET_STR, oid, jitter)
-        return self.dynamicCallbackSkeleton(handler)
+        return self.dynamicCallbackPeerdata("jitter", True,
+                                            ax.VALUE_OCTET_STR)
 
     def sub_assocDelay(self):
-        def handler(oid, associd):
-            pdata = self.misc_getPeerData()
-            if pdata is None:
-                return None
-            delay = pdata[associd]["delay"][1]
-            return ax.Varbind(ax.VALUE_OCTET_STR, oid, delay)
-        return self.dynamicCallbackSkeleton(handler)
+        return self.dynamicCallbackPeerdata("delay", True,
+                                            ax.VALUE_OCTET_STR)
 
     def sub_assocDispersion(self):
-        def handler(oid, associd):
-            pdata = self.misc_getPeerData()
-            if pdata is None:
-                return None
-            dispersion = pdata[associd]["rootdisp"][1]
-            return ax.Varbind(ax.VALUE_OCTET_STR, oid, dispersion)
-        return self.dynamicCallbackSkeleton(handler)
+        return self.dynamicCallbackPeerdata("rootdisp", True,
+                                            ax.VALUE_OCTET_STR)
 
     def sub_assocStatInPkts(self):
         def handler(oid, associd):
@@ -991,6 +965,16 @@ class DataSource(ntp.agentx.MIBControl):
         except ntp.packet.ControlException:
             return None
 
+    def dynamicCallbackPeerdata(self, variable, raw, valueType):
+        rawindex = 1 if raw is True else 0
+        def handler(oid, associd):
+            pdata = self.misc_getPeerData()
+            if pdata is None:
+                return None
+            value = pdata[associd][variable][rawindex]
+            return ax.Varbind(valueType, oid, value)
+        return self.dynamicCallbackSkeleton(handler)
+
     def dynamicCallbackSkeleton(self, handler):
         # Build a dynamic MIB tree, installing the provided handler in it
         def readCallback(oid):


=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -18,6 +18,7 @@ except ImportError as e:
 
 
 defaultTimeout = 30
+pingTime = 60
 
 
 class MIBControl:
@@ -139,6 +140,8 @@ class PacketControl:
         self.timeout = timeout
         self.sessionID = None  # need this for all packets
         self.highestTransactionID = 0  # used for exchanges we start
+        self.lastReception = None
+        self.stillConnected = False
         # indexed on pdu code
         self.pduHandlers = {ax.PDU_GET: self.handle_GetPDU,
                             ax.PDU_GET_NEXT: self.handle_GetNextPDU,
@@ -146,17 +149,21 @@ class PacketControl:
                             ax.PDU_TEST_SET: self.handle_TestSetPDU,
                             ax.PDU_COMMIT_SET: self.handle_CommitSetPDU,
                             ax.PDU_UNDO_SET: self.handle_UndoSetPDU,
-                            ax.PDU_CLEANUP_SET: self.handle_CleanupSetPDU}
+                            ax.PDU_CLEANUP_SET: self.handle_CleanupSetPDU,
+                            ax.PDU_RESPONSE: self.handle_ResponsePDU}
 
     def mainloop(self, runforever):
+        if self.stillConnected is not True:
+            return False
         if runforever:
-            while True:
+            while self.stillConnected is True:
                 self._doloop()
                 if self.loopCallback is not None:
                     self.loopCallback(self)
                 time.sleep(self.spinGap)
         else:
             self._doloop()
+        return self.stillConnected
 
     def _doloop(self):
         # loop body split out to separate the one-shot/run-forever switches
@@ -177,8 +184,13 @@ class PacketControl:
             else:
                 self.log("dropping packet type %i, not implemented\n" % ptype,
                          1)
+        self.checkResponses()
+        if self.lastReception is not None:
+            currentTime = time.time()
+            if (currentTime - self.lastReception) > pingTime:
+                self.sendPing()
 
-    def initNewSession(self): # WORKING
+    def initNewSession(self):
         self.log("init new session...\n", 1)
         # We already have a connection, need to open a session.
         openpkt = ax.OpenPDU(True, 23, 0, 0, self.timeout, (),
@@ -196,6 +208,7 @@ class PacketControl:
         self.sendPacket(register, False)
         self.log("Sent registration\n", 1)
         response = self.waitForResponse(register, True)
+        self.stillConnected = True
 
     def waitForResponse(self, opkt, ignoreSID=False):
         "Wait for a response to a specific packet, dropping everything else"
@@ -214,6 +227,16 @@ class PacketControl:
                     return packet
             time.sleep(self.spinGap)
 
+    def checkResponses(self):
+        "Check for expected responses that have timed out"
+        currentTime = time.time()
+        for key in self.packetLog.keys():
+            timeout, originalPkt, callback = self.packetLog[key]
+            if currentTime > timeout:
+                if callback is not None:
+                    callback(None, originalPkt)
+                del self.packetLog[key]
+
     def packetEater(self):
         "Slurps data from the input buffer and tries to parse packets from it"
         self.pollSocket()
@@ -241,7 +264,8 @@ class PacketControl:
                 #  whole buffer if too many failures in a row?
                 self.receivedData = e.remainingData
 
-    def sendPacket(self, packet, expectsReply):
+    def sendPacket(self, packet, expectsReply, replyTimeout=defaultTimeout,
+                   callback=None):
         encoded = packet.encode()
         self.log("\nsending packet: %s\n%s \n" % (repr(packet), repr(encoded)),
                  4)
@@ -250,7 +274,20 @@ class PacketControl:
             index = (packet.sessionID,
                      packet.transactionID,
                      packet.packetID)
-            self.packetLog[index] = packet
+            self.packetLog[index] = (replyTimeout, packet, callback)
+
+    def sendPing(self):
+        # DUMMY transactionID, does this count for Pings?
+        # DUMMY packetID, does this need to change? or does the pktID only
+        # count relative to a given transaction ID?
+        tid = self.highestTransactionID + 5  # +5 to avoid collisions
+        self.highestTransactionID = tid
+        pkt = ax.PingPDU(True, self.sessionID, tid, 1)
+        def callback(resp, orig):
+            if resp is None:  # Timed out. Need to restart the session.
+                # Er, problem: Can't handle reconnect from inside PacketControl
+                self.stillConnected = False
+        self.sendPacket(pkt, True)
 
     def sendNotify(self, varbinds, context=None):
         # DUMMY packetID, does this need to change? or does the pktID only
@@ -258,7 +295,7 @@ class PacketControl:
         tid = self.highestTransactionID + 5  # +5 to avoid collisions
         self.highestTransactionID = tid
         pkt = ax.NotifyPDU(True, self.sessionID, tid, 1, varbinds, context)
-        self.sendPacket(pkt, True)
+        self.sendPacket(pkt, True)  # TODO: callback
 
     def sendErrorResponse(self, errorHeader, errorType, errorIndex):
         err = ax.ResponsePDU(errorHeader["flags"]["bigEndian"],
@@ -280,6 +317,7 @@ class PacketControl:
             if len(newdata) > 0:
                 self.log("Received data: " + repr(newdata) + "\n", 4)
                 data += newdata
+                self.lastReception = time.time()
             else:
                 break
         self.recievedData += data
@@ -445,3 +483,15 @@ class PacketControl:
         for i in range(len(varbinds)):
             handlers[i]("clean", varbinds[i])
         self.database.inSetP = False
+
+    def handle_ResponsePDU(self, packet):
+        index = (packet.sessionID, packet.transactionID, packet.packetID)
+        if index in self.packetLog:
+            timeout, originalPkt, callback = self.packetLog[index]
+            del self.packetLog[index]
+            if callback is not None:
+                callback(packet, originalPkt)
+        else:
+            # Ok, response with no associated packet.
+            # Probably something that timed out.
+            pass



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/f3c7978b932249665d35f7d909dbc4f6decf3962...256b4e298c967f5314f47b59c3eb6abdae9bc82c

---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/f3c7978b932249665d35f7d909dbc4f6decf3962...256b4e298c967f5314f47b59c3eb6abdae9bc82c
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20180221/2e898d2c/attachment.html>


More information about the vc mailing list