[Git][NTPsec/ntpsec][master] 2 commits: Checkpoint befor implementing MRU-list cooked mode.

Eric S. Raymond gitlab at mg.gitlab.com
Wed Nov 2 14:33:27 UTC 2016


Eric S. Raymond pushed to branch master at NTPsec / ntpsec


Commits:
3ccc3c9c by Eric S. Raymond at 2016-11-01T18:51:22-04:00
Checkpoint befor implementing MRU-list cooked mode.

- - - - -
209dbc18 by Eric S. Raymond at 2016-11-02T10:32:12-04:00
In pyntpq, first successful retrieval of an MRU span.

Front end display code still needs to be written.

- - - - -


2 changed files:

- ntpq/pyntpq
- pylib/packet.py


Changes:

=====================================
ntpq/pyntpq
=====================================
--- a/ntpq/pyntpq
+++ b/ntpq/pyntpq
@@ -1,9 +1,12 @@
 #!/usr/bin/env python
 #
-# pyntpq - query an NTP server using mode 6 commands
+# ntpq - query an NTP server using mode 6 commands
 #
-# This is a direct translation of the ntpq C code, initially written to work as
-# much like it as possible. Eventually it will replace the C version.
+# Freely translated from the old C ntpq code by ESR.  The idea was to
+# cleanly separate ntpq-that-was into a thin front-end layer handling
+# mainly command interpretation and a back-end that presents the take
+# from ntpd as objects that can be re-used by other front
+# ends. Reusable pieces live in pylib.
 #
 # SPDX-License-Identifier: BSD-2-clause
 from __future__ import print_function, division
@@ -176,7 +179,7 @@ usage: help [ command ]
         try:
             self.peers = self.session.readstat()
         except Mode6Exception as e:
-            print(e.message)
+            self.warn(e.message + "\n")
             return False
         except IOError as e:
             print(e.strerror)
@@ -300,7 +303,7 @@ usage: help [ command ]
             try:
                 variables = self.session.readvar(peer.associd)
             except Mode6Exception as e:
-                print(e.message)
+                self.warn(e.message + "\n")
                 return
             except IOError as e:
                 print(e.strerror)
@@ -358,7 +361,8 @@ usage: help [ command ]
             return ()
         return (lo, hi)
 
-    def __printvars(self, variables, dtype, quiet):
+    def printvars(self, variables, dtype, quiet):
+        "Dump variables in raw (actually, semi-cooked) mode."
         if self.rawmode:
             if not quiet:
                 self.say("status=0x%04x,\n" % self.session.rstatus)
@@ -443,7 +447,7 @@ usage: help [ command ]
         try:
             variables = self.session.readvar(associd, varlist, op)
         except Mode6Exception as e:
-            print(e.message)
+            self.warn(e.message + "\n")
             return False
         except IOError as e:
             print(e.strerror)
@@ -461,7 +465,7 @@ usage: help [ command ]
             return True
 	if not quiet:
             self.say("associd=%d " % associd);
-	self.__printvars(variables, type, quiet)
+	self.printvars(variables, type, quiet)
 	return True;
 
     # Unexposed helper tables and functions end here
@@ -492,7 +496,7 @@ usage: timeout [ msec ]
         try:
             queried = self.session.readvar(associd, [v[0] for v in variables])
         except Mode6Exception as e:
-            print(e.message)
+            self.warn(e.message + "\n")
             return
         except IOError as e:
             print(e.strerror)
@@ -594,7 +598,7 @@ usage: poll [ n ] [ verbose ]
         try:
             self.session.password()
         except Mode6Exception as e:
