diff --git a/Documentation/UserManual/SCPI_Examples/libreVNA.py b/Documentation/UserManual/SCPI_Examples/libreVNA.py index 9c9bee7..dcac991 100755 --- a/Documentation/UserManual/SCPI_Examples/libreVNA.py +++ b/Documentation/UserManual/SCPI_Examples/libreVNA.py @@ -1,13 +1,14 @@ +import re import socket from asyncio import IncompleteReadError # only import the exception class import time class SocketStreamReader: - def __init__(self, sock: socket.socket): + def __init__(self, sock: socket.socket, default_timeout=1): self._sock = sock self._sock.setblocking(0) self._recv_buffer = bytearray() - self.timeout = 1.0 + self.default_timeout = default_timeout def read(self, num_bytes: int = -1) -> bytes: raise NotImplementedError @@ -22,12 +23,14 @@ class SocketStreamReader: pos += n return bytes(buf) - def readline(self) -> bytes: - return self.readuntil(b"\n") + def readline(self, timeout=None) -> bytes: + return self.readuntil(b"\n", timeout=timeout) - def readuntil(self, separator: bytes = b"\n") -> bytes: + def readuntil(self, separator: bytes = b"\n", timeout=None) -> bytes: if len(separator) != 1: raise ValueError("Only separators of length 1 are supported.") + if timeout is None: + timeout = self.default_timeout chunk = bytearray(4096) start = 0 @@ -35,12 +38,12 @@ class SocketStreamReader: bytes_read = self._recv_into(memoryview(buf)) assert bytes_read == len(buf) - timeout = time.time() + self.timeout + time_limit = time.time() + timeout while True: idx = buf.find(separator, start) if idx != -1: break - elif time.time() > timeout: + elif time.time() > time_limit: raise Exception("Timed out waiting for response from GUI") start = len(self._recv_buffer) @@ -66,31 +69,54 @@ class SocketStreamReader: return bytes_read class libreVNA: - def __init__(self, host='localhost', port=19542): + def __init__(self, host='localhost', port=19542, + default_check_cmds=True, default_timeout=1): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.sock.connect((host, port)) except: raise Exception("Unable to connect to LibreVNA-GUI. Make sure it is running and the TCP server is enabled.") - self.reader = SocketStreamReader(self.sock) + self.reader = SocketStreamReader(self.sock, + default_timeout=default_timeout) + self.default_check_cmds = default_check_cmds def __del__(self): self.sock.close() - def __read_response(self): - return self.reader.readline().decode().rstrip() + def __read_response(self, timeout=None): + return self.reader.readline(timeout=timeout).decode().rstrip() - def cmd(self, cmd): + def cmd(self, cmd, check=None, timeout=None): self.sock.sendall(cmd.encode()) self.sock.send(b"\n") - resp = self.__read_response() - if len(resp) > 0: - raise Exception("Expected empty response but got "+resp) + if check or (check is None and self.default_check_cmds): + status = self.get_status(timeout=timeout) + if self.get_status() & 0x20: + raise Exception("Command Error") + if self.get_status() & 0x10: + raise Exception("Execution Error") + if self.get_status() & 0x08: + raise Exception("Device Error") + if self.get_status() & 0x04: + raise Exception("Query Error") + return status + else: + return None - def query(self, query): + def query(self, query, timeout=None): self.sock.sendall(query.encode()) self.sock.send(b"\n") - return self.__read_response() + return self.__read_response(timeout=timeout) + + def get_status(self, timeout=None): + resp = self.query("*ESR?", timeout=timeout) + if not re.match(r'^\d+$', resp): + raise Exception("Expected numeric response from *ESR? but got " + f"'{resp}'") + status = int(resp) + if status < 0 or status > 255: + raise Exception(f"*ESR? returned invalid value {status}.") + return status @staticmethod def parse_VNA_trace_data(data): diff --git a/Software/Integrationtests/tests/TestBase.py b/Software/Integrationtests/tests/TestBase.py index 21edf90..983bd53 100644 --- a/Software/Integrationtests/tests/TestBase.py +++ b/Software/Integrationtests/tests/TestBase.py @@ -25,7 +25,7 @@ class TestBase(unittest.TestCase): self.vna = libreVNA('localhost', 19544) try: - self.vna.cmd(":DEV:CONN") + self.vna.cmd("*CLS;:DEV:CONN") except Exception as e: self.tearDown() raise e diff --git a/Software/Integrationtests/tests/TestCalibration.py b/Software/Integrationtests/tests/TestCalibration.py index 234613f..5fb7a14 100644 --- a/Software/Integrationtests/tests/TestCalibration.py +++ b/Software/Integrationtests/tests/TestCalibration.py @@ -7,11 +7,9 @@ class TestCalibration(TestBase): def cal_measure(self, number, timeout = 3): self.vna.cmd(":VNA:CAL:MEAS "+str(number)) # wait for the measurement to finish - stoptime = time.time() + timeout - while self.vna.query(":VNA:CAL:BUSY?") == "TRUE": - if time.time() > stoptime: - raise AssertionError("Calibration measurement timed out") - time.sleep(0.1) + assert self.vna.query(":VNA:CAL:BUSY?") == "TRUE" + self.vna.cmd("*WAI") + assert self.vna.query(":VNA:CAL:BUSY?") == "FALSE" def test_dummy_calibration(self): # This test just iterates through the calibration steps. As no actual standards @@ -29,7 +27,8 @@ class TestCalibration(TestBase): self.vna.cmd(":VNA:CAL:RESET") # No measurements yet, activating should fail - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_1"), "ERROR") + self.vna.cmd(":VNA:CAL:ACT SOLT_1", check=False) + self.assertTrue(self.vna.get_status() & 0x3C) # Load calibration kit self.assertEqual(self.vna.query(":VNA:CAL:KIT:LOAD? DUMMY.CALKIT"), "TRUE") @@ -47,11 +46,14 @@ class TestCalibration(TestBase): self.cal_measure(2) # SOLT_1 should now be available - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_1"), "") + self.vna.cmd(":VNA:CAL:ACT SOLT_1", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) # SOLT_2 and SOLT_12 should still be unavailable - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_2"), "ERROR") - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_12"), "ERROR") + self.vna.cmd(":VNA:CAL:ACT SOLT_2", check=False) + self.assertTrue(self.vna.get_status() & 0x3C) + self.vna.cmd(":VNA:CAL:ACT SOLT_12", check=False) + self.assertTrue(self.vna.get_status() & 0x3C) # Take measurements for SOLT_2 self.vna.cmd(":VNA:CAL:ADD OPEN") @@ -66,11 +68,14 @@ class TestCalibration(TestBase): self.cal_measure(5) # SOLT_1 and SOLT_2 should now be available - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_1"), "") - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_2"), "") - + self.vna.cmd(":VNA:CAL:ACT SOLT_1", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) + self.vna.cmd(":VNA:CAL:ACT SOLT_2", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) + # SOLT_12 should still be unavailable - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_12"), "ERROR") + self.vna.cmd(":VNA:CAL:ACT SOLT_12", check=False) + self.assertTrue(self.vna.get_status() & 0x3C) # Take the final through measurement for SOLT_12 self.vna.cmd(":VNA:CAL:ADD THROUGH") @@ -79,9 +84,12 @@ class TestCalibration(TestBase): self.cal_measure(6) # SOLT_1, SOLT_2 and SOLT_12 should now be available - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_1"), "") - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_2"), "") - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_12"), "") + self.vna.cmd(":VNA:CAL:ACT SOLT_1", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) + self.vna.cmd(":VNA:CAL:ACT SOLT_2", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) + self.vna.cmd(":VNA:CAL:ACT SOLT_12", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) def assertTrace_dB(self, trace, dB_nominal, dB_deviation): for S in trace: @@ -135,7 +143,8 @@ class TestCalibration(TestBase): self.cal_measure(6, 15) # activate calibration - self.assertEqual(self.vna.query(":VNA:CAL:ACT SOLT_12"), "") + self.vna.cmd(":VNA:CAL:ACT SOLT_12", check=False) + self.assertFalse(self.vna.get_status() & 0x3C) # switch in 6dB attenuator cal.setPort(cal.Standard.THROUGH, 1, 2) @@ -143,8 +152,9 @@ class TestCalibration(TestBase): # Start measurement and grab data self.vna.cmd(":VNA:ACQ:SINGLE TRUE") - while self.vna.query(":VNA:ACQ:FIN?") == "FALSE": - time.sleep(0.1) + self.assertEqual(self.vna.query(":VNA:ACQ:FIN?") == "FALSE") + self.vna.cmd("*WAI") + self.assertEqual(self.vna.query(":VNA:ACQ:FIN?") == "TRUE") cal.reset() @@ -162,5 +172,4 @@ class TestCalibration(TestBase): # Reflection should be below -10dB (much lower for most frequencies) self.assertTrace_dB(S11, -100, 90) self.assertTrace_dB(S22, -100, 90) - - \ No newline at end of file + diff --git a/Software/Integrationtests/tests/TestVNASweep.py b/Software/Integrationtests/tests/TestVNASweep.py index bae3143..3e0a87d 100644 --- a/Software/Integrationtests/tests/TestVNASweep.py +++ b/Software/Integrationtests/tests/TestVNASweep.py @@ -4,11 +4,9 @@ import time class TestVNASweep(TestBase): def waitSweepTimeout(self, timeout = 1): self.assertEqual(self.vna.query(":VNA:ACQ:FIN?"), "FALSE") - stoptime = time.time() + timeout - while self.vna.query(":VNA:ACQ:FIN?") == "FALSE": - if time.time() > stoptime: - raise AssertionError("Sweep timed out") - + self.vna.cmd("*WAI", timeout=timeout) + self.assertEqual(self.vna.query(":VNA:ACQ:FIN?"), "TRUE") + def test_sweep_frequency(self): self.vna.cmd(":DEV:MODE VNA") self.vna.cmd(":VNA:SWEEP FREQUENCY")