Clarity: drop dead code, add docstrings, prune dev-commentary

- Remove `from __future__ import print_function` (Python 3 only).
- mmio.py: drop the Python 2 `long = int` shim and `(int, long)`
  isinstance checks; make __enter__ return self so `with MMIO(...)`
  works as expected.
- Drop the redundant `value = config.set(...)` assignment in
  load_config (config.set returns None).
- Replace the personal "for my CPU" comments and the
  "ugly code, just testing..." comment with brief docstrings that
  describe what each helper computes.
- Add concise one-line docstrings to the public-ish functions
  (readmsr/writemsr, get_value_for_bits, is_on_battery, the various
  MSR getters/setters, load_config, calc_reg_values, power_thread,
  monitor, main, etc.) so a reader can navigate without reading the
  body.

Signed-off-by: ooonea <35407790+ooonea@users.noreply.github.com>
This commit is contained in:
ooonea 2026-04-19 21:56:35 +02:00
parent a6a95bdc1c
commit 2fd89b747b
No known key found for this signature in database
GPG key ID: 3CD106C3A25275AD
2 changed files with 52 additions and 32 deletions

32
mmio.py
View file

@ -1,19 +1,13 @@
'''
"""
Stripped down version from https://github.com/vsergeev/python-periphery/blob/master/periphery/mmio.py
'''
"""
import mmap
import os
import struct
import sys
# Alias long to int on Python 3
if sys.version_info[0] >= 3:
long = int
class MMIOError(IOError):
"""Base class for MMIO errors."""
pass
class MMIO(object):
@ -21,8 +15,8 @@ class MMIO(object):
"""Instantiate an MMIO object and map the region of physical memory
specified by the address base `physaddr` and size `size` in bytes.
Args:
physaddr (int, long): base physical address of memory region.
size (int, long): size of memory region.
physaddr (int): base physical address of memory region.
size (int): size of memory region.
Returns:
MMIO: MMIO object.
Raises:
@ -36,15 +30,15 @@ class MMIO(object):
self.close()
def __enter__(self):
pass
return self
def __exit__(self, t, value, traceback):
self.close()
def _open(self, physaddr, size):
if not isinstance(physaddr, (int, long)):
if not isinstance(physaddr, int):
raise TypeError("Invalid physaddr type, should be integer.")
if not isinstance(size, (int, long)):
if not isinstance(size, int):
raise TypeError("Invalid size type, should be integer.")
pagesize = os.sysconf(os.sysconf_names['SC_PAGESIZE'])
@ -89,14 +83,14 @@ class MMIO(object):
"""Read 32-bits from the specified `offset` in bytes, relative to the
base physical address of the MMIO region.
Args:
offset (int, long): offset from base physical address, in bytes.
offset (int): offset from base physical address, in bytes.
Returns:
int: 32-bit value read.
Raises:
TypeError: if `offset` type is invalid.
ValueError: if `offset` is out of bounds.
"""
if not isinstance(offset, (int, long)):
if not isinstance(offset, int):
raise TypeError("Invalid offset type, should be integer.")
offset = self._adjust_offset(offset)
@ -107,15 +101,15 @@ class MMIO(object):
"""Write 32-bits to the specified `offset` in bytes, relative to the
base physical address of the MMIO region.
Args:
offset (int, long): offset from base physical address, in bytes.
value (int, long): 32-bit value to write.
offset (int): offset from base physical address, in bytes.
value (int): 32-bit value to write.
Raises:
TypeError: if `offset` or `value` type are invalid.
ValueError: if `offset` or `value` are out of bounds.
"""
if not isinstance(offset, (int, long)):
if not isinstance(offset, int):
raise TypeError("Invalid offset type, should be integer.")
if not isinstance(value, (int, long)):
if not isinstance(value, int):
raise TypeError("Invalid value type, should be integer.")
if value < 0 or value > 0xffffffff:
raise ValueError("Value out of bounds.")

View file

