From 95e59769fdcaedbb97a9df2ea304e15066418241 Mon Sep 17 00:00:00 2001 From: Scott Guthridge Date: Thu, 18 Apr 2024 00:48:44 -0700 Subject: [PATCH] Update integration tests for SCPI improvements libraVNA.cmd() no longer expects blank line responses. By default, it checks status after each command to match the previous behavior. Check can be disabled by optional cmd() parameter for handling expected failure cases. Commands such as *WAI may take longer than anything previously. Make it possible to override the default time-out in cmd() and query(). Change poll loops in TestCalibration and TestVNASweep to *WAI calls. --- .../UserManual/SCPI_Examples/libreVNA.py | 60 +++++++++++++------ Software/Integrationtests/tests/TestBase.py | 2 +- .../Integrationtests/tests/TestCalibration.py | 51 +++++++++------- .../Integrationtests/tests/TestVNASweep.py | 8 +-- 4 files changed, 77 insertions(+), 44 deletions(-) diff --git a/Documentation/UserManual/SCPI_Examples/libreVNA.py b/Documentation/UserManual/SCPI_Examples/libreVNA.py index 9c9bee7..dcac991 100755 --- a/Documentation/UserManual/SCPI_Examples/libreVNA.py +++ b/Documentation/UserManual/SCPI_Examples/libreVNA.py @@ -1,13 +1,14 @@ +import re import socket from asyncio import IncompleteReadError # only import the exception class import time class SocketStreamReader: - def __init__(self, sock: socket.socket): + def __init__(self, sock: socket.socket, default_timeout=1): self._sock = sock self._sock.setblocking(0) self._recv_buffer = bytearray() - self.timeout = 1.0 + self.default_timeout = default_timeout def read(self, num_bytes: int = -1) -> bytes: raise NotImplementedError @@ -22,12 +23,14 @@ class SocketStreamReader: pos += n return bytes(buf) - def readline(self) -> bytes: - return self.readuntil(b"\n") + def readline(self, timeout=None) -> bytes: + return self.readuntil(b"\n", timeout=timeout) - def readuntil(self, separator: bytes = b"\n") -> bytes: + def readuntil(self, separator: bytes = b"\n", timeout=None) -> bytes: if len(separator) != 1: raise ValueError("Only separators of length 1 are supported.") + if timeout is None: + timeout = self.default_timeout chunk = bytearray(4096) start = 0 @@ -35,12 +38,12 @@ class SocketStreamReader: bytes_read = self._recv_into(memoryview(buf)) assert bytes_read == len(buf) - timeout = time.time() + self.timeout + time_limit = time.time() + timeout while True: idx = buf.find(separator, start) if idx != -1: break - elif time.time() > timeout: + elif time.time() > time_limit: raise Exception("Timed out waiting for response from GUI") start = len(self._recv_buffer) @@ -66,31 +69,54 @@ class SocketStreamReader: return bytes_read class libreVNA: - def __init__(self, host='localhost', port=19542): + def __init__(self, host='localhost', port=19542, + default_check_cmds=True, default_timeout=1): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.sock.connect((host, port)) except: raise Exception("Unable to connect to LibreVNA-GUI. Make sure it is running and the TCP server is enabled.") - self.reader = SocketStreamReader(self.sock) + self.reader = SocketStreamReader(self.sock, + default_timeout=default_timeout) + self.default_check_cmds = default_check_cmds def __del__(self): self.sock.close() - def __read_response(self): - return self.reader.readline().decode().rstrip() + def __read_response(self, timeout=None): + return self.reader.readline(timeout=timeout).decode().rstrip() - def cmd(self, cmd): + def cmd(self, cmd, check=None, timeout=None): self.sock.sendall(cmd.encode()) self.sock.send(b"\n") - resp = self.__read_response() - if len(resp) > 0: - raise Exception("Expected empty response but got "+resp) + if check or (check is None and self.default_check_cmds): + status = self.get_status(timeout=timeout) + if self.get_status() & 0x20: + raise Exception("Command Error") + if self.get_status() & 0x10: + raise Exception("Execution Error") + if self.get_status() & 0x08: + raise Exception("Device Error") + if self.get_status() & 0x04: + raise Exception("Query Error") + return status + else: + return None - def query(self, query): + def query(self, query, timeout=None): self.sock.sendall(query.encode()) self.sock.send(b"\n") - return self.__read_response() + return self.__read_response(timeout=timeout) + + def get_status(self, timeout=None): + resp = self.query("*ESR?", timeout=timeout) + if not re.match(r'^\d+$', resp): + raise Exception("Expected numeric response from *ESR? but got " + f"'{resp}'") + status = int(resp) + if status < 0 or status > 255: + raise Exception(f"*ESR? returned invalid value {status}.") + return status @staticmethod def parse_VNA_trace_data(data): diff --git a/Software/Integrationtests/tests/TestBase.py b/Software/Integrationtests/tests/TestBase.py index 21edf90..983bd53 100644 --- a/Software/Integrationtests/tests/TestBase.py +++ b/Software/Integrationtests/tests/TestBase.py @@ -25,7 +25,7 @@ class TestBase(unittest.TestCase): self.vna = libreVNA('localhost', 19544) try: - self.vna.cmd(":DEV:CONN") + self.vna.cmd("*CLS;:DEV:CONN") except Exception as e: self.tearDown() raise e diff --git a/Software/Integrationtests/tests/TestCalibration.py b/Software/Integrationtests/tests/TestCalibration.py index 234613f..5fb7a14 100644 --- a/Software/Integrationtests/tests/TestCalibration.py +++ b/Software/Integrationtests/tests/TestCalibration.py @@ -7,11 +7,9 @@ class TestCalibration(TestBase): def cal_measure(self, number, timeout = 3): self.vna.cmd(":VNA:CAL:MEAS "+str(number)) # wait for the measurement to finish - stoptime = time.time() + timeout - while self.vna.query(":VNA:CAL:BUSY?") == "TRUE": - if time.time() > stoptime: - raise AssertionError("Calibration measurement timed out") - time.sleep(0.1) + assert self.vna.query(":VNA:CAL:BUSY?") == "TRUE" + self.vna.cmd("*WAI") + assert self.vna.query(":VNA:CAL:BUSY?") == "FALSE" def test_dummy_calibration(self): # This test just iterates through the calibration steps. As no actual standards @@ -29,7 +27,8 @@ class TestCalibration(TestBase): self.vna.cmd(":VNA:CAL:RESET") # No measurements yet, activating should fail - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_1"), "ERROR") + self.vna.cmd(":VNA:CAL:ACT SOLT_1", check=False) + self.assertTrue(self.vna.get_status() & 0x3C) # Load calibration kit self.assertEqual(self.vna.query(":VNA:CAL:KIT:LOAD? DUMMY.CALKIT"), "TRUE") @@ -47,11 +46,14 @@ class TestCalibration(TestBase): self.cal_measure(2) # SOLT_1 should now be available - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_1"), "") + self.vna.cmd(":VNA:CAL:ACT SOLT_1", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) # SOLT_2 and SOLT_12 should still be unavailable - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_2"), "ERROR") - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_12"), "ERROR") + self.vna.cmd(":VNA:CAL:ACT SOLT_2", check=False) + self.assertTrue(self.vna.get_status() & 0x3C) + self.vna.cmd(":VNA:CAL:ACT SOLT_12", check=False) + self.assertTrue(self.vna.get_status() & 0x3C) # Take measurements for SOLT_2 self.vna.cmd(":VNA:CAL:ADD OPEN") @@ -66,11 +68,14 @@ class TestCalibration(TestBase): self.cal_measure(5) # SOLT_1 and SOLT_2 should now be available - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_1"), "") - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_2"), "") - + self.vna.cmd(":VNA:CAL:ACT SOLT_1", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) + self.vna.cmd(":VNA:CAL:ACT SOLT_2", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) + # SOLT_12 should still be unavailable - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_12"), "ERROR") + self.vna.cmd(":VNA:CAL:ACT SOLT_12", check=False) + self.assertTrue(self.vna.get_status() & 0x3C) # Take the final through measurement for SOLT_12 self.vna.cmd(":VNA:CAL:ADD THROUGH") @@ -79,9 +84,12 @@ class TestCalibration(TestBase): self.cal_measure(6) # SOLT_1, SOLT_2 and SOLT_12 should now be available - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_1"), "") - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_2"), "") - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_12"), "") + self.vna.cmd(":VNA:CAL:ACT SOLT_1", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) + self.vna.cmd(":VNA:CAL:ACT SOLT_2", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) + self.vna.cmd(":VNA:CAL:ACT SOLT_12", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) def assertTrace_dB(self, trace, dB_nominal, dB_deviation): for S in trace: @@ -135,7 +143,8 @@ class TestCalibration(TestBase): self.cal_measure(6, 15) # activate calibration - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_12"), "") + self.vna.cmd(":VNA:CAL:ACT SOLT_12", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) # switch in 6dB attenuator cal.setPort(cal.Standard.THROUGH, 1, 2) @@ -143,8 +152,9 @@ class TestCalibration(TestBase): # Start measurement and grab data self.vna.cmd(":VNA:ACQ:SINGLE TRUE") - while self.vna.query(":VNA:ACQ:FIN?") == "FALSE": - time.sleep(0.1) + self.assertEqual(self.vna.query(":VNA:ACQ:FIN?") == "FALSE") + self.vna.cmd("*WAI") + self.assertEqual(self.vna.query(":VNA:ACQ:FIN?") == "TRUE") cal.reset() @@ -162,5 +172,4 @@ class TestCalibration(TestBase): # Reflection should be below -10dB (much lower for most frequencies) self.assertTrace_dB(S11, -100, 90) self.assertTrace_dB(S22, -100, 90) - - \ No newline at end of file + diff --git a/Software/Integrationtests/tests/TestVNASweep.py b/Software/Integrationtests/tests/TestVNASweep.py index bae3143..3e0a87d 100644 --- a/Software/Integrationtests/tests/TestVNASweep.py +++ b/Software/Integrationtests/tests/TestVNASweep.py @@ -4,11 +4,9 @@ import time class TestVNASweep(TestBase): def waitSweepTimeout(self, timeout = 1): self.assertEqual(self.vna.query(":VNA:ACQ:FIN?"), "FALSE") - stoptime = time.time() + timeout - while self.vna.query(":VNA:ACQ:FIN?") == "FALSE": - if time.time() > stoptime: - raise AssertionError("Sweep timed out") - + self.vna.cmd("*WAI", timeout=timeout) + self.assertEqual(self.vna.query(":VNA:ACQ:FIN?"), "TRUE") + def test_sweep_frequency(self): self.vna.cmd(":DEV:MODE VNA") self.vna.cmd(":VNA:SWEEP FREQUENCY")