[Git][NTPsec/ntpsec][master] 3 commits: Added little endian tests for all PDUs
Ian Bruene
gitlab at mg.gitlab.com
Tue Aug 8 01:20:04 UTC 2017
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
f7fa1a95 by Ian Bruene at 2017-08-07T20:07:27-05:00
Added little endian tests for all PDUs
- - - - -
52f0357c by Ian Bruene at 2017-08-07T20:07:27-05:00
Added little endian tests where missing
- - - - -
910d0561 by Ian Bruene at 2017-08-07T20:07:27-05:00
Added test_pducore test helper, tests are now shorter and more complete
- - - - -
1 changed file:
- tests/pylib/test_agentx.py
Changes:
=====================================
tests/pylib/test_agentx.py
=====================================
--- a/tests/pylib/test_agentx.py
+++ b/tests/pylib/test_agentx.py
@@ -78,6 +78,14 @@ def makeFlags(iR, nI, aI, cP, bE):
"bigEndian": bE}}
+def test_pducore(tester, pdu, pduType, endian, sID, tactID, pktID):
+ tester.assertEqual(pdu.pduType, pduType)
+ tester.assertEqual(pdu.bigEndian, endian)
+ tester.assertEqual(pdu.sessionID, sID)
+ tester.assertEqual(pdu.transactionID, tactID)
+ tester.assertEqual(pdu.packetID, pktID)
+
+
class TestNtpclientsNtpsnmpd(unittest.TestCase):
#
# PDU tests
@@ -107,22 +115,17 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_OpenPDU(self):
dec = ntp.agentx.decode_OpenPDU
cls = ntp.agentx.OpenPDU
+ x = ntp.agentx
# Test PDU init, null packet
nullPkt = cls(True, 1, 2, 3, 4, (), "")
- self.assertEqual(nullPkt.bigEndian, True)
- self.assertEqual(nullPkt.sessionID, 1)
- self.assertEqual(nullPkt.transactionID, 2)
- self.assertEqual(nullPkt.packetID, 3)
+ test_pducore(self, nullPkt, x.PDU_OPEN, True, 1, 2, 3)
self.assertEqual(nullPkt.timeout, 4)
self.assertEqual(nullPkt.oid, ())
self.assertEqual(nullPkt.description, "")
# Test PDU init, basic packet
basicPkt = cls(False, 1, 2, 3, 4, (1, 2, 3, 4), "foo")
- self.assertEqual(basicPkt.bigEndian, False)
- self.assertEqual(basicPkt.sessionID, 1)
- self.assertEqual(basicPkt.transactionID, 2)
- self.assertEqual(basicPkt.packetID, 3)
+ test_pducore(self, basicPkt, x.PDU_OPEN, False, 1, 2, 3)
self.assertEqual(basicPkt.timeout, 4)
self.assertEqual(basicPkt.oid, (1, 2, 3, 4))
self.assertEqual(basicPkt.description, "foo")
@@ -148,10 +151,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
header, body = slicedata(nullPkt_str, 20)
header = decode_pduheader(header)
nullPkt_new = dec(body, header)
- self.assertEqual(nullPkt_new.bigEndian, True)
- self.assertEqual(nullPkt_new.sessionID, 1)
- self.assertEqual(nullPkt_new.transactionID, 2)
- self.assertEqual(nullPkt_new.packetID, 3)
+ test_pducore(self, nullPkt_new, x.PDU_OPEN, True, 1, 2, 3)
self.assertEqual(nullPkt_new.timeout, 4)
self.assertEqual(nullPkt_new.oid, {"subids": (), "include": False})
self.assertEqual(nullPkt_new.description, "")
@@ -159,10 +159,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
header, body = slicedata(basicPkt_str, 20)
header = decode_pduheader(header)
basicPkt_new = dec(body, header)
- self.assertEqual(basicPkt_new.bigEndian, False)
- self.assertEqual(basicPkt_new.sessionID, 1)
- self.assertEqual(basicPkt_new.transactionID, 2)
- self.assertEqual(basicPkt_new.packetID, 3)
+ test_pducore(self, basicPkt_new, x.PDU_OPEN, False, 1, 2, 3)
self.assertEqual(basicPkt_new.timeout, 4)
self.assertEqual(basicPkt_new.oid, {"subids": (1, 2, 3, 4),
"include": False})
@@ -185,11 +182,12 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
# Test init
pkt = cls(True, 1, 2, 3, x.RSN_OTHER)
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_CLOSE, True, 1, 2, 3)
self.assertEqual(pkt.reason, x.RSN_OTHER)
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3, x.RSN_OTHER)
+ test_pducore(self, pkt_LE, x.PDU_CLOSE, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.reason, x.RSN_OTHER)
# Test encoding
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -197,15 +195,25 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x01\x00\x00\x00")
+ # Test encoding, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x02\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x01\x00\x00\x00")
# Test decoding
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_CLOSE, True, 1, 2, 3)
self.assertEqual(pkt_new.reason, x.RSN_OTHER)
+ # Test decoding, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_CLOSE, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.reason, x.RSN_OTHER)
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 2,
@@ -218,26 +226,30 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_RegisterPDU(self):
dec = ntp.agentx.decode_xRegisterPDU
cls = ntp.agentx.RegisterPDU
+ x = ntp.agentx
# Test init, basic packet
basicPkt = cls(True, 1, 2, 3, 4, 5, (1, 2, 3))
- self.assertEqual(basicPkt.bigEndian, True)
- self.assertEqual(basicPkt.sessionID, 1)
- self.assertEqual(basicPkt.transactionID, 2)
- self.assertEqual(basicPkt.packetID, 3)
+ test_pducore(self, basicPkt, x.PDU_REGISTER, True, 1, 2, 3)
self.assertEqual(basicPkt.timeout, 4)
self.assertEqual(basicPkt.priority, 5)
self.assertEqual(basicPkt.subtree, (1, 2, 3))
self.assertEqual(basicPkt.rangeSubid, 0)
self.assertEqual(basicPkt.upperBound, None)
self.assertEqual(basicPkt.context, None)
+ # Test init, basic packet, little endian
+ basicPkt_LE = cls(False, 1, 2, 3, 4, 5, (1, 2, 3))
+ test_pducore(self, basicPkt_LE, x.PDU_REGISTER, False, 1, 2, 3)
+ self.assertEqual(basicPkt_LE.timeout, 4)
+ self.assertEqual(basicPkt_LE.priority, 5)
+ self.assertEqual(basicPkt_LE.subtree, (1, 2, 3))
+ self.assertEqual(basicPkt_LE.rangeSubid, 0)
+ self.assertEqual(basicPkt_LE.upperBound, None)
+ self.assertEqual(basicPkt_LE.context, None)
# Test init, fancy packet
fancyPkt = cls(True, 1, 2, 3, 4, 5, (1, 2, 3),
rangeSubid=5, upperBound=23, context="blah")
- self.assertEqual(fancyPkt.bigEndian, True)
- self.assertEqual(fancyPkt.sessionID, 1)
- self.assertEqual(fancyPkt.transactionID, 2)
- self.assertEqual(fancyPkt.packetID, 3)
+ test_pducore(self, fancyPkt, x.PDU_REGISTER, True, 1, 2, 3)
self.assertEqual(fancyPkt.timeout, 4)
self.assertEqual(fancyPkt.priority, 5)
self.assertEqual(fancyPkt.subtree, (1, 2, 3))
@@ -253,6 +265,15 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x04\x05\x00\x00"
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x03")
+ # Test encode, basic packet, little endian
+ basicPkt_LE_str = basicPkt_LE.encode()
+ self.assertEqual(basicPkt_LE_str,
+ "\x01\x03\x01\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x14\x00\x00\x00"
+ "\x04\x05\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00")
# Test encode, fancy packet
fancyPkt_str = fancyPkt.encode()
self.assertEqual(fancyPkt_str,
@@ -268,10 +289,7 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
header, body = slicedata(basicPkt_str, 20)
header = decode_pduheader(header)
basicPkt_new = dec(body, header)
- self.assertEqual(basicPkt_new.bigEndian, True)
- self.assertEqual(basicPkt_new.sessionID, 1)
- self.assertEqual(basicPkt_new.transactionID, 2)
- self.assertEqual(basicPkt_new.packetID, 3)
+ test_pducore(self, basicPkt_new, x.PDU_REGISTER, True, 1, 2, 3)
self.assertEqual(basicPkt_new.timeout, 4)
self.assertEqual(basicPkt_new.priority, 5)
self.assertEqual(basicPkt_new.subtree, {"subids": (1, 2, 3),
@@ -279,14 +297,23 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
self.assertEqual(basicPkt_new.rangeSubid, 0)
self.assertEqual(basicPkt_new.upperBound, None)
self.assertEqual(basicPkt_new.context, None)
+ # Test decoding, basic packet, little endian
+ header, body = slicedata(basicPkt_LE_str, 20)
+ header = decode_pduheader(header)
+ basicPkt_LE_new = dec(body, header)
+ test_pducore(self, basicPkt_LE_new, x.PDU_REGISTER, False, 1, 2, 3)
+ self.assertEqual(basicPkt_LE_new.timeout, 4)
+ self.assertEqual(basicPkt_LE_new.priority, 5)
+ self.assertEqual(basicPkt_LE_new.subtree, {"subids": (1, 2, 3),
+ "include": False})
+ self.assertEqual(basicPkt_LE_new.rangeSubid, 0)
+ self.assertEqual(basicPkt_LE_new.upperBound, None)
+ self.assertEqual(basicPkt_LE_new.context, None)
# Test decoding, fancy packet
header, body = slicedata(fancyPkt_str, 20)
header = decode_pduheader(header)
fancyPkt_new = dec(body, header)
- self.assertEqual(fancyPkt_new.bigEndian, True)
- self.assertEqual(fancyPkt_new.sessionID, 1)
- self.assertEqual(fancyPkt_new.transactionID, 2)
- self.assertEqual(fancyPkt_new.packetID, 3)
+ test_pducore(self, fancyPkt_new, x.PDU_REGISTER, True, 1, 2, 3)
self.assertEqual(fancyPkt_new.timeout, 4)
self.assertEqual(fancyPkt_new.priority, 5)
self.assertEqual(fancyPkt_new.subtree, {"subids": (1, 2, 3),
@@ -311,25 +338,28 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_UnregisterPDU(self):
dec = ntp.agentx.decode_xRegisterPDU
cls = ntp.agentx.UnregisterPDU
+ x = ntp.agentx
# Test init, basic packet
basicPkt = cls(True, 1, 2, 3, 5, (1, 2, 3))
- self.assertEqual(basicPkt.bigEndian, True)
- self.assertEqual(basicPkt.sessionID, 1)
- self.assertEqual(basicPkt.transactionID, 2)
- self.assertEqual(basicPkt.packetID, 3)
+ test_pducore(self, basicPkt, x.PDU_UNREGISTER, True, 1, 2, 3)
self.assertEqual(basicPkt.priority, 5)
self.assertEqual(basicPkt.subtree, (1, 2, 3))
self.assertEqual(basicPkt.rangeSubid, 0)
self.assertEqual(basicPkt.upperBound, None)
self.assertEqual(basicPkt.context, None)
+ # Test init, basic packet, little endian
+ basicPkt_LE = cls(False, 1, 2, 3, 5, (1, 2, 3))
+ test_pducore(self, basicPkt_LE, x.PDU_UNREGISTER, False, 1, 2, 3)
+ self.assertEqual(basicPkt_LE.priority, 5)
+ self.assertEqual(basicPkt_LE.subtree, (1, 2, 3))
+ self.assertEqual(basicPkt_LE.rangeSubid, 0)
+ self.assertEqual(basicPkt_LE.upperBound, None)
+ self.assertEqual(basicPkt_LE.context, None)
# Test init, fancy packet
fancyPkt = cls(True, 1, 2, 3, 5, (1, 2, 3),
rangeSubid=5, upperBound=23, context="blah")
- self.assertEqual(fancyPkt.bigEndian, True)
- self.assertEqual(fancyPkt.sessionID, 1)
- self.assertEqual(fancyPkt.transactionID, 2)
- self.assertEqual(fancyPkt.packetID, 3)
+ test_pducore(self, fancyPkt, x.PDU_UNREGISTER, True, 1, 2, 3)
self.assertEqual(fancyPkt.priority, 5)
self.assertEqual(fancyPkt.subtree, (1, 2, 3))
self.assertEqual(fancyPkt.rangeSubid, 5)
@@ -344,6 +374,15 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x05\x00\x00"
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x03")
+ # Test encode, basic packet, little endian
+ basicPkt_LE_str = basicPkt_LE.encode()
+ self.assertEqual(basicPkt_LE_str,
+ "\x01\x04\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x14\x00\x00\x00"
+ "\x00\x05\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00")
# Test encode, fancy packet
fancyPkt_str = fancyPkt.encode()
self.assertEqual(fancyPkt_str,
@@ -359,24 +398,29 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
header, body = slicedata(basicPkt_str, 20)
header = decode_pduheader(header)
basicPkt_new = dec(body, header)
- self.assertEqual(basicPkt_new.bigEndian, True)
- self.assertEqual(basicPkt_new.sessionID, 1)
- self.assertEqual(basicPkt_new.transactionID, 2)
- self.assertEqual(basicPkt_new.packetID, 3)
+ test_pducore(self, basicPkt, x.PDU_UNREGISTER, True, 1, 2, 3)
self.assertEqual(basicPkt_new.priority, 5)
self.assertEqual(basicPkt_new.subtree, {"subids": (1, 2, 3),
"include": False})
self.assertEqual(basicPkt_new.rangeSubid, 0)
self.assertEqual(basicPkt_new.upperBound, None)
self.assertEqual(basicPkt_new.context, None)
+ # Test decoding, basic packet, little endian
+ header, body = slicedata(basicPkt_LE_str, 20)
+ header = decode_pduheader(header)
+ basicPkt_LE_new = dec(body, header)
+ test_pducore(self, basicPkt_LE, x.PDU_UNREGISTER, False, 1, 2, 3)
+ self.assertEqual(basicPkt_LE_new.priority, 5)
+ self.assertEqual(basicPkt_LE_new.subtree, {"subids": (1, 2, 3),
+ "include": False})
+ self.assertEqual(basicPkt_LE_new.rangeSubid, 0)
+ self.assertEqual(basicPkt_LE_new.upperBound, None)
+ self.assertEqual(basicPkt_LE_new.context, None)
# Test decoding, fancy packet
header, body = slicedata(fancyPkt_str, 20)
header = decode_pduheader(header)
fancyPkt_new = dec(body, header)
- self.assertEqual(fancyPkt_new.bigEndian, True)
- self.assertEqual(fancyPkt_new.sessionID, 1)
- self.assertEqual(fancyPkt_new.transactionID, 2)
- self.assertEqual(fancyPkt_new.packetID, 3)
+ test_pducore(self, fancyPkt_new, x.PDU_UNREGISTER, True, 1, 2, 3)
self.assertEqual(fancyPkt_new.priority, 5)
self.assertEqual(fancyPkt_new.subtree, {"subids": (1, 2, 3),
"include": False})
@@ -399,13 +443,11 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_GetPDU(self):
dec = ntp.agentx.decode_xGetPDU
cls = ntp.agentx.GetPDU
+ x = ntp.agentx
# Test init, null packet
nullPkt = cls(True, 1, 2, 3, ())
- self.assertEqual(nullPkt.bigEndian, True)
- self.assertEqual(nullPkt.sessionID, 1)
- self.assertEqual(nullPkt.transactionID, 2)
- self.assertEqual(nullPkt.packetID, 3)
+ test_pducore(self, nullPkt, x.PDU_GET, True, 1, 2, 3)
self.assertEqual(nullPkt.oidranges, ())
self.assertEqual(nullPkt.context, None)
# Test init, full packet
@@ -413,13 +455,19 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
(((1, 2, 3), (1, 2, 5), False),
((10, 20), (30, 40), True)),
context="blah")
- self.assertEqual(fullPkt.bigEndian, True)
- self.assertEqual(fullPkt.sessionID, 1)
- self.assertEqual(fullPkt.transactionID, 2)
- self.assertEqual(fullPkt.packetID, 3)
+ test_pducore(self, fullPkt, x.PDU_GET, True, 1, 2, 3)
self.assertEqual(fullPkt.oidranges, (((1, 2, 3), (1, 2, 5), False),
((10, 20), (30, 40), True)))
self.assertEqual(fullPkt.context, "blah")
+ # Test init, full packet, little endian
+ fullPkt_LE = cls(False, 1, 2, 3,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True)),
+ context="blah")
+ test_pducore(self, fullPkt_LE, x.PDU_GET, False, 1, 2, 3)
+ self.assertEqual(fullPkt_LE.oidranges, (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True)))
+ self.assertEqual(fullPkt_LE.context, "blah")
# Test encode, null packet
nullPkt_str = nullPkt.encode()
self.assertEqual(nullPkt_str,
@@ -441,30 +489,49 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
"\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28"
"\x00\x00\x00\x00")
+ # Test encode, full packet, little endian
+ fullPkt_LE_str = fullPkt_LE.encode()
+ self.assertEqual(fullPkt_LE_str,
+ "\x01\x05\x08\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x44\x00\x00\x00"
+ "\x04\x00\x00\x00blah"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x05\x00\x00\x00"
+ "\x02\x00\x01\x00\x0A\x00\x00\x00\x14\x00\x00\x00"
+ "\x02\x00\x00\x00\x1E\x00\x00\x00\x28\x00\x00\x00"
+ "\x00\x00\x00\x00")
# Test decoding, null packet
header, body = slicedata(nullPkt_str, 20)
header = decode_pduheader(header)
nullPkt_new = dec(body, header)
- self.assertEqual(nullPkt_new.bigEndian, True)
- self.assertEqual(nullPkt_new.sessionID, 1)
- self.assertEqual(nullPkt_new.transactionID, 2)
- self.assertEqual(nullPkt_new.packetID, 3)
+ test_pducore(self, nullPkt_new, x.PDU_GET, True, 1, 2, 3)
self.assertEqual(nullPkt_new.oidranges, ())
self.assertEqual(nullPkt_new.context, None)
# Test decoding, full packet
header, body = slicedata(fullPkt_str, 20)
header = decode_pduheader(header)
fullPkt_new = dec(body, header)
- self.assertEqual(fullPkt_new.bigEndian, True)
- self.assertEqual(fullPkt_new.sessionID, 1)
- self.assertEqual(fullPkt_new.transactionID, 2)
- self.assertEqual(fullPkt_new.packetID, 3)
+ test_pducore(self, fullPkt_new, x.PDU_GET, True, 1, 2, 3)
self.assertEqual(fullPkt_new.oidranges,
({"start": {"subids": (1, 2, 3), "include": False},
"end": {"subids": (1, 2, 5), "include": False}},
{"start": {"subids": (10, 20), "include": True},
"end": {"subids": (30, 40), "include": False}}))
self.assertEqual(fullPkt_new.context, "blah")
+ # Test decoding, full packet, little endian
+ header, body = slicedata(fullPkt_LE_str, 20)
+ header = decode_pduheader(header)
+ fullPkt_LE_new = dec(body, header)
+ test_pducore(self, fullPkt_LE_new, x.PDU_GET, False, 1, 2, 3)
+ self.assertEqual(fullPkt_LE_new.oidranges,
+ ({"start": {"subids": (1, 2, 3), "include": False},
+ "end": {"subids": (1, 2, 5), "include": False}},
+ {"start": {"subids": (10, 20), "include": True},
+ "end": {"subids": (30, 40), "include": False}}))
+ self.assertEqual(fullPkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(nullPkt_new.packetVars(),
{"pduType": 5,
@@ -478,13 +545,11 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_GetNextPDU(self):
dec = ntp.agentx.decode_xGetPDU
cls = ntp.agentx.GetNextPDU
+ x = ntp.agentx
# Test init, null packet
nullPkt = cls(True, 1, 2, 3, ())
- self.assertEqual(nullPkt.bigEndian, True)
- self.assertEqual(nullPkt.sessionID, 1)
- self.assertEqual(nullPkt.transactionID, 2)
- self.assertEqual(nullPkt.packetID, 3)
+ test_pducore(self, nullPkt, x.PDU_GET_NEXT, True, 1, 2, 3)
self.assertEqual(nullPkt.oidranges, ())
self.assertEqual(nullPkt.context, None)
# Test init, full packet
@@ -492,13 +557,19 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
(((1, 2, 3), (1, 2, 5), False),
((10, 20), (30, 40), True)),
context="blah")
- self.assertEqual(fullPkt.bigEndian, True)
- self.assertEqual(fullPkt.sessionID, 1)
- self.assertEqual(fullPkt.transactionID, 2)
- self.assertEqual(fullPkt.packetID, 3)
+ test_pducore(self, fullPkt, x.PDU_GET_NEXT, True, 1, 2, 3)
self.assertEqual(fullPkt.oidranges, (((1, 2, 3), (1, 2, 5), False),
((10, 20), (30, 40), True)))
self.assertEqual(fullPkt.context, "blah")
+ # Test init, full packet, little endian
+ fullPkt_LE = cls(False, 1, 2, 3,
+ (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True)),
+ context="blah")
+ test_pducore(self, fullPkt_LE, x.PDU_GET_NEXT, False, 1, 2, 3)
+ self.assertEqual(fullPkt_LE.oidranges, (((1, 2, 3), (1, 2, 5), False),
+ ((10, 20), (30, 40), True)))
+ self.assertEqual(fullPkt_LE.context, "blah")
# Test encode, null packet
nullPkt_str = nullPkt.encode()
self.assertEqual(nullPkt_str,
@@ -518,30 +589,48 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x02\x00\x00\x00\x05"
"\x02\x00\x01\x00\x00\x00\x00\x0A\x00\x00\x00\x14"
"\x02\x00\x00\x00\x00\x00\x00\x1E\x00\x00\x00\x28")
+ # Test encode, full packet, little endian
+ fullPkt_LE_str = fullPkt_LE.encode()
+ self.assertEqual(fullPkt_LE_str,
+ "\x01\x06\x08\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x40\x00\x00\x00"
+ "\x04\x00\x00\x00blah"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x05\x00\x00\x00"
+ "\x02\x00\x01\x00\x0A\x00\x00\x00\x14\x00\x00\x00"
+ "\x02\x00\x00\x00\x1E\x00\x00\x00\x28\x00\x00\x00")
# Test decoding, null packet
header, body = slicedata(nullPkt_str, 20)
header = decode_pduheader(header)
nullPkt_new = dec(body, header)
- self.assertEqual(nullPkt_new.bigEndian, True)
- self.assertEqual(nullPkt_new.sessionID, 1)
- self.assertEqual(nullPkt_new.transactionID, 2)
- self.assertEqual(nullPkt_new.packetID, 3)
+ test_pducore(self, nullPkt_new, x.PDU_GET_NEXT, True, 1, 2, 3)
self.assertEqual(nullPkt_new.oidranges, ())
self.assertEqual(nullPkt_new.context, None)
# Test decoding, full packet
header, body = slicedata(fullPkt_str, 20)
header = decode_pduheader(header)
fullPkt_new = dec(body, header)
- self.assertEqual(fullPkt_new.bigEndian, True)
- self.assertEqual(fullPkt_new.sessionID, 1)
- self.assertEqual(fullPkt_new.transactionID, 2)
- self.assertEqual(fullPkt_new.packetID, 3)
+ test_pducore(self, fullPkt_new, x.PDU_GET_NEXT, True, 1, 2, 3)
self.assertEqual(fullPkt_new.oidranges,
({"start": {"subids": (1, 2, 3), "include": False},
"end": {"subids": (1, 2, 5), "include": False}},
{"start": {"subids": (10, 20), "include": True},
"end": {"subids": (30, 40), "include": False}}))
self.assertEqual(fullPkt_new.context, "blah")
+ # Test decoding, full packet, little endian
+ header, body = slicedata(fullPkt_LE_str, 20)
+ header = decode_pduheader(header)
+ fullPkt_LE_new = dec(body, header)
+ test_pducore(self, fullPkt_LE_new, x.PDU_GET_NEXT, False, 1, 2, 3)
+ self.assertEqual(fullPkt_LE_new.oidranges,
+ ({"start": {"subids": (1, 2, 3), "include": False},
+ "end": {"subids": (1, 2, 5), "include": False}},
+ {"start": {"subids": (10, 20), "include": True},
+ "end": {"subids": (30, 40), "include": False}}))
+ self.assertEqual(fullPkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(nullPkt_new.packetVars(),
{"pduType": 6,
@@ -555,22 +644,32 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_GetBulkPDU(self):
dec = ntp.agentx.decode_GetBulkPDU
cls = ntp.agentx.GetBulkPDU
+ x = ntp.agentx
# Test init
pkt = cls(True, 1, 2, 3, 1, 5,
(((1, 2), (3, 4), False),
((6, 7), (8, 9), True)),
context="blah")
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_GET_BULK, True, 1, 2, 3)
self.assertEqual(pkt.nonReps, 1)
self.assertEqual(pkt.maxReps, 5)
self.assertEqual(pkt.oidranges,
(((1, 2), (3, 4), False),
((6, 7), (8, 9), True)))
self.assertEqual(pkt.context, "blah")
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3, 1, 5,
+ (((1, 2), (3, 4), False),
+ ((6, 7), (8, 9), True)),
+ context="blah")
+ test_pducore(self, pkt_LE, x.PDU_GET_BULK, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.nonReps, 1)
+ self.assertEqual(pkt_LE.maxReps, 5)
+ self.assertEqual(pkt_LE.oidranges,
+ (((1, 2), (3, 4), False),
+ ((6, 7), (8, 9), True)))
+ self.assertEqual(pkt_LE.context, "blah")
# Test encoding
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -583,14 +682,23 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
"\x02\x00\x01\x00\x00\x00\x00\x06\x00\x00\x00\x07"
"\x02\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x09")
+ # Test encoding, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x07\x08\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x3C\x00\x00\x00"
+ "\x04\x00\x00\x00blah"
+ "\x01\x00\x05\x00"
+ "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x02\x00\x01\x00\x06\x00\x00\x00\x07\x00\x00\x00"
+ "\x02\x00\x00\x00\x08\x00\x00\x00\x09\x00\x00\x00")
# Test decoding
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_GET_BULK, True, 1, 2, 3)
self.assertEqual(pkt_new.nonReps, 1)
self.assertEqual(pkt_new.maxReps, 5)
self.assertEqual(pkt_new.oidranges,
@@ -599,6 +707,19 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
{"start": {"subids": (6, 7), "include": True},
"end": {"subids": (8, 9), "include": False}}))
self.assertEqual(pkt_new.context, "blah")
+ # Test decoding, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_GET_BULK, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.nonReps, 1)
+ self.assertEqual(pkt_LE_new.maxReps, 5)
+ self.assertEqual(pkt_LE_new.oidranges,
+ ({"start": {"subids": (1, 2), "include": False},
+ "end": {"subids": (3, 4), "include": False}},
+ {"start": {"subids": (6, 7), "include": True},
+ "end": {"subids": (8, 9), "include": False}}))
+ self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 7,
@@ -628,14 +749,21 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
((x.OID, (1, 2, 3), (4, 5, 6), False),
(x.OCTET_STR, (1, 2, 4), "blah")),
context="blah")
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_TEST_SET, True, 1, 2, 3)
self.assertEqual(pkt.varbinds,
((x.OID, (1, 2, 3), (4, 5, 6), False),
(x.OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt.context, "blah")
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")),
+ context="blah")
+ test_pducore(self, pkt_LE, x.PDU_TEST_SET, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.varbinds,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")))
+ self.assertEqual(pkt_LE.context, "blah")
# Test encoding
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -652,14 +780,27 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah")
+ # Test encoding, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x08\x08\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x48\x00\x00\x00"
+ "\x04\x00\x00\x00blah"
+ "\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah")
# Test decoding
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_TEST_SET, True, 1, 2, 3)
self.assertEqual(pkt_new.varbinds,
({"type": x.OID,
"name": {"subids": (1, 2, 3), "include": False},
@@ -668,6 +809,19 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"name": {"subids": (1, 2, 4), "include": False},
"data": "blah"}))
self.assertEqual(pkt_new.context, "blah")
+ # Test decoding, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_TEST_SET, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.varbinds,
+ ({"type": x.OID,
+ "name": {"subids": (1, 2, 3), "include": False},
+ "data": {"subids": (4, 5, 6), "include": False}},
+ {"type": x.OCTET_STR,
+ "name": {"subids": (1, 2, 4), "include": False},
+ "data": "blah"}))
+ self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 8,
@@ -689,27 +843,36 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_CommitSetPDU(self):
dec = ntp.agentx.decode_CommitSetPDU
cls = ntp.agentx.CommitSetPDU
+ x = ntp.agentx
# Test init
pkt = cls(True, 1, 2, 3)
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_COMMIT_SET, True, 1, 2, 3)
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3)
+ test_pducore(self, pkt_LE, x.PDU_COMMIT_SET, False, 1, 2, 3)
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
"\x01\x09\x10\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x00")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x09\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x00")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_COMMIT_SET, True, 1, 2, 3)
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_COMMIT_SET, False, 1, 2, 3)
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 9,
@@ -721,27 +884,36 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_UndoSetPDU(self):
dec = ntp.agentx.decode_UndoSetPDU
cls = ntp.agentx.UndoSetPDU
+ x = ntp.agentx
# Test init
pkt = cls(True, 1, 2, 3)
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_UNDO_SET, True, 1, 2, 3)
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3)
+ test_pducore(self, pkt_LE, x.PDU_UNDO_SET, False, 1, 2, 3)
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
"\x01\x0A\x10\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x00")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x0A\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x00")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_UNDO_SET, True, 1, 2, 3)
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_UNDO_SET, False, 1, 2, 3)
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 10,
@@ -753,27 +925,36 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_CleanupSetPDU(self):
dec = ntp.agentx.decode_CleanupSetPDU
cls = ntp.agentx.CleanupSetPDU
+ x = ntp.agentx
# Test init
pkt = cls(True, 1, 2, 3)
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_CLEANUP_SET, True, 1, 2, 3)
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3)
+ test_pducore(self, pkt_LE, x.PDU_CLEANUP_SET, False, 1, 2, 3)
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
"\x01\x0B\x10\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x00")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x0B\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x00")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_CLEANUP_SET, True, 1, 2, 3)
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_CLEANUP_SET, False, 1, 2, 3)
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 11,
@@ -785,14 +966,16 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_PingPDU(self):
dec = ntp.agentx.decode_PingPDU
cls = ntp.agentx.PingPDU
+ x = ntp.agentx
# Test init
pkt = cls(True, 1, 2, 3, "blah")
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_PING, True, 1, 2, 3)
self.assertEqual(pkt.context, "blah")
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3, "blah")
+ test_pducore(self, pkt_LE, x.PDU_PING, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.context, "blah")
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -800,15 +983,25 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x08"
"\x00\x00\x00\x04blah")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x0D\x08\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x08\x00\x00\x00"
+ "\x04\x00\x00\x00blah")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_PING, True, 1, 2, 3)
self.assertEqual(pkt_new.context, "blah")
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_PING, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 13,
@@ -828,14 +1021,21 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
((x.OID, (1, 2, 3), (4, 5, 6), False),
(x.OCTET_STR, (1, 2, 4), "blah")),
context="blah")
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_NOTIFY, True, 1, 2, 3)
self.assertEqual(pkt.varbinds,
((6, (1, 2, 3), (4, 5, 6), False),
(4, (1, 2, 4), 'blah')))
self.assertEqual(pkt.context, "blah")
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")),
+ context="blah")
+ test_pducore(self, pkt_LE, x.PDU_NOTIFY, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.varbinds,
+ ((6, (1, 2, 3), (4, 5, 6), False),
+ (4, (1, 2, 4), 'blah')))
+ self.assertEqual(pkt_LE.context, "blah")
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -852,14 +1052,27 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x0C\x08\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x48\x00\x00\x00"
+ "\x04\x00\x00\x00blah"
+ "\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_NOTIFY, True, 1, 2, 3)
self.assertEqual(pkt_new.varbinds,
({"type": x.OID,
"name": {"subids": (1, 2, 3), "include": False},
@@ -868,6 +1081,19 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"name": {"subids": (1, 2, 4), "include": False},
"data": "blah"}))
self.assertEqual(pkt_new.context, "blah")
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_NOTIFY, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.varbinds,
+ ({"type": x.OID,
+ "name": {"subids": (1, 2, 3), "include": False},
+ "data": {"subids": (4, 5, 6), "include": False}},
+ {"type": x.OCTET_STR,
+ "name": {"subids": (1, 2, 4), "include": False},
+ "data": "blah"}))
+ self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 12,
@@ -896,16 +1122,25 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
((x.OID, (1, 2, 3), (4, 5, 6), False),
(x.OCTET_STR, (1, 2, 4), "blah")),
context="blah")
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_INDEX_ALLOC, True, 1, 2, 3)
self.assertEqual(pkt.newIndex, True)
self.assertEqual(pkt.anyIndex, True)
self.assertEqual(pkt.varbinds,
((x.OID, (1, 2, 3), (4, 5, 6), False),
(x.OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt.context, "blah")
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3, True, True,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")),
+ context="blah")
+ test_pducore(self, pkt_LE, x.PDU_INDEX_ALLOC, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.newIndex, True)
+ self.assertEqual(pkt_LE.anyIndex, True)
+ self.assertEqual(pkt_LE.varbinds,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")))
+ self.assertEqual(pkt_LE.context, "blah")
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -922,16 +1157,29 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x0E\x0E\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x48\x00\x00\x00"
+ "\x04\x00\x00\x00blah"
+ "\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
- self.assertEqual(pkt.newIndex, True)
- self.assertEqual(pkt.anyIndex, True)
+ test_pducore(self, pkt_new, x.PDU_INDEX_ALLOC, True, 1, 2, 3)
+ self.assertEqual(pkt_new.newIndex, True)
+ self.assertEqual(pkt_new.anyIndex, True)
self.assertEqual(pkt_new.varbinds,
({"type": x.OID,
"name": {"subids": (1, 2, 3), "include": False},
@@ -939,7 +1187,21 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
{"type": x.OCTET_STR,
"name": {"subids": (1, 2, 4), "include": False},
"data": "blah"}))
- self.assertEqual(pkt_new.context, "blah")
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_INDEX_ALLOC, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.newIndex, True)
+ self.assertEqual(pkt_LE_new.anyIndex, True)
+ self.assertEqual(pkt_LE_new.varbinds,
+ ({"type": x.OID,
+ "name": {"subids": (1, 2, 3), "include": False},
+ "data": {"subids": (4, 5, 6), "include": False}},
+ {"type": x.OCTET_STR,
+ "name": {"subids": (1, 2, 4), "include": False},
+ "data": "blah"}))
+ self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 14,
@@ -970,16 +1232,25 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
((x.OID, (1, 2, 3), (4, 5, 6), False),
(x.OCTET_STR, (1, 2, 4), "blah")),
context="blah")
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_INDEX_DEALLOC, True, 1, 2, 3)
self.assertEqual(pkt.newIndex, True)
self.assertEqual(pkt.anyIndex, True)
self.assertEqual(pkt.varbinds,
((x.OID, (1, 2, 3), (4, 5, 6), False),
(x.OCTET_STR, (1, 2, 4), "blah")))
self.assertEqual(pkt.context, "blah")
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3, True, True,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")),
+ context="blah")
+ test_pducore(self, pkt_LE, x.PDU_INDEX_DEALLOC, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.newIndex, True)
+ self.assertEqual(pkt_LE.anyIndex, True)
+ self.assertEqual(pkt_LE.varbinds,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")))
+ self.assertEqual(pkt_LE.context, "blah")
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -996,16 +1267,29 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x0F\x0E\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x48\x00\x00\x00"
+ "\x04\x00\x00\x00blah"
+ "\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
- self.assertEqual(pkt.newIndex, True)
- self.assertEqual(pkt.anyIndex, True)
+ test_pducore(self, pkt_new, x.PDU_INDEX_DEALLOC, True, 1, 2, 3)
+ self.assertEqual(pkt_new.newIndex, True)
+ self.assertEqual(pkt_new.anyIndex, True)
self.assertEqual(pkt_new.varbinds,
({"type": x.OID,
"name": {"subids": (1, 2, 3), "include": False},
@@ -1014,6 +1298,21 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"name": {"subids": (1, 2, 4), "include": False},
"data": "blah"}))
self.assertEqual(pkt_new.context, "blah")
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_INDEX_DEALLOC, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.newIndex, True)
+ self.assertEqual(pkt_LE_new.anyIndex, True)
+ self.assertEqual(pkt_LE_new.varbinds,
+ ({"type": x.OID,
+ "name": {"subids": (1, 2, 3), "include": False},
+ "data": {"subids": (4, 5, 6), "include": False}},
+ {"type": x.OCTET_STR,
+ "name": {"subids": (1, 2, 4), "include": False},
+ "data": "blah"}))
+ self.assertEqual(pkt_LE_new.context, "blah")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 15,
@@ -1037,16 +1336,20 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_AddAgentCapsPDU(self):
dec = ntp.agentx.decode_AddAgentCapsPDU
cls = ntp.agentx.AddAgentCapsPDU
+ x = ntp.agentx
# Test init
pkt = cls(True, 1, 2, 3, (4, 5, 6), "blah", context="bluh")
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_ADD_AGENT_CAPS, True, 1, 2, 3)
self.assertEqual(pkt.oid, (4, 5, 6))
self.assertEqual(pkt.description, "blah")
self.assertEqual(pkt.context, "bluh")
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3, (4, 5, 6), "blah", context="bluh")
+ test_pducore(self, pkt_LE, x.PDU_ADD_AGENT_CAPS, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.oid, (4, 5, 6))
+ self.assertEqual(pkt_LE.description, "blah")
+ self.assertEqual(pkt_LE.context, "bluh")
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -1057,18 +1360,34 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x04"
"\x00\x00\x00\x05\x00\x00\x00\x06"
"\x00\x00\x00\x04blah")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x10\x08\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x20\x00\x00\x00"
+ "\x04\x00\x00\x00bluh"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00blah")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_ADD_AGENT_CAPS, True, 1, 2, 3)
self.assertEqual(pkt_new.oid, {"subids": (4, 5, 6),
"include": False})
self.assertEqual(pkt_new.description, "blah")
self.assertEqual(pkt_new.context, "bluh")
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_ADD_AGENT_CAPS, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.oid, {"subids": (4, 5, 6),
+ "include": False})
+ self.assertEqual(pkt_LE_new.description, "blah")
+ self.assertEqual(pkt_LE_new.context, "bluh")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 16,
@@ -1083,15 +1402,18 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
def test_RMAgentCapsPDU(self):
dec = ntp.agentx.decode_RMAgentCapsPDU
cls = ntp.agentx.RMAgentCapsPDU
+ x = ntp.agentx
# Test init
pkt = cls(True, 1, 2, 3, (4, 5, 6), context="bluh")
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_RM_AGENT_CAPS, True, 1, 2, 3)
self.assertEqual(pkt.oid, (4, 5, 6))
self.assertEqual(pkt.context, "bluh")
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3, (4, 5, 6), context="bluh")
+ test_pducore(self, pkt_LE, x.PDU_RM_AGENT_CAPS, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.oid, (4, 5, 6))
+ self.assertEqual(pkt_LE.context, "bluh")
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -1101,17 +1423,31 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x04bluh"
"\x03\x00\x00\x00\x00\x00\x00\x04"
"\x00\x00\x00\x05\x00\x00\x00\x06")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x11\x08\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x18\x00\x00\x00"
+ "\x04\x00\x00\x00bluh"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_RM_AGENT_CAPS, True, 1, 2, 3)
self.assertEqual(pkt_new.oid, {"subids": (4, 5, 6),
"include": False})
self.assertEqual(pkt_new.context, "bluh")
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_RM_AGENT_CAPS, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.oid, {"subids": (4, 5, 6),
+ "include": False})
+ self.assertEqual(pkt_LE_new.context, "bluh")
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 17,
@@ -1131,16 +1467,24 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
pkt = cls(True, 1, 2, 3, 4, 5, 6,
((x.OID, (1, 2, 3), (4, 5, 6), False),
(x.OCTET_STR, (1, 2, 4), "blah")))
- self.assertEqual(pkt.bigEndian, True)
- self.assertEqual(pkt.sessionID, 1)
- self.assertEqual(pkt.transactionID, 2)
- self.assertEqual(pkt.packetID, 3)
+ test_pducore(self, pkt, x.PDU_RESPONSE, True, 1, 2, 3)
self.assertEqual(pkt.sysUptime, 4)
self.assertEqual(pkt.resError, 5)
self.assertEqual(pkt.resIndex, 6)
self.assertEqual(pkt.varbinds,
((6, (1, 2, 3), (4, 5, 6), False),
(4, (1, 2, 4), 'blah')))
+ # Test init, little endian
+ pkt_LE = cls(False, 1, 2, 3, 4, 5, 6,
+ ((x.OID, (1, 2, 3), (4, 5, 6), False),
+ (x.OCTET_STR, (1, 2, 4), "blah")))
+ test_pducore(self, pkt_LE, x.PDU_RESPONSE, False, 1, 2, 3)
+ self.assertEqual(pkt_LE.sysUptime, 4)
+ self.assertEqual(pkt_LE.resError, 5)
+ self.assertEqual(pkt_LE.resIndex, 6)
+ self.assertEqual(pkt_LE.varbinds,
+ ((6, (1, 2, 3), (4, 5, 6), False),
+ (4, (1, 2, 4), 'blah')))
# Test encode
pkt_str = pkt.encode()
self.assertEqual(pkt_str,
@@ -1157,14 +1501,27 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x03\x00\x00\x00\x00\x00\x00\x01"
"\x00\x00\x00\x02\x00\x00\x00\x04"
"\x00\x00\x00\x04blah")
+ # Test encode, little endian
+ pkt_LE_str = pkt_LE.encode()
+ self.assertEqual(pkt_LE_str,
+ "\x01\x12\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x48\x00\x00\x00"
+ "\x04\x00\x00\x00\x05\x00\x06\x00"
+ "\x06\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x03\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00blah")
# Test decode
header, body = slicedata(pkt_str, 20)
header = decode_pduheader(header)
pkt_new = dec(body, header)
- self.assertEqual(pkt_new.bigEndian, True)
- self.assertEqual(pkt_new.sessionID, 1)
- self.assertEqual(pkt_new.transactionID, 2)
- self.assertEqual(pkt_new.packetID, 3)
+ test_pducore(self, pkt_new, x.PDU_RESPONSE, True, 1, 2, 3)
self.assertEqual(pkt_new.sysUptime, 4)
self.assertEqual(pkt_new.resError, 5)
self.assertEqual(pkt_new.resIndex, 6)
@@ -1175,6 +1532,21 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
{"type": x.OCTET_STR,
"name": {"subids": (1, 2, 4), "include": False},
"data": "blah"}))
+ # Test decode, little endian
+ header, body = slicedata(pkt_LE_str, 20)
+ header = decode_pduheader(header)
+ pkt_LE_new = dec(body, header)
+ test_pducore(self, pkt_LE_new, x.PDU_RESPONSE, False, 1, 2, 3)
+ self.assertEqual(pkt_LE_new.sysUptime, 4)
+ self.assertEqual(pkt_LE_new.resError, 5)
+ self.assertEqual(pkt_LE_new.resIndex, 6)
+ self.assertEqual(pkt_LE_new.varbinds,
+ ({"type": x.OID,
+ "name": {"subids": (1, 2, 3), "include": False},
+ "data": {"subids": (4, 5, 6), "include": False}},
+ {"type": x.OCTET_STR,
+ "name": {"subids": (1, 2, 4), "include": False},
+ "data": "blah"}))
# Test packetVars
self.assertEqual(pkt_new.packetVars(),
{"pduType": 18,
@@ -1198,247 +1570,255 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
#
# Data type tests
#
- def test_encode_integer32(self):
- f = ntp.agentx.encode_integer32
-
- # Test
- self.assertEqual(f(True, 42), "\x00\x00\x00\x2A")
-
- def test_decode_integer32(self):
- f = ntp.agentx.decode_integer32
-
- # Test
- self.assertEqual(f("\x00\x00\x00\x2A" + extraData, standardFlags),
+ def test_integer32(self):
+ enc = ntp.agentx.encode_integer32
+ dec = ntp.agentx.decode_integer32
+
+ # Encode
+ self.assertEqual(enc(True, 42), "\x00\x00\x00\x2A")
+ # Encode, little endian
+ self.assertEqual(enc(False, 42), "\x2A\x00\x00\x00")
+ # Decode
+ self.assertEqual(dec("\x00\x00\x00\x2A" + extraData, standardFlags),
(42, extraData))
- # Test little endian
- self.assertEqual(f("\x2A\x00\x00\x00" + extraData, lilEndianFlags),
+ # Decode, little endian
+ self.assertEqual(dec("\x2A\x00\x00\x00" + extraData, lilEndianFlags),
(42, extraData))
- def test_encode_nullvalue(self):
- f = ntp.agentx.encode_nullvalue
-
- # Test
- self.assertEqual(f(True), "")
-
- def test_decode_nullvalue(self):
- f = ntp.agentx.decode_nullvalue
-
- # Test
- self.assertEqual(f(extraData, standardFlags), (None, extraData))
-
- def test_encode_integer64(self):
- f = ntp.agentx.encode_integer64
-
- # Test
- self.assertEqual(f(True, 42), "\x00\x00\x00\x00\x00\x00\x00\x2A")
-
- def test_decode_integer64(self):
- f = ntp.agentx.decode_integer64
-
- # Test
- self.assertEqual(f("\x00\x00\x00\x00\x00\x00\x00\x2A" + extraData,
- standardFlags),
+ def test_nullvalue(self):
+ enc = ntp.agentx.encode_nullvalue
+ dec = ntp.agentx.decode_nullvalue
+
+ # Encode
+ self.assertEqual(enc(True), "")
+ # Decode
+ self.assertEqual(dec(extraData, standardFlags), (None, extraData))
+
+ def test_integer64(self):
+ enc = ntp.agentx.encode_integer64
+ dec = ntp.agentx.decode_integer64
+
+ # Encode
+ self.assertEqual(enc(True, 42), "\x00\x00\x00\x00\x00\x00\x00\x2A")
+ # Encode, little endian
+ self.assertEqual(enc(False, 42), "\x2A\x00\x00\x00\x00\x00\x00\x00")
+ # Decode
+ self.assertEqual(dec("\x00\x00\x00\x00\x00\x00\x00\x2A" + extraData,
+ standardFlags),
(42, extraData))
- # Test
- self.assertEqual(f("\x2A\x00\x00\x00\x00\x00\x00\x00" + extraData,
- lilEndianFlags),
+ # Decode, little endian
+ self.assertEqual(dec("\x2A\x00\x00\x00\x00\x00\x00\x00" + extraData,
+ lilEndianFlags),
(42, extraData))
- def test_encode_ipaddr(self):
- f = ntp.agentx.encode_ipaddr
+ def test_ipaddr(self):
+ enc = ntp.agentx.encode_ipaddr
+ dec = ntp.agentx.decode_ipaddr
- # Test correct
- self.assertEqual(f(True, (1, 2, 3, 4)),
+ # Encode correct
+ self.assertEqual(enc(True, (1, 2, 3, 4)),
"\x00\x00\x00\x04\x01\x02\x03\x04")
- # Test incorrect
+ # Encode correct, little endian
+ self.assertEqual(enc(False, (1, 2, 3, 4)),
+ "\x04\x00\x00\x00\x01\x02\x03\x04")
+ # Encode incorrect
try:
- f(True, (1, 2, 3, 4, 5))
+ enc(True, (1, 2, 3, 4, 5))
errored = False
except ValueError:
errored = True
self.assertEqual(errored, True)
-
- def test_decode_ipaddr(self):
- f = ntp.agentx.decode_ipaddr
-
- # Test
- self.assertEqual(f("\x00\x00\x00\x04\x01\x02\x03\x04" + extraData,
- standardFlags),
+ # Decode
+ self.assertEqual(dec("\x00\x00\x00\x04\x01\x02\x03\x04" + extraData,
+ standardFlags),
((1, 2, 3, 4), extraData))
- # Test little endian
- self.assertEqual(f("\x04\x00\x00\x00\x01\x02\x03\x04" + extraData,
- lilEndianFlags),
+ # Decode, little endian
+ self.assertEqual(dec("\x04\x00\x00\x00\x01\x02\x03\x04" + extraData,
+ lilEndianFlags),
((1, 2, 3, 4), extraData))
- def test_encode_oid(self):
- f = ntp.agentx.encode_oid
+ def test_oid(self):
+ enc = ntp.agentx.encode_oid
+ dec = ntp.agentx.decode_oid
- # Test empty OID
- self.assertEqual(f(True, (), False), "\x00\x00\x00\x00")
- # Test basic OID
- self.assertEqual(f(True, (1, 2, 3, 4, 5), False),
+ # Encode empty OID
+ self.assertEqual(enc(True, (), False), "\x00\x00\x00\x00")
+ # Encode basic OID
+ self.assertEqual(enc(True, (1, 2, 3, 4, 5), False),
"\x05\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x00\x00\x00\x05")
- # Test basic OID, little endian
- self.assertEqual(f(False, (1, 2, 3, 4, 5), False),
+ # Encode basic OID, little endian
+ self.assertEqual(enc(False, (1, 2, 3, 4, 5), False),
"\x05\x00\x00\x00"
"\x01\x00\x00\x00\x02\x00\x00\x00"
"\x03\x00\x00\x00\x04\x00\x00\x00"
"\x05\x00\x00\x00")
- # Test prefixed OID
- self.assertEqual(f(True, (1, 3, 6, 1, 23, 1, 2, 3), False),
+ # Encode prefixed OID
+ self.assertEqual(enc(True, (1, 3, 6, 1, 23, 1, 2, 3), False),
"\x03\x17\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03")
- # Test include
- self.assertEqual(f(True, (1, 2), True),
+ # Encode include
+ self.assertEqual(enc(True, (1, 2), True),
"\x02\x00\x01\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02")
- # Test together
- self.assertEqual(f(True, (1, 3, 6, 1, 1, 3, 4, 5, 6), True),
+ # Encode together
+ self.assertEqual(enc(True, (1, 3, 6, 1, 1, 3, 4, 5, 6), True),
"\x04\x01\x01\x00"
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x00\x00\x00\x05\x00\x00\x00\x06")
- # Test maximum size
- self.assertEqual(f(True, maximumOIDsubs, False), maximumOIDstr)
- # Test over maximum size
+ # Encode maximum size
+ self.assertEqual(enc(True, maximumOIDsubs, False), maximumOIDstr)
+ # Encode over maximum size
try:
- f(True, maximumOIDsubs + (42,), False)
+ enc(True, maximumOIDsubs + (42,), False)
fail = False
except ValueError:
fail = True
self.assertEqual(fail, True)
-
- def test_decode_oid(self):
- f = ntp.agentx.decode_oid
-
- # Test empty OID, extra data
- self.assertEqual(f("\x00\x00\x00\x00" + extraData, standardFlags),
+ # Decode empty OID, extra data
+ self.assertEqual(dec("\x00\x00\x00\x00" + extraData, standardFlags),
({"subids": (), "include": False}, extraData))
- # Test basic OID, extra data
- self.assertEqual(f("\x05\x00\x00\x00\x00\x00\x00\x01"
- "\x00\x00\x00\x02\x00\x00\x00\x03"
- "\x00\x00\x00\x04\x00\x00\x00\x05" + extraData,
- standardFlags),
+ # Decode basic OID, extra data
+ self.assertEqual(dec("\x05\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x00\x00\x00\x04\x00\x00\x00\x05" + extraData,
+ standardFlags),
({"subids": (1, 2, 3, 4, 5), "include": False},
extraData))
- # Test basic OID, little endian
- self.assertEqual(f("\x05\x00\x00\x00\x01\x00\x00\x00"
- "\x02\x00\x00\x00\x03\x00\x00\x00"
- "\x04\x00\x00\x00\x05\x00\x00\x00", lilEndianFlags),
+ # Decode basic OID, little endian
+ self.assertEqual(dec("\x05\x00\x00\x00\x01\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x04\x00\x00\x00\x05\x00\x00\x00",
+ lilEndianFlags),
({"subids": (1, 2, 3, 4, 5), "include": False},
- ""))
- # Test prefixed OID
- self.assertEqual(f("\x03\x17\x00\x00\x00\x00\x00\x01"
- "\x00\x00\x00\x02\x00\x00\x00\x03", standardFlags),
+ ""))
+ # Decode prefixed OID
+ self.assertEqual(dec("\x03\x17\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x03", standardFlags),
({"subids": (1, 3, 6, 1, 23, 1, 2, 3),
"include": False},
""))
- # Test include
- self.assertEqual(f("\x02\x00\x05\x00\x00\x00\x00\x01\x00\x00\x00\x02",
- standardFlags),
+ # Decode include
+ self.assertEqual(dec("\x02\x00\x05\x00\x00\x00\x00\x01\x00\x00\x00\x02",
+ standardFlags),
({"subids": (1, 2), "include": True}, ""))
- # Test together
- self.assertEqual(f("\x04\x01\x02\x00\x00\x00\x00\x03"
- "\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x06",
- standardFlags),
+ # Decode together
+ self.assertEqual(dec("\x04\x01\x02\x00\x00\x00\x00\x03"
+ "\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x06",
+ standardFlags),
({"subids": (1, 3, 6, 1, 1, 3, 4, 5, 6),
"include": True},
""))
- # Test maximum size
- self.assertEqual(f(maximumOIDstr, standardFlags),
+ # Decode maximum size
+ self.assertEqual(dec(maximumOIDstr, standardFlags),
({"subids": maximumOIDsubs, "include": False}, ""))
- # Test over maximum size
+ # Decode over maximum size
# Need to replace the hardcoded n_subid=128 with 129
fatOID = "\x81" + maximumOIDstr[1:] + "\xDE\xAD\xBE\xEF"
try:
- f(fatOID, standardFlags)
+ dec(fatOID, standardFlags)
fail = False
except ValueError:
fail = True
self.assertEqual(fail, True)
- def test_encode_searchrange(self):
- f = ntp.agentx.encode_searchrange
+ def test_searchrange(self):
+ enc = ntp.agentx.encode_searchrange
+ dec = ntp.agentx.decode_searchrange
- # Test minimum size
- self.assertEqual(f(True, (), (), False),
+ # Encode minimum size
+ self.assertEqual(enc(True, (), (), False),
"\x00\x00\x00\x00\x00\x00\x00\x00")
- # Test inclusive
- self.assertEqual(f(True, (1, 2, 3, 4), (5, 6, 7, 8), True),
+ # Encode inclusive
+ self.assertEqual(enc(True, (1, 2, 3, 4), (5, 6, 7, 8), True),
"\x04\x00\x01\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x04\x00\x00\x00"
"\x00\x00\x00\x05\x00\x00\x00\x06"
"\x00\x00\x00\x07\x00\x00\x00\x08")
- # Test exclusive
- self.assertEqual(f(True, (1, 2, 3, 4), (5, 6, 7, 8), False),
+ # Encode exclusive
+ self.assertEqual(enc(True, (1, 2, 3, 4), (5, 6, 7, 8), False),
"\x04\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02"
"\x00\x00\x00\x03\x00\x00\x00\x04"
"\x04\x00\x00\x00"
"\x00\x00\x00\x05\x00\x00\x00\x06"
"\x00\x00\x00\x07\x00\x00\x00\x08")
-
- def test_decode_searchrange(self):
- f = ntp.agentx.decode_searchrange
-
- # Test minimum size, extra data
- self.assertEqual(f("\x00\x00\x00\x00\x00\x00\x00\x00" + extraData,
- standardFlags),
+ # Encode exclusive, little endian
+ self.assertEqual(enc(False, (1, 2, 3, 4), (5, 6, 7, 8), False),
+ "\x04\x00\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x07\x00\x00\x00\x08\x00\x00\x00")
+ # Decode minimum size, extra data
+ self.assertEqual(dec("\x00\x00\x00\x00\x00\x00\x00\x00" + extraData,
+ standardFlags),
({"start": {"subids": (), "include": False},
"end": {"subids": (), "include": False}},
extraData))
- # Test inclusive
- self.assertEqual(f("\x04\x00\x01\x00"
- "\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x00\x00\x00\x03\x00\x00\x00\x04"
- "\x04\x00\x00\x00"
- "\x00\x00\x00\x05\x00\x00\x00\x06"
- "\x00\x00\x00\x07\x00\x00\x00\x08", standardFlags),
+ # Decode inclusive
+ self.assertEqual(dec("\x04\x00\x01\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x07\x00\x00\x00\x08",
+ standardFlags),
({"start": {"subids": (1, 2, 3, 4), "include": True},
"end": {"subids": (5, 6, 7, 8), "include": False}},
""))
- # Test exclusive
- self.assertEqual(f("\x04\x00\x00\x00"
- "\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x00\x00\x00\x03\x00\x00\x00\x04"
- "\x04\x00\x00\x00"
- "\x00\x00\x00\x05\x00\x00\x00\x06"
- "\x00\x00\x00\x07\x00\x00\x00\x08", standardFlags),
+ # Decode exclusive
+ self.assertEqual(dec("\x04\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x04\x00\x00\x00"
+ "\x00\x00\x00\x05\x00\x00\x00\x06"
+ "\x00\x00\x00\x07\x00\x00\x00\x08",
+ standardFlags),
({"start": {"subids": (1, 2, 3, 4), "include": False},
"end": {"subids": (5, 6, 7, 8), "include": False}},
""))
- # Test little endian
- self.assertEqual(f("\x04\x00\x01\x00"
- "\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x03\x00\x00\x00\x04\x00\x00\x00"
- "\x04\x00\x00\x00"
- "\x05\x00\x00\x00\x06\x00\x00\x00"
- "\x07\x00\x00\x00\x08\x00\x00\x00", lilEndianFlags),
+ # Decode little endian
+ self.assertEqual(dec("\x04\x00\x01\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x04\x00\x00\x00"
+ "\x05\x00\x00\x00\x06\x00\x00\x00"
+ "\x07\x00\x00\x00\x08\x00\x00\x00",
+ lilEndianFlags),
({"start": {"subids": (1, 2, 3, 4), "include": True},
"end": {"subids": (5, 6, 7, 8), "include": False}},
""))
def test_encode_searchrange_list(self):
- f = ntp.agentx.encode_searchrange_list
+ enc = ntp.agentx.encode_searchrange_list
- # Test
- self.assertEqual(f(True, (((1, 2), (1, 2), True),
- ((2, 3), (3, 4), False))),
+ # Encode
+ self.assertEqual(enc(True, (((1, 2), (1, 2), True),
+ ((2, 3), (3, 4), False))),
"\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
"\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
"\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
"\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04")
+ # Encode, little endian
+ self.assertEqual(enc(False, (((1, 2), (1, 2), True),
+ ((2, 3), (3, 4), False))),
+ "\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00")
# Test, null terminated
- self.assertEqual(f(True,
- (((1, 2), (1, 2), True),
- ((2, 3), (3, 4), False)),
- nullTerminate=True),
+ self.assertEqual(enc(True,
+ (((1, 2), (1, 2), True),
+ ((2, 3), (3, 4), False)),
+ nullTerminate=True),
"\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
"\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
"\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
@@ -1446,140 +1826,146 @@ class TestNtpclientsNtpsnmpd(unittest.TestCase):
"\x00\x00\x00\x00")
def test_decode_searchrange_list(self):
- f = ntp.agentx.decode_searchrange_list
-
- # Test
- self.assertEqual(f("\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
- "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04",
- standardFlags),
+ dec = ntp.agentx.decode_searchrange_list
+
+ # Decode
+ self.assertEqual(dec("\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04",
+ standardFlags),
({"start": {"subids": (1, 2), "include": True},
"end": {"subids": (1, 2), "include": False}},
{"start": {"subids": (2, 3), "include": False},
"end": {"subids": (3, 4), "include": False}}))
# Test, little endian
- self.assertEqual(f("\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
- "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00",
- lilEndianFlags),
+ self.assertEqual(dec("\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00",
+ lilEndianFlags),
({"start": {"subids": (1, 2), "include": True},
"end": {"subids": (1, 2), "include": False}},
{"start": {"subids": (2, 3), "include": False},
"end": {"subids": (3, 4), "include": False}}))
-
+
def test_decode_searchrange_list_nullterm(self):
- f = ntp.agentx.decode_searchrange_list_nullterm
-
- # Test
- self.assertEqual(f("\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
- "\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
- "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
- "\x00\x00\x00\x00" + extraData, standardFlags),
+ dec = ntp.agentx.decode_searchrange_list_nullterm
+
+ # Decode
+ self.assertEqual(dec("\x02\x00\x01\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x02\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x03"
+ "\x02\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x04"
+ "\x00\x00\x00\x00" + extraData, standardFlags),
(({"start": {"subids": (1, 2), "include": True},
- "end": {"subids": (1, 2), "include": False}},
- {"start": {"subids": (2, 3), "include": False},
- "end": {"subids": (3, 4), "include": False}}),
+ "end": {"subids": (1, 2), "include": False}},
+ {"start": {"subids": (2, 3), "include": False},
+ "end": {"subids": (3, 4), "include": False}}),
extraData))
# Test, little endian
- self.assertEqual(f("\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
- "\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
- "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00"
- "\x00\x00\x00\x00" + extraData, lilEndianFlags),
+ self.assertEqual(dec("\x02\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
+ "\x02\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00"
+ "\x00\x00\x00\x00" + extraData, lilEndianFlags),
(({"start": {"subids": (1, 2), "include": True},
- "end": {"subids": (1, 2), "include": False}},
- {"start": {"subids": (2, 3), "include": False},
- "end": {"subids": (3, 4), "include": False}}),
+ "end": {"subids": (1, 2), "include": False}},
+ {"start": {"subids": (2, 3), "include": False},
+ "end": {"subids": (3, 4), "include": False}}),
extraData))
def test_encode_octetstr(self):
- f = ntp.agentx.encode_octetstr
+ enc = ntp.agentx.encode_octetstr
+ dec = ntp.agentx.decode_octetstr
- # Test empty
- self.assertEqual(f(True, ()), "\x00\x00\x00\x00")
- # Test word multiple
- self.assertEqual(f(True, (1, 2, 3, 4)),
+ # Encode empty
+ self.assertEqual(enc(True, ()), "\x00\x00\x00\x00")
+ # Encode word multiple
+ self.assertEqual(enc(True, (1, 2, 3, 4)),
"\x00\x00\x00\x04\x01\x02\x03\x04")
- # Test non word multiple
- self.assertEqual(f(True, (1, 2, 3, 4, 5)),
+ # Encode non word multiple
+ self.assertEqual(enc(True, (1, 2, 3, 4, 5)),
"\x00\x00\x00\x05\x01\x02\x03\x04\x05\x00\x00\x00")
- # Test string
- self.assertEqual(f(True, "blah"), "\x00\x00\x00\x04blah")
-
- def test_decode_octetstr(self):
- f = ntp.agentx.decode_octetstr
-
- # Test empty
- self.assertEqual(f("\x00\x00\x00\x00", standardFlags), ("", ""))
- # Test word multiple, extra data
- self.assertEqual(f("\x00\x00\x00\x04blah" + extraData, standardFlags),
+ # Encode string
+ self.assertEqual(enc(True, "blah"), "\x00\x00\x00\x04blah")
+ # Encode string, little endian
+ self.assertEqual(enc(False, "blah"), "\x04\x00\x00\x00blah")
+ # Decode empty
+ self.assertEqual(dec("\x00\x00\x00\x00", standardFlags), ("", ""))
+ # Decode word multiple, extra data
+ self.assertEqual(dec("\x00\x00\x00\x04blah" + extraData, standardFlags),
("blah", extraData))
- # Test word multiple, little endian
- self.assertEqual(f("\x04\x00\x00\x00blah", lilEndianFlags),
+ # Decode word multiple, little endian
+ self.assertEqual(dec("\x04\x00\x00\x00blah", lilEndianFlags),
("blah", ""))
- # Test non word multiple, extra data
- self.assertEqual(f("\x00\x00\x00\x05"
- "blarg\x00\x00\x00" + extraData, standardFlags),
+ # Decode non word multiple, extra data
+ self.assertEqual(dec("\x00\x00\x00\x05"
+ "blarg\x00\x00\x00" + extraData,
+ standardFlags),
("blarg", extraData))
def test_encode_varbind(self):
- f = ntp.agentx.encode_varbind
+ enc = ntp.agentx.encode_varbind
a = ntp.agentx
# Test payloadless types
- self.assertEqual(f(True, a.NULL, (1, 2, 3)),
+ self.assertEqual(enc(True, a.NULL, (1, 2, 3)),
"\x00\x05\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
- self.assertEqual(f(True, a.NO_SUCH_OBJECT, (1, 2, 3)),
+ self.assertEqual(enc(True, a.NO_SUCH_OBJECT, (1, 2, 3)),
"\x00\x80\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
- self.assertEqual(f(True, a.NO_SUCH_INSTANCE, (1, 2, 3)),
+ self.assertEqual(enc(True, a.NO_SUCH_INSTANCE, (1, 2, 3)),
"\x00\x81\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
- self.assertEqual(f(True, a.END_OF_MIB_VIEW, (1, 2, 3)),
+ self.assertEqual(enc(True, a.END_OF_MIB_VIEW, (1, 2, 3)),
"\x00\x82\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03")
# Test octet based types
- self.assertEqual(f(True, a.OCTET_STR, (1, 2, 3), (1, 2, 3, 4, 5)),
+ self.assertEqual(enc(True, a.OCTET_STR, (1, 2, 3), (1, 2, 3, 4, 5)),
"\x00\x04\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x05"
"\x01\x02\x03\x04\x05\x00\x00\x00")
- self.assertEqual(f(True, a.IP_ADDR, (1, 2, 3), (16, 32, 48, 64)),
+ self.assertEqual(enc(True, a.IP_ADDR, (1, 2, 3), (16, 32, 48, 64)),
"\x00\x40\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x04\x10\x20\x30\x40")
# Test integer32 types
- self.assertEqual(f(True, a.INTEGER, (1, 2, 3), 42),
+ self.assertEqual(enc(True, a.INTEGER, (1, 2, 3), 42),
"\x00\x02\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A")
- self.assertEqual(f(True, a.COUNTER32, (1, 2, 3), 42),
+ self.assertEqual(enc(True, a.COUNTER32, (1, 2, 3), 42),
"\x00\x41\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A")
- self.assertEqual(f(True, a.GAUGE32, (1, 2, 3), 42),
+ self.assertEqual(enc(True, a.GAUGE32, (1, 2, 3), 42),
"\x00\x42\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A")
- self.assertEqual(f(True, a.TIME_TICKS, (1, 2, 3), 42),
+ self.assertEqual(enc(True, a.TIME_TICKS, (1, 2, 3), 42),
"\x00\x43\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x2A")
# Test integer64 type
- self.assertEqual(f(True, a.COUNTER64, (1, 2, 3), 42),
+ self.assertEqual(enc(True, a.COUNTER64, (1, 2, 3), 42),
"\x00\x46\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x00\x00\x00\x00\x00\x00\x00\x2A")
# Test oid type
- self.assertEqual(f(True, a.OID, (1, 2, 3), (16, 42, 256), False),
+ self.assertEqual(enc(True, a.OID, (1, 2, 3), (16, 42, 256), False),
"\x00\x06\x00\x00\x03\x00\x00\x00"
"\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03"
"\x03\x00\x00\x00\x00\x00\x00\x10"
"\x00\x00\x00\x2A\x00\x00\x01\x00")
+ # Test oid type, little endian
+ self.assertEqual(enc(False, a.OID, (1, 2, 3), (16, 42, 256), False),
+ "\x06\x00\x00\x00\x03\x00\x00\x00"
+ "\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00"
+ "\x03\x00\x00\x00\x10\x00\x00\x00"
+ "\x2A\x00\x00\x00\x00\x01\x00\x00")
def test_decode_varbind(self):
f = ntp.agentx.decode_varbind
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/ec2abf6fbf4bcf12be8ea520a796ffc8d3b5c1b5...910d05617f38bce8729c871bd08f3e390dc574ed
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/compare/ec2abf6fbf4bcf12be8ea520a796ffc8d3b5c1b5...910d05617f38bce8729c871bd08f3e390dc574ed
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20170808/f3259625/attachment.html>
More information about the vc
mailing list