Migrated to named columns to prevent any accidential re-order

Signed-off-by: simonmicro <simon@simonmicro.de>
This commit is contained in:
simonmicro 2025-12-06 21:25:05 +01:00
parent 6c57a8e5b4
commit 1c28865f3d
No known key found for this signature in database
GPG key ID: 033A4D4CE4E063D6

View file

@ -7,17 +7,7 @@ import logging
#--------------------------------------------------------------------------------------------------------------------------------------------------------
loggersrv = logging.getLogger('logsrv')
_column_name_to_index = {
'clientMachineId': 0,
'machineName': 1,
'applicationId': 2,
'skuId': 3,
'licenseStatus': 4,
'lastRequestTime': 5,
'kmsEpid': 6,
'requestCount': 7,
'lastRequestIP': 8,
}
_column_names = ('clientMachineId', 'machineName', 'applicationId', 'skuId', 'licenseStatus', 'lastRequestTime', 'kmsEpid', 'requestCount', 'lastRequestIP')
# sqlite3 is optional.
available = False
@ -68,17 +58,18 @@ def sql_get_all(dbName):
if not os.path.isfile(dbName):
return None
with sqlite3.connect(dbName) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
cur.execute(f"SELECT {', '.join(_column_name_to_index.keys())} FROM clients")
cur.execute(f"SELECT {', '.join(_column_names)} FROM clients")
clients = []
for row in cur.fetchall():
loggersrv.debug(f"Row: {row}")
obj = {}
for col_name, index in _column_name_to_index.items():
for col_name in _column_names:
if col_name == "lastRequestTime":
obj[col_name] = datetime.fromtimestamp(row[_column_name_to_index['lastRequestTime']]).isoformat()
obj[col_name] = datetime.fromtimestamp(row['lastRequestTime']).isoformat()
else:
obj[col_name] = row[index]
obj[col_name] = row[col_name]
loggersrv.debug(f"Obj: {obj}")
clients.append(obj)
return clients
@ -88,36 +79,37 @@ def sql_update(dbName, infoDict):
return
# make sure all column names are present
for col_name in _column_name_to_index.keys():
for col_name in _column_names:
if col_name in ["requestCount", "kmsEpid"]:
continue
if col_name not in infoDict:
raise ValueError(f"infoDict is missing required column: {col_name}")
with sqlite3.connect(dbName) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
cur.execute(f"SELECT {', '.join(_column_name_to_index.keys())} FROM clients WHERE clientMachineId=:clientMachineId AND applicationId=:applicationId;", infoDict)
cur.execute(f"SELECT {', '.join(_column_names)} FROM clients WHERE clientMachineId=:clientMachineId AND applicationId=:applicationId;", infoDict)
data = cur.fetchone()
if not data:
# Insert new row with all given info
infoDict["kmsEpid"] = "" # Default empty value
infoDict["requestCount"] = 1
cur.execute(f"""INSERT INTO clients ({', '.join(_column_name_to_index.keys())})
VALUES ({', '.join(':' + col for col in _column_name_to_index.keys())});""", infoDict)
cur.execute(f"""INSERT INTO clients ({', '.join(_column_names)})
VALUES ({', '.join(':' + col for col in _column_names)});""", infoDict)
else:
# Update only changed columns
common_postfix = "WHERE clientMachineId=:clientMachineId AND applicationId=:applicationId"
def update_column_if_changed(column_name, new_value):
assert "clientMachineId" in infoDict and "applicationId" in infoDict, "infoDict must contain 'clientMachineId' and 'applicationId'"
if column_name not in _column_name_to_index:
if column_name not in _column_names:
raise ValueError(f"Unknown column name: {column_name}")
if data[_column_name_to_index[column_name]] != new_value:
if data[column_name] != new_value:
query = f"UPDATE clients SET {column_name}=:value {common_postfix}"
cur.execute(query, {"value": new_value, "clientMachineId": infoDict['clientMachineId'], "applicationId": infoDict['applicationId']})
# Dynamically check and maybe update all columns
for column_name in _column_name_to_index.keys():
for column_name in _column_names:
if column_name in ["clientMachineId", "applicationId", "requestCount"]:
continue # Skip these columns
if column_name == "kmsEpid":