Merge pull request #390 from ooonea/contrib/improvements

Multiple fixes and code cleanup
This commit is contained in:
Francesco Palmarini 2026-04-20 21:49:42 +02:00 committed by GitHub
commit 9813d814ae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 206 additions and 180 deletions

85
mmio.py
View file

@ -1,28 +1,23 @@
'''
"""
Stripped down version from https://github.com/vsergeev/python-periphery/blob/master/periphery/mmio.py
'''
"""
import mmap
import os
import struct
import sys
# Alias long to int on Python 3
if sys.version_info[0] >= 3:
long = int
class MMIOError(IOError):
"""Base class for MMIO errors."""
pass
class MMIO(object):
def __init__(self, physaddr, size):
def __init__(self, physaddr: int, size: int) -> None:
"""Instantiate an MMIO object and map the region of physical memory
specified by the address base `physaddr` and size `size` in bytes.
Args:
physaddr (int, long): base physical address of memory region.
size (int, long): size of memory region.
physaddr (int): base physical address of memory region.
size (int): size of memory region.
Returns:
MMIO: MMIO object.
Raises:
@ -36,15 +31,15 @@ class MMIO(object):
self.close()
def __enter__(self):
pass
return self
def __exit__(self, t, value, traceback):
self.close()
def _open(self, physaddr, size):
if not isinstance(physaddr, (int, long)):
def _open(self, physaddr: int, size: int) -> None:
if not isinstance(physaddr, int):
raise TypeError("Invalid physaddr type, should be integer.")
if not isinstance(size, (int, long)):
if not isinstance(size, int):
raise TypeError("Invalid size type, should be integer.")
pagesize = os.sysconf(os.sysconf_names['SC_PAGESIZE'])
@ -61,8 +56,14 @@ class MMIO(object):
try:
self.mapping = mmap.mmap(
fd, self._aligned_size, flags=mmap.MAP_SHARED, prot=mmap.PROT_WRITE, offset=self._aligned_physaddr)
fd,
self._aligned_size,
flags=mmap.MAP_SHARED,
prot=mmap.PROT_READ | mmap.PROT_WRITE,
offset=self._aligned_physaddr,
)
except OSError as e:
os.close(fd)
raise MMIOError(e.errno, "Mapping /dev/mem: " + e.strerror)
try:
@ -79,46 +80,68 @@ class MMIO(object):
if (offset + length) > self._aligned_size:
raise ValueError("Offset out of bounds.")
def read32(self, offset):
def read32(self, offset: int) -> int:
"""Read 32-bits from the specified `offset` in bytes, relative to the
base physical address of the MMIO region.
Args:
offset (int, long): offset from base physical address, in bytes.
offset (int): offset from base physical address, in bytes.
Returns:
int: 32-bit value read.
Raises:
TypeError: if `offset` type is invalid.
ValueError: if `offset` is out of bounds.
"""
if not isinstance(offset, (int, long)):
if not isinstance(offset, int):
raise TypeError("Invalid offset type, should be integer.")
offset = self._adjust_offset(offset)
self._validate_offset(offset, 4)
return struct.unpack("=L", self.mapping[offset:offset + 4])[0]
return struct.unpack("=L", self.mapping[offset : offset + 4])[0]
def write32(self, offset, value):
def write32(self, offset: int, value: int) -> None:
"""Write 32-bits to the specified `offset` in bytes, relative to the
base physical address of the MMIO region.
Args:
offset (int, long): offset from base physical address, in bytes.
value (int, long): 32-bit value to write.
offset (int): offset from base physical address, in bytes.
value (int): 32-bit value to write.
Raises:
TypeError: if `offset` or `value` type are invalid.
ValueError: if `offset` or `value` are out of bounds.
"""
if not isinstance(offset, (int, long)):
if not isinstance(offset, int):
raise TypeError("Invalid offset type, should be integer.")
if not isinstance(value, (int, long)):
if not isinstance(value, int):
raise TypeError("Invalid value type, should be integer.")
if value < 0 or value > 0xffffffff:
if value < 0 or value > 0xFFFFFFFF:
raise ValueError("Value out of bounds.")
offset = self._adjust_offset(offset)
self._validate_offset(offset, 4)
self.mapping[offset:offset + 4] = struct.pack("=L", value)
self.mapping[offset : offset + 4] = struct.pack("=L", value)
def close(self):
def read64(self, offset: int) -> int:
"""Read 64-bits from the specified `offset` in bytes, relative to the base physical address of the MMIO region."""
if not isinstance(offset, int):
raise TypeError("Invalid offset type, should be integer.")
offset = self._adjust_offset(offset)
self._validate_offset(offset, 8)
return struct.unpack("=Q", self.mapping[offset : offset + 8])[0]
def write64(self, offset: int, value: int) -> None:
"""Write 64-bits to the specified `offset` in bytes, relative to the base physical address of the MMIO region."""
if not isinstance(offset, int):
raise TypeError("Invalid offset type, should be integer.")
if not isinstance(value, int):
raise TypeError("Invalid value type, should be integer.")
if value < 0 or value > 0xFFFFFFFFFFFFFFFF:
raise ValueError("Value out of bounds.")
offset = self._adjust_offset(offset)
self._validate_offset(offset, 8)
self.mapping[offset : offset + 8] = struct.pack("=Q", value)
def close(self) -> None:
"""Unmap the MMIO object's mapped physical memory."""
if self.mapping is None:
return
@ -126,9 +149,7 @@ class MMIO(object):
self.mapping.close()
self.mapping = None
self._fd = None
# String representation
def __str__(self):
return "MMIO 0x%08x (size=%d)" % (self.base, self.size)
def __str__(self) -> str:
return f"MMIO 0x{self._physaddr:08x} (size={self._size:d})"

