[Git][NTPsec/ntpsec][6auth] 4 commits: Try to make HMAC/CMAC code work on Python 2
James Browning
gitlab at mg.gitlab.com
Tue May 5 15:29:02 UTC 2020
James Browning pushed to branch 6auth at NTPsec / ntpsec
Commits:
27a13e4c by Hal Murray at 2020-05-05T08:13:18-07:00
Try to make HMAC/CMAC code work on Python 2
- - - - -
95fe097d by James Browning at 2020-05-05T08:25:44-07:00
Auth: Fix auth for mode 6 etc ...
but only if you can provide
- the point the packet ends
- the point the mac begins
this is usually 48 for most modes
- mode 0 is reserved
- modes 1, 2, 5 and 7 are no longer supported
- mode 3 is the servers problem IIRC
- mode 4 could be a problem w/ notional extensions IMO
- mode 6 seems taken care of
- Shorten long macs to work with existing code
- Dehexify long passwords
- Add a function docstring
- Remove per packet auth success message
- - - - -
a757c751 by James Browning at 2020-05-05T08:25:44-07:00
client/ntpq: polystr() output that should not be bytes anymore
- - - - -
86d95752 by James Browning at 2020-05-05T08:25:44-07:00
Change over to crypto from ntp.ntpc
- - - - -
5 changed files:
- libntp/pymodule.c
- ntpclients/ntpdig.py
- ntpclients/ntpq.py
- pylib/packet.py
- tests/pylib/test_packet.py
Changes:
=====================================
libntp/pymodule.c
=====================================
@@ -159,6 +159,15 @@ ntpc_step_systime(PyObject *self, PyObject *args)
#define EVP_MD_CTX_reset(ctx) EVP_MD_CTX_init(ctx)
#endif
+/* Needed on old versions of OpenSSL */
+static void SSL_init(void) {
+ static bool init_done = false;
+ if (init_done)
+ return;
+ init_done = true;
+ OpenSSL_add_all_ciphers();
+ OpenSSL_add_all_digests();
+}
/* xx = ntp.ntpc.checkname(name)
* returns None if algorithm name is invalid. */
@@ -172,6 +181,8 @@ ntpc_checkname(PyObject *self, PyObject *args)
const EVP_CIPHER *cipher;
UNUSED_ARG(self);
+ SSL_init();
+
if (!PyArg_ParseTuple(args, "s", &name))
return NULL;
strlcpy(upcase, name, sizeof(upcase));
@@ -217,8 +228,16 @@ ntpc_mac(PyObject *self, PyObject *args)
const EVP_MD *digest;
const EVP_CIPHER *cipher;
int cipherlen;
+
+ SSL_init();
+
+#if PY_MAJOR_VERSION >= 3
if (!PyArg_ParseTuple(args, "y#y#s",
&data, &datalen, &key, &keylen, &name))
+#else
+ if (!PyArg_ParseTuple(args, "s#s#s",
+ &data, &datalen, &key, &keylen, &name))
+#endif
Py_RETURN_NONE;
strlcpy(upcase, name, sizeof(upcase));
@@ -240,7 +259,11 @@ ntpc_mac(PyObject *self, PyObject *args)
EVP_DigestFinal_ex(digest_ctx, mac, &maclenint);
if (MAX_BARE_MAC_LENGTH < maclenint)
maclenint = MAX_BARE_MAC_LENGTH;
+#if PY_MAJOR_VERSION >= 3
return Py_BuildValue("y#", &mac, maclenint);
+#else
+ return Py_BuildValue("s#", &mac, maclenint);
+#endif
}
if ((strcmp(upcase, "AES") == 0) || (strcmp(upcase, "AES128CMAC") == 0)) {
@@ -273,7 +296,11 @@ ntpc_mac(PyObject *self, PyObject *args)
CMAC_Final(cmac_ctx, mac, &maclen);
if (MAX_BARE_MAC_LENGTH < maclen)
maclen = MAX_BARE_MAC_LENGTH;
+#if PY_MAJOR_VERSION >= 3
return Py_BuildValue("y#", &mac, maclen);
+#else
+ return Py_BuildValue("s#", &mac, maclen);
+#endif
}
/* List of functions defined in the module */
=====================================
ntpclients/ntpdig.py
=====================================
@@ -63,7 +63,7 @@ def read_append(s, packets, packet, sockaddr):
if not ntp.packet.Authenticator.have_mac(d):
if debug:
log("no MAC on reply from %s" % packet.hostname)
- if not credentials.verify_mac(d):
+ if not credentials.verify_mac(d, packet_end=48, mac_begin=48):
packet.trusted = False
log("MAC verification on reply from %s failed"
% sockaddr[0])
=====================================
ntpclients/ntpq.py
=====================================
@@ -1171,6 +1171,7 @@ usage: lopeers
self.warn("In Config\nKeyword = :config\nCommand = %s" % line)
try:
self.session.config(line)
+ self.session.response = ntp.poly.polystr(self.session.response)
m = re.match("column ([0-9]+) syntax error", self.session.response)
if m:
col = int(m.group(1))
=====================================
pylib/packet.py
=====================================
@@ -205,7 +205,7 @@ A Mode 6 packet cannot have extension fields.
# SPDX-License-Identifier: BSD-2-Clause
from __future__ import print_function, division
import getpass
-import hashlib
+import hmac
import os
import select
import socket
@@ -253,6 +253,13 @@ DEFSTIMEOUT = 3000
# The maximum keyid for authentication, keyid is a 16-bit field
MAX_KEYID = 0xFFFF
+# Some constants (in Bytes) for mode6 and authentication
+MODE_SIX_HEADER_LENGTH = 12
+MINIMUM_MAC_LENGTH = 16
+KEYID_LENGTH = 4
+MODE_SIX_ALIGNMENT = 8
+MAX_BARE_MAC_LENGTH = 20
+
class Packet:
"Encapsulate an NTP fragment"
@@ -698,6 +705,7 @@ class ControlException(BaseException):
class ControlSession:
"A session to a host"
MRU_ROW_LIMIT = 256
+ _authpass = True
server_errors = {
ntp.control.CERR_UNSPEC: "UNSPEC",
ntp.control.CERR_PERMISSION: "PERMISSION",
@@ -1024,6 +1032,16 @@ class ControlSession:
continue
# Someday, perhaps, check authentication here
+ if self._authpass and self.auth:
+ _pend = rpkt.count + MODE_SIX_HEADER_LENGTH
+ _pend += (-_pend % MODE_SIX_ALIGNMENT)
+ if len(rawdata) < (_pend + KEYID_LENGTH + MINIMUM_MAC_LENGTH):
+ self.logfp.write('AUTH - packet too short for MAC %d < %d\n' %
+ (len(rawdata), (_pend + KEYID_LENGTH + MINIMUM_MAC_LENGTH)))
+ self._authpass = False
+ elif not self.auth.verify_mac(rawdata, packet_end=_pend,
+ mac_begin=_pend):
+ self._authpass = False
# Clip off the MAC, if any
rpkt.extension = rpkt.extension[:rpkt.count]
@@ -1108,6 +1126,8 @@ class ControlSession:
warn("First line:\n%s\n" % repr(firstline))
return None
break
+ if not self._authpass:
+ warn('AUTH: Content untrusted due to authentication failure!\n')
def __validate_packet(self, rpkt, rawdata, opcode, associd):
# TODO: refactor to simplify while retaining semantic info
@@ -1553,15 +1573,15 @@ def parse_mru_variables(variables):
"addr": lambda e: e.sortaddr(),
# IPv6 desc. then IPv4 desc.
"-addr": lambda e: e.sortaddr(),
- # hit count ascending
+ # hit count ascending
"count": lambda e: -e.ct,
# hit count descending
"-count": lambda e: e.ct,
- # score ascending
+ # score ascending
"score": lambda e: -e.sc,
# score descending
"-score": lambda e: e.sc,
- # drop count ascending
+ # drop count ascending
"drop": lambda e: -e.dr,
# drop count descending
"-drop": lambda e: e.dr,
@@ -1675,16 +1695,20 @@ class Authenticator:
if not line:
continue
(keyid, keytype, passwd) = line.split()
+ if len(passwd) > 20:
+ passwd = ntp.util.hexstr2octets(passwd)
self.passwords[int(keyid)] = (keytype, passwd)
def __len__(self):
+ 'return the number of keytype/passwd tuples stored'
return len(self.passwords)
def __getitem__(self, keyid):
+ 'get a keytype/passwd tuple by keyid'
return self.passwords.get(keyid)
def control(self, keyid=None):
- "Get a keyid/passwd pair that is trusted on localhost"
+ "Get the keytype/passwd tuple that controls localhost and its id"
if keyid is not None:
if keyid in self.passwords:
return (keyid,) + self.passwords[keyid]
@@ -1700,19 +1724,18 @@ class Authenticator:
if len(passwd) > 20:
passwd = ntp.util.hexstr2octets(passwd)
return (keyid, keytype, passwd)
- else:
- # No control lines found
- raise ValueError
+ # No control lines found
+ raise ValueError
@staticmethod
def compute_mac(payload, keyid, keytype, passwd):
- hasher = hashlib.new(keytype)
- hasher.update(ntp.poly.polybytes(passwd))
- hasher.update(payload)
- if hasher.digest_size == 0:
- return None
- else:
- return struct.pack("!I", keyid) + hasher.digest()
+ 'Create the authentication payload to send'
+ if not ntp.ntpc.checkname(keytype):
+ return False
+ mac2 = ntp.ntpc.mac(ntp.poly.polybytes(payload), ntp.poly.polybytes(passwd), keytype)
+ if not mac2 or len(mac2) == 0:
+ return b''
+ return struct.pack("!I", keyid) + mac2
@staticmethod
def have_mac(packet):
@@ -1722,20 +1745,21 @@ class Authenticator:
# On those you have to go in and look at the count.
return len(packet) > ntp.magic.LEN_PKT_NOMAC
- def verify_mac(self, packet):
+ def verify_mac(self, packet, packet_end=48, mac_begin=48):
"Does the MAC on this packet verify according to credentials we have?"
- # FIXME: Someday, figure out how to handle SHA1?
- HASHLEN = 16 # Length of MD5 hash.
- payload = packet[:-HASHLEN-4]
- keyid = packet[-HASHLEN-4:-HASHLEN]
- mac = packet[-HASHLEN:]
+ payload = packet[:packet_end]
+ keyid = packet[mac_begin:mac_begin+KEYID_LENGTH]
+ mac = packet[mac_begin+KEYID_LENGTH:]
(keyid,) = struct.unpack("!I", keyid)
if keyid not in self.passwords:
+ print('AUTH: No key %08x...' % keyid)
return False
(keytype, passwd) = self.passwords[keyid]
- hasher = hashlib.new(keytype)
- hasher.update(passwd)
- hasher.update(payload)
- return ntp.poly.polybytes(hasher.digest()) == mac
+ if ntp.ntpc.checkname(keytype) not in (1, len(passwd)):
+ return False
+ mac2 = ntp.ntpc.mac(ntp.poly.polybytes(payload), ntp.poly.polybytes(passwd), keytype)
+ if not mac2:
+ return False
+ return hmac.compare_digest(mac, mac2)
# end
=====================================
tests/pylib/test_packet.py
=====================================
@@ -3,17 +3,17 @@
from __future__ import print_function, division
-import unittest
-import ntp.packet
-import ntp.control
-import ntp.util
-import ntp.magic
-import socket
+import getpass
import select
+import socket
import sys
-import getpass
+import unittest
import jigs
+import ntp.control
+import ntp.magic
+import ntp.packet
import ntp.poly
+import ntp.util
odict = ntp.util.OrderedDict
@@ -2082,17 +2082,13 @@ class TestAuthenticator(unittest.TestCase):
def test_compute_mac(self):
f = self.target.compute_mac
- try:
- temphash = ntpp.hashlib
- fakehashlibmod = jigs.HashlibModuleJig()
- ntpp.hashlib = fakehashlibmod
- # Test no digest
- self.assertEqual(f("", 0, None, ntp.poly.polybytes("")), None)
- # Test with digest
- self.assertEqual(f("foo", 0x42, "bar", "quux"),
- ntp.poly.polybytes("\x00\x00\x00\x42blahblahblahblah"))
- finally:
- ntpp.hashlib = temphash
+ pkt = ntp.util.hexstr2octets('240300e8000012ce0000091941138e89' + 'e25b102e9fe94dc9e25b1175bd5a3000' + 'e25b1175bd6cf48ee25b1175bd70e594')
+ mac1 = b'\x00\x00\x00\rL\x7f\xc1\xd1\xe9\xd3\xf8\xec\x91\xdf\xecS\x89e\xc5\xf3'
+ key1 = ntp.util.hexstr2octets('2f3badbb640bf975fec519df8a83e829')
+ key2 = ''
+ self.assertEqual(f(pkt, 0x0e, 'neun', key2), False)
+ # FIXME Find out why the following test works
+ self.assertEqual(f(pkt, 0x0d, 'aes', key1), mac1)
def test_have_mac(self):
f = self.target.have_mac
@@ -2105,19 +2101,13 @@ class TestAuthenticator(unittest.TestCase):
def test_verify_mac(self):
cls = self.target()
- cls.passwords[0x23] = ("a", "z")
- good_pkt = "foobar\x00\x00\x00\x23blahblahblahblah"
- bad_pkt = "foobar\xDE\xAD\xDE\xAFblahblahblah"
- try:
- temphash = ntpp.hashlib
- fakehashlibmod = jigs.HashlibModuleJig()
- ntpp.hashlib = fakehashlibmod
- # Test good
- self.assertEqual(cls.verify_mac(ntp.poly.polybytes(good_pkt)), True)
- # Test bad
- self.assertEqual(cls.verify_mac(ntp.poly.polybytes(bad_pkt)), False)
- finally:
- ntpp.hashlib = temphash
+ cls.passwords[0x20] = ('aria-192', ntp.util.hexstr2octets('2f3badbb640bf975fec519df8a83e8292f3badbb640bf975'))
+ good_pkt = '240300e8000012ce0000091941138e89' + 'e25b102e9fe94dc9e25b1176280a8000' + 'e25b1176281c2321e25b11762820e836' + '00000020157ccbe4b0d3081bd7853463' + '89f7690c'
+ bad_pkt = '240300e8000012ce0000091941138e89' + 'e25b102e9fe94dc9e25b11763a84a000' + 'e25b11763a94efe6e25b11763a99c162' + '00000020fac63503ca24039ad658938d' + '5aad2f4b'
+ # Test good
+ self.assertEqual(cls.verify_mac(ntp.poly.polybytes(ntp.util.hexstr2octets(good_pkt)), packet_end=48, mac_begin=48), True)
+ # Test bad
+ self.assertEqual(cls.verify_mac(ntp.poly.polybytes(ntp.util.hexstr2octets(bad_pkt)), packet_end=48, mac_begin=48), False)
if __name__ == "__main__":
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/-/compare/041bd1ad2beb1375530ea7decdfd0405306c5c0d...86d957529b51dce7af1ed5d4e57153e68e3c2eb2
--
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/-/compare/041bd1ad2beb1375530ea7decdfd0405306c5c0d...86d957529b51dce7af1ed5d4e57153e68e3c2eb2
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20200505/c6f3a3ad/attachment-0001.htm>
More information about the vc
mailing list