[Git][NTPsec/ntpsec][master] 2 commits: Checkpoint befor implementing MRU-list cooked mode.
Eric S. Raymond
gitlab at mg.gitlab.com
Wed Nov 2 14:33:27 UTC 2016
Eric S. Raymond pushed to branch master at NTPsec / ntpsec
Commits:
3ccc3c9c by Eric S. Raymond at 2016-11-01T18:51:22-04:00
Checkpoint befor implementing MRU-list cooked mode.
- - - - -
209dbc18 by Eric S. Raymond at 2016-11-02T10:32:12-04:00
In pyntpq, first successful retrieval of an MRU span.
Front end display code still needs to be written.
- - - - -
2 changed files:
- ntpq/pyntpq
- pylib/packet.py
Changes:
=====================================
ntpq/pyntpq
=====================================
--- a/ntpq/pyntpq
+++ b/ntpq/pyntpq
@@ -1,9 +1,12 @@
#!/usr/bin/env python
#
-# pyntpq - query an NTP server using mode 6 commands
+# ntpq - query an NTP server using mode 6 commands
#
-# This is a direct translation of the ntpq C code, initially written to work as
-# much like it as possible. Eventually it will replace the C version.
+# Freely translated from the old C ntpq code by ESR. The idea was to
+# cleanly separate ntpq-that-was into a thin front-end layer handling
+# mainly command interpretation and a back-end that presents the take
+# from ntpd as objects that can be re-used by other front
+# ends. Reusable pieces live in pylib.
#
# SPDX-License-Identifier: BSD-2-clause
from __future__ import print_function, division
@@ -176,7 +179,7 @@ usage: help [ command ]
try:
self.peers = self.session.readstat()
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
return False
except IOError as e:
print(e.strerror)
@@ -300,7 +303,7 @@ usage: help [ command ]
try:
variables = self.session.readvar(peer.associd)
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
return
except IOError as e:
print(e.strerror)
@@ -358,7 +361,8 @@ usage: help [ command ]
return ()
return (lo, hi)
- def __printvars(self, variables, dtype, quiet):
+ def printvars(self, variables, dtype, quiet):
+ "Dump variables in raw (actually, semi-cooked) mode."
if self.rawmode:
if not quiet:
self.say("status=0x%04x,\n" % self.session.rstatus)
@@ -443,7 +447,7 @@ usage: help [ command ]
try:
variables = self.session.readvar(associd, varlist, op)
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
return False
except IOError as e:
print(e.strerror)
@@ -461,7 +465,7 @@ usage: help [ command ]
return True
if not quiet:
self.say("associd=%d " % associd);
- self.__printvars(variables, type, quiet)
+ self.printvars(variables, type, quiet)
return True;
# Unexposed helper tables and functions end here
@@ -492,7 +496,7 @@ usage: timeout [ msec ]
try:
queried = self.session.readvar(associd, [v[0] for v in variables])
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
return
except IOError as e:
print(e.strerror)
@@ -594,7 +598,7 @@ usage: poll [ n ] [ verbose ]
try:
self.session.password()
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
def help_passwd(self):
self.say("""\
@@ -1144,7 +1148,7 @@ usage: lopeers
try:
self.session.password()
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
return
if self.debug > 2:
self.warn("In Config\nKeyword = :config\nCommand = %s\n" % line)
@@ -1162,7 +1166,7 @@ usage: lopeers
self.say("^\n")
self.say(self.session.response + "\n")
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
def help_config(self):
self.say("""\
@@ -1187,7 +1191,20 @@ usage: config_from_file <configuration filename>
def do_mrulist(self, line):
"display the list of most recently seen source addresses, tags mincount=... resall=0x... resany=0x..."
self.say("Ctrl-C will stop MRU retrieval and display partial results.\n")
- self.session.mrulist()
+ if self.rawmode:
+ mruhook = lambda v: self.printvars(variables=v,
+ dtype=TYPE_SYS,
+ quiet=True)
+ else:
+ mruhook = None
+ try:
+ self.session.mrulist(mruhook)
+ except Mode6Exception as e:
+ # Giving up after 8 restarts from the beginning.
+ # With high-traffic NTP servers, this can occur if the
+ # MRU list is limited to less than about 16 seconds' of
+ # entries. See the 'mru' ntp.conf entry.
+ self.warn(e.message + "\n")
def help_mrulist(self):
self.say("""\
@@ -1200,7 +1217,7 @@ usage: mrulist [ tag=value ] [ tag=value ] [ tag=value ] [ tag=value ]
try:
self.session.password()
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
return
pass
@@ -1215,7 +1232,7 @@ usage: ifstats
try:
self.session.password()
except Mode6Exception as e:
- print(e.message)
+ self.warn(e.message + "\n")
return
pass
=====================================
pylib/packet.py
=====================================
--- a/pylib/packet.py
+++ b/pylib/packet.py
@@ -1,6 +1,12 @@
#
# packet.py - definitions and classes for Python querying of NTP
#
+# Freely translated from the old C ntpq code by ESR, with comments
+# preserved. The idea was to cleanly separate ntpq-that-was into a
+# thin front-end layer handling mainly command interpretation and a
+# back-end that presents the take from ntpd as objects that can be
+# re-used by other front ends. Other reusable pieces live in util.py.
+#
# This code should be Python2-vs-Python-3 agnostic. Keep it that way!
#
# SPDX-License-Identifier: BSD-2-clause
@@ -244,6 +250,9 @@ SERR_NOKEY = "***Key not found"
SERR_BADNONCE = "***Unexpected nonce response format"
SERR_BADPARM = "***Unknown parameter %s"
SERR_NOCRED = "***No credentials"
+SERR_SERVER = "***Server error code"
+SERR_STALL = "***No response, probably high-traffic server with low MRU limit"
+SERR_BADTAG = "***Bad MRU tag %s"
def dump_hex_printable(xdata):
"Dump a packet in hex, in a familiar hex format"
@@ -268,9 +277,42 @@ def dump_hex_printable(xdata):
sys.stdout.write("\n")
llen -= rowlen
+class MRUEntry:
+ "A traffic entry for an MRU list."
+ def __init__(self):
+ self.addr = None # text of IPv4 or IPv6 address and port
+ self.last = None # timestamp of last receipt
+ self.first = None # timestamp of first receipt
+ self.ct = 0 # count of packets received
+ self.mv = None # mode and version
+ self.rs = None # restriction mask (RES_* bits)
+ def matches(self, other):
+ return self.last == other.last and self.addr == other.addr
+ def __repr__(self):
+ return "<MRUentry: " + repr(self.__dict__) + ">"
+
+class MRUSpan:
+ "A sequence of address-timespan pairs returned by ntpd in one response."
+ def __init__(self):
+ self.older = MRUEntry() # If not None, an MRUEntry object
+ self.entries = [] # A list of MRUEntry objects
+ self.now = None # server timestamp marking end of operation
+ self.last_newest = None # timestamp same as last.# of final entry
+ def breadcrumb(self, i):
+ e = self.entries[i]
+ return ", addr.%d=%s, last.%d=%s" % (i, e.addr, i, e.last)
+
+ def is_complete(self):
+ "Is the server done shipping entries for this span?"
+ return self.last_newest is not None
+ def __repr__(self):
+ return "<MRUSpan: older=%s entries=%s now=%s last_newest=%s>" \
+ % (self.older, self.entries, self.now, self.last_newest)
+
class Mode6Exception(BaseException):
- def __init__(self, message):
+ def __init__(self, message, errorcode=0):
self.message = message
+ self.errorcode = errorcode
class Mode6Session:
"A session to a host"
@@ -293,7 +335,7 @@ class Mode6Session:
self.sequence = 0
self.response = ""
self.rstatus = 0
- self.mrustats = []
+ self.mruspans = []
self.ntpd_row_limit = Mode6Session.MRU_ROW_LIMIT
def close(self):
@@ -483,7 +525,6 @@ class Mode6Session:
# on how long we're willing to spend here.
bail += 1
if bail >= (2*MAXFRAGS):
- warn("too many packets in response; bailing out\n")
raise Mode6Exception(SERR_TOOMUCH)
if len(fragments) == 0:
@@ -494,17 +535,14 @@ class Mode6Session:
try:
(rd, _, _) = select.select([self.sock], [], [], tvo)
except select.error as msg:
- warn("select failed: %s\n" % msg[1])
raise Mode6Exception(SERR_SELECT)
if not rd:
# Timed out. Return what we have
if len(fragments) == 0:
if timeo:
- warn("%s: timed out, nothing received\n" % self.hostname)
raise Mode6Exception(SERR_TIMEOUT)
if timeo:
- warn("%s: timed out with incomplete data\n" % self.hostname)
if self.debug:
sys.stderr.write("ERR_INCOMPLETE: Received fragments:\n")
for (i, frag) in enumerate(fragments):
@@ -520,7 +558,6 @@ class Mode6Session:
try:
rpkt.analyze(rawdata)
except struct.error as reason:
- warn("packet analysis failed: %s\n" % reason)
raise Mode6Exception(SERR_UNSPEC)
if rpkt.version() > NTP_VERSION or rpkt.version() < NTP_OLDVERSION:
@@ -554,7 +591,7 @@ class Mode6Session:
if rpkt.more():
warn("Error %d received on non-final packet\n" %
rpkt.errcode())
- return rpkt.errcode()
+ raise Mode6Exception(SERR_SERVER, rpkt.errcode())
# Check the association ID to make sure it matches what we expect
if rpkt.associd != associd:
@@ -628,9 +665,7 @@ class Mode6Session:
retry = True
while True:
# Ship the request
- res = self.sendrequest(opcode, associd, qdata, auth)
- if res is not None:
- return res
+ self.sendrequest(opcode, associd, qdata, auth)
# Get the response.
try:
res = self.getresponse(opcode, associd, not retry)
@@ -658,15 +693,10 @@ class Mode6Session:
idlist.sort(key=lambda a: a.associd)
return idlist
- def readvar(self, associd=0, varlist=None, opcode=CTL_OP_READVAR):
- "Read system vars from the host as a dict, or throw an exception."
- if varlist == None:
- qdata = ""
- else:
- qdata = ",".join(varlist)
- self.doquery(opcode, associd=associd, qdata=qdata)
- response = self.response
+ def __parse_varlist(self):
+ "Parse a response as a textual varlist."
# Trim trailing NULs from the text
+ response = self.response
while response.endswith(b"\x00"):
response = response[:-1]
response = response.rstrip()
@@ -692,6 +722,15 @@ class Mode6Session:
items.append((pair, ""))
return collections.OrderedDict(items)
+ def readvar(self, associd=0, varlist=None, opcode=CTL_OP_READVAR):
+ "Read system vars from the host as a dict, or throw an exception."
+ if varlist == None:
+ qdata = ""
+ else:
+ qdata = ",".join(varlist)
+ self.doquery(opcode, associd=associd, qdata=qdata)
+ return self.__parse_varlist()
+
def config(self, configtext):
"Send configuration text to the daemon. Return True if accepted."
self.doquery(opcode=CTL_OP_CONFIGURE, qdata=configtext, auth=True)
@@ -704,37 +743,177 @@ class Mode6Session:
self.response = self.response.rstrip()
return self.response == "Config Succeeded"
- def mrulist(self, variables):
- "Retrieve MRU list data"
+ def fetch_nonce(self):
self.doquery(opcode=CTL_OP_REQ_NONCE)
if not self.response.startswith("nonce="):
raise Mode6Exception(SERR_BADNONCE)
- nonce = self.response
+ return self.response.strip()
+
+ def mrulist(self, variables, rawhook=None):
+ "Retrieve MRU list data"
nonce_uses = 0
restarted_count = 0
- mru_count = 0
- c_mru_l_rc = False
- list_complete = False
- have_now = False
cap_frags = True
- got = 0
- ri = 0
+ warn = sys.stderr.write
+
+ if variables:
+ for k in list(variables.keys()):
+ if k in ("mincount", "resall", "resany",
+ "maxlstint", "laddr", "sort"):
+ continue
+ else:
+ raise Mode6Exception(SERR_BADPARAM % k)
+ # FIXME: Do the reslist parameter mappings from the C version
+
+ nonce = self.fetch_nonce()
- for k in list(variables.keys()):
- if k in ("mincount","resall","resany","maxlstint","laddr","sort"):
- continue
- else:
- raise Mode6Exception(SERR_BADPARAM % k)
mrulist_interrupted = False
try:
- next_report = time.time() + MRU_REPORT_SECS
+ # Form the initial request
+ #next_report = time.time() + MRU_REPORT_SECS
limit = min(3 * MAXFRAGS, self.ntpd_row_limit)
frags = MAXFRAGS;
- req_buf = "nonce=%s, frags=%d" % (nonce, frags)
- if varlist:
- req_buf += ", " + ",".join([("%s=%s" % it) for it in list(variables.items())])
+ req_buf = "%s, frags=%d" % (nonce, frags)
+ if variables:
+ parms = ", " + ",".join([("%s=%s" % it) for it in list(variables.items())])
+ else:
+ parms = ""
+ req_buf += parms
+
while True:
- self.doquery(opcode=CTL_OP_REQ_NONCE, qdata=req_buf)
+ # Request additions to the MRU list
+ try:
+ self.doquery(opcode=CTL_OP_READ_MRU, qdata=req_buf)
+ recoverable_read_errors = False
+ except Mode6Exception as e:
+ recoverable_read_errors = True
+ if e.errorcode is None:
+ raise e
+ elif e.errorcode == CERR_UNKNOWNVAR:
+ # None of the supplied prior entries match, so
+ # toss them from our list and try again.
+ if self.debug:
+ warn("no overlap between %d prior entries and server MRU list\n" % len(self.mrustats))
+ self.mrustats = []
+ restarted_count += 1
+ if restarted_count > 8:
+ raise Mode6Exception(SERR_STALL)
+ if self.debug:
+ warn("---> Restarting from the beginning, retry #%u\n" % restarted_count)
+ elif e.errorcode == CERR_UNKNOWNVAR:
+ e.message = "CERR_UNKNOWNVAR from ntpd but no priors given."
+ raise e
+ elif e.errorcode == CERR_BADVALUE:
+ if cap_frags:
+ cap_frags = False;
+ if self.debug:
+ warn("Reverted to row limit from fragments limit.\n");
+ else:
+ # ntpd has lower cap on row limit
+ self.ntpd_row_limit -= 1
+ limit = min(limit, ntpd_row_limit)
+ if self.debug:
+ warn("Row limit reduced to %d following CERR_BADVALUE.\n" % limit)
+ elif e.errorcode in (ERR_INCOMPLETE, ERR_TIMEOUT):
+ # Reduce the number of rows/frags requested by
+ # half to recover from lost response fragments.
+ if cap_frags:
+ frags = max(2, frags / 2)
+ if self.debug:
+ warn("Frag limit reduced to %d following incomplete response.\n"% frags)
+ else:
+ limit = max(2, limit / 2);
+ if self.debug:
+ warn("Row limit reduced to %d following incomplete response.\n" % limit)
+ elif e.errorcode:
+ raise e
+
+ # Comment from the C code:
+ # This is a cheap cop-out implementation of rawmode
+ # output for mrulist. A better approach would be to
+ # dump similar output after the list is collected by
+ # ntpq with a continuous sequence of indexes. This
+ # cheap approach has indexes resetting to zero for
+ # each query/response, and duplicates are not
+ # coalesced.
+ variables = self.__parse_varlist()
+ if rawhook:
+ rawhook(variables)
+ continue
+
+ # Deserialize the contents of one response
+ span = MRUSpan()
+ for (tag, val) in variables.items():
+ if tag =="addr.older":
+ if span.older.last is None:
+ if self.debug:
+ warn("addr.older %s before last.older\n" % val)
+ return False
+ span.older.addr = val
+ continue
+ elif tag =="last.older":
+ span.older.addr = val
+ continue
+ elif tag =="now":
+ span.now = val
+ continue
+ elif tag =="last.newest":
+ span.last_newest = val
+ continue
+ for prefix in ("addr", "last", "ct", "mv", "rs"):
+ if tag.startswith(prefix + "."):
+ (member, idx) = tag.split(".")
+ try:
+ idx = int(idx)
+ except ValueError:
+ raise Mode6Exception(SERR_BADTAG % tag)
+ if idx >= len(span.entries):
+ span.entries.append(MRUEntry())
+ setattr(span.entries[-1], prefix, val)
+
+ # Now try to glue it to the history
+ # FIXME: The following enables an eyeball check of the parse
+ print(repr(span))
+
+ # If we've seen the end sentinel on the span, break out
+ if span.is_complete():
+ break
+
+ # Snooze for a bit between queries to let ntpd catch
+ # up with other duties.
+ time.sleep(0.05)
+
+ # If there were no errors, increase the number of rows
+ # to a maximum of 3 * MAXFRAGS (the most packets ntpq
+ # can handle in one response), on the assumption that
+ # no less than 3 rows fit in each packet, capped at
+ # our best guess at the server's row limit.
+ if not recoverable_read_errors:
+ if cap_frags:
+ frags = min(MAXFRAGS, frags + 1)
+ else:
+ limit = min(3 * MAXFRAGS,
+ ntpd_row_limit,
+ max(limit + 1,
+ limit * 33 / 32))
+
+ # prepare next query with as many address and last-seen
+ # timestamps as will fit in a single packet.
+ req_buf = "%s, %s=%d%s" % \
+ (nonce,
+ "frags" if cap_frags else "limit",
+ frags if cap_frags else limit,
+ parms)
+ nonce_uses += 1
+ if nonce_uses >= 4:
+ nonce = fetch_nonce()
+ nonce_uses = 0
+ for i in range(len(span.entries)):
+ incr = span.breadcrumb(i)
+ if len(req_buf) + len(incr) >= CTL_MAX_DATA_LEN:
+ break
+ else:
+ req_buf += incr
except KeyboardInterrupt:
mrulist_interrupted = True
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/aaa15862988fa24906be1e6103d1530caa4de2f3...209dbc185c8b62ad018c67ec6e912d2eb634f702
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.ntpsec.org/pipermail/vc/attachments/20161102/95c262a2/attachment.html>
More information about the vc
mailing list