[Git][NTPsec/ntpsec][master] Added tests for PacketControl and MIBControl.
Ian Bruene
gitlab at mg.gitlab.com
Fri Mar 23 18:58:04 UTC 2018
Ian Bruene pushed to branch master at NTPsec / ntpsec
Commits:
cefda341 by Ian Bruene at 2018-03-23T18:56:26Z
Added tests for PacketControl and MIBControl.
- - - - -
3 changed files:
- pylib/agentx.py
- pylib/agentx_packet.py
- tests/pylib/test_agentx.py
Changes:
=====================================
pylib/agentx.py
=====================================
--- a/pylib/agentx.py
+++ b/pylib/agentx.py
@@ -214,8 +214,8 @@ class PacketControl:
"Received packet with incorrect session ID: %s" % packet,
3)
resp = ax.ResponsePDU(True, packet.sessionID,
- packet.transactioID, packet.packetID,
- 0, ax.REPERR_NOT_OPEN, 0)
+ packet.transactionID, packet.packetID,
+ 0, ax.RSPERR_NOT_OPEN, 0)
self.sendPacket(resp, False)
continue
ptype = packet.pduType
@@ -245,7 +245,7 @@ class PacketControl:
self.database.mib_upperBound(),
self.database.mib_context())
self.sendPacket(register, False)
- response = self.waitForResponse(register, True)
+ response = self.waitForResponse(register)
self.stillConnected = True
def waitForResponse(self, opkt, ignoreSID=False):
@@ -325,7 +325,7 @@ class PacketControl:
if resp is None: # Timed out. Need to restart the session.
# Er, problem: Can't handle reconnect from inside PacketControl
self.stillConnected = False
- self.sendPacket(pkt, True)
+ self.sendPacket(pkt, True, callback=callback)
def sendNotify(self, varbinds, context=None):
# DUMMY packetID, does this need to change? or does the pktID only
=====================================
pylib/agentx_packet.py
=====================================
--- a/pylib/agentx_packet.py
+++ b/pylib/agentx_packet.py
@@ -1036,7 +1036,7 @@ def decode_integer32(data, header):
def sanity_integer32(data):
if type(data) is not int:
- raise TypeError
+ raise TypeError("%s is not integer" % repr(data))
def encode_unsigned32(bigEndian, num):
=====================================
tests/pylib/test_agentx.py
=====================================
--- a/tests/pylib/test_agentx.py
+++ b/tests/pylib/test_agentx.py
@@ -3,10 +3,13 @@
import ntp.agentx as AX
import ntp.agentx_packet as AP
+import ntp.util
+import jigs
import unittest
import types
+
class TestMIBControl(unittest.TestCase):
def test___init__(self):
c = AX.MIBControl(1, 2, 3, 4, 5)
@@ -139,6 +142,590 @@ class TestMIBControl(unittest.TestCase):
(AP.OID((0, 4, 1, 0)), 50, 51)])
+class TestPacketControl(unittest.TestCase):
+ def test___init__(self):
+ p = AX.PacketControl("sock", "base")
+ self.assertEqual(p.socket, "sock")
+ self.assertEqual(0.0005 < p.spinGap < 0.0015, True)
+ self.assertEqual(p.packetLog, {})
+ self.assertEqual(p.loopCallback, None)
+ self.assertEqual(p.database, "base")
+ self.assertEqual(p.receivedData, "")
+ self.assertEqual(p.timeout, 30)
+ self.assertEqual(p.sessionID, None)
+ self.assertEqual(p.highestTransactionID, 0)
+ self.assertEqual(p.lastReception, None)
+ self.assertEqual(p.stillConnected, False)
+ self.assertEqual(p.pduHandlers,
+ {AP.PDU_GET: p.handle_GetPDU,
+ AP.PDU_GET_NEXT: p.handle_GetNextPDU,
+ AP.PDU_GET_BULK: p.handle_GetBulkPDU,
+ AP.PDU_TEST_SET: p.handle_TestSetPDU,
+ AP.PDU_COMMIT_SET: p.handle_CommitSetPDU,
+ AP.PDU_UNDO_SET: p.handle_UndoSetPDU,
+ AP.PDU_CLEANUP_SET: p.handle_CleanupSetPDU,
+ AP.PDU_RESPONSE: p.handle_ResponsePDU})
+
+ def test_mainloop(self):
+ stillCons = []
+ loop_calls = [0]
+ cb_calls = [0]
+
+ def loop_jig(self):
+ loop_calls[0] += 1
+ self.stillConnected = stillCons.pop(0)
+
+ def callback_jig(p):
+ cb_calls[0] += 1
+
+ p = AX.PacketControl(None, None)
+ p.spinGap = 0
+ p._doloop = (lambda: loop_jig(p))
+ p.stillConnected = False
+ # Test not connected
+ self.assertEqual(p.mainloop(False), False)
+ self.assertEqual(loop_calls, [0])
+ # Test single shot
+ p.stillConnected = True
+ stillCons = [True] * 2
+ self.assertEqual(p.mainloop(False), True)
+ self.assertEqual(stillCons, [True])
+ self.assertEqual(loop_calls, [1])
+ # Test loop
+ p.stillConnected = True
+ loop_calls[0] = 0
+ stillCons = [True] * 5 + [False]
+ self.assertEqual(p.mainloop(True), False)
+ self.assertEqual(stillCons, [])
+ self.assertEqual(loop_calls, [6])
+ # Test loop, with callback
+ p.stillConnected = True
+ loop_calls[0] = 0
+ p.loopCallback = callback_jig
+ stillCons = [True] * 5 + [False]
+ self.assertEqual(p.mainloop(True), False)
+ self.assertEqual(stillCons, [])
+ self.assertEqual(loop_calls, [6])
+ self.assertEqual(cb_calls, [6])
+
+ def test__doloop(self):
+ # need jigs for
+ # time
+ pkts = []
+ recpTimes = []
+ handler_calls = [0]
+
+ def packetEater_jig(self):
+ self.receivedPackets = pkts
+ self.lastReception = recpTime
+
+ def pinghandler_jig(self):
+ handler_calls[0] += 1
+
+ faketimemod = jigs.TimeModuleJig()
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, None)
+ p.packetEater = (lambda: packetEater_jig(p))
+ p.checkResponses = (lambda: None)
+ p.pduHandlers[AP.PDU_PING] = pinghandler_jig
+ p.sessionID = 42
+ try:
+ timetemp = AX.time
+ AX.time = faketimemod
+ # Test first time, no reception
+ recpTime = None
+ faketimemod.time_returns = []
+ p._doloop()
+ self.assertEqual(sock.data, [])
+ # Test wrong session ID
+ pkts = [AP.PingPDU(True, 13, 0, 0)]
+ recpTime = 50
+ faketimemod.time_returns = [55]
+ p._doloop()
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 13, 0, 0, 0, AP.RSPERR_NOT_OPEN, 0)
+ self.assertEqual(sentPkt, respPkt)
+ # Test unimplemented packet type
+ faketimemod.time_returns = [55]
+ sock.data = []
+ pkts = [AP.PingPDU(True, 42, 0, 0)]
+ pkts[0].pduType = 1337 # spoof the type
+ p._doloop()
+ self.assertEqual(sock.data, [])
+ self.assertEqual(handler_calls, [0])
+ # Test sucessful
+ faketimemod.time_returns = [55]
+ sock.data = []
+ pkts = [AP.PingPDU(True, 42, 0, 0)]
+ p._doloop()
+ self.assertEqual(sock.data, [])
+ self.assertEqual(handler_calls, [1])
+ # Test pinging
+ handler_calls = [0]
+ faketimemod.time_returns = [120]
+ sock.data = []
+ p._doloop()
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.PingPDU(True, 42, 5, 1)
+ self.assertEqual(sentPkt, respPkt)
+ self.assertEqual(handler_calls, [0])
+ finally:
+ AX.time = timetemp
+
+ def test_initNewSession(self):
+ resp_returns = []
+ resp_calls = []
+
+ def waitForResponse_jig(opkt, ignoreSID=False):
+ resp_calls.append((opkt, ignoreSID))
+ return resp_returns.pop(0)
+
+ mjig = AX.MIBControl(mibRoot=(0, 1, 2), rangeSubid=0, upperBound=None,
+ mibContext="foo")
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, mjig)
+ p.waitForResponse = waitForResponse_jig
+ resp_returns = [AP.ResponsePDU(True, 42, 1, 1, 0, 0, 0),
+ AP.ResponsePDU(True, 42, 1, 1, 0, 0, 0)]
+ p.initNewSession()
+ self.assertEqual(resp_returns, [])
+ self.assertEqual(len(sock.data), 2)
+ openpkt = AP.decode_packet(sock.data[0])[0]
+ registerpkt = AP.decode_packet(sock.data[1])[0]
+ self.assertEqual(openpkt, AP.OpenPDU(True, 23, 0, 0, 30, (),
+ "NTPsec SNMP subagent"))
+ self.assertEqual(registerpkt, AP.RegisterPDU(True, 42, 1, 1, 30, 1,
+ mjig.mib_rootOID(),
+ mjig.mib_rangeSubid(),
+ mjig.mib_upperBound(),
+ mjig.mib_context()))
+ self.assertEqual(p.sessionID, 42)
+ self.assertEqual(p.stillConnected, True)
+
+ def test_waitForResponse(self):
+ pkts = [AP.PingPDU(True, 1, 2, 3),
+ AP.ResponsePDU(True, 10, 20, 30, 0, 0, 0),
+ AP.ResponsePDU(True, 5, 23, 100, 0, 0, 0),
+ AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0),
+ AP.ResponsePDU(True, 100, 200, 300, 0, 0, 0)]
+ opkt = AP.PingPDU(True, 42, 23, 100)
+
+ def packetEater_jig(self):
+ self.receivedPackets.append(pkts.pop(0))
+
+ p = AX.PacketControl(None, None)
+ p.packetEater = (lambda : packetEater_jig(p))
+ p.spinGap = 0
+ # Test with session id match required
+ ret = p.waitForResponse(opkt)
+ self.assertEqual(pkts, [AP.ResponsePDU(True, 100, 200, 300, 0, 0, 0)])
+ self.assertEqual(ret, AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0))
+ # Test ignoring session id
+ pkts = [AP.PingPDU(True, 1, 2, 3),
+ AP.ResponsePDU(True, 10, 20, 30, 0, 0, 0),
+ AP.ResponsePDU(True, 5, 23, 100, 0, 0, 0),
+ AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0)]
+ ret = p.waitForResponse(opkt, True)
+ self.assertEqual(pkts, [AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0)])
+ self.assertEqual(ret, AP.ResponsePDU(True, 5, 23, 100, 0, 0, 0))
+
+ def test_checkResponses(self):
+ returns = []
+ callback = (lambda resp, pkt: returns.append((resp, pkt)))
+ faketimemod = jigs.TimeModuleJig()
+ p = AX.PacketControl(None, None)
+ p.packetLog = {(42, 23, 100):(25, "foo", callback),
+ (42, 24, 200):(50, "bar", callback)}
+ faketimemod.time_returns = [40]
+ try:
+ timetemp = AX.time
+ AX.time = faketimemod
+ p.checkResponses()
+ self.assertEqual(returns, [(None, "foo")])
+ self.assertEqual(p.packetLog,
+ {(42, 24, 200):(50, "bar", callback)})
+ finally:
+ AX.time = timetemp
+
+ def test_packetEater(self):
+ pkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0)
+ enc = pkt.encode()
+ part1, rest = ntp.util.slicedata(enc, 15)
+ part2, part3 = ntp.util.slicedata(rest, 6)
+ pollReturns = [part1, part2, part3 + "blah"]
+
+ def pollSocket_jig(self):
+ self.receivedData += pollReturns.pop(0)
+ p = AX.PacketControl(None, None)
+ p.sessionID = 42
+ p.pollSocket = (lambda : pollSocket_jig(p))
+ # Test incomplete header
+ p.packetEater()
+ self.assertEqual(p.receivedPackets, [])
+ self.assertEqual(p.receivedData, part1)
+ # Test incomplete packet
+ p.packetEater()
+ self.assertEqual(p.receivedPackets, [])
+ self.assertEqual(p.receivedData, part1 + part2)
+ # Test packet eaten
+ p.packetEater()
+ self.assertEqual(p.receivedPackets, [pkt])
+ self.assertEqual(p.receivedData, "blah")
+
+ def test_sendPacket(self):
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, None)
+ testpkt = AP.PingPDU(True, 0, 1, 2, "fake")
+ # Test not expecting reply
+ p.sendPacket(testpkt, False)
+ self.assertEqual(sock.data, ["\x01\r\x18\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x08"
+ "\x00\x00\x00\x04fake"])
+ self.assertEqual(p.packetLog, {})
+ sock.data = []
+ # Test expecting reply
+ p.sendPacket(testpkt, True, callback="foo")
+ self.assertEqual(sock.data, ["\x01\r\x18\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x01"
+ "\x00\x00\x00\x02\x00\x00\x00\x08"
+ "\x00\x00\x00\x04fake"])
+ self.assertEqual(p.packetLog, {(0, 1, 2):(30.0, testpkt, "foo")})
+
+ def test_sendPing(self):
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, None)
+ p.sessionID = 42
+ p.sendPing()
+ self.assertEqual(sock.data, ["\x01\r\x10\x00"
+ "\x00\x00\x00*\x00\x00\x00\x05"
+ "\x00\x00\x00\x01\x00\x00\x00\x00"])
+ self.assertEqual(p.packetLog.keys(), [(42, 5, 1)])
+
+ def test_sendNotify(self):
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, None)
+ p.sessionID = 42
+ vb = AP.Varbind(AP.VALUE_INTEGER, (0, 1, 2), 23)
+ p.sendNotify([vb])
+ self.assertEqual(sock.data,
+ ["\x01\x0c\x10\x00"
+ "\x00\x00\x00*\x00\x00\x00\x05"
+ "\x00\x00\x00\x01\x00\x00\x00\x18"
+ "\x00\x02\x00\x00"
+ "\x03\x00\x00\x00\x00\x00\x00\x00"
+ "\x00\x00\x00\x01\x00\x00\x00\x02"
+ "\x00\x00\x00\x17"])
+ self.assertEqual(p.packetLog.keys(), [(42, 5, 1)])
+
+ def test_sendErrorResponse(self):
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, None)
+ p.sessionID = 42
+ header = {"flags":{"bigEndian":True}, "session_id":42,
+ "transaction_id":32, "packet_id":100}
+ p.sendErrorResponse(header, AP.ERR_GENERR, 12)
+ self.assertEqual(sock.data,
+ ["\x01\x12\x10\x00"
+ "\x00\x00\x00*\x00\x00\x00 "
+ "\x00\x00\x00d\x00\x00\x00\x08"
+ "\x00\x00\x00\x00\x00\x05\x00\x0c"])
+ self.assertEqual(p.packetLog, {})
+
+ def test_pollSocket(self):
+ fakeselectmod = jigs.SelectModuleJig()
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, None)
+ p.sessionID = 42
+ # Test
+ pkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0)
+ enc = pkt.encode()
+ part1, part2 = ntp.util.slicedata(enc, 15)
+ sock.return_data = [part1, part2, "blah"]
+ fakeselectmod.do_return = [True, True, True, False]
+ try:
+ selecttemp = AX.select
+ AX.select = fakeselectmod
+ p.pollSocket()
+ self.assertEqual(p.receivedData, enc + "blah")
+ finally:
+ AX.select = selecttemp
+
+ def test_handleGetPDU(self):
+ readReturns = [1]
+ read = (lambda oid: AP.Varbind(AP.VALUE_INTEGER,
+ oid, readReturns.pop(0)))
+ mjig = AX.MIBControl()
+ mjig.addNode((0, 0), read)
+ mjig.addNode((0, 1), (lambda x: None))
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, mjig)
+ p.sessionID = 42
+ # Test sucessful
+ pkt = AP.GetPDU(True, 42, 23, 100, [AP.SearchRange((0, 0), ())])
+ p.handle_GetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0,
+ (AP.Varbind(AP.VALUE_INTEGER, (0, 0), 1),))
+ self.assertEqual(sentPkt, respPkt)
+ # Test non-existent
+ sock.data = []
+ pkt = AP.GetPDU(True, 42, 23, 100, [AP.SearchRange((1, 0), ())])
+ p.handle_GetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0,
+ (AP.Varbind(AP.VALUE_NO_SUCH_OBJECT,
+ (1, 0)),))
+ self.assertEqual(sentPkt, respPkt)
+ # Test non-existent
+ sock.data = []
+ pkt = AP.GetPDU(True, 42, 23, 100, [AP.SearchRange((0, 1), ())])
+ p.handle_GetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0,
+ (AP.Varbind(AP.VALUE_NULL,
+ (0, 1)),))
+ self.assertEqual(sentPkt, respPkt)
+
+ def test_handleGetNextPDU(self):
+ readReturns = [1]
+ read = (lambda oid: AP.Varbind(AP.VALUE_INTEGER,
+ oid, readReturns.pop(0)))
+ mjig = AX.MIBControl()
+ mjig.addNode((0, 0), read)
+ mjig.addNode((0, 1), read)
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, mjig)
+ p.sessionID = 42
+ # Test successful
+ pkt = AP.GetNextPDU(True, 42, 23, 100, [AP.SearchRange((0, 0), ())])
+ p.handle_GetNextPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0,
+ (AP.Varbind(AP.VALUE_INTEGER, (0, 1), 1),))
+ self.assertEqual(sentPkt, respPkt)
+ # Test non-existent
+ sock.data = []
+ pkt = AP.GetNextPDU(True, 42, 23, 100, [AP.SearchRange((1, 0), ())])
+ p.handle_GetNextPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0,
+ (AP.Varbind(AP.VALUE_END_OF_MIB_VIEW,
+ (1, 0)),))
+ self.assertEqual(sentPkt, respPkt)
+
+ def test_handle_GetBulkPDU(self):
+ readReturns = [1, 2, 3, 4, 5, 6]
+ read = (lambda oid: AP.Varbind(AP.VALUE_INTEGER,
+ oid, readReturns.pop(0)))
+ mjig = AX.MIBControl()
+ # non-repeated
+ mjig.addNode((0, 0), read)
+ mjig.addNode((0, 1), read) # shouldn't see
+ mjig.addNode((1, 0), read)
+ mjig.addNode((1, 1), read) # shouldn't see
+ # repeated
+ mjig.addNode((2, 0), read)
+ mjig.addNode((2, 1), read)
+ mjig.addNode((2, 2), read) # shouldn't see
+ mjig.addNode((3, 0), read)
+ mjig.addNode((3, 1), read)
+ mjig.addNode((3, 2), read) # shouldn't see
+ mjig.addNode((4, 0), read) # shouldn't see
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, mjig)
+ p.sessionID = 42
+ pkt = AP.GetBulkPDU(True, 42, 23, 100, 2, 2,
+ [AP.SearchRange((0,), (1,), True),
+ AP.SearchRange((1,), (2,), True),
+ AP.SearchRange((2, 0), (3,), True),
+ AP.SearchRange((3, 0), (), True)])
+ p.handle_GetBulkPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0,
+ (AP.Varbind(AP.VALUE_INTEGER, (0, 0), 1),
+ AP.Varbind(AP.VALUE_INTEGER, (1, 0), 2),
+ AP.Varbind(AP.VALUE_INTEGER, (2, 0), 3),
+ AP.Varbind(AP.VALUE_INTEGER, (2, 1), 4),
+ AP.Varbind(AP.VALUE_INTEGER, (3, 0), 5),
+ AP.Varbind(AP.VALUE_INTEGER, (3, 1), 6)))
+ self.assertEqual(sentPkt, respPkt)
+
+ def test_handle_TestSetPDU(self):
+ mjig = AX.MIBControl()
+ readReturns = []
+ read = (lambda oid: AP.Varbind(AP.VALUE_INTEGER,
+ oid, readReturns.pop(0)))
+ writeCalls = []
+ writeReturns = []
+ write = (lambda a, vb: (writeCalls.append((a, vb)),
+ writeReturns.pop(0))[1])
+ mjig.addNode((0,), read, write)
+ mjig.addNode((1,), read)
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, mjig)
+ p.sessionID = 42
+ # Test success
+ readReturns = [10]
+ writeReturns = [AP.ERR_NOERROR]
+ pkt = AP.TestSetPDU(True, 42, 23, 100,
+ [AP.Varbind(AP.VALUE_INTEGER, (0,), 16)])
+ p.handle_TestSetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0)
+ self.assertEqual(sentPkt, respPkt)
+ self.assertEqual(writeCalls,
+ [("test", AP.Varbind(AP.VALUE_INTEGER, (0,), 16))])
+ self.assertEqual(mjig.setVarbinds,
+ [AP.Varbind(AP.VALUE_INTEGER, (0,), 16)])
+ self.assertEqual(mjig.setHandlers, [write])
+ self.assertEqual(mjig.setUndoData,
+ [AP.Varbind(AP.VALUE_INTEGER, (0,), 10)])
+ # Test failure, no access
+ mjig.setVarbinds = []
+ mjig.setHandlers = []
+ mjig.setUndoData = []
+ sock.data = []
+ readReturns = [10]
+ writeReturns = [AP.ERR_NOERROR]
+ writeCalls = []
+ pkt = AP.TestSetPDU(True, 42, 23, 100,
+ [AP.Varbind(AP.VALUE_INTEGER, (0, 1), 16)])
+ p.handle_TestSetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 6, 0)
+ self.assertEqual(sentPkt, respPkt)
+ self.assertEqual(writeCalls, [])
+ self.assertEqual(mjig.setVarbinds, [])
+ self.assertEqual(mjig.setHandlers, [])
+ self.assertEqual(mjig.setUndoData, [])
+ # Test failure, not writable
+ mjig.setVarbinds = []
+ mjig.setHandlers = []
+ mjig.setUndoData = []
+ sock.data = []
+ readReturns = [10]
+ writeReturns = [AP.ERR_NOERROR]
+ writeCalls = []
+ pkt = AP.TestSetPDU(True, 42, 23, 100,
+ [AP.Varbind(AP.VALUE_INTEGER, (1,), 16)])
+ p.handle_TestSetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 17, 0)
+ self.assertEqual(sentPkt, respPkt)
+ self.assertEqual(writeCalls, [])
+ self.assertEqual(mjig.setVarbinds, [])
+ self.assertEqual(mjig.setHandlers, [])
+ self.assertEqual(mjig.setUndoData, [])
+
+ def test_handle_CommitSetPDU(self):
+ handlerCalls = []
+ handlerReturns = [AP.ERR_NOERROR, AP.ERR_NOERROR]
+ handler = (lambda a, vb: (handlerCalls.append((a, vb)),
+ handlerReturns.pop(0))[1])
+ mjig = AX.MIBControl()
+ mjig.setVarbinds = ["foo", "bar"]
+ mjig.setHandlers = [handler, handler]
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, mjig)
+ p.sessionID = 42
+ # Test success
+ pkt = AP.CommitSetPDU(True, 42, 23, 100)
+ p.handle_CommitSetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0)
+ self.assertEqual(sentPkt, respPkt)
+ self.assertEqual(handlerCalls, [("commit", "foo"),
+ ("commit", "bar")])
+ # Test failure
+ sock.data = []
+ handlerCalls = []
+ handlerReturns = [AP.ERR_NOERROR, AP.ERR_GENERR]
+ p.handle_CommitSetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 5, 1)
+ self.assertEqual(sentPkt, respPkt)
+ self.assertEqual(handlerCalls, [("commit", "foo"),
+ ("commit", "bar")])
+
+ def test_handle_UndoSetPDU(self):
+ handlerCalls = []
+ handlerReturns = [AP.ERR_NOERROR, AP.ERR_NOERROR]
+ handler = (lambda a, vb, ud: (handlerCalls.append((a, vb, ud)),
+ handlerReturns.pop(0))[1])
+ mjig = AX.MIBControl()
+ mjig.setVarbinds = ["foo", "bar"]
+ mjig.setHandlers = [handler, handler]
+ mjig.setUndoData = ["fey", "yar"]
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, mjig)
+ p.sessionID = 42
+ # Test success
+ pkt = AP.UndoSetPDU(True, 42, 23, 100)
+ p.handle_UndoSetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 0, 0)
+ self.assertEqual(sentPkt, respPkt)
+ self.assertEqual(handlerCalls, [("undo", "foo", "fey"),
+ ("undo", "bar", "yar")])
+ # Test failure
+ sock.data = []
+ handlerCalls = []
+ handlerReturns = [AP.ERR_NOERROR, AP.ERR_GENERR]
+ p.handle_UndoSetPDU(pkt)
+ self.assertEqual(len(sock.data), 1)
+ sentPkt = AP.decode_packet(sock.data[0])[0]
+ respPkt = AP.ResponsePDU(True, 42, 23, 100, 0, 5, 1)
+ self.assertEqual(sentPkt, respPkt)
+ self.assertEqual(handlerCalls, [("undo", "foo", "fey"),
+ ('undo', 'bar', 'yar')])
+
+ def test_handle_CleanupSetPDU(self):
+ handlerCalls = []
+ handler = (lambda a, vb: handlerCalls.append((a, vb)))
+ mjig = AX.MIBControl()
+ mjig.setVarbinds = ["foo", "bar"]
+ mjig.setHandlers = [handler, handler]
+ p = AX.PacketControl(None, mjig)
+ p.sessionID = 42
+ pkt = AP.CleanupSetPDU(True, 42, 23, 100)
+ p.handle_CleanupSetPDU(pkt)
+ self.assertEqual(p.database.inSetP, False)
+ self.assertEqual(handlerCalls, [("clean", "foo"), ("clean", "bar")])
+
+ def test_handle_ResponsePDU(self):
+ cbreturn = []
+ def callback(pkt, origpkt):
+ cbreturn.append((pkt, origpkt))
+ opkt = AP.PingPDU(True, 42, 4, 50)
+ incorrect_rpkt = AP.ResponsePDU(True, 23, 1, 99, 0, 0, 0)
+ correct_rpkt = AP.ResponsePDU(True, 42, 4, 50, 0, 0, 0)
+ sock = jigs.SocketJig()
+ p = AX.PacketControl(sock, None)
+ p.sessionID = 42
+ p.packetLog = {(42, 4, 50):(30.0, opkt, callback)}
+ # Test non-expected response (likely to happen in a timeout)
+ p.handle_ResponsePDU(incorrect_rpkt)
+ self.assertEqual(p.packetLog, {(42, 4, 50):(30.0, opkt, callback)})
+ # Test expected response
+ p.handle_ResponsePDU(correct_rpkt)
+ self.assertEqual(p.packetLog, {})
+
+
class TestAgentx(unittest.TestCase):
def test_walkMIBTree(self):
f = AX.walkMIBTree
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/cefda3413166112d598a8e8ba9f4770dd1faab89
---
View it on GitLab: https://gitlab.com/NTPsec/ntpsec/commit/cefda3413166112d598a8e8ba9f4770dd1faab89
You're receiving this email because of your account on gitlab.com.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ntpsec.org/pipermail/vc/attachments/20180323/8b67d910/attachment.html>
More information about the vc
mailing list