mirror of
https://github.com/jankae/LibreVNA.git
synced 2025-12-06 07:12:10 +01:00
libraVNA.cmd() no longer expects blank line responses. By default, it checks status after each command to match the previous behavior. Check can be disabled by optional cmd() parameter for handling expected failure cases. Commands such as *WAI may take longer than anything previously. Make it possible to override the default time-out in cmd() and query(). Change poll loops in TestCalibration and TestVNASweep to *WAI calls.
152 lines
5.6 KiB
Python
Executable file
152 lines
5.6 KiB
Python
Executable file
import re
|
|
import socket
|
|
from asyncio import IncompleteReadError # only import the exception class
|
|
import time
|
|
|
|
class SocketStreamReader:
|
|
def __init__(self, sock: socket.socket, default_timeout=1):
|
|
self._sock = sock
|
|
self._sock.setblocking(0)
|
|
self._recv_buffer = bytearray()
|
|
self.default_timeout = default_timeout
|
|
|
|
def read(self, num_bytes: int = -1) -> bytes:
|
|
raise NotImplementedError
|
|
|
|
def readexactly(self, num_bytes: int) -> bytes:
|
|
buf = bytearray(num_bytes)
|
|
pos = 0
|
|
while pos < num_bytes:
|
|
n = self._recv_into(memoryview(buf)[pos:])
|
|
if n == 0:
|
|
raise IncompleteReadError(bytes(buf[:pos]), num_bytes)
|
|
pos += n
|
|
return bytes(buf)
|
|
|
|
def readline(self, timeout=None) -> bytes:
|
|
return self.readuntil(b"\n", timeout=timeout)
|
|
|
|
def readuntil(self, separator: bytes = b"\n", timeout=None) -> bytes:
|
|
if len(separator) != 1:
|
|
raise ValueError("Only separators of length 1 are supported.")
|
|
if timeout is None:
|
|
timeout = self.default_timeout
|
|
|
|
chunk = bytearray(4096)
|
|
start = 0
|
|
buf = bytearray(len(self._recv_buffer))
|
|
bytes_read = self._recv_into(memoryview(buf))
|
|
assert bytes_read == len(buf)
|
|
|
|
time_limit = time.time() + timeout
|
|
while True:
|
|
idx = buf.find(separator, start)
|
|
if idx != -1:
|
|
break
|
|
elif time.time() > time_limit:
|
|
raise Exception("Timed out waiting for response from GUI")
|
|
|
|
start = len(self._recv_buffer)
|
|
bytes_read = self._recv_into(memoryview(chunk))
|
|
buf += memoryview(chunk)[:bytes_read]
|
|
|
|
result = bytes(buf[: idx + 1])
|
|
self._recv_buffer = b"".join(
|
|
(memoryview(buf)[idx + 1 :], self._recv_buffer)
|
|
)
|
|
return result
|
|
|
|
def _recv_into(self, view: memoryview) -> int:
|
|
bytes_read = min(len(view), len(self._recv_buffer))
|
|
view[:bytes_read] = self._recv_buffer[:bytes_read]
|
|
self._recv_buffer = self._recv_buffer[bytes_read:]
|
|
if bytes_read == len(view):
|
|
return bytes_read
|
|
try:
|
|
bytes_read += self._sock.recv_into(view[bytes_read:], 0)
|
|
except:
|
|
pass
|
|
return bytes_read
|
|
|
|
class libreVNA:
|
|
def __init__(self, host='localhost', port=19542,
|
|
default_check_cmds=True, default_timeout=1):
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
self.sock.connect((host, port))
|
|
except:
|
|
raise Exception("Unable to connect to LibreVNA-GUI. Make sure it is running and the TCP server is enabled.")
|
|
self.reader = SocketStreamReader(self.sock,
|
|
default_timeout=default_timeout)
|
|
self.default_check_cmds = default_check_cmds
|
|
|
|
def __del__(self):
|
|
self.sock.close()
|
|
|
|
def __read_response(self, timeout=None):
|
|
return self.reader.readline(timeout=timeout).decode().rstrip()
|
|
|
|
def cmd(self, cmd, check=None, timeout=None):
|
|
self.sock.sendall(cmd.encode())
|
|
self.sock.send(b"\n")
|
|
if check or (check is None and self.default_check_cmds):
|
|
status = self.get_status(timeout=timeout)
|
|
if self.get_status() & 0x20:
|
|
raise Exception("Command Error")
|
|
if self.get_status() & 0x10:
|
|
raise Exception("Execution Error")
|
|
if self.get_status() & 0x08:
|
|
raise Exception("Device Error")
|
|
if self.get_status() & 0x04:
|
|
raise Exception("Query Error")
|
|
return status
|
|
else:
|
|
return None
|
|
|
|
def query(self, query, timeout=None):
|
|
self.sock.sendall(query.encode())
|
|
self.sock.send(b"\n")
|
|
return self.__read_response(timeout=timeout)
|
|
|
|
def get_status(self, timeout=None):
|
|
resp = self.query("*ESR?", timeout=timeout)
|
|
if not re.match(r'^\d+$', resp):
|
|
raise Exception("Expected numeric response from *ESR? but got "
|
|
f"'{resp}'")
|
|
status = int(resp)
|
|
if status < 0 or status > 255:
|
|
raise Exception(f"*ESR? returned invalid value {status}.")
|
|
return status
|
|
|
|
@staticmethod
|
|
def parse_VNA_trace_data(data):
|
|
ret = []
|
|
# Remove brackets (order of data implicitly known)
|
|
data = data.replace(']','').replace('[','')
|
|
values = data.split(',')
|
|
if int(len(values) / 3) * 3 != len(values):
|
|
# number of values must be a multiple of three (frequency, real, imaginary)
|
|
raise Exception("Invalid input data: expected tuples of three values each")
|
|
for i in range(0, len(values), 3):
|
|
freq = float(values[i])
|
|
real = float(values[i+1])
|
|
imag = float(values[i+2])
|
|
ret.append((freq, complex(real, imag)))
|
|
return ret
|
|
|
|
@staticmethod
|
|
def parse_SA_trace_data(data):
|
|
ret = []
|
|
# Remove brackets (order of data implicitly known)
|
|
data = data.replace(']','').replace('[','')
|
|
values = data.split(',')
|
|
if int(len(values) / 2) * 2 != len(values):
|
|
# number of values must be a multiple of two (frequency, dBm)
|
|
raise Exception("Invalid input data: expected tuples of two values each")
|
|
for i in range(0, len(values), 2):
|
|
freq = float(values[i])
|
|
dBm = float(values[i+1])
|
|
ret.append((freq, dBm))
|
|
return ret
|
|
|