LibreVNA/Documentation/UserManual/SCPI_Examples/libreVNA.py
Scott Guthridge 106832624a Shorten class libraVNA check_cmds, timeout arguments
default_check_cmds -> check_cmds
default_timeout -> timeout
2024-04-19 23:27:08 -07:00

152 lines
5.6 KiB
Python
Executable file

import re
import socket
from asyncio import IncompleteReadError # only import the exception class
import time
class SocketStreamReader:
def __init__(self, sock: socket.socket, default_timeout=1):
self._sock = sock
self._sock.setblocking(0)
self._recv_buffer = bytearray()
self.default_timeout = default_timeout
def read(self, num_bytes: int = -1) -> bytes:
raise NotImplementedError
def readexactly(self, num_bytes: int) -> bytes:
buf = bytearray(num_bytes)
pos = 0
while pos < num_bytes:
n = self._recv_into(memoryview(buf)[pos:])
if n == 0:
raise IncompleteReadError(bytes(buf[:pos]), num_bytes)
pos += n
return bytes(buf)
def readline(self, timeout=None) -> bytes:
return self.readuntil(b"\n", timeout=timeout)
def readuntil(self, separator: bytes = b"\n", timeout=None) -> bytes:
if len(separator) != 1:
raise ValueError("Only separators of length 1 are supported.")
if timeout is None:
timeout = self.default_timeout
chunk = bytearray(4096)
start = 0
buf = bytearray(len(self._recv_buffer))
bytes_read = self._recv_into(memoryview(buf))
assert bytes_read == len(buf)
time_limit = time.time() + timeout
while True:
idx = buf.find(separator, start)
if idx != -1:
break
elif time.time() > time_limit:
raise Exception("Timed out waiting for response from GUI")
start = len(self._recv_buffer)
bytes_read = self._recv_into(memoryview(chunk))
buf += memoryview(chunk)[:bytes_read]
result = bytes(buf[: idx + 1])
self._recv_buffer = b"".join(
(memoryview(buf)[idx + 1 :], self._recv_buffer)
)
return result
def _recv_into(self, view: memoryview) -> int:
bytes_read = min(len(view), len(self._recv_buffer))
view[:bytes_read] = self._recv_buffer[:bytes_read]
self._recv_buffer = self._recv_buffer[bytes_read:]
if bytes_read == len(view):
return bytes_read
try:
bytes_read += self._sock.recv_into(view[bytes_read:], 0)
except:
pass
return bytes_read
class libreVNA:
def __init__(self, host='localhost', port=19542,
check_cmds=True, timeout=1):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((host, port))
except:
raise Exception("Unable to connect to LibreVNA-GUI. Make sure it is running and the TCP server is enabled.")
self.reader = SocketStreamReader(self.sock,
default_timeout=timeout)
self.default_check_cmds = check_cmds
def __del__(self):
self.sock.close()
def __read_response(self, timeout=None):
return self.reader.readline(timeout=timeout).decode().rstrip()
def cmd(self, cmd, check=None, timeout=None):
self.sock.sendall(cmd.encode())
self.sock.send(b"\n")
if check or (check is None and self.default_check_cmds):
status = self.get_status(timeout=timeout)
if self.get_status() & 0x20:
raise Exception("Command Error")
if self.get_status() & 0x10:
raise Exception("Execution Error")
if self.get_status() & 0x08:
raise Exception("Device Error")
if self.get_status() & 0x04:
raise Exception("Query Error")
return status
else:
return None
def query(self, query, timeout=None):
self.sock.sendall(query.encode())
self.sock.send(b"\n")
return self.__read_response(timeout=timeout)
def get_status(self, timeout=None):
resp = self.query("*ESR?", timeout=timeout)
if not re.match(r'^\d+$', resp):
raise Exception("Expected numeric response from *ESR? but got "
f"'{resp}'")
status = int(resp)
if status < 0 or status > 255:
raise Exception(f"*ESR? returned invalid value {status}.")
return status
@staticmethod
def parse_VNA_trace_data(data):
ret = []
# Remove brackets (order of data implicitly known)
data = data.replace(']','').replace('[','')
values = data.split(',')
if int(len(values) / 3) * 3 != len(values):
# number of values must be a multiple of three (frequency, real, imaginary)
raise Exception("Invalid input data: expected tuples of three values each")
for i in range(0, len(values), 3):
freq = float(values[i])
real = float(values[i+1])
imag = float(values[i+2])
ret.append((freq, complex(real, imag)))
return ret
@staticmethod
def parse_SA_trace_data(data):
ret = []
# Remove brackets (order of data implicitly known)
data = data.replace(']','').replace('[','')
values = data.split(',')
if int(len(values) / 2) * 2 != len(values):
# number of values must be a multiple of two (frequency, dBm)
raise Exception("Invalid input data: expected tuples of two values each")
for i in range(0, len(values), 2):
freq = float(values[i])
dBm = float(values[i+1])
ret.append((freq, dBm))
return ret