systemrescue-zfs/airootfs/etc/systemd/scripts/sysrescue-initialize.py
Gerd v. Egidy afb77e30c5 sysrescue-initialize.py: Ensure the values given in the config file have the correct types
With the new config file merging the user could accidently overwrite the
config values with wrong types, for example a boolean with a dict or list.
This could lead to the script aborting with an exception.

Use explicit type casting and default values to ensure correct operation
in this case. This is the same as recently implemented for autorun.

Implement a dedicated conversion function for booleans to for example
treat a string "0" as False, python by default would interpret it as True.
2022-05-22 15:43:10 +02:00

298 lines
13 KiB
Python
Executable file

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
import subprocess
import json
import glob
import os
import sys
import re
import tempfile
# pythons os.symlink bails when a file already exists, this function also handles overwrites
def symlink_overwrite(target, link_file):
link_dir = os.path.dirname(link_file)
while True:
# get a tmp filename in the same dir as link_file
tmp = tempfile.NamedTemporaryFile(delete=True, dir=link_dir)
tmp.close()
# tmp is now deleted
# os.symlink aborts when a file with the same name already exists
# someone could have created a new file with the tmp name right in this moment
# so we need to loop and try again in this case
try:
os.symlink(target,tmp.name)
break
except FileExistsError:
pass
os.replace(tmp.name, link_file)
def strtobool (val):
"""Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
Function adapted from Pythons distutils.util.py because it will be deprecated soon
Copyright (c) Python Software Foundation; All Rights Reserved
"""
val = str(val).lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
return True
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
return False
else:
raise ValueError("invalid truth value %r" % (val,))
# ==============================================================================
# Initialization
# ==============================================================================
print(f"====> Script {sys.argv[0]} starting ...")
errcnt = 0
# ==============================================================================
# Read the effective configuration file
# ==============================================================================
print(f"====> Read the effective configuration file ...")
effectivecfg = "/run/archiso/config/sysrescue-effective-config.json"
if os.path.exists(effectivecfg) == False:
print (f"Failed to find effective configuration file in {effectivecfg}")
sys.exit(1)
with open(effectivecfg) as file:
config = json.load(file)
# ==============================================================================
# Sanitize config, initialize variables
# Make sysrescue-initialize work safely without them being defined or have a wrong type
# Also show the effective configuration
# ==============================================================================
print(f"====> Showing the effective global configuration (except clear passwords) ...")
def read_cfg_value(scope, name, defaultval, printval):
if not scope in config:
val = defaultval
elif name in config[scope]:
chkval = config[scope][name]
try:
if isinstance(chkval, list) or isinstance(chkval, dict):
raise TypeError(f"must be a {type(defaultval)}, not a {type(chkval)}")
elif isinstance(defaultval, bool) and not isinstance(chkval, bool):
val = strtobool(chkval)
else:
val = type(defaultval)(chkval)
except (TypeError, ValueError) as e:
if printval:
print(f"config['{scope}']['{name}'] with {chkval} is not the same type as defaultval: {e}")
else:
print(f"config['{scope}']['{name}'] is not the same type as defaultval: {e}")
val = defaultval
else:
val = defaultval
if printval:
print(f"config['{scope}']['{name}']={val}")
return val
setkmap = read_cfg_value('global','setkmap', "", True)
rootshell = read_cfg_value('global','rootshell', "", True)
rootpass = read_cfg_value('global','rootpass', "", False)
rootcryptpass = read_cfg_value('global','rootcryptpass', "", False)
nofirewall = read_cfg_value('global','nofirewall', False, True)
noautologin = read_cfg_value('global','noautologin', False, True)
dostartx = read_cfg_value('global','dostartx', False, True)
dovnc = read_cfg_value('global','dovnc', False, True)
vncpass = read_cfg_value('global','vncpass', "", False)
late_load_srm = read_cfg_value('global','late_load_srm', "", True)
# ==============================================================================
# Apply the effective configuration
# ==============================================================================
print(f"====> Applying configuration ...")
# Configure keyboard layout if requested in the configuration
if setkmap != "":
p = subprocess.run(["localectl", "set-keymap", setkmap], text=True)
if p.returncode == 0:
print (f"Have changed the keymap successfully")
else:
print (f"Failed to change keymap")
errcnt+=1
# Configure root login shell if requested in the configuration
if rootshell != "":
p = subprocess.run(["chsh", "--shell", rootshell, "root"], text=True)
if p.returncode == 0:
print (f"Have changed the root shell successfully")
else:
print (f"Failed to change the root shell")
errcnt+=1
# Set the system root password from a clear password
if rootpass != "":
p = subprocess.run(["chpasswd", "--crypt-method", "SHA512"], text=True, input=f"root:{rootpass}")
if p.returncode == 0:
print (f"Have changed the root password successfully")
else:
print (f"Failed to change the root password")
errcnt+=1
# Set the system root password from an encrypted password
# A password can be encrypted using a one-line python3 command such as:
# python3 -c 'import crypt; print(crypt.crypt("MyPassWord123", crypt.mksalt(crypt.METHOD_SHA512)))'
if rootcryptpass != "":
p = subprocess.run(["chpasswd", "--encrypted"], text=True, input=f"root:{rootcryptpass}")
if p.returncode == 0:
print (f"Have changed the root password successfully")
else:
print (f"Failed to change the root password")
errcnt+=1
# Disable the firewall
if nofirewall == True:
# The firewall service(s) must be in the Before-section of sysrescue-initialize.service
p = subprocess.run(["systemctl", "disable", "--now", "iptables.service", "ip6tables.service"], text=True)
if p.returncode == 0:
print (f"Have disabled the firewall successfully")
else:
print (f"Failed to disable the firewall")
errcnt+=1
# Auto-start the graphical environment (tty1 only)
if dostartx == True:
str = '[[ ! $DISPLAY ]] && [[ ! $SSH_TTY ]] && [[ $XDG_VTNR == 1 ]] && startx'
if (os.path.exists("/root/.bash_profile") == False) or (open("/root/.bash_profile", 'r').read().find(str) == -1):
file1 = open("/root/.bash_profile", "a")
file1.write(f"{str}\n")
file1.close()
file2 = open("/root/.zlogin", "w")
file2.write(f"{str}\n")
file2.close()
# Require authenticated console access
if noautologin == True:
p = subprocess.run(["systemctl", "revert", "getty@.service", "serial-getty@.service"], text=True)
if p.returncode == 0:
print (f"Have enabled authenticated console access successfully")
else:
print (f"Failed to enable authenticated console access")
errcnt+=1
# Set the VNC password from a clear password
if vncpass != "":
os.makedirs("/root/.vnc", exist_ok = True)
p = subprocess.run(["x11vnc", "-storepasswd", vncpass, "/root/.vnc/passwd"], text=True)
if p.returncode == 0:
print (f"Have changed the vnc password successfully")
else:
print (f"Failed to change the vnc password")
errcnt+=1
# Auto-start x11vnc with the graphical environment
if dovnc == True:
print (f"Enabling VNC Server in /root/.xprofile ...")
file = open("/root/.xprofile", "w")
file.write("""[ -f ~/.vnc/passwd ] && pwopt="-usepw" || pwopt="-nopw"\n""")
file.write("""x11vnc $pwopt -nevershared -forever -logfile /var/log/x11vnc.log &\n""")
file.close()
# ==============================================================================
# Configure custom CA certificates
# ==============================================================================
ca_anchor_path = "/etc/ca-certificates/trust-source/anchors/"
if 'sysconfig' in config and 'ca-trust' in config['sysconfig'] and config['sysconfig']['ca-trust']:
print(f"====> Adding trusted CA certificates ...")
for name, cert in sorted(config['sysconfig']['ca-trust'].items()):
print (f"Adding certificate '{name}' ...")
with open(os.path.join(ca_anchor_path, name + ".pem"), "w") as certfile:
certfile.write(cert)
print(f"Updating CA trust configuration ...")
p = subprocess.run(["update-ca-trust"], text=True)
# ==============================================================================
# late-load a SystemRescueModule (SRM)
# ==============================================================================
if late_load_srm != "":
print(f"====> Late-loading SystemRescueModule (SRM) ...")
subprocess.run(["/usr/share/sysrescue/bin/load-srm", late_load_srm])
# the SRM could contain changes to systemd units -> let them take effect
subprocess.run(["/usr/bin/systemctl", "daemon-reload"])
# trigger start of multi-user.target: the SRM could have added something to it's "Wants"
# systemd doesn't re-evaluate the dependencies on daemon-reload while running a transaction
# so we have to do this manually. Note: only affects multi-user.target, nothing else
subprocess.run(["/usr/bin/systemctl", "--no-block", "start", "multi-user.target"])
# ==============================================================================
# autoterminal: programs that take over a virtual terminal for user interaction
# ==============================================================================
# expect a dict with terminal-name: command, like config['autoterminal']['tty2'] = "/usr/bin/setkmap"
if ('autoterminal' in config) and (config['autoterminal'] is not None) and \
(config['autoterminal'] is not False) and isinstance(config['autoterminal'], dict):
print("====> Configuring autoterminal ...")
with open('/usr/share/sysrescue/template/autoterminal.service', 'r') as template_file:
conf_template = template_file.read()
with open('/usr/share/sysrescue/template/serial-autoterminal.service', 'r') as template_file:
serial_conf_template = template_file.read()
start_services = []
for terminal, command in sorted(config['autoterminal'].items()):
if m := re.match(r"^serial:([a-zA-Z0-9_-]+)$", terminal):
serial=True
terminal = m.group(1)
else:
serial=False
if not re.match(r"^[a-zA-Z0-9_-]+$", terminal):
print (f"Ignoring invalid terminal name '{terminal}'")
errcnt+=1
continue
# do not check if terminal or command exists: an autorun could create them later on
if serial:
print (f"setting serial terminal '{terminal}' to '{command}'")
else:
print (f"setting terminal '{terminal}' to '{command}'")
with open(f"/etc/systemd/system/autoterminal-{terminal}.service", "w") as terminal_conf:
# write service config, based on the template config we loaded above
# don't use getty@{terminal}.service name to not use autovt@{terminal}.service on-demand logic
if serial:
conf_data=serial_conf_template.replace("%TTY%",terminal)
else:
conf_data=conf_template.replace("%TTY%",terminal)
conf_data=conf_data.replace("%EXEC%",command)
terminal_conf.write(conf_data)
# enable service: always start it, do not wait for the user to switch to the terminal
# means other programs (like X.org) can't allocate it away, also allows for longer running init sequences
symlink_overwrite(f"/etc/systemd/system/autoterminal-{terminal}.service",
f"/etc/systemd/system/getty.target.wants/autoterminal-{terminal}.service")
# mask the regular getty for this terminal
if serial:
symlink_overwrite("/dev/null",f"/etc/systemd/system/serial-getty@{terminal}.service")
else:
symlink_overwrite("/dev/null",f"/etc/systemd/system/getty@{terminal}.service")
symlink_overwrite("/dev/null",f"/etc/systemd/system/autovt@{terminal}.service")
start_services.append(f"autoterminal-{terminal}.service")
# reload systemd to allow the new config to take effect
subprocess.run(["/usr/bin/systemctl", "daemon-reload"])
# explicitly start new services (after daemon-reload): systemd can't update dependencies while starting
for s in start_services:
subprocess.run(["/usr/bin/systemctl", "--no-block", "start", s])
# ==============================================================================
# End of the script
# ==============================================================================
print(f"====> Script {sys.argv[0]} completed with {errcnt} errors ...")
sys.exit(errcnt)