diff --git a/mmio.py b/mmio.py index cfd610f..e07411a 100644 --- a/mmio.py +++ b/mmio.py @@ -1,6 +1,7 @@ """ Stripped down version from https://github.com/vsergeev/python-periphery/blob/master/periphery/mmio.py """ + import mmap import os import struct @@ -11,7 +12,7 @@ class MMIOError(IOError): class MMIO(object): - def __init__(self, physaddr, size): + def __init__(self, physaddr: int, size: int) -> None: """Instantiate an MMIO object and map the region of physical memory specified by the address base `physaddr` and size `size` in bytes. Args: @@ -35,7 +36,7 @@ class MMIO(object): def __exit__(self, t, value, traceback): self.close() - def _open(self, physaddr, size): + def _open(self, physaddr: int, size: int) -> None: if not isinstance(physaddr, int): raise TypeError("Invalid physaddr type, should be integer.") if not isinstance(size, int): @@ -79,7 +80,7 @@ class MMIO(object): if (offset + length) > self._aligned_size: raise ValueError("Offset out of bounds.") - def read32(self, offset): + def read32(self, offset: int) -> int: """Read 32-bits from the specified `offset` in bytes, relative to the base physical address of the MMIO region. Args: @@ -95,9 +96,9 @@ class MMIO(object): offset = self._adjust_offset(offset) self._validate_offset(offset, 4) - return struct.unpack("=L", self.mapping[offset:offset + 4])[0] + return struct.unpack("=L", self.mapping[offset : offset + 4])[0] - def write32(self, offset, value): + def write32(self, offset: int, value: int) -> None: """Write 32-bits to the specified `offset` in bytes, relative to the base physical address of the MMIO region. Args: @@ -111,14 +112,14 @@ class MMIO(object): raise TypeError("Invalid offset type, should be integer.") if not isinstance(value, int): raise TypeError("Invalid value type, should be integer.") - if value < 0 or value > 0xffffffff: + if value < 0 or value > 0xFFFFFFFF: raise ValueError("Value out of bounds.") offset = self._adjust_offset(offset) self._validate_offset(offset, 4) - self.mapping[offset:offset + 4] = struct.pack("=L", value) + self.mapping[offset : offset + 4] = struct.pack("=L", value) - def close(self): + def close(self) -> None: """Unmap the MMIO object's mapped physical memory.""" if self.mapping is None: return @@ -128,5 +129,5 @@ class MMIO(object): # String representation - def __str__(self): - return "MMIO 0x%08x (size=%d)" % (self._physaddr, self._size) + def __str__(self) -> str: + return f"MMIO 0x{self._physaddr:08x} (size={self._size:d})" diff --git a/throttled.py b/throttled.py index 96e634e..574a228 100755 --- a/throttled.py +++ b/throttled.py @@ -197,10 +197,10 @@ ANSI_ESCAPE_RE = re.compile(r'\x1b\[[0-9;]*m') def _format(prefix, msg): - tstamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] if args.log: - return '{:s}: {:s}{:s}'.format(tstamp, prefix, ANSI_ESCAPE_RE.sub('', msg)) - return '{:s}{:s}'.format(prefix, msg) + tstamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + return f'{tstamp}: {prefix}{ANSI_ESCAPE_RE.sub("", msg)}' + return f'{prefix}{msg}' def log(msg, oneshot=False, end='\n'): @@ -226,7 +226,7 @@ def warning(msg, oneshot=True, end='\n'): def get_msr_list(): """Return the per-CPU MSR device paths in CPU-index order.""" cpus = sorted(int(x) for x in os.listdir('/dev/cpu') if x.isdigit()) - return ['/dev/cpu/{:d}/msr'.format(cpu) for cpu in cpus] + return [f'/dev/cpu/{cpu:d}/msr' for cpu in cpus] def writemsr(msr, val): @@ -250,11 +250,11 @@ def writemsr(msr, val): raise e if e.errno == EPERM or e.errno == EACCES: fatal( - 'Unable to write to MSR {} ({:x}). Try to disable Secure Boot ' - 'and check if your kernel does not restrict access to MSR.'.format(msr, MSR_DICT[msr]) + f'Unable to write to MSR {msr} ({MSR_DICT[msr]:x}). Try to disable Secure Boot ' + 'and check if your kernel does not restrict access to MSR.' ) elif e.errno == EIO: - fatal('Unable to write to MSR {} ({:x}). Unknown error.'.format(msr, MSR_DICT[msr])) + fatal(f'Unable to write to MSR {msr} ({MSR_DICT[msr]:x}). Unknown error.') else: raise e @@ -286,23 +286,23 @@ def readmsr(msr, from_bit=0, to_bit=63, cpu=None, flatten=False): output.append(get_value_for_bits(val, from_bit, to_bit)) if flatten: if len(set(output)) > 1: - warning('Found multiple values for {:s} ({:x}). This should never happen.'.format(msr, MSR_DICT[msr])) + warning(f'Found multiple values for {msr:s} ({MSR_DICT[msr]:x}). This should never happen.') return output[0] return output[cpu] if cpu is not None else output except (IOError, OSError) as e: if TESTMSR: raise e if e.errno == EPERM or e.errno == EACCES: - fatal('Unable to read from MSR {} ({:x}). Try to disable Secure Boot.'.format(msr, MSR_DICT[msr])) + fatal(f'Unable to read from MSR {msr} ({MSR_DICT[msr]:x}). Try to disable Secure Boot.') elif e.errno == EIO: - fatal('Unable to read to MSR {} ({:x}). Unknown error.'.format(msr, MSR_DICT[msr])) + fatal(f'Unable to read to MSR {msr} ({MSR_DICT[msr]:x}). Unknown error.') else: raise e def get_value_for_bits(val, from_bit=0, to_bit=63): """Extract bits [from_bit, to_bit] (inclusive) from val.""" - mask = sum(2 ** x for x in range(from_bit, to_bit + 1)) + mask = sum(2**x for x in range(from_bit, to_bit + 1)) return (val & mask) >> from_bit @@ -331,7 +331,7 @@ def is_on_battery(config): with open(path) as f: return not bool(int(f.read())) except (IOError, OSError, ValueError) as e: - warning('Sysfs_Power_Path read failed ({}). Trying upower method.'.format(e)) + warning(f'Sysfs_Power_Path read failed ({e}). Trying upower method.') else: warning('No valid Sysfs_Power_Path found! Trying upower method') try: @@ -400,9 +400,9 @@ def calc_time_window_vars(t): """Encode a time-window duration (s) as the (Y, Z) pair used by MSR_PKG_POWER_LIMIT.""" time_unit = get_time_unit() - for Y in range(2 ** 5): - for Z in range(2 ** 2): - if t <= (2 ** Y) * (1.0 + Z / 4.0) * time_unit: + for Y in range(2**5): + for Z in range(2**2): + if t <= (2**Y) * (1.0 + Z / 4.0) * time_unit: return (Y, Z) raise ValueError('Unable to find a good combination!') @@ -441,14 +441,11 @@ def get_undervolt(plane=None, convert=False): def undervolt(config): """Apply the undervolt offsets from the config to all voltage planes.""" - if ('UNDERVOLT.{:s}'.format(power['source']) not in config and 'UNDERVOLT' not in config) or ( - 'UNDERVOLT' in UNSUPPORTED_FEATURES - ): + section = f"UNDERVOLT.{power['source']}" + if (section not in config and 'UNDERVOLT' not in config) or 'UNDERVOLT' in UNSUPPORTED_FEATURES: return for plane in VOLTAGE_PLANES: - write_offset_mv = config.getfloat( - 'UNDERVOLT.{:s}'.format(power['source']), plane, fallback=config.getfloat('UNDERVOLT', plane, fallback=0.0) - ) + write_offset_mv = config.getfloat(section, plane, fallback=config.getfloat('UNDERVOLT', plane, fallback=0.0)) write_value = calc_undervolt_msr(plane, write_offset_mv) writemsr('MSR_OC_MAILBOX', write_value) if args.debug: @@ -457,9 +454,7 @@ def undervolt(config): read_offset_mv = calc_undervolt_mv(read_value) match = OK if write_value == read_value else ERR log( - '[D] Undervolt plane {:s} - write {:.0f} mV ({:#x}) - read {:.0f} mV ({:#x}) - match {}'.format( - plane, write_offset_mv, write_value, read_offset_mv, read_value, match - ) + f'[D] Undervolt plane {plane:s} - write {write_offset_mv:.0f} mV ({write_value:#x}) - read {read_offset_mv:.0f} mV ({read_value:#x}) - match {match}' ) @@ -492,10 +487,11 @@ def get_icc_max(plane=None, convert=False): def set_icc_max(config): """Apply the IccMax limits from the config to all current planes.""" + section = f"ICCMAX.{power['source']}" for plane in CURRENT_PLANES: try: write_current_amp = config.getfloat( - 'ICCMAX.{:s}'.format(power['source']), plane, fallback=config.getfloat('ICCMAX', plane, fallback=-1.0) + section, plane, fallback=config.getfloat('ICCMAX', plane, fallback=-1.0) ) if write_current_amp > 0: write_value = calc_icc_max_msr(plane, write_current_amp) @@ -506,9 +502,7 @@ def set_icc_max(config): read_current_A = calc_icc_max_amp(read_value) match = OK if write_value == read_value else ERR log( - '[D] IccMax plane {:s} - write {:.2f} A ({:#x}) - read {:.2f} A ({:#x}) - match {}'.format( - plane, write_current_amp, write_value, read_current_A, read_value, match - ) + f'[D] IccMax plane {plane:s} - write {write_current_amp:.2f} A ({write_value:#x}) - read {read_current_A:.2f} A ({read_value:#x}) - match {match}' ) except (configparser.NoSectionError, configparser.NoOptionError): pass @@ -534,9 +528,7 @@ def load_config(): if trip_temp != valid_trip_temp: config.set(power_source, 'Trip_Temp_C', str(valid_trip_temp)) log( - '[!] Overriding invalid "Trip_Temp_C" value in "{:s}": {:.1f} -> {:.1f}'.format( - power_source, trip_temp, valid_trip_temp - ) + f'[!] Overriding invalid "Trip_Temp_C" value in "{power_source:s}": {trip_temp:.1f} -> {valid_trip_temp:.1f}' ) # fix any invalid value (ie. > 0) in the undervolt settings @@ -548,9 +540,7 @@ def load_config(): if value != valid_value: config.set(key, plane, str(valid_value)) log( - '[!] Overriding invalid "{:s}" value in "{:s}" voltage plane: {:.0f} -> {:.0f}'.format( - key, plane, value, valid_value - ) + f'[!] Overriding invalid "{key:s}" value in "{plane:s}" voltage plane: {value:.0f} -> {valid_value:.0f}' ) # handle the case where only one of UNDERVOLT.AC, UNDERVOLT.BATTERY keys exists @@ -581,7 +571,7 @@ def load_config(): raise ValueError iccmax_enabled = True except ValueError: - warning('Invalid value for {:s} in {:s}'.format(plane, key), oneshot=False) + warning(f'Invalid value for {plane:s} in {key:s}', oneshot=False) config.remove_option(key, plane) except configparser.NoOptionError: pass @@ -608,7 +598,7 @@ def calc_reg_values(platform_info, config): trip_offset = int(round(critical_temp - Trip_Temp_C)) regs[power_source]['MSR_TEMPERATURE_TARGET'] = trip_offset << 24 else: - log('[I] {:s} trip temperature is disabled in config.'.format(power_source)) + log(f'[I] {power_source:s} trip temperature is disabled in config.') power_unit = get_power_unit() @@ -621,26 +611,26 @@ def calc_reg_values(platform_info, config): cur_pkg_power_limits = get_cur_pkg_power_limits() if PL1_Tdp_W is None: PL1 = cur_pkg_power_limits['PL1'] - log('[I] {:s} PL1_Tdp_W disabled in config.'.format(power_source)) + log(f'[I] {power_source:s} PL1_Tdp_W disabled in config.') else: PL1 = int(round(PL1_Tdp_W / power_unit)) if PL1_Duration_s is None: TW1 = cur_pkg_power_limits['TW1'] - log('[I] {:s} PL1_Duration_s disabled in config.'.format(power_source)) + log(f'[I] {power_source:s} PL1_Duration_s disabled in config.') else: Y, Z = calc_time_window_vars(PL1_Duration_s) TW1 = Y | (Z << 5) if PL2_Tdp_W is None: PL2 = cur_pkg_power_limits['PL2'] - log('[I] {:s} PL2_Tdp_W disabled in config.'.format(power_source)) + log(f'[I] {power_source:s} PL2_Tdp_W disabled in config.') else: PL2 = int(round(PL2_Tdp_W / power_unit)) if PL2_Duration_s is None: TW2 = cur_pkg_power_limits['TW2'] - log('[I] {:s} PL2_Duration_s disabled in config.'.format(power_source)) + log(f'[I] {power_source:s} PL2_Duration_s disabled in config.') else: Y, Z = calc_time_window_vars(PL2_Duration_s) TW2 = Y | (Z << 5) @@ -649,7 +639,7 @@ def calc_reg_values(platform_info, config): PL1 | (1 << 15) | (1 << 16) | (TW1 << 17) | (PL2 << 32) | (1 << 47) | (TW2 << 49) ) else: - log('[I] {:s} package power limits are disabled in config.'.format(power_source)) + log(f'[I] {power_source:s} package power limits are disabled in config.') # cTDP c_tdp_target_value = config.getint(power_source, 'cTDP', fallback=None) @@ -677,7 +667,7 @@ def set_hwp(performance_mode): if args.debug: read_value = readmsr('IA32_HWP_REQUEST', from_bit=24, to_bit=31)[0] match = OK if hwp_mode == read_value else ERR - log('[D] HWP - write "{:#02x}" - read "{:#02x}" - match {}'.format(hwp_mode, read_value, match)) + log(f'[D] HWP - write "{hwp_mode:#02x}" - read "{read_value:#02x}" - match {match}') def set_disable_bdprochot(): @@ -689,7 +679,7 @@ def set_disable_bdprochot(): if args.debug: read_value = readmsr('MSR_POWER_CTL', from_bit=0, to_bit=0)[0] match = OK if read_value == 0 else ERR - log('[D] BDPROCHOT - write "{:#02x}" - read "{:#02x}" - match {}'.format(0, read_value, match)) + log(f'[D] BDPROCHOT - write "{0:#02x}" - read "{read_value:#02x}" - match {match}') def get_config_write_time(): @@ -718,7 +708,7 @@ def power_thread(config, regs, exit_event, cpuid): except CalledProcessError: warning('Please ensure that "setpci" is in path. This is typically provided by the "pciutils" package.') warning('Trying to guess the MCHBAR address from the CPUID. This MIGHT NOT WORK!') - if cpuid in ((6, 140, 1),(6, 140, 2),(6, 141, 1),(6, 151, 2),(6, 151, 5), (6, 154, 3),(6, 154, 4)): + if cpuid in ((6, 140, 1), (6, 140, 2), (6, 141, 1), (6, 151, 2), (6, 151, 5), (6, 154, 3), (6, 154, 4)): MCHBAR_BASE = 0xFEDC0001 else: MCHBAR_BASE = 0xFED10001 @@ -739,7 +729,7 @@ def power_thread(config, regs, exit_event, cpuid): thermal_status = get_reset_thermal_status() for index, core_thermal_status in enumerate(thermal_status): for key, value in core_thermal_status.items(): - log('[D] core {} thermal status: {} = {}'.format(index, key.replace("_", " "), value)) + log(f'[D] core {index} thermal status: {key.replace("_", " ")} = {value}') # Reload config on changes (unless it's deleted) if config.getboolean('GENERAL', 'Autoreload', fallback=False): @@ -759,11 +749,7 @@ def power_thread(config, regs, exit_event, cpuid): if args.debug: read_value = readmsr('MSR_TEMPERATURE_TARGET', 24, 29, flatten=True) match = OK if write_value >> 24 == read_value else ERR - log( - '[D] TEMPERATURE_TARGET - write {:#x} - read {:#x} - match {}'.format( - write_value >> 24, read_value, match - ) - ) + log(f'[D] TEMPERATURE_TARGET - write {write_value >> 24:#x} - read {read_value:#x} - match {match}') # set cTDP if 'MSR_CONFIG_TDP_CONTROL' in regs[power['source']]: @@ -772,11 +758,7 @@ def power_thread(config, regs, exit_event, cpuid): if args.debug: read_value = readmsr('MSR_CONFIG_TDP_CONTROL', 0, 1, flatten=True) match = OK if write_value == read_value else ERR - log( - '[D] CONFIG_TDP_CONTROL - write {:#x} - read {:#x} - match {}'.format( - write_value, read_value, match - ) - ) + log(f'[D] CONFIG_TDP_CONTROL - write {write_value:#x} - read {read_value:#x} - match {match}') # set PL1/2 on MSR if 'MSR_PKG_POWER_LIMIT' in regs[power['source']]: @@ -785,11 +767,7 @@ def power_thread(config, regs, exit_event, cpuid): if args.debug: read_value = readmsr('MSR_PKG_POWER_LIMIT', 0, 55, flatten=True) match = OK if write_value == read_value else ERR - log( - '[D] MSR PACKAGE_POWER_LIMIT - write {:#x} - read {:#x} - match {}'.format( - write_value, read_value, match - ) - ) + log(f'[D] MSR PACKAGE_POWER_LIMIT - write {write_value:#x} - read {read_value:#x} - match {match}') if mchbar_mmio is not None: # set MCHBAR register to the same PL1/2 values mchbar_mmio.write32(0, write_value & 0xFFFFFFFF) @@ -798,9 +776,7 @@ def power_thread(config, regs, exit_event, cpuid): read_value = mchbar_mmio.read32(0) | (mchbar_mmio.read32(4) << 32) match = OK if write_value == read_value else ERR log( - '[D] MCHBAR PACKAGE_POWER_LIMIT - write {:#x} - read {:#x} - match {}'.format( - write_value, read_value, match - ) + f'[D] MCHBAR PACKAGE_POWER_LIMIT - write {write_value:#x} - read {read_value:#x} - match {match}' ) # Disable BDPROCHOT @@ -832,7 +808,7 @@ def check_kernel(): kernel_config = None try: - with open(os.path.join('/boot', 'config-{:s}'.format(uname()[2]))) as f: + with open(os.path.join('/boot', f'config-{uname()[2]:s}')) as f: kernel_config = f.read() except IOError: config_gz_path = os.path.join('/proc', 'config.gz') @@ -883,12 +859,12 @@ def check_cpu(): 'from /proc/cpuinfo.' ) - log('[I] Detected CPU architecture: Intel {:s}'.format(supported_cpus[cpuid])) + log(f'[I] Detected CPU architecture: Intel {supported_cpus[cpuid]:s}') return cpuid except SystemExit: raise except (OSError, KeyError, ValueError) as e: - fatal('Unable to identify CPU model: {}'.format(e)) + fatal(f'Unable to identify CPU model: {e}') def test_msr_rw_capabilities(): @@ -930,21 +906,21 @@ def monitor(exit_event, wait): } undervolt_values = get_undervolt(convert=True) - undervolt_output = ' | '.join('{:s}: {:.2f} mV'.format(plane, undervolt_values[plane]) for plane in VOLTAGE_PLANES) - log('[D] Undervolt offsets: {:s}'.format(undervolt_output)) + undervolt_output = ' | '.join(f'{plane:s}: {undervolt_values[plane]:.2f} mV' for plane in VOLTAGE_PLANES) + log(f'[D] Undervolt offsets: {undervolt_output:s}') iccmax_values = get_icc_max(convert=True) - iccmax_output = ' | '.join('{:s}: {:.2f} A'.format(plane, iccmax_values[plane]) for plane in CURRENT_PLANES) - log('[D] IccMax: {:s}'.format(iccmax_output)) + iccmax_output = ' | '.join(f'{plane:s}: {iccmax_values[plane]:.2f} A' for plane in CURRENT_PLANES) + log(f'[D] IccMax: {iccmax_output:s}') log('[D] Realtime monitoring of throttling causes:\n') while not exit_event.is_set(): value = readmsr('IA32_THERM_STATUS', from_bit=0, to_bit=15, cpu=0) offsets = {'Thermal': 0, 'Power': 10, 'Current': 12, 'Cross-domain (e.g. GPU)': 14} - output = ('{:s}: {:s}'.format(cause, LIM if bool((value >> offsets[cause]) & 1) else OK) for cause in offsets) + output = (f'{cause:s}: {LIM if bool((value >> offsets[cause]) & 1) else OK:s}' for cause in offsets) - vcore = readmsr('IA32_PERF_STATUS', from_bit=32, to_bit=47, cpu=0) / (2.0 ** 13) * 1000 - stats2 = {'VCore': '{:.0f} mV'.format(vcore)} + vcore = readmsr('IA32_PERF_STATUS', from_bit=32, to_bit=47, cpu=0) / (2.0**13) * 1000 + stats2 = {'VCore': f'{vcore:.0f} mV'} total = 0.0 for power_plane in ('Package', 'Graphics', 'DRAM'): energy_j = readmsr(power_plane_msr[power_plane], cpu=0) * rapl_power_unit @@ -953,15 +929,15 @@ def monitor(exit_event, wait): (energy_j, now), (energy_j - prev_energy[power_plane][0]) / (now - prev_energy[power_plane][1]), ) - stats2[power_plane] = '{:.1f} W'.format(energy_w) + stats2[power_plane] = f'{energy_w:.1f} W' total += energy_w - stats2['Total'] = '{:.1f} W'.format(total) + stats2['Total'] = f'{total:.1f} W' - output2 = ('{:s}: {:s}'.format(label, stats2[label]) for label in stats2) + output2 = (f'{label}: {stats2[label]}' for label in stats2) terminator = '\n' if args.log else '\r' log( - '[{}] {} || {}{}'.format(power['source'], ' - '.join(output), ' - '.join(output2), ' ' * 10), + f"[{power['source']}] {' - '.join(output)} || {' - '.join(output2)}{' ' * 10}", end=terminator, ) exit_event.wait(wait) @@ -992,7 +968,7 @@ def main(): args.log = open(args.log, 'w') except OSError as e: args.log = None - fatal('Unable to write to the log file: {}'.format(e)) + fatal(f'Unable to write to the log file: {e}') cpuid = None if not args.force: @@ -1013,7 +989,7 @@ def main(): platform_info = get_cpu_platform_info() if args.debug: for key, value in platform_info.items(): - log('[D] cpu platform info: {} = {}'.format(key.replace("_", " "), value)) + log(f'[D] cpu platform info: {key.replace("_", " ")} = {value}') regs = calc_reg_values(platform_info, config) if not config.getboolean('GENERAL', 'Enabled'):