@ -1,6 +1,4 @@
#!/usr/bin/env python3
from __future__ import print_function
import argparse
import configparser
import glob
@ -226,10 +224,13 @@ def warning(msg, oneshot=True, end='\n'):
def get_msr_list():
"""Return the per-CPU MSR device paths in CPU-index order."""
cpus = sorted(int(x) for x in os.listdir('/dev/cpu') if x.isdigit())
return ['/dev/cpu/{:d}/msr'.format(cpu) for cpu in cpus]
def writemsr(msr, val):
"""Write a 64-bit value to the named MSR on every online CPU."""
msr_list = get_msr_list()
if not os.path.exists(msr_list[0]):
try:
@ -258,8 +259,12 @@ def writemsr(msr, val):
raise e
# returns the value between from_bit and to_bit as unsigned long
def readmsr(msr, from_bit=0, to_bit=63, cpu=None, flatten=False):
"""Read the named MSR and return the [from_bit, to_bit] field as
an unsigned integer. By default returns one value per CPU; with
cpu=N returns just CPU N, with flatten=True returns the shared value
(warning if CPUs disagree).
"""
assert cpu is None or cpu in range(cpu_count())
if from_bit > to_bit:
fatal('Wrong readmsr bit params')
@ -296,11 +301,13 @@ def readmsr(msr, from_bit=0, to_bit=63, cpu=None, flatten=False):
def get_value_for_bits(val, from_bit=0, to_bit=63):
"""Extract bits [from_bit, to_bit] (inclusive) from val."""
mask = sum(2 ** x for x in range(from_bit, to_bit + 1))
return (val & mask) >> from_bit
def set_msr_allow_writes():
"""Try to enable msr.allow_writes; tolerate kernels that don't expose it."""
log('[I] Trying to unlock MSR allow_writes.')
if not os.path.exists('/sys/module/msr'):
try:
@ -316,6 +323,9 @@ def set_msr_allow_writes():
def is_on_battery(config):
"""Return True if the system is on battery power; falls back to UPower
over D-Bus if the configured sysfs path is unreadable.
"""
try:
for path in glob.glob(config.get('GENERAL', 'Sysfs_Power_Path', fallback=DEFAULT_SYSFS_POWER_PATH)):
with open(path) as f:
@ -337,6 +347,7 @@ def is_on_battery(config):
def get_cpu_platform_info():
"""Decode MSR_PLATFORM_INFO into a dict of named feature bits."""
features_msr_value = readmsr('MSR_PLATFORM_INFO', cpu=0)
cpu_platform_info = {}
for key, value in platform_info_bits.items():
@ -345,7 +356,7 @@ def get_cpu_platform_info():
def get_reset_thermal_status():
# read thermal status
"""Read IA32_THERM_STATUS for every CPU, then clear the sticky log bits."""
thermal_status_msr_value = readmsr('IA32_THERM_STATUS')
thermal_status = []
for core in range(cpu_count()):
@ -359,23 +370,23 @@ def get_reset_thermal_status():
def get_time_unit():
# 0.000977 is the time unit of my CPU
# TODO formula might be different for other CPUs
"""Return the RAPL time unit in seconds (Intel SDM Vol. 4, MSR 0x606)."""
return 1.0 / 2 ** readmsr('MSR_RAPL_POWER_UNIT', 16, 19, cpu=0)
def get_power_unit():
# 0.125 is the power unit of my CPU
# TODO formula might be different for other CPUs
"""Return the RAPL power unit in watts (Intel SDM Vol. 4, MSR 0x606)."""
return 1.0 / 2 ** readmsr('MSR_RAPL_POWER_UNIT', 0, 3, cpu=0)
def get_critical_temp():
# the critical temperature for my CPU is 100 'C
"""Return the package critical temperature offset in degrees Celsius."""
return readmsr('MSR_TEMPERATURE_TARGET', 16, 23, cpu=0)
def get_cur_pkg_power_limits():
"""Return the current PL1/PL2 power and time-window fields from
MSR_PKG_POWER_LIMIT."""
value = readmsr('MSR_PKG_POWER_LIMIT', 0, 55, flatten=True)
return {
'PL1': get_value_for_bits(value, 0, 14),
@ -386,6 +397,8 @@ def get_cur_pkg_power_limits():
def calc_time_window_vars(t):
"""Encode a time-window duration (s) as the (Y, Z) pair used by
MSR_PKG_POWER_LIMIT."""
time_unit = get_time_unit()
for Y in range(2 ** 5):
for Z in range(2 ** 2):
@ -413,6 +426,7 @@ def calc_undervolt_mv(msr_value):
def get_undervolt(plane=None, convert=False):
"""Read the current undervolt offset from one or all voltage planes."""
if 'UNDERVOLT' in UNSUPPORTED_FEATURES:
return 0
planes = [plane] if plane in VOLTAGE_PLANES else VOLTAGE_PLANES
@ -426,6 +440,7 @@ def get_undervolt(plane=None, convert=False):
def undervolt(config):
"""Apply the undervolt offsets from the config to all voltage planes."""
if ('UNDERVOLT.{:s}'.format(power['source']) not in config and 'UNDERVOLT' not in config) or (
'UNDERVOLT' in UNSUPPORTED_FEATURES
):
@ -464,6 +479,7 @@ def calc_icc_max_amp(msr_value):
def get_icc_max(plane=None, convert=False):
"""Read the IccMax setting from one or all current planes."""
planes = [plane] if plane in CURRENT_PLANES else CURRENT_PLANES
out = {}
for plane in planes:
@ -475,6 +491,7 @@ def get_icc_max(plane=None, convert=False):
def set_icc_max(config):
"""Apply the IccMax limits from the config to all current planes."""
for plane in CURRENT_PLANES:
try:
write_current_amp = config.getfloat(
@ -498,6 +515,7 @@ def set_icc_max(config):
def load_config():
"""Parse the config file, validating and clamping out-of-range values."""
config = configparser.ConfigParser()
config.read(args.config)
@ -506,7 +524,7 @@ def load_config():
for option in ('Update_Rate_s', 'PL1_Tdp_W', 'PL1_Duration_s', 'PL2_Tdp_W', 'PL2_Duration_S'):
value = config.getfloat(power_source, option, fallback=None)
if value is not None:
value = config.set(power_source, option, str(max(0.001, value)))
config.set(power_source, option, str(max(0.001, value)))
elif option == 'Update_Rate_s':
fatal('The mandatory "Update_Rate_s" parameter is missing.')
@ -574,12 +592,12 @@ def load_config():
def calc_reg_values(platform_info, config):
"""Compute the MSR values to apply for each power source from the config."""
regs = defaultdict(dict)
for power_source in ('AC', 'BATTERY'):
if platform_info['feature_programmable_temperature_target'] != 1:
warning("Setting temperature target is not supported by this CPU")
else:
# the critical temperature for my CPU is 100 'C
critical_temp = get_critical_temp()
# update the allowed temp range to keep at least 3 'C from the CPU critical temperature
global TRIP_TEMP_RANGE
@ -647,6 +665,7 @@ def calc_reg_values(platform_info, config):
def set_hwp(performance_mode):
"""Set the IA32_HWP_REQUEST energy/performance preference field."""
if performance_mode not in (True, False) or 'HWP' in UNSUPPORTED_FEATURES:
return
# set HWP energy performance preference
@ -662,7 +681,7 @@ def set_hwp(performance_mode):
def set_disable_bdprochot():
# Disable BDPROCHOT
"""Clear bit 0 of MSR_POWER_CTL to disable BDPROCHOT."""
cur_val = readmsr('MSR_POWER_CTL', flatten=True)
new_val = cur_val & 0xFFFFFFFFFFFFFFFE
@ -674,6 +693,7 @@ def set_disable_bdprochot():
def get_config_write_time():
"""Return the config file's mtime, or None if it doesn't exist."""
try:
return os.stat(args.config).st_mtime
except FileNotFoundError:
@ -681,6 +701,7 @@ def get_config_write_time():
def reload_config():
"""Re-read the config and re-apply undervolt, IccMax and HWP settings."""
config = load_config()
regs = calc_reg_values(get_cpu_platform_info(), config)
undervolt(config)
@ -691,6 +712,7 @@ def reload_config():
def power_thread(config, regs, exit_event, cpuid):
"""Daemon main loop: periodically (re-)apply throttling MSRs."""
try:
MCHBAR_BASE = int(check_output(('setpci', '-s', '0:0.0', '48.l')), 16)
except CalledProcessError:
@ -804,6 +826,7 @@ def power_thread(config, regs, exit_event, cpuid):
def check_kernel():
"""Verify we run as root and that the kernel exposes MSR/devmem."""
if os.geteuid() != 0:
fatal('No root no party. Try again with sudo.')
@ -830,6 +853,7 @@ def check_kernel():
def check_cpu():
"""Identify the CPU from /proc/cpuinfo and refuse to run on unsupported models."""
try:
with open('/proc/cpuinfo') as f:
cpuinfo = {}
@ -868,6 +892,7 @@ def check_cpu():
def test_msr_rw_capabilities():
"""Probe undervolt and HWP support; mark unavailable features as such."""
global TESTMSR
TESTMSR = True
try:
@ -890,6 +915,7 @@ def test_msr_rw_capabilities():
def monitor(exit_event, wait):
"""Live-display throttling causes and per-domain power until exit_event is set."""
wait = max(0.1, wait)
rapl_power_unit = 0.5 ** readmsr('MSR_RAPL_POWER_UNIT', from_bit=8, to_bit=12, cpu=0)
power_plane_msr = {
@ -917,7 +943,6 @@ def monitor(exit_event, wait):
offsets = {'Thermal': 0, 'Power': 10, 'Current': 12, 'Cross-domain (e.g. GPU)': 14}
output = ('{:s}: {:s}'.format(cause, LIM if bool((value >> offsets[cause]) & 1) else OK) for cause in offsets)
# ugly code, just testing...
vcore = readmsr('IA32_PERF_STATUS', from_bit=32, to_bit=47, cpu=0) / (2.0 ** 13) * 1000
stats2 = {'VCore': '{:.0f} mV'.format(vcore)}
total = 0.0
@ -943,6 +968,7 @@ def monitor(exit_event, wait):
def main():
"""Daemon entrypoint: parse args, validate platform, start power thread."""
global args
parser = argparse.ArgumentParser()