View file

@ -1,6 +1,4 @@
#!/usr/bin/env python3
from __future__ import print_function
import argparse
import configparser
import glob
@ -195,37 +193,44 @@ LIM = bcolors.YELLOW + bcolors.BOLD + 'LIM' + bcolors.RESET
log_history = set()
ANSI_ESCAPE_RE = re.compile(r'\x1b\[[0-9;]*m')
def _format(prefix, msg):
if args.log:
tstamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
return f'{tstamp}: {prefix}{ANSI_ESCAPE_RE.sub("", msg)}'
return f'{prefix}{msg}'
def log(msg, oneshot=False, end='\n'):
outfile = args.log if args.log else sys.stdout
if msg.strip() not in log_history or oneshot is False:
tstamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
full_msg = '{:s}: {:s}'.format(tstamp, msg) if args.log else msg
print(full_msg, file=outfile, end=end)
print(_format('', msg), file=outfile, end=end)
log_history.add(msg.strip())
def fatal(msg, code=1, end='\n'):
outfile = args.log if args.log else sys.stderr
tstamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
full_msg = '{:s}: [E] {:s}'.format(tstamp, msg) if args.log else '[E] {:s}'.format(msg)
print(full_msg, file=outfile, end=end)
print(_format('[E] ', msg), file=outfile, end=end)
sys.exit(code)
def warning(msg, oneshot=True, end='\n'):
outfile = args.log if args.log else sys.stderr
if msg.strip() not in log_history or oneshot is False:
tstamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
full_msg = '{:s}: [W] {:s}'.format(tstamp, msg) if args.log else '[W] {:s}'.format(msg)
print(full_msg, file=outfile, end=end)
print(_format('[W] ', msg), file=outfile, end=end)
log_history.add(msg.strip())
def get_msr_list():
return ['/dev/cpu/{:d}/msr'.format(int(x)) for x in os.listdir("/dev/cpu")]
"""Return the per-CPU MSR device paths in CPU-index order."""
cpus = sorted(int(x) for x in os.listdir('/dev/cpu') if x.isdigit())
return [f'/dev/cpu/{cpu:d}/msr' for cpu in cpus]
def writemsr(msr, val):
"""Write a 64-bit value to the named MSR on every online CPU."""
msr_list = get_msr_list()
if not os.path.exists(msr_list[0]):
try:
@ -235,25 +240,31 @@ def writemsr(msr, val):
try:
for addr in msr_list:
f = os.open(addr, os.O_WRONLY)
os.lseek(f, MSR_DICT[msr], os.SEEK_SET)
os.write(f, struct.pack('Q', val))
os.close(f)
try:
os.lseek(f, MSR_DICT[msr], os.SEEK_SET)
os.write(f, struct.pack('Q', val))
finally:
os.close(f)
except (IOError, OSError) as e:
if TESTMSR:
raise e
if e.errno == EPERM or e.errno == EACCES:
fatal(
'Unable to write to MSR {} ({:x}). Try to disable Secure Boot '
'and check if your kernel does not restrict access to MSR.'.format(msr, MSR_DICT[msr])
f'Unable to write to MSR {msr} ({MSR_DICT[msr]:x}). Try to disable Secure Boot '
'and check if your kernel does not restrict access to MSR.'
)
elif e.errno == EIO:
fatal('Unable to write to MSR {} ({:x}). Unknown error.'.format(msr, MSR_DICT[msr]))
fatal(f'Unable to write to MSR {msr} ({MSR_DICT[msr]:x}). Unknown error.')
else:
raise e
# returns the value between from_bit and to_bit as unsigned long
def readmsr(msr, from_bit=0, to_bit=63, cpu=None, flatten=False):
"""Read the named MSR and return the [from_bit, to_bit] field as
an unsigned integer. By default returns one value per CPU; with
cpu=N returns just CPU N, with flatten=True returns the shared value
(warning if CPUs disagree).
"""
assert cpu is None or cpu in range(cpu_count())
if from_bit > to_bit:
fatal('Wrong readmsr bit params')
@ -267,32 +278,36 @@ def readmsr(msr, from_bit=0, to_bit=63, cpu=None, flatten=False):
output = []
for addr in msr_list:
f = os.open(addr, os.O_RDONLY)
os.lseek(f, MSR_DICT[msr], os.SEEK_SET)
val = struct.unpack('Q', os.read(f, 8))[0]
os.close(f)
try:
os.lseek(f, MSR_DICT[msr], os.SEEK_SET)
val = struct.unpack('Q', os.read(f, 8))[0]
finally:
os.close(f)
output.append(get_value_for_bits(val, from_bit, to_bit))
if flatten:
if len(set(output)) > 1:
warning('Found multiple values for {:s} ({:x}). This should never happen.'.format(msr, MSR_DICT[msr]))
warning(f'Found multiple values for {msr:s} ({MSR_DICT[msr]:x}). This should never happen.')
return output[0]
return output[cpu] if cpu is not None else output
except (IOError, OSError) as e:
if TESTMSR:
raise e
if e.errno == EPERM or e.errno == EACCES:
fatal('Unable to read from MSR {} ({:x}). Try to disable Secure Boot.'.format(msr, MSR_DICT[msr]))
fatal(f'Unable to read from MSR {msr} ({MSR_DICT[msr]:x}). Try to disable Secure Boot.')
elif e.errno == EIO:
fatal('Unable to read to MSR {} ({:x}). Unknown error.'.format(msr, MSR_DICT[msr]))
fatal(f'Unable to read to MSR {msr} ({MSR_DICT[msr]:x}). Unknown error.')
else:
raise e
def get_value_for_bits(val, from_bit=0, to_bit=63):
mask = sum(2 ** x for x in range(from_bit, to_bit + 1))
"""Extract bits [from_bit, to_bit] (inclusive) from val."""
mask = sum(2**x for x in range(from_bit, to_bit + 1))
return (val & mask) >> from_bit
def set_msr_allow_writes():
"""Try to enable msr.allow_writes; tolerate kernels that don't expose it."""
log('[I] Trying to unlock MSR allow_writes.')
if not os.path.exists('/sys/module/msr'):
try:
@ -303,24 +318,28 @@ def set_msr_allow_writes():
try:
with open('/sys/module/msr/parameters/allow_writes', 'w') as f:
f.write('on')
except:
except OSError:
warning('Unable to set MSR allow_writes to on. You might experience warnings in kernel logs.')
def is_on_battery(config):
"""Return True if the system is on battery power; falls back to UPower
over D-Bus if the configured sysfs path is unreadable.
"""
try:
for path in glob.glob(config.get('GENERAL', 'Sysfs_Power_Path', fallback=DEFAULT_SYSFS_POWER_PATH)):
with open(path) as f:
return not bool(int(f.read()))
raise
except:
except (IOError, OSError, ValueError) as e:
warning(f'Sysfs_Power_Path read failed ({e}). Trying upower method.')
else:
warning('No valid Sysfs_Power_Path found! Trying upower method')
try:
bus = dbus.SystemBus()
proxy = bus.get_object('org.freedesktop.UPower', '/org/freedesktop/UPower')
iface = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
return iface.Get('org.freedesktop.UPower', 'OnBattery')
except:
return bool(iface.Get('org.freedesktop.UPower', 'OnBattery'))
except dbus.DBusException:
pass
warning('No valid power detection methods found. Assuming that the system is running on battery power.')
@ -328,6 +347,7 @@ def is_on_battery(config):
def get_cpu_platform_info():
"""Decode MSR_PLATFORM_INFO into a dict of named feature bits."""
features_msr_value = readmsr('MSR_PLATFORM_INFO', cpu=0)
cpu_platform_info = {}
for key, value in platform_info_bits.items():
@ -336,7 +356,7 @@ def get_cpu_platform_info():
def get_reset_thermal_status():
# read thermal status
"""Read IA32_THERM_STATUS for every CPU, then clear the sticky log bits."""
thermal_status_msr_value = readmsr('IA32_THERM_STATUS')
thermal_status = []
for core in range(cpu_count()):
@ -350,23 +370,23 @@ def get_reset_thermal_status():
def get_time_unit():
# 0.000977 is the time unit of my CPU
# TODO formula might be different for other CPUs
"""Return the RAPL time unit in seconds (Intel SDM Vol. 4, MSR 0x606)."""
return 1.0 / 2 ** readmsr('MSR_RAPL_POWER_UNIT', 16, 19, cpu=0)
def get_power_unit():
# 0.125 is the power unit of my CPU
# TODO formula might be different for other CPUs
"""Return the RAPL power unit in watts (Intel SDM Vol. 4, MSR 0x606)."""
return 1.0 / 2 ** readmsr('MSR_RAPL_POWER_UNIT', 0, 3, cpu=0)
def get_critical_temp():
# the critical temperature for my CPU is 100 'C
"""Return the package critical temperature offset in degrees Celsius."""
return readmsr('MSR_TEMPERATURE_TARGET', 16, 23, cpu=0)
def get_cur_pkg_power_limits():
"""Return the current PL1/PL2 power and time-window fields from
MSR_PKG_POWER_LIMIT."""
value = readmsr('MSR_PKG_POWER_LIMIT', 0, 55, flatten=True)
return {
'PL1': get_value_for_bits(value, 0, 14),
@ -377,10 +397,12 @@ def get_cur_pkg_power_limits():
def calc_time_window_vars(t):
"""Encode a time-window duration (s) as the (Y, Z) pair used by
MSR_PKG_POWER_LIMIT."""
time_unit = get_time_unit()
for Y in range(2 ** 5):
for Z in range(2 ** 2):
if t <= (2 ** Y) * (1.0 + Z / 4.0) * time_unit:
for Y in range(2**5):
for Z in range(2**2):
if t <= (2**Y) * (1.0 + Z / 4.0) * time_unit:
return (Y, Z)
raise ValueError('Unable to find a good combination!')
@ -404,6 +426,7 @@ def calc_undervolt_mv(msr_value):
def get_undervolt(plane=None, convert=False):
"""Read the current undervolt offset from one or all voltage planes."""
if 'UNDERVOLT' in UNSUPPORTED_FEATURES:
return 0
planes = [plane] if plane in VOLTAGE_PLANES else VOLTAGE_PLANES
@ -417,14 +440,12 @@ def get_undervolt(plane=None, convert=False):
def undervolt(config):
if ('UNDERVOLT.{:s}'.format(power['source']) not in config and 'UNDERVOLT' not in config) or (
'UNDERVOLT' in UNSUPPORTED_FEATURES
):
"""Apply the undervolt offsets from the config to all voltage planes."""
section = f"UNDERVOLT.{power['source']}"
if (section not in config and 'UNDERVOLT' not in config) or 'UNDERVOLT' in UNSUPPORTED_FEATURES:
return
for plane in VOLTAGE_PLANES:
write_offset_mv = config.getfloat(
'UNDERVOLT.{:s}'.format(power['source']), plane, fallback=config.getfloat('UNDERVOLT', plane, fallback=0.0)
)
write_offset_mv = config.getfloat(section, plane, fallback=config.getfloat('UNDERVOLT', plane, fallback=0.0))
write_value = calc_undervolt_msr(plane, write_offset_mv)
writemsr('MSR_OC_MAILBOX', write_value)
if args.debug:
@ -433,9 +454,7 @@ def undervolt(config):
read_offset_mv = calc_undervolt_mv(read_value)
match = OK if write_value == read_value else ERR
log(
'[D] Undervolt plane {:s} - write {:.0f} mV ({:#x}) - read {:.0f} mV ({:#x}) - match {}'.format(
plane, write_offset_mv, write_value, read_offset_mv, read_value, match
)
f'[D] Undervolt plane {plane:s} - write {write_offset_mv:.0f} mV ({write_value:#x}) - read {read_offset_mv:.0f} mV ({read_value:#x}) - match {match}'
)
@ -455,6 +474,7 @@ def calc_icc_max_amp(msr_value):
def get_icc_max(plane=None, convert=False):
"""Read the IccMax setting from one or all current planes."""
planes = [plane] if plane in CURRENT_PLANES else CURRENT_PLANES
out = {}
for plane in planes:
@ -466,10 +486,12 @@ def get_icc_max(plane=None, convert=False):
def set_icc_max(config):
"""Apply the IccMax limits from the config to all current planes."""
section = f"ICCMAX.{power['source']}"
for plane in CURRENT_PLANES:
try:
write_current_amp = config.getfloat(
'ICCMAX.{:s}'.format(power['source']), plane, fallback=config.getfloat('ICCMAX', plane, fallback=-1.0)
section, plane, fallback=config.getfloat('ICCMAX', plane, fallback=-1.0)
)
if write_current_amp > 0:
write_value = calc_icc_max_msr(plane, write_current_amp)
@ -480,15 +502,14 @@ def set_icc_max(config):
read_current_A = calc_icc_max_amp(read_value)
match = OK if write_value == read_value else ERR
log(
'[D] IccMax plane {:s} - write {:.2f} A ({:#x}) - read {:.2f} A ({:#x}) - match {}'.format(
plane, write_current_amp, write_value, read_current_A, read_value, match
)
f'[D] IccMax plane {plane:s} - write {write_current_amp:.2f} A ({write_value:#x}) - read {read_current_A:.2f} A ({read_value:#x}) - match {match}'
)
except (configparser.NoSectionError, configparser.NoOptionError):
pass
def load_config():
"""Parse the config file, validating and clamping out-of-range values."""
config = configparser.ConfigParser()
config.read(args.config)
@ -497,7 +518,7 @@ def load_config():
for option in ('Update_Rate_s', 'PL1_Tdp_W', 'PL1_Duration_s', 'PL2_Tdp_W', 'PL2_Duration_S'):
value = config.getfloat(power_source, option, fallback=None)
if value is not None:
value = config.set(power_source, option, str(max(0.001, value)))
config.set(power_source, option, str(max(0.001, value)))
elif option == 'Update_Rate_s':
fatal('The mandatory "Update_Rate_s" parameter is missing.')
@ -507,9 +528,7 @@ def load_config():
if trip_temp != valid_trip_temp:
config.set(power_source, 'Trip_Temp_C', str(valid_trip_temp))
log(
'[!] Overriding invalid "Trip_Temp_C" value in "{:s}": {:.1f} -> {:.1f}'.format(
power_source, trip_temp, valid_trip_temp
)
f'[!] Overriding invalid "Trip_Temp_C" value in "{power_source:s}": {trip_temp:.1f} -> {valid_trip_temp:.1f}'
)
# fix any invalid value (ie. > 0) in the undervolt settings
@ -521,9 +540,7 @@ def load_config():
if value != valid_value:
config.set(key, plane, str(valid_value))
log(
'[!] Overriding invalid "{:s}" value in "{:s}" voltage plane: {:.0f} -> {:.0f}'.format(
key, plane, value, valid_value
)
f'[!] Overriding invalid "{key:s}" value in "{plane:s}" voltage plane: {value:.0f} -> {valid_value:.0f}'
)
# handle the case where only one of UNDERVOLT.AC, UNDERVOLT.BATTERY keys exists
@ -554,7 +571,7 @@ def load_config():
raise ValueError
iccmax_enabled = True
except ValueError:
warning('Invalid value for {:s} in {:s}'.format(plane, key), oneshot=False)
warning(f'Invalid value for {plane:s} in {key:s}', oneshot=False)
config.remove_option(key, plane)
except configparser.NoOptionError:
pass
@ -565,12 +582,12 @@ def load_config():
def calc_reg_values(platform_info, config):
"""Compute the MSR values to apply for each power source from the config."""
regs = defaultdict(dict)
for power_source in ('AC', 'BATTERY'):
if platform_info['feature_programmable_temperature_target'] != 1:
warning("Setting temperature target is not supported by this CPU")
else:
# the critical temperature for my CPU is 100 'C
critical_temp = get_critical_temp()
# update the allowed temp range to keep at least 3 'C from the CPU critical temperature
global TRIP_TEMP_RANGE
@ -581,7 +598,7 @@ def calc_reg_values(platform_info, config):
trip_offset = int(round(critical_temp - Trip_Temp_C))
regs[power_source]['MSR_TEMPERATURE_TARGET'] = trip_offset << 24
else:
log('[I] {:s} trip temperature is disabled in config.'.format(power_source))
log(f'[I] {power_source:s} trip temperature is disabled in config.')
power_unit = get_power_unit()
@ -594,26 +611,26 @@ def calc_reg_values(platform_info, config):
cur_pkg_power_limits = get_cur_pkg_power_limits()
if PL1_Tdp_W is None:
PL1 = cur_pkg_power_limits['PL1']
log('[I] {:s} PL1_Tdp_W disabled in config.'.format(power_source))
log(f'[I] {power_source:s} PL1_Tdp_W disabled in config.')
else:
PL1 = int(round(PL1_Tdp_W / power_unit))
if PL1_Duration_s is None:
TW1 = cur_pkg_power_limits['TW1']
log('[I] {:s} PL1_Duration_s disabled in config.'.format(power_source))
log(f'[I] {power_source:s} PL1_Duration_s disabled in config.')
else:
Y, Z = calc_time_window_vars(PL1_Duration_s)
TW1 = Y | (Z << 5)
if PL2_Tdp_W is None:
PL2 = cur_pkg_power_limits['PL2']
log('[I] {:s} PL2_Tdp_W disabled in config.'.format(power_source))
log(f'[I] {power_source:s} PL2_Tdp_W disabled in config.')
else:
PL2 = int(round(PL2_Tdp_W / power_unit))
if PL2_Duration_s is None:
TW2 = cur_pkg_power_limits['TW2']
log('[I] {:s} PL2_Duration_s disabled in config.'.format(power_source))
log(f'[I] {power_source:s} PL2_Duration_s disabled in config.')
else:
Y, Z = calc_time_window_vars(PL2_Duration_s)
TW2 = Y | (Z << 5)
@ -622,7 +639,7 @@ def calc_reg_values(platform_info, config):
PL1 | (1 << 15) | (1 << 16) | (TW1 << 17) | (PL2 << 32) | (1 << 47) | (TW2 << 49)
)
else:
log('[I] {:s} package power limits are disabled in config.'.format(power_source))
log(f'[I] {power_source:s} package power limits are disabled in config.')
# cTDP
c_tdp_target_value = config.getint(power_source, 'cTDP', fallback=None)
@ -638,6 +655,7 @@ def calc_reg_values(platform_info, config):
def set_hwp(performance_mode):
"""Set the IA32_HWP_REQUEST energy/performance preference field."""
if performance_mode not in (True, False) or 'HWP' in UNSUPPORTED_FEATURES:
return
# set HWP energy performance preference
@ -649,22 +667,23 @@ def set_hwp(performance_mode):
if args.debug:
read_value = readmsr('IA32_HWP_REQUEST', from_bit=24, to_bit=31)[0]
match = OK if hwp_mode == read_value else ERR
log('[D] HWP - write "{:#02x}" - read "{:#02x}" - match {}'.format(hwp_mode, read_value, match))
log(f'[D] HWP - write "{hwp_mode:#02x}" - read "{read_value:#02x}" - match {match}')
def set_disable_bdprochot():
# Disable BDPROCHOT
"""Clear bit 0 of MSR_POWER_CTL to disable BDPROCHOT."""
cur_val = readmsr('MSR_POWER_CTL', flatten=True)
new_val = cur_val & 0xFFFFFFFFFFFFFFFE
writemsr('MSR_POWER_CTL', new_val)
if args.debug:
read_value = readmsr('MSR_POWER_CTL', from_bit=0, to_bit=0)[0]
match = OK if ~read_value else ERR
log('[D] BDPROCHOT - write "{:#02x}" - read "{:#02x}" - match {}'.format(0, read_value, match))
match = OK if read_value == 0 else ERR
log(f'[D] BDPROCHOT - write "{0:#02x}" - read "{read_value:#02x}" - match {match}')
def get_config_write_time():
"""Return the config file's mtime, or None if it doesn't exist."""
try:
return os.stat(args.config).st_mtime
except FileNotFoundError:
@ -672,6 +691,7 @@ def get_config_write_time():
def reload_config():
"""Re-read the config and re-apply undervolt, IccMax and HWP settings."""
config = load_config()
regs = calc_reg_values(get_cpu_platform_info(), config)
undervolt(config)
@ -682,12 +702,13 @@ def reload_config():
def power_thread(config, regs, exit_event, cpuid):
"""Daemon main loop: periodically (re-)apply throttling MSRs."""
try:
MCHBAR_BASE = int(check_output(('setpci', '-s', '0:0.0', '48.l')), 16)
except CalledProcessError:
warning('Please ensure that "setpci" is in path. This is typically provided by the "pciutils" package.')
warning('Trying to guess the MCHBAR address from the CPUID. This MIGHT NOT WORK!')
if cpuid in ((6, 140, 1),(6, 140, 2),(6, 141, 1),(6, 151, 2),(6, 151, 5), (6, 154, 3),(6, 154, 4)):
if cpuid in ((6, 140, 1), (6, 140, 2), (6, 141, 1), (6, 151, 2), (6, 151, 5), (6, 154, 3), (6, 154, 4)):
MCHBAR_BASE = 0xFEDC0001
else:
MCHBAR_BASE = 0xFED10001
@ -708,7 +729,7 @@ def power_thread(config, regs, exit_event, cpuid):
thermal_status = get_reset_thermal_status()
for index, core_thermal_status in enumerate(thermal_status):
for key, value in core_thermal_status.items():
log('[D] core {} thermal status: {} = {}'.format(index, key.replace("_", " "), value))
log(f'[D] core {index} thermal status: {key.replace("_", " ")} = {value}')
# Reload config on changes (unless it's deleted)
if config.getboolean('GENERAL', 'Autoreload', fallback=False):
@ -728,11 +749,7 @@ def power_thread(config, regs, exit_event, cpuid):
if args.debug:
read_value = readmsr('MSR_TEMPERATURE_TARGET', 24, 29, flatten=True)
match = OK if write_value >> 24 == read_value else ERR
log(
'[D] TEMPERATURE_TARGET - write {:#x} - read {:#x} - match {}'.format(
write_value >> 24, read_value, match
)
)
log(f'[D] TEMPERATURE_TARGET - write {write_value >> 24:#x} - read {read_value:#x} - match {match}')
# set cTDP
if 'MSR_CONFIG_TDP_CONTROL' in regs[power['source']]:
@ -741,35 +758,25 @@ def power_thread(config, regs, exit_event, cpuid):
if args.debug:
read_value = readmsr('MSR_CONFIG_TDP_CONTROL', 0, 1, flatten=True)
match = OK if write_value == read_value else ERR
log(
'[D] CONFIG_TDP_CONTROL - write {:#x} - read {:#x} - match {}'.format(
write_value, read_value, match
)
)
log(f'[D] CONFIG_TDP_CONTROL - write {write_value:#x} - read {read_value:#x} - match {match}')
# set PL1/2 on MSR
write_value = regs[power['source']]['MSR_PKG_POWER_LIMIT']
writemsr('MSR_PKG_POWER_LIMIT', write_value)
if args.debug:
read_value = readmsr('MSR_PKG_POWER_LIMIT', 0, 55, flatten=True)
match = OK if write_value == read_value else ERR
log(
'[D] MSR PACKAGE_POWER_LIMIT - write {:#x} - read {:#x} - match {}'.format(
write_value, read_value, match
)
)
if mchbar_mmio is not None:
# set MCHBAR register to the same PL1/2 values
mchbar_mmio.write32(0, write_value & 0xFFFFFFFF)
mchbar_mmio.write32(4, write_value >> 32)
if 'MSR_PKG_POWER_LIMIT' in regs[power['source']]:
write_value = regs[power['source']]['MSR_PKG_POWER_LIMIT']
writemsr('MSR_PKG_POWER_LIMIT', write_value)
if args.debug:
read_value = mchbar_mmio.read32(0) | (mchbar_mmio.read32(4) << 32)
read_value = readmsr('MSR_PKG_POWER_LIMIT', 0, 55, flatten=True)
match = OK if write_value == read_value else ERR
log(
'[D] MCHBAR PACKAGE_POWER_LIMIT - write {:#x} - read {:#x} - match {}'.format(
write_value, read_value, match
log(f'[D] MSR PACKAGE_POWER_LIMIT - write {write_value:#x} - read {read_value:#x} - match {match}')
if mchbar_mmio is not None:
# set MCHBAR register to the same PL1/2 values
mchbar_mmio.write64(0, write_value)
if args.debug:
read_value = mchbar_mmio.read64(0)
match = OK if write_value == read_value else ERR
log(
f'[D] MCHBAR PACKAGE_POWER_LIMIT - write {write_value:#x} - read {read_value:#x} - match {match}'
)
)
# Disable BDPROCHOT
disable_bdprochot = config.getboolean(power['source'], 'Disable_BDPROCHOT', fallback=None)
@ -779,28 +786,21 @@ def power_thread(config, regs, exit_event, cpuid):
wait_t = config.getfloat(power['source'], 'Update_Rate_s')
enable_hwp_mode = config.getboolean('AC', 'HWP_Mode', fallback=None)
# set HWP less frequently. Just to be safe since (e.g.) TLP might reset this value
if (
enable_hwp_mode
and next_hwp_write <= time()
and (
(power['method'] == 'dbus' and power['source'] == 'AC')
or (power['method'] == 'polling' and not is_on_battery(config))
)
):
if enable_hwp_mode and next_hwp_write <= time() and power['source'] == 'AC':
set_hwp(enable_hwp_mode)
next_hwp_write = time() + HWP_INTERVAL
else:
exit_event.wait(wait_t)
exit_event.wait(wait_t)
def check_kernel():
"""Verify we run as root and that the kernel exposes MSR/devmem."""
if os.geteuid() != 0:
fatal('No root no party. Try again with sudo.')
kernel_config = None
try:
with open(os.path.join('/boot', 'config-{:s}'.format(uname()[2]))) as f:
with open(os.path.join('/boot', f'config-{uname()[2]:s}')) as f:
kernel_config = f.read()
except IOError:
config_gz_path = os.path.join('/proc', 'config.gz')
@ -821,6 +821,7 @@ def check_kernel():
def check_cpu():
"""Identify the CPU from /proc/cpuinfo and refuse to run on unsupported models."""
try:
with open('/proc/cpuinfo') as f:
cpuinfo = {}
@ -850,36 +851,39 @@ def check_cpu():
'from /proc/cpuinfo.'
)
log('[I] Detected CPU architecture: Intel {:s}'.format(supported_cpus[cpuid]))
log(f'[I] Detected CPU architecture: Intel {supported_cpus[cpuid]:s}')
return cpuid
except SystemExit:
sys.exit(1)
except:
fatal('Unable to identify CPU model.')
raise
except (OSError, KeyError, ValueError) as e:
fatal(f'Unable to identify CPU model: {e}')
def test_msr_rw_capabilities():
"""Probe undervolt and HWP support; mark unavailable features as such."""
global TESTMSR
TESTMSR = True
try:
log('[I] Testing if undervolt is supported...')
get_undervolt()
except:
warning('Undervolt seems not to be supported by your system, disabling.')
UNSUPPORTED_FEATURES.append('UNDERVOLT')
try:
log('[I] Testing if undervolt is supported...')
get_undervolt()
except (IOError, OSError):
warning('Undervolt seems not to be supported by your system, disabling.')
UNSUPPORTED_FEATURES.append('UNDERVOLT')
try:
log('[I] Testing if HWP is supported...')
cur_val = readmsr('IA32_HWP_REQUEST', cpu=0)
writemsr('IA32_HWP_REQUEST', cur_val)
except:
warning('HWP seems not to be supported by your system, disabling.')
UNSUPPORTED_FEATURES.append('HWP')
TESTMSR = False
try:
log('[I] Testing if HWP is supported...')
cur_val = readmsr('IA32_HWP_REQUEST', cpu=0)
writemsr('IA32_HWP_REQUEST', cur_val)
except (IOError, OSError):
warning('HWP seems not to be supported by your system, disabling.')
UNSUPPORTED_FEATURES.append('HWP')
finally:
TESTMSR = False
def monitor(exit_event, wait):
"""Live-display throttling causes and per-domain power until exit_event is set."""
wait = max(0.1, wait)
rapl_power_unit = 0.5 ** readmsr('MSR_RAPL_POWER_UNIT', from_bit=8, to_bit=12, cpu=0)
power_plane_msr = {
@ -894,22 +898,21 @@ def monitor(exit_event, wait):
}
undervolt_values = get_undervolt(convert=True)
undervolt_output = ' | '.join('{:s}: {:.2f} mV'.format(plane, undervolt_values[plane]) for plane in VOLTAGE_PLANES)
log('[D] Undervolt offsets: {:s}'.format(undervolt_output))
undervolt_output = ' | '.join(f'{plane:s}: {undervolt_values[plane]:.2f} mV' for plane in VOLTAGE_PLANES)
log(f'[D] Undervolt offsets: {undervolt_output:s}')
iccmax_values = get_icc_max(convert=True)
iccmax_output = ' | '.join('{:s}: {:.2f} A'.format(plane, iccmax_values[plane]) for plane in CURRENT_PLANES)
log('[D] IccMax: {:s}'.format(iccmax_output))
iccmax_output = ' | '.join(f'{plane:s}: {iccmax_values[plane]:.2f} A' for plane in CURRENT_PLANES)
log(f'[D] IccMax: {iccmax_output:s}')
log('[D] Realtime monitoring of throttling causes:\n')
while not exit_event.is_set():
value = readmsr('IA32_THERM_STATUS', from_bit=0, to_bit=15, cpu=0)
offsets = {'Thermal': 0, 'Power': 10, 'Current': 12, 'Cross-domain (e.g. GPU)': 14}
output = ('{:s}: {:s}'.format(cause, LIM if bool((value >> offsets[cause]) & 1) else OK) for cause in offsets)
output = (f'{cause:s}: {LIM if bool((value >> offsets[cause]) & 1) else OK:s}' for cause in offsets)
# ugly code, just testing...
vcore = readmsr('IA32_PERF_STATUS', from_bit=32, to_bit=47, cpu=0) / (2.0 ** 13) * 1000
stats2 = {'VCore': '{:.0f} mV'.format(vcore)}
vcore = readmsr('IA32_PERF_STATUS', from_bit=32, to_bit=47, cpu=0) / (2.0**13) * 1000
stats2 = {'VCore': f'{vcore:.0f} mV'}
total = 0.0
for power_plane in ('Package', 'Graphics', 'DRAM'):
energy_j = readmsr(power_plane_msr[power_plane], cpu=0) * rapl_power_unit
@ -918,21 +921,22 @@ def monitor(exit_event, wait):
(energy_j, now),
(energy_j - prev_energy[power_plane][0]) / (now - prev_energy[power_plane][1]),
)
stats2[power_plane] = '{:.1f} W'.format(energy_w)
stats2[power_plane] = f'{energy_w:.1f} W'
total += energy_w
stats2['Total'] = '{:.1f} W'.format(total)
stats2['Total'] = f'{total:.1f} W'
output2 = ('{:s}: {:s}'.format(label, stats2[label]) for label in stats2)
output2 = (f'{label}: {stats2[label]}' for label in stats2)
terminator = '\n' if args.log else '\r'
log(
'[{}] {} || {}{}'.format(power['source'], ' - '.join(output), ' - '.join(output2), ' ' * 10),
f"[{power['source']}] {' - '.join(output)} || {' - '.join(output2)}{' ' * 10}",
end=terminator,
)
exit_event.wait(wait)
def main():
"""Daemon entrypoint: parse args, validate platform, start power thread."""
global args
parser = argparse.ArgumentParser()
@ -954,10 +958,11 @@ def main():
if args.log:
try:
args.log = open(args.log, 'w')
except:
except OSError as e:
args.log = None
fatal('Unable to write to the log file!')
fatal(f'Unable to write to the log file: {e}')
cpuid = None
if not args.force:
check_kernel()
cpuid = check_cpu()
@ -976,7 +981,7 @@ def main():
platform_info = get_cpu_platform_info()
if args.debug:
for key, value in platform_info.items():
log('[D] cpu platform info: {} = {}'.format(key.replace("_", " "), value))
log(f'[D] cpu platform info: {key.replace("_", " ")} = {value}')
regs = calc_reg_values(platform_info, config)
if not config.getboolean('GENERAL', 'Enabled'):