Style: convert to f-strings, add type hints to mmio, run Black

- Replace remaining .format() calls with f-strings (ruff UP032 plus
  manual cleanups for the formats ruff couldn't rewrite); replace
  the lone %-format in mmio.MMIO.__str__ as well.
- Hoist `UNDERVOLT.<source>` and `ICCMAX.<source>` section names to
  local variables in undervolt() / set_icc_max() so the f-string
  appears once.
- Add type hints to the public mmio.MMIO methods (the only real
  module boundary in the project).
- Run Black with the existing pyproject.toml config (line-length 120,
  skip-string-normalization). No semantic changes.

Signed-off-by: ooonea <35407790+ooonea@users.noreply.github.com>
This commit is contained in:
ooonea 2026-04-19 21:56:35 +02:00
parent 2fd89b747b
commit 1b815ea3fe
No known key found for this signature in database
GPG key ID: 3CD106C3A25275AD
2 changed files with 66 additions and 89 deletions

21
mmio.py
View file

@ -1,6 +1,7 @@
"""
Stripped down version from https://github.com/vsergeev/python-periphery/blob/master/periphery/mmio.py
"""
import mmap
import os
import struct
@ -11,7 +12,7 @@ class MMIOError(IOError):
class MMIO(object):
def __init__(self, physaddr, size):
def __init__(self, physaddr: int, size: int) -> None:
"""Instantiate an MMIO object and map the region of physical memory
specified by the address base `physaddr` and size `size` in bytes.
Args:
@ -35,7 +36,7 @@ class MMIO(object):
def __exit__(self, t, value, traceback):
self.close()
def _open(self, physaddr, size):
def _open(self, physaddr: int, size: int) -> None:
if not isinstance(physaddr, int):
raise TypeError("Invalid physaddr type, should be integer.")
if not isinstance(size, int):
@ -79,7 +80,7 @@ class MMIO(object):
if (offset + length) > self._aligned_size:
raise ValueError("Offset out of bounds.")
def read32(self, offset):
def read32(self, offset: int) -> int:
"""Read 32-bits from the specified `offset` in bytes, relative to the
base physical address of the MMIO region.
Args:
@ -95,9 +96,9 @@ class MMIO(object):
offset = self._adjust_offset(offset)
self._validate_offset(offset, 4)
return struct.unpack("=L", self.mapping[offset:offset + 4])[0]
return struct.unpack("=L", self.mapping[offset : offset + 4])[0]
def write32(self, offset, value):
def write32(self, offset: int, value: int) -> None:
"""Write 32-bits to the specified `offset` in bytes, relative to the
base physical address of the MMIO region.
Args:
@ -111,14 +112,14 @@ class MMIO(object):
raise TypeError("Invalid offset type, should be integer.")
if not isinstance(value, int):
raise TypeError("Invalid value type, should be integer.")
if value < 0 or value > 0xffffffff:
if value < 0 or value > 0xFFFFFFFF:
raise ValueError("Value out of bounds.")
offset = self._adjust_offset(offset)
self._validate_offset(offset, 4)
self.mapping[offset:offset + 4] = struct.pack("=L", value)
self.mapping[offset : offset + 4] = struct.pack("=L", value)
def close(self):
def close(self) -> None:
"""Unmap the MMIO object's mapped physical memory."""
if self.mapping is None:
return
@ -128,5 +129,5 @@ class MMIO(object):
# String representation
def __str__(self):
return "MMIO 0x%08x (size=%d)" % (self._physaddr, self._size)
def __str__(self) -> str:
return f"MMIO 0x{self._physaddr:08x} (size={self._size:d})"

View file

@ -197,10 +197,10 @@ ANSI_ESCAPE_RE = re.compile(r'\x1b\[[0-9;]*m')
def _format(prefix, msg):
tstamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
if args.log:
return '{:s}: {:s}{:s}'.format(tstamp, prefix, ANSI_ESCAPE_RE.sub('', msg))
return '{:s}{:s}'.format(prefix, msg)
tstamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
return f'{tstamp}: {prefix}{ANSI_ESCAPE_RE.sub("", msg)}'
return f'{prefix}{msg}'
def log(msg, oneshot=False, end='\n'):
@ -226,7 +226,7 @@ def warning(msg, oneshot=True, end='\n'):
def get_msr_list():
"""Return the per-CPU MSR device paths in CPU-index order."""
cpus = sorted(int(x) for x in os.listdir('/dev/cpu') if x.isdigit())
return ['/dev/cpu/{:d}/msr'.format(cpu) for cpu in cpus]
return [f'/dev/cpu/{cpu:d}/msr' for cpu in cpus]
def writemsr(msr, val):
@ -250,11 +250,11 @@ def writemsr(msr, val):
raise e
if e.errno == EPERM or e.errno == EACCES:
fatal(
'Unable to write to MSR {} ({:x}). Try to disable Secure Boot '
'and check if your kernel does not restrict access to MSR.'.format(msr, MSR_DICT[msr])
f'Unable to write to MSR {msr} ({MSR_DICT[msr]:x}). Try to disable Secure Boot '
'and check if your kernel does not restrict access to MSR.'
)
elif e.errno == EIO:
fatal('Unable to write to MSR {} ({:x}). Unknown error.'.format(msr, MSR_DICT[msr]))
fatal(f'Unable to write to MSR {msr} ({MSR_DICT[msr]:x}). Unknown error.')
else:
raise e
@ -286,23 +286,23 @@ def readmsr(msr, from_bit=0, to_bit=63, cpu=None, flatten=False):
output.append(get_value_for_bits(val, from_bit, to_bit))
if flatten:
if len(set(output)) > 1:
warning('Found multiple values for {:s} ({:x}). This should never happen.'.format(msr, MSR_DICT[msr]))
warning(f'Found multiple values for {msr:s} ({MSR_DICT[msr]:x}). This should never happen.')
return output[0]
return output[cpu] if cpu is not None else output
except (IOError, OSError) as e:
if TESTMSR:
raise e
if e.errno == EPERM or e.errno == EACCES:
fatal('Unable to read from MSR {} ({:x}). Try to disable Secure Boot.'.format(msr, MSR_DICT[msr]))
fatal(f'Unable to read from MSR {msr} ({MSR_DICT[msr]:x}). Try to disable Secure Boot.')
elif e.errno == EIO:
fatal('Unable to read to MSR {} ({:x}). Unknown error.'.format(msr, MSR_DICT[msr]))
fatal(f'Unable to read to MSR {msr} ({MSR_DICT[msr]:x}). Unknown error.')
else:
raise e
def get_value_for_bits(val, from_bit=0, to_bit=63):
"""Extract bits [from_bit, to_bit] (inclusive) from val."""
mask = sum(2 ** x for x in range(from_bit, to_bit + 1))
mask = sum(2**x for x in range(from_bit, to_bit + 1))
return (val & mask) >> from_bit
@ -331,7 +331,7 @@ def is_on_battery(config):
with open(path) as f:
return not bool(int(f.read()))
except (IOError, OSError, ValueError) as e:
warning('Sysfs_Power_Path read failed ({}). Trying upower method.'.format(e))
warning(f'Sysfs_Power_Path read failed ({e}). Trying upower method.')
else:
warning('No valid Sysfs_Power_Path found! Trying upower method')
try:
@ -400,9 +400,9 @@ def calc_time_window_vars(t):
"""Encode a time-window duration (s) as the (Y, Z) pair used by
MSR_PKG_POWER_LIMIT."""
time_unit = get_time_unit()
for Y in range(2 ** 5):
for Z in range(2 ** 2):
if t <= (2 ** Y) * (1.0 + Z / 4.0) * time_unit:
for Y in range(2**5):
for Z in range(2**2):
if t <= (2**Y) * (1.0 + Z / 4.0) * time_unit:
return (Y, Z)
raise ValueError('Unable to find a good combination!')
@ -441,14 +441,11 @@ def get_undervolt(plane=None, convert=False):
def undervolt(config):
"""Apply the undervolt offsets from the config to all voltage planes."""
if ('UNDERVOLT.{:s}'.format(power['source']) not in config and 'UNDERVOLT' not in config) or (
'UNDERVOLT' in UNSUPPORTED_FEATURES
):
section = f"UNDERVOLT.{power['source']}"
if (section not in config and 'UNDERVOLT' not in config) or 'UNDERVOLT' in UNSUPPORTED_FEATURES:
return
for plane in VOLTAGE_PLANES:
write_offset_mv = config.getfloat(
'UNDERVOLT.{:s}'.format(power['source']), plane, fallback=config.getfloat('UNDERVOLT', plane, fallback=0.0)
)
write_offset_mv = config.getfloat(section, plane, fallback=config.getfloat('UNDERVOLT', plane, fallback=0.0))
write_value = calc_undervolt_msr(plane, write_offset_mv)
writemsr('MSR_OC_MAILBOX', write_value)
if args.debug:
@ -457,9 +454,7 @@ def undervolt(config):
read_offset_mv = calc_undervolt_mv(read_value)
match = OK if write_value == read_value else ERR
log(
'[D] Undervolt plane {:s} - write {:.0f} mV ({:#x}) - read {:.0f} mV ({:#x}) - match {}'.format(
plane, write_offset_mv, write_value, read_offset_mv, read_value, match
)
f'[D] Undervolt plane {plane:s} - write {write_offset_mv:.0f} mV ({write_value:#x}) - read {read_offset_mv:.0f} mV ({read_value:#x}) - match {match}'
)
@ -492,10 +487,11 @@ def get_icc_max(plane=None, convert=False):
def set_icc_max(config):
"""Apply the IccMax limits from the config to all current planes."""
section = f"ICCMAX.{power['source']}"
for plane in CURRENT_PLANES:
try:
write_current_amp = config.getfloat(
'ICCMAX.{:s}'.format(power['source']), plane, fallback=config.getfloat('ICCMAX', plane, fallback=-1.0)
section, plane, fallback=config.getfloat('ICCMAX', plane, fallback=-1.0)
)
if write_current_amp > 0:
write_value = calc_icc_max_msr(plane, write_current_amp)
@ -506,9 +502,7 @@ def set_icc_max(config):
read_current_A = calc_icc_max_amp(read_value)
match = OK if write_value == read_value else ERR
log(
'[D] IccMax plane {:s} - write {:.2f} A ({:#x}) - read {:.2f} A ({:#x}) - match {}'.format(
plane, write_current_amp, write_value, read_current_A, read_value, match
)
f'[D] IccMax plane {plane:s} - write {write_current_amp:.2f} A ({write_value:#x}) - read {read_current_A:.2f} A ({read_value:#x}) - match {match}'
)
except (configparser.NoSectionError, configparser.NoOptionError):
pass
@ -534,9 +528,7 @@ def load_config():
if trip_temp != valid_trip_temp:
config.set(power_source, 'Trip_Temp_C', str(valid_trip_temp))
log(
'[!] Overriding invalid "Trip_Temp_C" value in "{:s}": {:.1f} -> {:.1f}'.format(
power_source, trip_temp, valid_trip_temp
)
f'[!] Overriding invalid "Trip_Temp_C" value in "{power_source:s}": {trip_temp:.1f} -> {valid_trip_temp:.1f}'
)
# fix any invalid value (ie. > 0) in the undervolt settings
@ -548,9 +540,7 @@ def load_config():
if value != valid_value:
config.set(key, plane, str(valid_value))
log(
'[!] Overriding invalid "{:s}" value in "{:s}" voltage plane: {:.0f} -> {:.0f}'.format(
key, plane, value, valid_value
)
f'[!] Overriding invalid "{key:s}" value in "{plane:s}" voltage plane: {value:.0f} -> {valid_value:.0f}'
)
# handle the case where only one of UNDERVOLT.AC, UNDERVOLT.BATTERY keys exists
@ -581,7 +571,7 @@ def load_config():
raise ValueError
iccmax_enabled = True
except ValueError:
warning('Invalid value for {:s} in {:s}'.format(plane, key), oneshot=False)
warning(f'Invalid value for {plane:s} in {key:s}', oneshot=False)
config.remove_option(key, plane)
except configparser.NoOptionError:
pass
@ -608,7 +598,7 @@ def calc_reg_values(platform_info, config):
trip_offset = int(round(critical_temp - Trip_Temp_C))
regs[power_source]['MSR_TEMPERATURE_TARGET'] = trip_offset << 24
else:
log('[I] {:s} trip temperature is disabled in config.'.format(power_source))
log(f'[I] {power_source:s} trip temperature is disabled in config.')
power_unit = get_power_unit()
@ -621,26 +611,26 @@ def calc_reg_values(platform_info, config):
cur_pkg_power_limits = get_cur_pkg_power_limits()
if PL1_Tdp_W is None:
PL1 = cur_pkg_power_limits['PL1']
log('[I] {:s} PL1_Tdp_W disabled in config.'.format(power_source))
log(f'[I] {power_source:s} PL1_Tdp_W disabled in config.')
else:
PL1 = int(round(PL1_Tdp_W / power_unit))
if PL1_Duration_s is None:
TW1 = cur_pkg_power_limits['TW1']
log('[I] {:s} PL1_Duration_s disabled in config.'.format(power_source))
log(f'[I] {power_source:s} PL1_Duration_s disabled in config.')
else:
Y, Z = calc_time_window_vars(PL1_Duration_s)
TW1 = Y | (Z << 5)
if PL2_Tdp_W is None:
PL2 = cur_pkg_power_limits['PL2']
log('[I] {:s} PL2_Tdp_W disabled in config.'.format(power_source))
log(f'[I] {power_source:s} PL2_Tdp_W disabled in config.')
else:
PL2 = int(round(PL2_Tdp_W / power_unit))
if PL2_Duration_s is None:
TW2 = cur_pkg_power_limits['TW2']
log('[I] {:s} PL2_Duration_s disabled in config.'.format(power_source))
log(f'[I] {power_source:s} PL2_Duration_s disabled in config.')
else:
Y, Z = calc_time_window_vars(PL2_Duration_s)
TW2 = Y | (Z << 5)
@ -649,7 +639,7 @@ def calc_reg_values(platform_info, config):
PL1 | (1 << 15) | (1 << 16) | (TW1 << 17) | (PL2 << 32) | (1 << 47) | (TW2 << 49)
)
else:
log('[I] {:s} package power limits are disabled in config.'.format(power_source))
log(f'[I] {power_source:s} package power limits are disabled in config.')
# cTDP
c_tdp_target_value = config.getint(power_source, 'cTDP', fallback=None)
@ -677,7 +667,7 @@ def set_hwp(performance_mode):
if args.debug:
read_value = readmsr('IA32_HWP_REQUEST', from_bit=24, to_bit=31)[0]
match = OK if hwp_mode == read_value else ERR
log('[D] HWP - write "{:#02x}" - read "{:#02x}" - match {}'.format(hwp_mode, read_value, match))
log(f'[D] HWP - write "{hwp_mode:#02x}" - read "{read_value:#02x}" - match {match}')
def set_disable_bdprochot():
@ -689,7 +679,7 @@ def set_disable_bdprochot():
if args.debug:
read_value = readmsr('MSR_POWER_CTL', from_bit=0, to_bit=0)[0]
match = OK if read_value == 0 else ERR
log('[D] BDPROCHOT - write "{:#02x}" - read "{:#02x}" - match {}'.format(0, read_value, match))
log(f'[D] BDPROCHOT - write "{0:#02x}" - read "{read_value:#02x}" - match {match}')
def get_config_write_time():
@ -718,7 +708,7 @@ def power_thread(config, regs, exit_event, cpuid):
except CalledProcessError:
warning('Please ensure that "setpci" is in path. This is typically provided by the "pciutils" package.')
warning('Trying to guess the MCHBAR address from the CPUID. This MIGHT NOT WORK!')
if cpuid in ((6, 140, 1),(6, 140, 2),(6, 141, 1),(6, 151, 2),(6, 151, 5), (6, 154, 3),(6, 154, 4)):
if cpuid in ((6, 140, 1), (6, 140, 2), (6, 141, 1), (6, 151, 2), (6, 151, 5), (6, 154, 3), (6, 154, 4)):
MCHBAR_BASE = 0xFEDC0001
else:
MCHBAR_BASE = 0xFED10001
@ -739,7 +729,7 @@ def power_thread(config, regs, exit_event, cpuid):
thermal_status = get_reset_thermal_status()
for index, core_thermal_status in enumerate(thermal_status):
for key, value in core_thermal_status.items():
log('[D] core {} thermal status: {} = {}'.format(index, key.replace("_", " "), value))
log(f'[D] core {index} thermal status: {key.replace("_", " ")} = {value}')
# Reload config on changes (unless it's deleted)
if config.getboolean('GENERAL', 'Autoreload', fallback=False):
@ -759,11 +749,7 @@ def power_thread(config, regs, exit_event, cpuid):
if args.debug:
read_value = readmsr('MSR_TEMPERATURE_TARGET', 24, 29, flatten=True)
match = OK if write_value >> 24 == read_value else ERR
log(
'[D] TEMPERATURE_TARGET - write {:#x} - read {:#x} - match {}'.format(
write_value >> 24, read_value, match
)
)
log(f'[D] TEMPERATURE_TARGET - write {write_value >> 24:#x} - read {read_value:#x} - match {match}')
# set cTDP
if 'MSR_CONFIG_TDP_CONTROL' in regs[power['source']]:
@ -772,11 +758,7 @@ def power_thread(config, regs, exit_event, cpuid):
if args.debug:
read_value = readmsr('MSR_CONFIG_TDP_CONTROL', 0, 1, flatten=True)
match = OK if write_value == read_value else ERR
log(
'[D] CONFIG_TDP_CONTROL - write {:#x} - read {:#x} - match {}'.format(
write_value, read_value, match
)
)
log(f'[D] CONFIG_TDP_CONTROL - write {write_value:#x} - read {read_value:#x} - match {match}')
# set PL1/2 on MSR
if 'MSR_PKG_POWER_LIMIT' in regs[power['source']]:
@ -785,11 +767,7 @@ def power_thread(config, regs, exit_event, cpuid):
if args.debug:
read_value = readmsr('MSR_PKG_POWER_LIMIT', 0, 55, flatten=True)
match = OK if write_value == read_value else ERR
log(
'[D] MSR PACKAGE_POWER_LIMIT - write {:#x} - read {:#x} - match {}'.format(
write_value, read_value, match
)
)
log(f'[D] MSR PACKAGE_POWER_LIMIT - write {write_value:#x} - read {read_value:#x} - match {match}')
if mchbar_mmio is not None:
# set MCHBAR register to the same PL1/2 values
mchbar_mmio.write32(0, write_value & 0xFFFFFFFF)
@ -798,9 +776,7 @@ def power_thread(config, regs, exit_event, cpuid):
read_value = mchbar_mmio.read32(0) | (mchbar_mmio.read32(4) << 32)
match = OK if write_value == read_value else ERR
log(
'[D] MCHBAR PACKAGE_POWER_LIMIT - write {:#x} - read {:#x} - match {}'.format(
write_value, read_value, match
)
f'[D] MCHBAR PACKAGE_POWER_LIMIT - write {write_value:#x} - read {read_value:#x} - match {match}'
)
# Disable BDPROCHOT
@ -832,7 +808,7 @@ def check_kernel():
kernel_config = None
try:
with open(os.path.join('/boot', 'config-{:s}'.format(uname()[2]))) as f:
with open(os.path.join('/boot', f'config-{uname()[2]:s}')) as f:
kernel_config = f.read()
except IOError:
config_gz_path = os.path.join('/proc', 'config.gz')
@ -883,12 +859,12 @@ def check_cpu():
'from /proc/cpuinfo.'
)
log('[I] Detected CPU architecture: Intel {:s}'.format(supported_cpus[cpuid]))
log(f'[I] Detected CPU architecture: Intel {supported_cpus[cpuid]:s}')
return cpuid
except SystemExit:
raise
except (OSError, KeyError, ValueError) as e:
fatal('Unable to identify CPU model: {}'.format(e))
fatal(f'Unable to identify CPU model: {e}')
def test_msr_rw_capabilities():
@ -930,21 +906,21 @@ def monitor(exit_event, wait):
}
undervolt_values = get_undervolt(convert=True)
undervolt_output = ' | '.join('{:s}: {:.2f} mV'.format(plane, undervolt_values[plane]) for plane in VOLTAGE_PLANES)
log('[D] Undervolt offsets: {:s}'.format(undervolt_output))
undervolt_output = ' | '.join(f'{plane:s}: {undervolt_values[plane]:.2f} mV' for plane in VOLTAGE_PLANES)
log(f'[D] Undervolt offsets: {undervolt_output:s}')
iccmax_values = get_icc_max(convert=True)
iccmax_output = ' | '.join('{:s}: {:.2f} A'.format(plane, iccmax_values[plane]) for plane in CURRENT_PLANES)
log('[D] IccMax: {:s}'.format(iccmax_output))
iccmax_output = ' | '.join(f'{plane:s}: {iccmax_values[plane]:.2f} A' for plane in CURRENT_PLANES)
log(f'[D] IccMax: {iccmax_output:s}')
log('[D] Realtime monitoring of throttling causes:\n')
while not exit_event.is_set():
value = readmsr('IA32_THERM_STATUS', from_bit=0, to_bit=15, cpu=0)
offsets = {'Thermal': 0, 'Power': 10, 'Current': 12, 'Cross-domain (e.g. GPU)': 14}
output = ('{:s}: {:s}'.format(cause, LIM if bool((value >> offsets[cause]) & 1) else OK) for cause in offsets)
output = (f'{cause:s}: {LIM if bool((value >> offsets[cause]) & 1) else OK:s}' for cause in offsets)
vcore = readmsr('IA32_PERF_STATUS', from_bit=32, to_bit=47, cpu=0) / (2.0 ** 13) * 1000
stats2 = {'VCore': '{:.0f} mV'.format(vcore)}
vcore = readmsr('IA32_PERF_STATUS', from_bit=32, to_bit=47, cpu=0) / (2.0**13) * 1000
stats2 = {'VCore': f'{vcore:.0f} mV'}
total = 0.0
for power_plane in ('Package', 'Graphics', 'DRAM'):
energy_j = readmsr(power_plane_msr[power_plane], cpu=0) * rapl_power_unit
@ -953,15 +929,15 @@ def monitor(exit_event, wait):
(energy_j, now),
(energy_j - prev_energy[power_plane][0]) / (now - prev_energy[power_plane][1]),
)
stats2[power_plane] = '{:.1f} W'.format(energy_w)
stats2[power_plane] = f'{energy_w:.1f} W'
total += energy_w
stats2['Total'] = '{:.1f} W'.format(total)
stats2['Total'] = f'{total:.1f} W'
output2 = ('{:s}: {:s}'.format(label, stats2[label]) for label in stats2)
output2 = (f'{label}: {stats2[label]}' for label in stats2)
terminator = '\n' if args.log else '\r'
log(
'[{}] {} || {}{}'.format(power['source'], ' - '.join(output), ' - '.join(output2), ' ' * 10),
f"[{power['source']}] {' - '.join(output)} || {' - '.join(output2)}{' ' * 10}",
end=terminator,
)
exit_event.wait(wait)
@ -992,7 +968,7 @@ def main():
args.log = open(args.log, 'w')
except OSError as e:
args.log = None
fatal('Unable to write to the log file: {}'.format(e))
fatal(f'Unable to write to the log file: {e}')
cpuid = None
if not args.force:
@ -1013,7 +989,7 @@ def main():
platform_info = get_cpu_platform_info()
if args.debug:
for key, value in platform_info.items():
log('[D] cpu platform info: {} = {}'.format(key.replace("_", " "), value))
log(f'[D] cpu platform info: {key.replace("_", " ")} = {value}')
regs = calc_reg_values(platform_info, config)
if not config.getboolean('GENERAL', 'Enabled'):