-            print(e.message)
+            self.warn(e.message + "\n")
 
     def help_passwd(self):
         self.say("""\
@@ -1144,7 +1148,7 @@ usage: lopeers
         try:
             self.session.password()
         except Mode6Exception as e:
-            print(e.message)
+            self.warn(e.message + "\n")
             return
         if self.debug > 2:
             self.warn("In Config\nKeyword = :config\nCommand = %s\n" % line)
@@ -1162,7 +1166,7 @@ usage: lopeers
                 self.say("^\n")
             self.say(self.session.response + "\n")
         except Mode6Exception as e:
-            print(e.message)
+            self.warn(e.message + "\n")
 
     def help_config(self):
         self.say("""\
@@ -1187,7 +1191,20 @@ usage: config_from_file <configuration filename>
     def do_mrulist(self, line):
         "display the list of most recently seen source addresses, tags mincount=... resall=0x... resany=0x..."
 	self.say("Ctrl-C will stop MRU retrieval and display partial results.\n")
-        self.session.mrulist()
+        if self.rawmode:
+            mruhook = lambda v: self.printvars(variables=v,
+                                               dtype=TYPE_SYS,
+                                               quiet=True)
+        else:
+            mruhook = None
+        try:
+            self.session.mrulist(mruhook)
+        except Mode6Exception as e:
+            # Giving up after 8 restarts from the beginning.
+            # With high-traffic NTP servers, this can occur if the
+            # MRU list is limited to less than about 16 seconds' of
+            # entries.  See the 'mru' ntp.conf entry.
+            self.warn(e.message + "\n")
 
     def help_mrulist(self):
         self.say("""\
@@ -1200,7 +1217,7 @@ usage: mrulist [ tag=value ] [ tag=value ] [ tag=value ] [ tag=value ]
         try:
             self.session.password()
         except Mode6Exception as e:
-            print(e.message)
+            self.warn(e.message + "\n")
             return
         pass
 
@@ -1215,7 +1232,7 @@ usage: ifstats
         try:
             self.session.password()
         except Mode6Exception as e:
-            print(e.message)
+            self.warn(e.message + "\n")
             return
         pass
 


=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -1,6 +1,12 @@
 #
 # packet.py - definitions and classes for Python querying of NTP
 #
+# Freely translated from the old C ntpq code by ESR, with comments
+# preserved.  The idea was to cleanly separate ntpq-that-was into a
+# thin front-end layer handling mainly command interpretation and a
+# back-end that presents the take from ntpd as objects that can be
+# re-used by other front ends. Other reusable pieces live in util.py.
+#
 # This code should be Python2-vs-Python-3 agnostic.  Keep it that way!
 #
 # SPDX-License-Identifier: BSD-2-clause
@@ -244,6 +250,9 @@ SERR_NOKEY = "***Key not found"
 SERR_BADNONCE = "***Unexpected nonce response format"
 SERR_BADPARM = "***Unknown parameter %s"
 SERR_NOCRED = "***No credentials"
+SERR_SERVER = "***Server error code"
+SERR_STALL = "***No response, probably high-traffic server with low MRU limit"
+SERR_BADTAG = "***Bad MRU tag %s"
 
 def dump_hex_printable(xdata):
     "Dump a packet in hex, in a familiar hex format"
@@ -268,9 +277,42 @@ def dump_hex_printable(xdata):
         sys.stdout.write("\n")
         llen -= rowlen
 
+class MRUEntry:
+    "A traffic entry for an MRU list."
+    def __init__(self):
+        self.addr = None	# text of IPv4 or IPv6 address and port
+        self.last = None	# timestamp of last receipt
+        self.first = None	# timestamp of first receipt
+        self.ct = 0		# count of packets received
+        self.mv = None		# mode and version
+        self.rs = None		# restriction mask (RES_* bits)
+    def matches(self, other):
+        return self.last == other.last and self.addr == other.addr
+    def __repr__(self):
+        return "<MRUentry: " + repr(self.__dict__) + ">"
+
+class MRUSpan:
+    "A sequence of address-timespan pairs returned by ntpd in one response."
+    def __init__(self):
+        self.older = MRUEntry()	# If not None, an MRUEntry object
+        self.entries = []	# A list of MRUEntry objects
+        self.now = None		# server timestamp marking end of operation
+        self.last_newest = None	# timestamp same as last.# of final entry
+    def breadcrumb(self, i):
+        e = self.entries[i]
+        return ", addr.%d=%s, last.%d=%s" % (i, e.addr, i, e.last)
+
+    def is_complete(self):
+        "Is the server done shipping entries for this span?"
+        return self.last_newest is not None
+    def __repr__(self):
+        return "<MRUSpan: older=%s entries=%s now=%s last_newest=%s>" \
+               % (self.older, self.entries, self.now, self.last_newest)
+
 class Mode6Exception(BaseException):
-    def __init__(self, message):
+    def __init__(self, message, errorcode=0):
         self.message = message
+        self.errorcode = errorcode
 
 class Mode6Session:
     "A session to a host"
@@ -293,7 +335,7 @@ class Mode6Session:
         self.sequence = 0
         self.response = ""
         self.rstatus = 0
-        self.mrustats = []
+        self.mruspans = []
         self.ntpd_row_limit = Mode6Session.MRU_ROW_LIMIT
 
     def close(self):
@@ -483,7 +525,6 @@ class Mode6Session:
             #  on how long we're willing to spend here.
             bail += 1
             if bail >= (2*MAXFRAGS):
-                warn("too many packets in response; bailing out\n")
                 raise Mode6Exception(SERR_TOOMUCH)
 
             if len(fragments) == 0:
@@ -494,17 +535,14 @@ class Mode6Session:
             try:
                 (rd, _, _) = select.select([self.sock], [], [], tvo)
             except select.error as msg:
-                warn("select failed: %s\n" % msg[1])
                 raise Mode6Exception(SERR_SELECT)
 
             if not rd:
                 # Timed out.  Return what we have
                 if len(fragments) == 0:
                     if timeo:
-                        warn("%s: timed out, nothing received\n" % self.hostname)
                         raise Mode6Exception(SERR_TIMEOUT)
                 if timeo:
-                    warn("%s: timed out with incomplete data\n" % self.hostname)
                     if self.debug:
                         sys.stderr.write("ERR_INCOMPLETE: Received fragments:\n")
                         for (i, frag) in enumerate(fragments):
@@ -520,7 +558,6 @@ class Mode6Session:
             try:
                 rpkt.analyze(rawdata)
             except struct.error as reason:
-                warn("packet analysis failed: %s\n" % reason)
                 raise Mode6Exception(SERR_UNSPEC)
 
             if rpkt.version() > NTP_VERSION or rpkt.version() < NTP_OLDVERSION:
@@ -554,7 +591,7 @@ class Mode6Session:
                 if rpkt.more():
                     warn("Error %d received on non-final packet\n" %
                          rpkt.errcode())
-                return rpkt.errcode()
+                raise Mode6Exception(SERR_SERVER, rpkt.errcode())
 
             # Check the association ID to make sure it matches what we expect
             if rpkt.associd != associd:
@@ -628,9 +665,7 @@ class Mode6Session:
         retry = True
         while True:
             # Ship the request
-            res = self.sendrequest(opcode, associd, qdata, auth)
-            if res is not None:
-                return res
+            self.sendrequest(opcode, associd, qdata, auth)
             # Get the response.
             try:
                 res = self.getresponse(opcode, associd, not retry)
@@ -658,15 +693,10 @@ class Mode6Session:
         idlist.sort(key=lambda a: a.associd)
         return idlist
 
-    def readvar(self, associd=0, varlist=None, opcode=CTL_OP_READVAR):
-        "Read system vars from the host as a dict, or throw an exception."
-        if varlist == None:
-            qdata = ""
-        else:
-            qdata = ",".join(varlist)
-        self.doquery(opcode, associd=associd, qdata=qdata)
-        response = self.response
+    def __parse_varlist(self):
+        "Parse a response as a textual varlist."
         # Trim trailing NULs from the text
+        response = self.response
         while response.endswith(b"\x00"):
             response = response[:-1]
         response = response.rstrip()
@@ -692,6 +722,15 @@ class Mode6Session:
                     items.append((pair, ""))
         return collections.OrderedDict(items)
 
+    def readvar(self, associd=0, varlist=None, opcode=CTL_OP_READVAR):
+        "Read system vars from the host as a dict, or throw an exception."
+        if varlist == None:
+            qdata = ""
+        else:
+            qdata = ",".join(varlist)
+        self.doquery(opcode, associd=associd, qdata=qdata)
+        return self.__parse_varlist()
+
     def config(self, configtext):
         "Send configuration text to the daemon. Return True if accepted."
         self.doquery(opcode=CTL_OP_CONFIGURE, qdata=configtext, auth=True)
@@ -704,37 +743,177 @@ class Mode6Session:
         self.response = self.response.rstrip()
         return self.response == "Config Succeeded"
 
-    def mrulist(self, variables):
-        "Retrieve MRU list data"
+    def fetch_nonce(self):
         self.doquery(opcode=CTL_OP_REQ_NONCE)
         if not self.response.startswith("nonce="):
             raise Mode6Exception(SERR_BADNONCE)
-        nonce = self.response
+        return self.response.strip()
+
+    def mrulist(self, variables, rawhook=None):
+        "Retrieve MRU list data"
 	nonce_uses = 0
 	restarted_count = 0
-	mru_count = 0
-	c_mru_l_rc = False
-	list_complete = False
-	have_now = False
 	cap_frags = True
-	got = 0
-	ri = 0
+        warn = sys.stderr.write
+
+        if variables:
+            for k in list(variables.keys()):
+                if k in ("mincount", "resall", "resany",
+                         "maxlstint", "laddr", "sort"):
+                    continue
+                else:
+                    raise Mode6Exception(SERR_BADPARAM % k)
+        # FIXME: Do the reslist parameter mappings from the C version
+
+        nonce = self.fetch_nonce()
 
-        for k in list(variables.keys()):
-            if k in ("mincount","resall","resany","maxlstint","laddr","sort"):
-                continue
-            else:
-                raise Mode6Exception(SERR_BADPARAM % k)
 	mrulist_interrupted = False
         try:
-            next_report = time.time() + MRU_REPORT_SECS
+            # Form the initial request
+            #next_report = time.time() + MRU_REPORT_SECS
             limit = min(3 * MAXFRAGS, self.ntpd_row_limit)
             frags = MAXFRAGS;
-            req_buf = "nonce=%s, frags=%d" % (nonce, frags)
-            if varlist:
-                req_buf += ", " + ",".join([("%s=%s" % it) for it in list(variables.items())])
+            req_buf = "%s, frags=%d" % (nonce, frags)
+            if variables:
+                parms = ", " + ",".join([("%s=%s" % it) for it in list(variables.items())])
+            else:
+                parms = ""
+            req_buf += parms
+
             while True:
-                self.doquery(opcode=CTL_OP_REQ_NONCE, qdata=req_buf)
+                # Request additions to the MRU list
+                try:
+                    self.doquery(opcode=CTL_OP_READ_MRU, qdata=req_buf)
+                    recoverable_read_errors = False
+                except Mode6Exception as e:
+                    recoverable_read_errors = True
+                    if e.errorcode is None:
+                        raise e
+                    elif e.errorcode == CERR_UNKNOWNVAR:
+                        # None of the supplied prior entries match, so
+                        # toss them from our list and try again.
+                        if self.debug:
+                            warn("no overlap between %d prior entries and server MRU list\n" % len(self.mrustats))
+                        self.mrustats = []
+                        restarted_count += 1
+                        if restarted_count > 8:
+                            raise Mode6Exception(SERR_STALL)
+                        if self.debug:
+                            warn("--->   Restarting from the beginning, retry #%u\n" % restarted_count)
+                    elif e.errorcode == CERR_UNKNOWNVAR:
+                        e.message = "CERR_UNKNOWNVAR from ntpd but no priors given."
+                        raise e
+                    elif e.errorcode == CERR_BADVALUE:
+                        if cap_frags:
+                            cap_frags = False;
+                            if self.debug:
+                                warn("Reverted to row limit from fragments limit.\n");
+			else:
+                            # ntpd has lower cap on row limit
+                            self.ntpd_row_limit -= 1
+                            limit = min(limit, ntpd_row_limit)
+                            if self.debug:
+                                warn("Row limit reduced to %d following CERR_BADVALUE.\n" % limit)
+                    elif e.errorcode in (ERR_INCOMPLETE, ERR_TIMEOUT):
+			 # Reduce the number of rows/frags requested by
+			 # half to recover from lost response fragments.
+			if cap_frags:
+                            frags = max(2, frags / 2)
+                            if self.debug:
+                                warn("Frag limit reduced to %d following incomplete response.\n"% frags)
+			else:
+                            limit = max(2, limit / 2);
+                            if self.debug:
+                                warn("Row limit reduced to %d following incomplete response.\n" % limit)
+                    elif e.errorcode:
+                        raise e
+
+                # Comment from the C code:
+		# This is a cheap cop-out implementation of rawmode
+		# output for mrulist.  A better approach would be to
+		# dump similar output after the list is collected by
+		# ntpq with a continuous sequence of indexes.  This
+		# cheap approach has indexes resetting to zero for
+		# each query/response, and duplicates are not
+		# coalesced.
+                variables = self.__parse_varlist()
+                if rawhook:
+                    rawhook(variables)
+                    continue
+
+                # Deserialize the contents of one response
+                span = MRUSpan()
+                for (tag, val) in variables.items():
+                    if tag =="addr.older":
+                        if span.older.last is None:
+                            if self.debug:
+                                warn("addr.older %s before last.older\n" % val)
+                            return False
+                        span.older.addr = val
+                        continue
+                    elif tag =="last.older":
+                        span.older.addr = val
+                        continue
+                    elif tag =="now":
+                        span.now = val
+                        continue
+                    elif tag =="last.newest":
+                        span.last_newest = val
+                        continue
+                    for prefix in ("addr", "last", "ct", "mv", "rs"):
+                        if tag.startswith(prefix + "."):
+                            (member, idx) = tag.split(".")
+                            try:
+                                idx = int(idx)
+                            except ValueError:
+                                raise Mode6Exception(SERR_BADTAG % tag)
+                            if idx >= len(span.entries):
+                                span.entries.append(MRUEntry())
+                            setattr(span.entries[-1], prefix, val)
+
+                # Now try to glue it to the history
+                # FIXME: The following enables an eyeball check of the parse
+                print(repr(span))
+
+                # If we've seen the end sentinel on the span, break out
+                if span.is_complete():
+                    break
+
+		# Snooze for a bit between queries to let ntpd catch
+		# up with other duties.
+                time.sleep(0.05)
+
+		# If there were no errors, increase the number of rows
+		# to a maximum of 3 * MAXFRAGS (the most packets ntpq
+		# can handle in one response), on the assumption that
+		# no less than 3 rows fit in each packet, capped at 
+		# our best guess at the server's row limit.
+                if not recoverable_read_errors:
+                    if cap_frags:
+                        frags = min(MAXFRAGS, frags + 1)
+                    else:
+                        limit = min(3 * MAXFRAGS,
+                                    ntpd_row_limit,
+                                    max(limit + 1,
+                                        limit * 33 / 32))
+
+		# prepare next query with as many address and last-seen
+		# timestamps as will fit in a single packet.
+		req_buf = "%s, %s=%d%s" % \
+                          (nonce,
+                           "frags" if cap_frags else "limit",
+                           frags if cap_frags else limit,
+                           parms)
+		nonce_uses += 1
+                if nonce_uses >= 4:
+                    nonce = fetch_nonce()
+                nonce_uses = 0
+                for i in range(len(span.entries)):
+                    incr = span.breadcrumb(i)
+                    if len(req_buf) + len(incr) >= CTL_MAX_DATA_LEN:
+                        break
+                    else:
+                        req_buf += incr
         except KeyboardInterrupt:
             mrulist_interrupted = True
 



View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/aaa15862988fa24906be1e6103d1530caa4de2f3...209dbc185c8b62ad018c67ec6e912d2eb634f702
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.ntpsec.org/pipermail/vc/attachments/20161102/95c262a2/attachment.html>


More information about the vc mailing list