# Copyright (C) 2017 Red Hat, Inc., Bryn M. Reeves <bmr@redhat.com>
#
# bootloader.py - Boom BLS bootloader manager
#
# This file is part of the boom project.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""The ``boom.bootloader`` module defines classes for working with
on-disk boot loader entries: the ``BootEntry`` class represents an
individual boot loader entry, and the ``BootParams`` class
encapsulates the parameters needed to boot an instance of the
operating system. The kernel version and root device configuration
of an existing ``BootEntry`` may be changed by modifying or
substituting its ``BootParams`` object (this may also be used to
'clone' configuration from one entry to another).
Functions are provided to read and write boot loader entries from an
on-disk store (normally located at ``/boot/loader/entries``), and to
retrieve particular ``BootEntry`` objects based on a variety of
selection criteria.
The ``BootEntry`` class includes named properties for each boot entry
attribute ("entry key"). In addition, the class serves as a container
type, allowing attributes to be accessed via dictionary-style indexing.
This simplifies iteration over a profile's key / value pairs and allows
straightforward access to all members in scripts and the Python shell.
All entry key names are made available as named members of the module:
``BOOT_ENTRY_*``, and the ``ENTRY_KEYS`` list. A map of Boom key names
to BLS keys is available in the ``KEY_MAP`` dictionary (a reverse map
is also provided in the ``MAP_KEY`` member).
"""
from boom import *
from boom.osprofile import *
from os.path import basename, exists as path_exists, join as path_join
from tempfile import mkstemp
from os import listdir, rename, fdopen, chmod, unlink, fdatasync, stat, dup
from stat import S_ISBLK
from hashlib import sha1
import logging
import re
#: The path to the BLS boot entries directory relative to /boot
ENTRIES_PATH = "loader/entries"
#: The format used to construct entry file names.
BOOT_ENTRIES_FORMAT = "%s-%s-%s.conf"
#: A regular expression matching the boom file name format.
BOOT_ENTRIES_PATTERN = r"(\w*)-(\w{1,7})-([a-zA-Z0-9.\-_]*)"
#: The file mode with which BLS entries should be created.
BOOT_ENTRY_MODE = 0o644
#: The ``BootEntry`` title key.
BOOT_TITLE = "BOOT_TITLE"
#: The ``BootEntry`` version key.
BOOT_VERSION = "BOOT_VERSION"
#: The ``BootEntry`` machine_id key.
BOOT_MACHINE_ID = "BOOT_MACHINE_ID"
#: The ``BootEntry`` linux key.
BOOT_LINUX = "BOOT_LINUX"
#: The ``BootEntry`` initrd key.
BOOT_INITRD = "BOOT_INITRD"
#: The ``BootEntry`` efi key.
BOOT_EFI = "BOOT_EFI"
#: The ``BootEntry`` options key.
BOOT_OPTIONS = "BOOT_OPTIONS"
#: The ``BootEntry`` device tree key.
BOOT_DEVICETREE = "BOOT_DEVICETREE"
#: The ``BootEntry`` boot identifier key.
BOOT_ID = "BOOT_ID"
#: An ordered list of all possible ``BootEntry`` keys.
ENTRY_KEYS = [
# We require a title for each entry (BLS does not)
BOOT_TITLE,
# MACHINE_ID is optional in BLS, however, since the standard suggests
# that it form part of the file name for compliant snippets, it is
# effectively mandatory.
BOOT_MACHINE_ID,
BOOT_VERSION,
# One of either BOOT_LINUX or BOOT_EFI must be present.
BOOT_LINUX, BOOT_EFI,
BOOT_INITRD, BOOT_OPTIONS,
BOOT_DEVICETREE
]
#: Map Boom entry names to BLS keys
KEY_MAP = {
BOOT_TITLE: "title",
BOOT_VERSION: "version",
BOOT_MACHINE_ID: "machine_id",
BOOT_LINUX: "linux",
BOOT_INITRD: "initrd",
BOOT_EFI: "efi",
BOOT_OPTIONS: "options",
BOOT_DEVICETREE: "devicetree"
}
def __make_map_key(key_map):
"""Compatibility function to generate a reverse dictionary on
Python 2.6 which does not support dictionary comprehension
notation.
"""
map_key = {}
for k, v in key_map.items():
map_key[v] = k
return map_key
#: Map BLS entry keys to Boom names
MAP_KEY = __make_map_key(KEY_MAP)
# Module logging configuration
_log = logging.getLogger(__name__)
_log.set_debug_mask(BOOM_DEBUG_ENTRY)
_log_debug = _log.debug
_log_debug_entry = _log.debug_masked
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error
#: The global list of boot entries.
_entries = None
#: Pattern for forming root device paths from LVM2 names.
DEV_PATTERN = "/dev/%s"
[docs]def boom_entries_path():
"""Return the path to the boom profiles directory.
:returns: The boom profiles path.
:returntype: str
"""
return path_join(get_boot_path(), ENTRIES_PATH)
#: Private constants for Grub2 integration checks
__grub_cfg = "grub2/grub.cfg"
__etc_grub_d = "/etc/grub.d"
__boom_grub_d = "42_boom"
__etc_default = "/etc/default"
__boom_defaults = "boom"
[docs]def check_bootloader():
"""Check the configuration state of the system bootloader to ensure
that Boom integration is enabled. Currently only Grub2 with the
Red Hat BLS patches is supported.
"""
grub_cfg = path_join(get_boot_path(), __grub_cfg)
if not path_exists(grub_cfg):
_log_warn("No Grub2 configuration file found")
return False
boom_grub_d = path_join(__etc_grub_d, __boom_grub_d)
if not path_exists(boom_grub_d):
_log_warn("Boom grub2 script missing from '%s'" % __etc_grub_d)
return False
defaults_file = path_join(__etc_default, __boom_defaults)
if not path_exists(defaults_file):
_log_warn("Boom configuration file missing from '%s'" % defaults_file)
return False
def is_yes(val):
return val == "y" or val == "yes"
submenu_enabled = False
with open(defaults_file, "r") as dfile:
for line in dfile:
(name, value) = _parse_name_value(line)
if name == "BOOM_ENABLE_GRUB" and not is_yes(value):
_log_warn("Boom grub2 integration is disabled in '%s'" %
defaults_file)
if name == "BOOM_USE_SUBMENU" and is_yes(value):
_log_info("Boom grub2 submenu support enabled")
submenu_enabled = True
if name == "BOOM_SUBMENU_NAME" and submenu_enabled:
_log_info("Boom grub2 submenu name is '%s'" % value)
found_boom_grub = False
found_bls = False
blscfg = "blscfg"
with open(grub_cfg) as gfile:
for line in gfile:
words = line.split()
if blscfg in line:
_log_info("Found BLS import statement in '%s'" % grub_cfg)
found_bls = True
if "BEGIN" in line and boom_grub_d in line:
_log_info("Found Boom Grub2 integration in '%s'" % grub_cfg)
found_boom_grub = True
return found_boom_grub or found_bls
[docs]class BoomRootDeviceError(BoomError):
"""Boom exception indicating an invalid root device.
"""
pass
def check_root_device(dev):
"""Test for the presence of root device ``dev`` and return if it
exists in the configured /dev directory and is a valid block
device, or raise ``BoomRootDeviceError`` otherwise.
The exception string indicates the class of error: missing
path or not a block device.
:param dev: the root device to check for.
:raises: BoomRootDeviceError if ``dev`` is invalid.
:returns: None
"""
if not path_exists(dev):
raise BoomRootDeviceError("Device '%s' not found." % dev)
st = stat(dev)
if not S_ISBLK(st.st_mode):
raise BoomRootDeviceError("Path '%s' is not a block device." % dev)
[docs]class BootParams(object):
"""The ``BootParams`` class encapsulates the information needed to
boot an instance of the operating system: the kernel version,
root device, and root device options.
A ``BootParams`` object is used to configure a ``BootEntry``
and to generate configuration keys for the entry based on an
attached OsProfile.
"""
#: The kernel version of the instance.
version = None
#: The path to the root device
root_device = None
#: The LVM2 logical volume containing the root file system
lvm_root_lv = None
#: The BTRFS subvolume path to be used as the root file system.
btrfs_subvol_path = None
#: The ID of the BTRFS subvolume to be used as the root file system.
btrfs_subvol_id = None
def __str(self, quote=False, prefix="", suffix=""):
"""Format BootParams as a string.
Format this ``BootParams`` object as a string, with optional
prefix, suffix, and value quoting.
:param quote: A bool indicating whether to quote values.
:param prefix: An optional prefix string to be concatenated
with the start of the formatted string.
:param suffix: An optional suffix string to be concatenated
with the end of the formatted string.
:returns: a formatted representation of this ``BootParams``.
:returntype: string
"""
bp_str = prefix
fields = ["version", "root_device", "lvm_root_lv",
"btrfs_subvol_path", "btrfs_subvol_id"]
params = (
self.root_device,
self.lvm_root_lv,
self.btrfs_subvol_path, self.btrfs_subvol_id
)
# arg
bp_str += self.version if not quote else '"%s"' % self.version
bp_str += ", "
# kwargs
bp_fmt = "%s=%s, " if not quote else '%s="%s", '
for fv in [fv for fv in zip(fields[1:], params) if fv[1]]:
bp_str += bp_fmt % fv
return bp_str.rstrip(", ") + suffix
[docs] def __str__(self):
"""Format BootParams as a human-readable string.
Format this ``BootParams`` object as a human-readable string.
:returns: A human readable string representation of this
``BootParams`` object.
:returntype: string
"""
return self.__str()
[docs] def __repr__(self):
"""Format BootParams as a machine-readable string.
Format this ``BootParams`` object as a machine-readable
string. The string returned is in the form of a call to the
``BootParams`` constructor.
:returns: a machine readable string represenatation of this
``BootParams`` object.
"""
return self.__str(quote=True, prefix="BootParams(", suffix=")")
[docs] def __init__(self, version, root_device=None, lvm_root_lv=None,
btrfs_subvol_path=None, btrfs_subvol_id=None):
"""Initialise a new ``BootParams`` object.
The root device is specified via the ``root_device``
argument as a path relative to the root file system.
The LVM2 logical volume containing the root file system is
specified using ``lvm_root_lv`` if LVM2 is used.
For instances using LVM2, if the ``lvm_root_lv`` argument is
set and ``root_device`` is unset, ``root_device`` is assumed
to be the normal path of the logical volume specified by the
``lvm_root_lv`` argument.
For instances using BTRFS, the ``root_device`` argument is
always required.
Instances using BTRFS may select a subvolume to be mounted
as the root file system by specifying either the subvolume
path or id via ``btrfs_subvol_path`` and
``btrfs_subvol_id``.
``BootParams()`` raises ValueError if a required argument is
missing, or if conflicting arguments are present.
:param version: The version string for this BootParams
object.
:param root_device: The root device for this BootParams
object.
:param lvm_root_lv: The LVM2 logical volume containing the
root file system, for systems that use
LVM.
:param btrfs_subvol_path: The BTRFS subvolume path
containing the root file system,
for systems using BTRFS.
:param btrfs_subvol_id: The BTRFS subvolume ID containing
the root file system, for systems
using BTRFS.
:returns: a newly initialised BootParams object.
:returntype: class BootParams
:raises: ValueError
"""
if not version:
raise ValueError("version argument is required.")
self.version = version
if root_device:
self.root_device = root_device
if lvm_root_lv:
if not root_device:
self.root_device = DEV_PATTERN % lvm_root_lv
self.lvm_root_lv = lvm_root_lv
if btrfs_subvol_path and btrfs_subvol_id:
raise ValueError("Only one of btrfs_subvol_path and "
"btrfs_subvol_id allowed.")
if btrfs_subvol_path:
self.btrfs_subvol_path = btrfs_subvol_path
if btrfs_subvol_id:
self.btrfs_subvol_id = btrfs_subvol_id
_log_debug_entry("Initialised %s" % repr(self))
[docs] def has_btrfs(self):
"""Return ``True`` if this BootParams object is configured to
use BTRFS.
:returns: True if BTRFS is in use, or False otherwise
:returntype: bool
"""
return any((self.btrfs_subvol_id, self.btrfs_subvol_path))
[docs] def has_lvm2(self):
"""Return ``True`` if this BootParams object is configured to
use LVM2.
:returns: True if LVM2 is in use, or False otherwise
:returntype: bool
"""
return self.lvm_root_lv is not None and len(self.lvm_root_lv)
[docs] @classmethod
def from_entry(cls, be):
"""Recover BootParams from BootEntry.
Recover BootParams values from a templated BootEntry: each
key subject to template substitution is transformed into a
regular expression, matching the element and capturing the
corresponding BootParams value.
A BootEntry object that has no attached OsProfile cannot be
reversed since no templates exist to match the entry against:
in this case None is returned but no exception is raised.
The entry may be modified and re-written, but no templating
is possible unless a new, valid, OsProfile is attached.
:param be: The BootEntry to recover BootParams from.
:returns: A newly initialised BootParams object.
:returntype: ``BootParams``
:raises: ValueError if expected values cannot be matched.
"""
osp = be._osp
# Version is written directly from BootParams
version = be.version
bp = BootParams(version)
_log_debug_entry("Initialising BootParams() from "
"BootEntry(boot_id='%s')" % be.boot_id)
opts_regex_words = osp.make_format_regexes(osp.options)
if not opts_regex_words:
return None
_log_debug_entry("Matching options regex list with %d entries" %
len(opts_regex_words))
_log_debug_entry("Options regex list: %s" % str(opts_regex_words))
for rgx_word in opts_regex_words:
# FIXME: capture options not present in template
name = rgx_word[0]
exp = rgx_word[1]
for word in be.options.split():
match = re.search(exp, word)
if match:
if len(match.groups()):
value = match.group(1)
_log_debug_entry("Matched: '%s' (%s)" %
(value, name))
setattr(bp, name, value)
_log_debug_entry("Parsed %s" % repr(bp))
return bp
def _add_entry(entry):
"""Add a new entry to the list of loaded on-disk entries.
:param entry: The ``BootEntry`` to add.
"""
global _entries
if _entries is None:
load_entries()
if entry not in _entries:
_entries.append(entry)
def _del_entry(entry):
"""Remove a ``BootEntry`` from the list of loaded entries.
:param entry: The ``BootEntry`` to remove.
"""
global _entries
_entries.remove(entry)
[docs]def load_entries(machine_id=None):
""" Load boot entries into memory.
Load boot entries from ``boom.bootloader.boom_entries_path()``.
If ``machine_id`` is specified only matching entries will be
considered.
:param machine_id: A ``machine_id`` value to match.
"""
global _entries
if not profiles_loaded():
load_profiles()
entries_path = boom_entries_path()
_log_info("Loading boot entries from '%s'" % entries_path)
_entries = []
for entry in listdir(entries_path):
if not entry.endswith(".conf"):
continue
if machine_id and machine_id not in entry:
_log_debug_entry("Skipping entry with machine_id!='%s'",
machine_id)
continue
entry_path = path_join(entries_path, entry)
try:
_add_entry(BootEntry(entry_file=entry_path))
except Exception as e:
_log_info("Could not load BootEntry '%s': %s" %
(entry_path, e))
_log_info("Loaded %d entries" % len(_entries))
[docs]def write_entries():
"""Write out boot entries.
Write all currently loaded boot entries to
``boom.bootloader.boom_entries_path()``.
"""
global _entries
for be in _entries:
try:
be.write_entry()
except Exception as e:
_log_warn("Could not write BootEntry(boot_id='%s'): %s" %
(be.disp_boot_id, e))
[docs]def min_boot_id_width():
"""Calculate the minimum unique width for boot_id values.
Calculate the minimum width to ensure uniqueness when displaying
boot_id values.
:returns: the minimum boot_id width.
:returntype: int
"""
min_prefix = 7
if not _entries:
return min_prefix
shas = set()
for be in _entries:
shas.add(be.boot_id)
return _find_minimum_sha_prefix(shas, min_prefix)
def select_params(s, bp):
"""Test BootParams against Selection criteria.
Test the supplied ``BootParams`` against the selection criteria
in ``s`` and return ``True`` if it passes, or ``False``
otherwise.
:param bp: The BootParams to test
:returntype: bool
:returns: True if BootParams passes selection or ``False``
otherwise.
"""
if s.root_device and s.root_device != bp.root_device:
return False
if s.lvm_root_lv and s.lvm_root_lv != bp.lvm_root_lv:
return False
if s.btrfs_subvol_path and s.btrfs_subvol_path != bp.btrfs_subvol_path:
return False
if s.btrfs_subvol_id and s.btrfs_subvol_id != bp.btrfs_subvol_id:
return False
return True
def select_entry(s, be):
"""Test BootEntry against Selection criteria.
Test the supplied ``BootEntry`` against the selection criteria
in ``s`` and return ``True`` if it passes, or ``False``
otherwise.
:param bp: The BootEntry to test
:returntype: bool
:returns: True if BootEntry passes selection or ``False``
otherwise.
"""
if not select_profile(s, be._osp):
return False
if s.boot_id and not be.boot_id.startswith(s.boot_id):
return False
if s.title and be.title != s.title:
return False
if s.version and be.version != s.version:
return False
if s.machine_id and be.machine_id != s.machine_id:
return False
if not select_params(s, be.bp):
return False
return True
[docs]def find_entries(selection=None):
"""Find boot entries matching selection criteria.
Return a list of ``BootEntry`` objects matching the specified
criteria. Matching proceeds as the logical 'and' of all criteria.
Criteria that are unset (``None``) are ignored.
If no ``BootEntry`` matches the specified criteria the empty list
is returned.
Boot entries will be automatically loaded from disk if they are
not already in memory.
:param selection: A ``Selection`` object specifying the match
criteria for the operation.
:returns: a list of ``BootEntry`` objects.
:returntype: list
"""
global _entries
if not _entries:
load_entries()
matches = []
# Use null search criteria if unspecified
selection = selection if selection else Selection()
selection.check_valid_selection(entry=True, params=True, profile=True)
_log_debug_entry("Finding entries for %s" % repr(selection))
for be in _entries:
if select_entry(selection, be):
matches.append(be)
_log_debug_entry("Found %d entries" % len(matches))
return matches
def _transform_key(key_name):
"""Transform key characters between Boom and BLS notation.
Transform all occurrences of '_' in ``key_name`` to '-' or vice
versa.
Key names on-disk use a hyphen as the word separator, for e.g.
"machine-id". We cannot use this character for Python attributes
since it collides with the subtraction operator.
:param key_name: The key name to be transformed.
:returns: The transformed key name.
:returntype: string
"""
if "_" in key_name:
return key_name.replace("_", "-")
if "-" in key_name:
return key_name.replace("-", "_")
return key_name
[docs]class BootEntry(object):
"""A class representing a BLS compliant boot entry.
A ``BootEntry`` exposes two sets of properties that are the
keys of a BootLoader Specification boot entry.
The properties of a ``BootEntry`` that is not associated with an
``OsProfile`` (for e.g. one read from disk) are the literal
values read from a file or set through the API.
When an ``OSProfile`` is attached to a ``BootEntry``, it is used
as a template to fill out the values of keys for properties
including the kernel and initramfs file name. This is used to
create new ``BootEntry`` objects to be written to disk.
An ``OsProfile`` can be attached to a ``BootEntry`` when it is
created, or at a later time by calling the ``set_os_profile()``
method.
"""
_entry_data = None
_unwritten = False
_comments = None
_osp = None
bp = None
# boot_id cache
__boot_id = None
def __str(self, quote=False, prefix="", suffix="", tail="\n",
sep=" ", bls=True, no_boot_id=False):
"""Format BootEntry as a string.
Return a human or machine readable representation of this
BootEntry.
:param quote: True if values should be quoted or False otherwise.
:param prefix:An optional prefix string to be concatenated with
with the start of the formatted string.
:param suffix: An optional suffix string to be concatenated
with the end of the formatted string.
:param tail: A string to be concatenated between subsequent
records in the formatted string.
:param sep: A separator to be inserted between each name and
value. Normally either ' ' or '='.
:param bls: Generate output using BootLoader Specification
syntax and key names.
:param no_boot_id: Do not include the BOOT_ID key in the
returned string. Used internally in
order to avoid recursion when calculating
the BOOT_ID checksum.
:returns: A string representation.
:returntype: string
"""
be_str = prefix
for key in [k for k in ENTRY_KEYS if getattr(self, KEY_MAP[k])]:
attr = KEY_MAP[key]
key_fmt = '%s%s"%s"' if quote else '%s%s%s'
key_fmt += tail
attr_val = getattr(self, attr)
if bls:
key_data = (_transform_key(attr), sep, attr_val)
else:
key_data = (key, sep, attr_val)
be_str += key_fmt % key_data
# BOOT_ID requires special handling to avoid recursion from the
# boot_id property method (which uses the string representation
# of the object to calculate the checksum).
if not bls and not no_boot_id:
key_fmt = ('%s%s"%s"' if quote else '%s%s%s') + tail
boot_id_data = [BOOT_ID, sep, self.boot_id]
if bls:
boot_id_data[0] = _transform_key(BOOT_ID)
be_str += key_fmt % tuple(boot_id_data)
return be_str.rstrip(tail) + suffix
[docs] def __str__(self):
"""Format BootEntry as a human-readable string in BLS notation.
Format this BootEntry as a string containing a BLS
configuration snippet.
:returns: a BLS configuration snippet corresponding to this entry.
:returntype: string
"""
return self.__str()
[docs] def __repr__(self):
"""Format BootEntry as a machine-readable string.
Return a machine readable representation of this BootEntry,
in constructor notation.
:returns: A string in BootEntry constructor syntax.
:returntype: str
"""
return self.__str(quote=True, prefix="BootEntry(entry_data={",
suffix="})", tail=", ", sep=": ", bls=False)
[docs] def __len__(self):
"""Return the length (key count) of this ``BootEntry``.
:returns: the ``BootEntry`` length as an integer.
:returntype: ``int``
"""
return len(self._entry_data)
[docs] def __getitem__(self, key):
"""Return an item from this ``BootEntry``.
:returns: the item corresponding to the key requested.
:returntype: the corresponding type of the requested key.
:raises: TypeError if ``key`` is of an invalid type.
KeyError if ``key`` is valid but not present.
"""
if not isinstance(key, str):
raise TypeError("BootEntry key must be a string.")
if key in self._entry_data:
return self._entry_data[key]
if key == BOOT_LINUX:
return self.linux
if key == BOOT_INITRD:
return self.initrd
if key == BOOT_OPTIONS:
return self.options
if key == BOOT_DEVICETREE:
return self.devicetree
if key == BOOT_EFI:
return self.efi
if key == BOOT_ID:
return self.boot_id
if self.bp and key == BOOT_VERSION:
return self.bp.version
raise KeyError("BootEntry key %s not present." % key)
[docs] def __setitem__(self, key, value):
"""Set the specified ``BootEntry`` key to the given value.
:param key: the ``BootEntry`` key to be set.
:param value: the value to set for the specified key.
"""
if not isinstance(key, str):
raise TypeError("BootEntry key must be a string.")
if key == BOOT_VERSION and self.bp:
self.bp.version = value
elif key == BOOT_LINUX and self.bp:
self.linux = value
elif key == BOOT_INITRD and self.bp:
self.initrd = value
elif key == BOOT_OPTIONS and self.bp:
self.options = value
elif key == BOOT_DEVICETREE and self.bp:
self.devicetree = value
elif key == BOOT_EFI and self.bp:
self.efi = value
elif key == BOOT_ID:
raise TypeError("'boot_id' property does not support assignment")
elif key in self._entry_data:
self._entry_data[key] = value
else:
raise KeyError("BootEntry key %s not present." % key)
[docs] def keys(self):
"""Return the list of keys for this ``BootEntry``.
Return a copy of this ``BootEntry``'s keys as a list of
key name strings.
:returns: the current list of ``BotoEntry`` keys.
:returntype: list of str
"""
keys = list(self._entry_data.keys())
add_keys = [BOOT_LINUX, BOOT_INITRD, BOOT_OPTIONS]
# Sort the item list to give stable list ordering on Py3.
keys = sorted(keys, reverse=True)
if self.bp:
add_keys.append(BOOT_VERSION)
for k in add_keys:
if k not in self._entry_data:
keys.append(k)
return keys
[docs] def values(self):
"""Return the list of values for this ``BootEntry``.
Return a copy of this ``BootEntry``'s values as a list.
:returns: the current list of ``BotoEntry`` values.
:returntype: list
"""
values = list(self._entry_data.values())
add_values = [self.linux, self.initrd, self.options]
# Sort the item list to give stable list ordering on Py3.
values = sorted(values, reverse=True)
if self.bp:
add_values.append(self.version)
return values + add_values
[docs] def items(self):
"""Return the items list for this BootEntry.
Return a copy of this ``BootEntry``'s ``(key, value)``
pairs as a list.
:returns: the current list of ``BotoEntry`` items.
:returntype: list of ``(key, value)`` tuples.
"""
items = list(self._entry_data.items())
add_items = [
(BOOT_LINUX, self.linux),
(BOOT_INITRD, self.initrd),
(BOOT_OPTIONS, self.options)
]
if self.bp:
add_items.append((BOOT_VERSION, self.version))
# Sort the item list to give stable list ordering on Py3.
items = sorted(items, key=lambda i:i[0], reverse=True)
return items + add_items
[docs] def _dirty(self):
"""Mark this ``BootEntry`` as needing to be written to disk.
A newly created ``BootEntry`` object is always dirty and
a call to its ``write_entry()`` method will always write
a new boot entry file. Writes may be avoided for entries
that are not marked as dirty.
A clean ``BootEntry`` is marked as dirty if a new value
is written to any of its writable properties.
:returntype: None
"""
self._unwritten = True
def __os_id_from_comment(self, comment):
"""Retrive OsProfile from BootEntry comment.
Attempt to set this BootEntry's OsProfile using a comment
string stored in the entry file. The comment must be of the
form "OsIdentifier: <os_id>". If found the value is treated
as authoritative and a reference to the corresponding
``OsProfile`` is stored in the object's ``_osp`` member.
Any comment lines that do not contain an OsIdentifier tag
are returned as a multi-line string.
:param comment: The comment to attempt to parse
:returns: Comment lines not containing an OsIdentifier
:returntype: str
"""
if "OsIdentifier:" not in comment:
return
outlines = ""
for line in comment.splitlines():
(key, os_id) = line.split(":")
os_id = os_id.strip()
osp = get_os_profile_by_id(os_id)
# An OsIdentifier comment is automatically added to the
# entry when it is written: do not add the read value to
# the comment list.
if not self._osp and osp:
self._osp = osp
_log_debug_entry("Parsed os_id='%s' from comment" %
osp.disp_os_id)
else:
outlines += line + "\n"
return outlines
def __match_os_profile(self):
"""Attempt to find a matching OsProfile for this BootEntry.
Attempt to guess the correct ``OsProfile`` to use with
this ``BootEntry`` by probing each loaded ``OsProfile``
in turn until a profile recognises the entry. If no match
is found the entrie's ``OsProfile`` is set to ``None``.
Probing is only used in the case that a loaded entry has
no embedded OsIdentifier string. All entries written by
Boom include the OsIdentifier value: probing is primarily
useful for entries that have been manually written or
edited.
"""
self._osp = match_os_profile(self)
def __from_data(self, entry_data, boot_params):
"""Initialise a new BootEntry from in-memory data.
Initialise a new ``BootEntry`` object with data from the
dictionary ``entry_data`` (and optionally the supplied
``BootParams`` object). The supplied dictionary should be
indexed by Boom entry key names (``BOOT_*``).
Raises ``ValueError`` if required keys are missing
(``BOOT_TITLE``, and either ``BOOT_LINUX`` or ``BOOT_EFI``).
This method should not be called directly: to build a new
``BootEntry`` object from in-memory data, use the class
initialiser with the ``entry_data`` argument.
:param entry_data: A dictionary mapping Boom boot entry key
names to values
:param boot_params: Optional BootParams to attach to the new
BootEntry object
:returns: None
:returntype: None
:raises: ValueError
"""
if BOOT_TITLE not in entry_data:
raise ValueError("BootEntry missing BOOT_TITLE")
if BOOT_LINUX not in entry_data and BOOT_EFI not in entry_data:
raise ValueError("BootEntry missing BOOT_LINUX or BOOT_EFI")
self._entry_data = {}
for key in [k for k in ENTRY_KEYS if k in entry_data]:
self._entry_data[key] = entry_data[key]
if not self._osp:
self.__match_os_profile()
if boot_params:
self.bp = boot_params
# boot_params is always authoritative
self._entry_data[BOOT_VERSION] = self.bp.version
else:
# Attempt to recover BootParams from entry data
self.bp = BootParams.from_entry(self)
if self.bp:
def _pop_if_set(key):
if key in self._entry_data:
if _entry_data[key] == getattr(self, KEY_MAP[key]):
_entry_data.pop(key)
_entry_data = self._entry_data
self._entry_data = {}
# Clear templated keys from _entry_data and if the value
# read from entry_data is identical to that generated by the
# current OsProfile and BootParams.
_pop_if_set(BOOT_VERSION)
_pop_if_set(BOOT_LINUX)
_pop_if_set(BOOT_INITRD)
_pop_if_set(BOOT_OPTIONS)
self._entry_data = _entry_data
def __from_file(self, entry_file, boot_params):
"""Initialise a new BootEntry from on-disk data.
Initialise a new ``BootEntry`` using the entry data in
``entry_file`` (and optionally the supplied ``BootParams``
object).
Raises ``ValueError`` if required keys are missing
(``BOOT_TITLE``, and either ``BOOT_LINUX`` or ``BOOT_EFI``).
This method should not be called directly: to build a new
``BootEntry`` object from entry file data, use the class
initialiser with the ``entry_file`` argument.
:param entry_file: The path to a file containing a BLS boot
entry
:param boot_params: Optional BootParams to attach to the new
BootEntry object
:returns: None
:returntype: None
:raises: ValueError
"""
entry_data = {}
comments = {}
comment = ""
entry_basename = basename(entry_file)
_log_debug("Loading BootEntry from '%s'" % entry_basename)
with open(entry_file, "r") as ef:
for line in ef:
if _blank_or_comment(line):
comment += line if line else ""
else:
bls_key, value = _parse_name_value(line, separator=None)
# Convert BLS key name to Boom notation
key = _transform_key(bls_key)
if key not in MAP_KEY:
raise LookupError("Unknown BLS key '%s'" % bls_key)
key = MAP_KEY[_transform_key(bls_key)]
entry_data[key] = value
if comment:
comment = self.__os_id_from_comment(comment)
if not comment:
continue
comments[key] = comment
comment = ""
self._comments = comments
self.__from_data(entry_data, boot_params)
match = re.match(BOOT_ENTRIES_PATTERN, entry_basename)
if not match or len(match.groups()) <= 1:
_log_warn("Unknown boot entry file: %s" % entry_basename)
else:
if self.disp_boot_id != match.group(2):
_log_info("Entry file name does not match boot_id: %s" %
entry_basename)
[docs] def __init__(self, title=None, machine_id=None, osprofile=None,
boot_params=None, entry_file=None, entry_data=None,
allow_no_dev=False):
"""Initialise new BootEntry.
Initialise a new ``BootEntry`` object from the specified
file or using the supplied values.
If ``osprofile`` is specified the profile is attached to the
new ``BootEntry`` and will be used to supply templates for
``BootEntry`` values.
A ``BootParams`` object may be supplied using the
``boot_params`` keyword argument. The object will be used to
provide values for subsitution using the patterns defined by
the configured ``OsProfile``.
If ``entry_file`` is specified the ``BootEntry`` will be
initialised from the values found in the file, which should
contain a valid BLS snippet in UTF-8 encoding. The file may
contain blank lines and comments (lines beginning with '#'),
and these will be preserved if the entry is re-written.
If ``entry_file`` is not specified, both ``title`` and
``machine_id`` must be given.
The ``entry_data`` keyword argument is an optional argument
used to initialise a ``BootEntry`` from a dictionary mapping
``BOOT_*`` keys to ``BootEntry`` values. It may be used to
initialised a new ``BootEntry`` using the strings obtained
from a call to ``BootEntry.__repr__()``.
:param title: The title for this ``BootEntry``.
:param machine_id: The ``machine_id`` of this ``BootEntry``.
:param osprofile: An optional ``OsProfile`` to attach to
this ``BootEntry``.
:param boot_params: An optional ``BootParams`` object to
initialise this ``BooyEntry``.
:param entry_file: An optional path to a file in the file
system containing a boot entry in BLS
notation.
:param entry_data: An optional dictionary of ``BootEntry``
key to value mappings to initialise
this ``BootEntry`` from.
:returns: A new ``BootEntry`` object.
:returntype: BootEntry
"""
# An osprofile kwarg always takes precedent over either an
# 'OsIdentifier' comment or a matched osprofile value.
self._osp = osprofile
if entry_data:
return self.__from_data(entry_data, boot_params)
if entry_file:
return self.__from_file(entry_file, boot_params)
self._unwritten = True
if not title or not machine_id:
raise ValueError("BootEntry title and machine_id cannot be None")
self.bp = boot_params
# The BootEntry._entry_data dictionary contains data for an existing
# BootEntry that has been read from disk, as well as any overridden
# fields for a new BootEntry with an OsProfile attached.
self._entry_data = {}
self.title = title
self.machine_id = machine_id
if not self._osp:
self.__match_os_profile()
if self.bp:
if not allow_no_dev:
check_root_device(self.bp.root_device)
[docs] def _apply_format(self, fmt):
"""Apply key format string substitution.
Apply format key substitution to format string ``fmt``,
using values provided by an attached ``BootParams`` object,
and string patterns from either an associated ``OsProfile``
object, or values set directly in this ``BootEntry``.
If the source of data for a key is empty or None, the
string is returned unchanged.
The currently defined format keys are:
* ``%{version}`` The kernel version string.
* ``%{lvm_root_lv}`` The LVM2 logical volume containing the
root file system.
* ``%{btrfs_subvolume}`` The root flags specifying the BTRFS
subvolume containing the root file system.
* ``%{root_device}`` The device containing the root file
system.
* ``%{root_opts}`` The command line options required for the
root file system.
* ``%{linux}`` The linux image to boot
:param fmt: The string to be formatted.
:returns: The formatted string
:returntype: str
"""
orig = fmt
key_format = "%%{%s}"
bp = self.bp
if not fmt:
return ""
version = None
if not bp and self.version:
version = self.version
elif bp:
version = bp.version
key = key_format % FMT_VERSION
if key in fmt and version:
fmt = fmt.replace(key, version)
key = key_format % FMT_LVM_ROOT_OPTS
if key in fmt and self._osp:
value_fmt = self._osp.root_opts_lvm2
value = self._apply_format(value_fmt)
fmt = fmt.replace(key, value)
key = key_format % FMT_LVM_ROOT_LV
if bp and key in fmt and bp.lvm_root_lv:
fmt = fmt.replace(key, bp.lvm_root_lv)
key = key_format % FMT_BTRFS_ROOT_OPTS
if bp and self._osp and key in fmt and bp.has_btrfs():
value_fmt = self._osp.root_opts_btrfs
value = self._apply_format(value_fmt)
fmt = fmt.replace(key, value)
key = key_format % FMT_BTRFS_SUBVOLUME
if bp and key in fmt and bp.has_btrfs():
if bp.btrfs_subvol_id:
subvolume = "subvolid=%s" % bp.btrfs_subvol_id
if bp.btrfs_subvol_path:
subvolume = "subvol=%s" % bp.btrfs_subvol_path
fmt = fmt.replace(key, subvolume)
key = key_format % FMT_ROOT_DEVICE
if bp and key in fmt:
if bp.root_device:
fmt = fmt.replace(key, bp.root_device)
key = key_format % FMT_ROOT_OPTS
if bp and key in fmt:
root_opts = self._apply_format(self.root_opts)
fmt = fmt.replace(key, root_opts)
key = key_format % FMT_KERNEL
if bp and key in fmt:
fmt = fmt.replace(key, self.linux)
key = key_format % FMT_INITRAMFS
if bp and key in fmt:
fmt = fmt.replace(key, self.initrd)
return fmt
def __generate_boot_id(self):
"""Generate a new boot_id value.
Generate a new sha1 profile identifier for this entry,
using the title, version, root_device and any defined
LVM2 or BTRFS snapshot parameters.
:returns: A ``boot_id`` string
:returntype: str
"""
# The default ``str()`` and ``repr()`` behaviour for
# ``BootEntry`` objects includes the ``boot_id`` value. This
# must be disabled in order to generate the ``boot_id`` to
# avoid recursing into __generate_boot_id() from the string
# formatting methods.
#
# Call the underlying ``__str()`` method directly and disable
# the inclusion of the ``boot_id``.
#
# Other callers should always rely on the standard methods.
boot_id = sha1(self.__str(no_boot_id=True).encode('utf-8')).hexdigest()
_log_debug_entry("Generated new boot_id='%s'" % boot_id)
return boot_id
[docs] def _entry_data_property(self, name):
"""Return property value from entry data.
:param name: The boom key name of the property to return
:returns: The property value from the entry data dictionary
"""
if self._entry_data and name in self._entry_data:
return self._entry_data[name]
return None
@property
def disp_boot_id(self):
"""The display boot_id of this entry.
Return the shortest prefix of this BootEntry's boot_id that
is unique within the current set of loaded entries.
:getter: return this BootEntry's boot_id.
:type: str
"""
return self.boot_id[:min_boot_id_width()]
@property
def boot_id(self):
"""A SHA1 digest that uniquely identifies this ``BootEntry``.
:getter: return this ``BootEntry``'s ``boot_id``.
:type: string
"""
if not self.__boot_id or self._unwritten:
self.__boot_id = self.__generate_boot_id()
return self.__boot_id
@property
def root_opts(self):
"""The root options that should be used for this ``BootEntry``.
:getter: Returns the root options string for this ``BootEntry``.
:type: string
"""
if not self._osp or not self.bp:
return ""
bp = self.bp
osp = self._osp
root_opts = "%s%s%s"
lvm_opts = ""
if bp.lvm_root_lv:
lvm_opts = self._apply_format(osp.root_opts_lvm2)
btrfs_opts = ""
if bp.btrfs_subvol_id or bp.btrfs_subvol_path:
btrfs_opts += self._apply_format(osp.root_opts_btrfs)
spacer = " " if lvm_opts and btrfs_opts else ""
return root_opts % (lvm_opts, spacer, btrfs_opts)
@property
def title(self):
"""The title of this ``BootEntry``.
:getter: returns the ``BootEntry`` title.
:setter: sets this ``BootEntry`` object's title.
:type: string
"""
return self._entry_data_property(BOOT_TITLE)
@title.setter
def title(self, title):
self._entry_data[BOOT_TITLE] = title
self._dirty()
@property
def machine_id(self):
"""The machine_id of this ``BootEntry``.
:getter: returns this ``BootEntry`` object's ``machine_id``.
:setter: sets this ``BootEntry`` object's ``machine_id``.
:type: string
"""
return self._entry_data_property(BOOT_MACHINE_ID)
@machine_id.setter
def machine_id(self, machine_id):
self._entry_data[BOOT_MACHINE_ID] = machine_id
self._dirty()
@property
def version(self):
"""The version string associated with this ``BootEntry``.
:getter: returns this ``BootEntry`` object's ``version``.
:setter: sets this ``BootEntry`` object's ``version``.
:type: string
"""
if self.bp and BOOT_VERSION not in self._entry_data:
return self.bp.version
return self._entry_data_property(BOOT_VERSION)
@version.setter
def version(self, version):
self._entry_data[BOOT_VERSION] = version
self._dirty()
@property
def options(self):
"""The command line options for this ``BootEntry``.
:getter: returns the command line for this ``BootEntry``.
:setter: sets the command line for this ``BootEntry``.
:type: string
"""
if not self._osp or BOOT_OPTIONS in self._entry_data:
return self._entry_data_property(BOOT_OPTIONS)
return self._apply_format(self._osp.options)
@options.setter
def options(self, options):
self._entry_data[BOOT_OPTIONS] = options
self._dirty()
@property
def linux(self):
"""The bootable Linux image for this ``BootEntry``.
:getter: returns the configured ``linux`` image.
:setter: sets the configured ``linux`` image.
:type: string
"""
if not self._osp or BOOT_LINUX in self._entry_data:
return self._entry_data_property(BOOT_LINUX)
kernel_path = self._apply_format(self._osp.kernel_pattern)
return kernel_path
@linux.setter
def linux(self, linux):
self._entry_data[BOOT_LINUX] = linux
self._dirty()
@property
def initrd(self):
"""The loadable initramfs image for this ``BootEntry``.
:getter: returns the configured ``initrd`` image.
:getter: sets the configured ``initrd`` image.
:type: string
"""
if not self._osp or BOOT_INITRD in self._entry_data:
return self._entry_data_property(BOOT_INITRD)
initramfs_path = self._apply_format(self._osp.initramfs_pattern)
return initramfs_path
@initrd.setter
def initrd(self, initrd):
self._entry_data[BOOT_INITRD] = initrd
self._dirty()
@property
def efi(self):
"""The loadable EFI image for this ``BootEntry``.
:getter: returns the configured EFI application image.
:getter: sets the configured EFI application image.
:type: string
"""
return self._entry_data_property(BOOT_EFI)
@efi.setter
def efi(self, efi):
self._entry_data[BOOT_EFI] = efi
self._dirty()
@property
def devicetree(self):
"""The devicetree archive for this ``BootEntry``.
:getter: returns the configured device tree archive.
:getter: sets the configured device tree archive.
:type: string
"""
return self._entry_data_property(BOOT_DEVICETREE)
@devicetree.setter
def devicetree(self, devicetree):
self._entry_data[BOOT_DEVICETREE] = devicetree
self._dirty()
@property
def _entry_path(self):
id_tuple = (self.machine_id, self.boot_id[0:7], self.version)
file_name = BOOT_ENTRIES_FORMAT % id_tuple
return path_join(boom_entries_path(), file_name)
[docs] def write_entry(self, force=False):
"""Write out entry to disk.
Write out this ``BootEntry``'s data to a file in BLS
format to the path specified by ``boom_entries_path()``.
The file will be named according to the entry's key values,
and the value of the ``BOOT_ENTRIES_FORMAT`` constant.
Currently the ``machine_id`` and ``version`` keys are used
to contstuct the file name.
If the value of ``force`` is ``False`` and the ``OsProfile``
is not currently marked as dirty (either new, or modified
since the last load operation) the write will be skipped.
:param force: Force this entry to be written to disk even
if the entry is unmodified.
:raises: ``OSError`` if the temporary entry file cannot be
renamed, or if setting file permissions on the
new entry file fails.
:returntype: None
"""
entry_path = self._entry_path
(tmp_fd, tmp_path) = mkstemp(prefix="boom", dir=boom_entries_path())
with fdopen(tmp_fd, "w") as f:
# Our original file descriptor will be closed on exit from the
# fdopen with statement: save a copy so that we can call fdatasync
# once at the end of writing rather than on each loop iteration.
tmp_fd = dup(tmp_fd)
if self._osp:
# Insert OsIdentifier comment at top-of-file
f.write("#OsIdentifier: %s\n" % self._osp.os_id)
for key in [k for k in ENTRY_KEYS if getattr(self, KEY_MAP[k])]:
if self._comments and key in self._comments:
f.write(self._comments[key].rstrip() + '\n')
# Map Boom key names to BLS entry keys
key = KEY_MAP[key]
key_fmt = "%s %s\n"
key_data = (_transform_key(key), getattr(self, key))
f.write(key_fmt % key_data)
f.flush()
try:
fdatasync(tmp_fd)
rename(tmp_path, entry_path)
chmod(entry_path, BOOT_ENTRY_MODE)
except Exception as e:
_log_error("Error writing entry file %s: %s" %
(entry_path, e))
try:
unlink(tmp_path)
except:
pass
raise e
# Add this entry to the list of known on-disk entries
_add_entry(self)
[docs] def delete_entry(self):
"""Remove on-disk BootEntry file.
Remove the on-disk entry corresponding to this ``BootEntry``
object. This will permanently erase the current file
(although the current data may be re-written at any time by
calling ``write_entry()``).
:returntype: ``NoneType``
:raises: ``OsError`` if an error occurs removing the file or
``ValueError`` if the entry does not exist.
"""
if not path_exists(self._entry_path):
raise ValueError("Entry does not exist: %s" % self._entry_path)
try:
unlink(self._entry_path)
except Exception as e:
_log_error("Error removing entry file %s: %s" %
(entry_path, e))
raise
if not self._unwritten:
_del_entry(self)
__all__ = [
# Module constants
'BOOT_ENTRIES_FORMAT',
'BOOT_ENTRY_MODE',
# BootEntry keys
'BOOT_TITLE',
'BOOT_VERSION',
'BOOT_MACHINE_ID',
'BOOT_LINUX',
'BOOT_INITRD',
'BOOT_EFI',
'BOOT_OPTIONS',
'BOOT_DEVICETREE',
# Root device pattern
'DEV_PATTERN',
# Boom root device error class
'BoomRootDeviceError',
# BootParams and BootEntry objects
'BootParams', 'BootEntry',
# Path configuration
'boom_entries_path',
# Entry lookup, load, and write functions
'load_entries', 'write_entries', 'find_entries',
# Formatting
'min_boot_id_width',
# Bootloader integration check
'check_bootloader'
]
# vim: set et ts=4 sw=4 :