From 1c28865f3de5fee456b84cd06814952ae828e05d Mon Sep 17 00:00:00 2001 From: simonmicro Date: Sat, 6 Dec 2025 21:25:05 +0100 Subject: [PATCH] Migrated to named columns to prevent any accidential re-order Signed-off-by: simonmicro --- py-kms/pykms_Sql.py | 36 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/py-kms/pykms_Sql.py b/py-kms/pykms_Sql.py index 0f6f73c..07d6548 100644 --- a/py-kms/pykms_Sql.py +++ b/py-kms/pykms_Sql.py @@ -7,17 +7,7 @@ import logging #-------------------------------------------------------------------------------------------------------------------------------------------------------- loggersrv = logging.getLogger('logsrv') -_column_name_to_index = { - 'clientMachineId': 0, - 'machineName': 1, - 'applicationId': 2, - 'skuId': 3, - 'licenseStatus': 4, - 'lastRequestTime': 5, - 'kmsEpid': 6, - 'requestCount': 7, - 'lastRequestIP': 8, -} +_column_names = ('clientMachineId', 'machineName', 'applicationId', 'skuId', 'licenseStatus', 'lastRequestTime', 'kmsEpid', 'requestCount', 'lastRequestIP') # sqlite3 is optional. available = False @@ -68,17 +58,18 @@ def sql_get_all(dbName): if not os.path.isfile(dbName): return None with sqlite3.connect(dbName) as con: + con.row_factory = sqlite3.Row cur = con.cursor() - cur.execute(f"SELECT {', '.join(_column_name_to_index.keys())} FROM clients") + cur.execute(f"SELECT {', '.join(_column_names)} FROM clients") clients = [] for row in cur.fetchall(): loggersrv.debug(f"Row: {row}") obj = {} - for col_name, index in _column_name_to_index.items(): + for col_name in _column_names: if col_name == "lastRequestTime": - obj[col_name] = datetime.fromtimestamp(row[_column_name_to_index['lastRequestTime']]).isoformat() + obj[col_name] = datetime.fromtimestamp(row['lastRequestTime']).isoformat() else: - obj[col_name] = row[index] + obj[col_name] = row[col_name] loggersrv.debug(f"Obj: {obj}") clients.append(obj) return clients @@ -88,36 +79,37 @@ def sql_update(dbName, infoDict): return # make sure all column names are present - for col_name in _column_name_to_index.keys(): + for col_name in _column_names: if col_name in ["requestCount", "kmsEpid"]: continue if col_name not in infoDict: raise ValueError(f"infoDict is missing required column: {col_name}") with sqlite3.connect(dbName) as con: + con.row_factory = sqlite3.Row cur = con.cursor() - cur.execute(f"SELECT {', '.join(_column_name_to_index.keys())} FROM clients WHERE clientMachineId=:clientMachineId AND applicationId=:applicationId;", infoDict) + cur.execute(f"SELECT {', '.join(_column_names)} FROM clients WHERE clientMachineId=:clientMachineId AND applicationId=:applicationId;", infoDict) data = cur.fetchone() if not data: # Insert new row with all given info infoDict["kmsEpid"] = "" # Default empty value infoDict["requestCount"] = 1 - cur.execute(f"""INSERT INTO clients ({', '.join(_column_name_to_index.keys())}) - VALUES ({', '.join(':' + col for col in _column_name_to_index.keys())});""", infoDict) + cur.execute(f"""INSERT INTO clients ({', '.join(_column_names)}) + VALUES ({', '.join(':' + col for col in _column_names)});""", infoDict) else: # Update only changed columns common_postfix = "WHERE clientMachineId=:clientMachineId AND applicationId=:applicationId" def update_column_if_changed(column_name, new_value): assert "clientMachineId" in infoDict and "applicationId" in infoDict, "infoDict must contain 'clientMachineId' and 'applicationId'" - if column_name not in _column_name_to_index: + if column_name not in _column_names: raise ValueError(f"Unknown column name: {column_name}") - if data[_column_name_to_index[column_name]] != new_value: + if data[column_name] != new_value: query = f"UPDATE clients SET {column_name}=:value {common_postfix}" cur.execute(query, {"value": new_value, "clientMachineId": infoDict['clientMachineId'], "applicationId": infoDict['applicationId']}) # Dynamically check and maybe update all columns - for column_name in _column_name_to_index.keys(): + for column_name in _column_names: if column_name in ["clientMachineId", "applicationId", "requestCount"]: continue # Skip these columns if column_name == "kmsEpid":