[Git][NTPsec/ntpsec][ntp.util-codacy] Codacy Python codestyle: solve some issues with the worst 4-6
James Browning
gitlab at mg.gitlab.com
Wed Oct 14 14:34:48 UTC 2020
James Browning pushed to branch ntp.util-codacy at NTPsec / ntpsec
Commits:
d3639ebb by James Browning at 2020-10-14T07:30:48-07:00
Codacy Python codestyle: solve some issues with the worst 4-6
- - - - -
3 changed files:
- ntpclients/ntpdig.py
- ntpclients/ntpmon.py
- ntpclients/ntpsnmpd.py
Changes:
=====================================
ntpclients/ntpdig.py
=====================================
@@ -55,8 +55,9 @@ except ImportError as e:
# The one new option in this version is -p, borrowed from ntpdate.
+
def read_append(s, packets, packet, sockaddr):
- d, a = s.recvfrom(1024)
+ d, _ = s.recvfrom(1024)
if debug >= 2:
ntp.packet.dump_hex_printable(d)
if credentials:
@@ -70,10 +71,10 @@ def read_append(s, packets, packet, sockaddr):
elif debug:
log("MAC verification on reply from %s succeeded"
% sockaddr[0])
- pkt = ntp.packet.SyncPacket(d)
- pkt.hostname = server
- pkt.resolved = sockaddr[0]
- packets.append(pkt)
+ pkt2 = ntp.packet.SyncPacket(d)
+ pkt2.hostname = server
+ pkt2.resolved = sockaddr[0]
+ packets.append(pkt2)
return packets
@@ -92,13 +93,13 @@ def queryhost(server, concurrent, timeout=5, port=123):
request.transmit_timestamp = ntp.packet.SyncPacket.posix_to_ntp(
time.time())
packet = request.flatten()
- needgap = (len(iptuples) > 1) and (gap > 0)
- firstloop = True
- for (family, socktype, proto, canonname, sockaddr) in iptuples:
- if needgap and not firstloop:
+ needgap_i = (len(iptuples) > 1) and (gap > 0)
+ firstloop_i = True
+ for (family, socktype, _, canonname, sockaddr) in iptuples:
+ if needgap_i and not firstloop_i:
time.sleep(gap)
- if firstloop:
- firstloop = False
+ if firstloop_i:
+ firstloop_i = False
if debug:
log("querying %s (%s)" % (sockaddr[0], server))
s = socket.socket(family, socktype)
@@ -110,8 +111,7 @@ def queryhost(server, concurrent, timeout=5, port=123):
if mac is None:
log("MAC generation failed while querying %s" % server)
raise SystemExit(1)
- else:
- packet += mac
+ packet += mac
try:
s.sendto(packet, sockaddr)
except socket.error as e:
@@ -151,33 +151,34 @@ def clock_select(packets):
#
filtered = []
for response in packets:
- def drop(msg):
- log("%s: Response dropped: %s" % (response.hostname, msg))
+ _ = response.hostname
+ def drop(host, msg):
+ log("%s: Response dropped: %s" % (host, msg))
if response.stratum > NTP_INFIN:
- drop("stratum too high")
+ drop(_, "stratum too high")
continue
if response.version() < ntp.magic.NTP_OLDVERSION:
- drop("response version %d is too old" % response.version())
+ drop(_, "response version %d is too old" % response.version())
continue
if response.mode() != ntp.magic.MODE_SERVER:
- drop("unexpected response mode %d" % response.mode())
+ drop(_, "unexpected response mode %d" % response.mode())
continue
if response.version() > ntp.magic.NTP_VERSION:
- drop("response version %d is too new" % response.version())
+ drop(_, "response version %d is too new" % response.version())
continue
if response.stratum == 0:
# FIXME: Do some kind of semi-useful diagnostic dump here
- drop("stratum 0, probable KOD packet")
+ drop(_, "stratum 0, probable '%s' KOD packet" % response.refid)
continue
if response.leap() == "unsync":
- drop("leap not in sync")
+ drop(_, "leap not in sync")
continue
if not response.trusted:
- drop("request was authenticated but server is untrusted")
+ drop(_, "request was authenticated but server is untrusted")
continue
# Bypass this test if we ever support broadcast-client mode again
if response.origin_timestamp == 0:
- drop("unexpected response timestamp")
+ drop(_, "unexpected response timestamp")
continue
filtered.append(response)
@@ -399,22 +400,22 @@ if __name__ == '__main__':
returned = []
needgap = (samples > 1) and (gap > 0)
firstloop = True
- for s in range(samples):
+ for _ in range(samples):
if needgap and not firstloop:
time.sleep(gap)
if firstloop:
firstloop = False
- for server in concurrent_hosts:
+ for oserver in concurrent_hosts:
try:
- returned += queryhost(server=server,
+ returned += queryhost(server=oserver,
concurrent=True,
timeout=timeout)
except ntp.packet.SyncException as e:
log(str(e))
continue
- for server in arguments:
+ for oserver in arguments:
try:
- returned += queryhost(server=server,
+ returned += queryhost(server=oserver,
concurrent=False,
timeout=timeout)
except ntp.packet.SyncException as e:
@@ -451,10 +452,7 @@ if __name__ == '__main__':
rc = ntp.ntpc.step_systime(offset)
elif slew:
rc = ntp.ntpc.adj_systime(offset)
- if rc:
- raise SystemExit(0)
- else:
- raise SystemExit(1)
+ raise SystemExit(0 if rc else 1)
else:
log("no eligible servers")
raise SystemExit(1)
=====================================
ntpclients/ntpmon.py
=====================================
@@ -74,7 +74,7 @@ def iso8601(t):
return time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(t))
-def statline(_peerlist, _mrulist, nyquist):
+def statline(_peerlist, _mrulist, nyquist2):
"""Generate a status line."""
# We don't use stdversion here because the presence of a date is confusing
leader = sysvars['version'][0]
@@ -82,24 +82,24 @@ def statline(_peerlist, _mrulist, nyquist):
if span.entries:
trailer = "Updated: %s (%s)" \
% (iso8601(int(ntp.ntpc.lfptofloat(span.entries[0].last))),
- ntp.util.PeerSummary.prettyinterval(nyquist))
+ ntp.util.PeerSummary.prettyinterval(nyquist2))
else:
trailer = ""
spacer = ((peer_report.termwidth - 1) - len(leader) - len(trailer)) * " "
return leader + spacer + trailer
-def peer_detail(variables, showunits=False):
+def peer_detail(variables2, showunits=False):
"""Show the things a peer summary doesn't, cooked slightly differently."""
# All of an rv display except refid, reach, delay, offset, jitter.
# One of the goals here is to emit field values at fixed positions
# on the 2D display, so that changes in the details are easier to spot.
vcopy = {}
vcopyraw = {}
- vcopy.update(variables)
+ vcopy.update(variables2)
width = ntp.util.termsize().width - 2
# Need to separate the casted from the raw
- for key in vcopy.keys():
+ for key in vcopy:
vcopyraw[key] = vcopy[key][1]
vcopy[key] = vcopy[key][0]
vcopy["leap"] = ("no-leap", "add-leap", "del-leap",
@@ -154,8 +154,8 @@ filtdelay = %(filtdelay)s
filtoffset = %(filtoffset)s
filtdisp = %(filtdisp)s
"""
- str = peerfmt % vcopy
- return str.expandtabs()
+ out = peerfmt % vcopy
+ return out.expandtabs()
class Fatal(Exception):
@@ -409,9 +409,9 @@ if __name__ == '__main__':
try:
helpmode = False
key = stdscr.getkey()
- if key == 'q' or key == 'x':
+ if key in ('q', 'x'):
raise SystemExit(0)
- elif key == 'a':
+ if key == 'a':
peer_report.displaymode = 'apeers'
elif key == 'd':
if not selectmode:
@@ -453,11 +453,12 @@ if __name__ == '__main__':
peer_report.displaymode = 'opeers'
else:
peer_report.displaymode = 'peers'
- elif key == 'j' or key == "KEY_DOWN":
+
+ elif key in ('j', "KEY_DOWN"):
if showpeers:
selected += 1
selected %= len(peers)
- elif key == 'k' or key == "KEY_UP":
+ elif key in ('k', "KEY_UP"):
if showpeers:
selected += len(peers) - 1
selected %= len(peers)
=====================================
ntpclients/ntpsnmpd.py
=====================================
@@ -43,7 +43,7 @@ DEFLOG = "ntpsnmpd.log"
class DataSource(ntp.agentx.MIBControl):
- def __init__(self, hostname=DEFHOST, settingsFile=None, notifySpin=0.1):
+ def __init__(self, ihostname=DEFHOST, settingsFile=None, notifySpin=0.1):
# This is defined as a dict tree because it is simpler, and avoids
# certain edge cases
# OIDs are relative from ntp root
@@ -152,7 +152,7 @@ class DataSource(ntp.agentx.MIBControl):
# print(repr(self.oidTree))
# print(self.oidTree[1]["subids"][1][1][0])
self.session = ntp.packet.ControlSession()
- self.hostname = hostname if hostname else DEFHOST
+ self.hostname = ihostname if ihostname else DEFHOST
self.session.openhost(self.hostname)
self.settingsFilename = settingsFile
# Cache so we don't hammer ntpd, default 1 second timeout
@@ -356,8 +356,8 @@ class DataSource(ntp.agentx.MIBControl):
if data is None:
return None
protoerr = 0
- for key in data.keys():
- protoerr += data[key]
+ for ikey in data.keys():
+ protoerr += data[ikey]
return ax.Varbind(ax.VALUE_COUNTER32, oid, protoerr)
def cbr_statusNotifications(self, oid):
@@ -537,7 +537,7 @@ class DataSource(ntp.agentx.MIBControl):
try:
sockinfo = socket.getaddrinfo(srcadr, None)[0][-1]
addr = sockinfo[0]
- ipv6 = True if len(sockinfo) == 4 else False
+ ipv6 = bool(len(sockinfo) == 4)
except socket.gaierror:
addr = None # how to handle?
ipv6 = None
@@ -615,8 +615,8 @@ class DataSource(ntp.agentx.MIBControl):
if pvars is None:
return None
protoerr = 0
- for key in pvars.keys():
- protoerr += pvars[key]
+ for ikey in pvars.keys():
+ protoerr += pvars[ikey]
return ax.Varbind(ax.VALUE_COUNTER32, oid, protoerr)
return self.dynamicCallbackSkeleton(handler)
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/-/commit/d3639ebb81f3fe119e435425125f6e63b12cf795
--
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/-/commit/d3639ebb81f3fe119e435425125f6e63b12cf795
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20201014/28173d8d/attachment-0001.htm>
More information about the vc
mailing list