Source code for saltext.bmc.utils.ipmi

"""
IPMI transport for the ``saltext-bmc`` extension.

Thin wrapper over ``pyghmi.ipmi.command.Command``.  ``pyghmi`` is an
optional dependency — install with ``pip install saltext.bmc[ipmi]``.
The module imports it lazily so the extension still loads in
Redfish-only environments.

This module exposes a uniform interface that mirrors the Redfish one in
:mod:`saltext.bmc.utils.redfish`, taking canonical action / boot-device
names and translating them to IPMI semantics:

============================  ============================
Canonical action               IPMI translation
============================  ============================
``On``                         ``set_power('on')``
``ForceOff``                   ``set_power('off')``
``GracefulShutdown``           ``set_power('shutdown')``
``PowerCycle``                 IPMI chassis control 0x02
``GracefulRestart``            ``set_power('reset')``
``ForceRestart``               ``set_power('reset')``
============================  ============================

============================  ============================
Canonical boot device          pyghmi ``bootdev``
============================  ============================
``disk``                       ``hd``
``pxe``                        ``network``
``bios``                       ``setup``
``cd``                         ``cd``
``none``                       ``default``
``http`` / ``usb``              **not supported** (raises)
============================  ============================
"""

from __future__ import annotations

import logging

log = logging.getLogger(__name__)

DEFAULT_TIMEOUT = 30
DEFAULT_PORT = 623

# Canonical reset names → pyghmi set_power() values.
_RESET_TO_PYGHMI = {
    "On": "on",
    "ForceOff": "off",
    "GracefulShutdown": "shutdown",
    "GracefulRestart": "reset",
    "ForceRestart": "reset",
    # PowerCycle is handled via raw IPMI chassis control (see _do_cycle).
}

# Canonical boot device → pyghmi bootdev value.
_DEVICE_TO_PYGHMI = {
    "disk": "hd",
    "pxe": "network",
    "bios": "setup",
    "cd": "cd",
    "none": "default",
}
_DEVICE_FROM_PYGHMI = {v: k for k, v in _DEVICE_TO_PYGHMI.items()}
_DEVICE_FROM_PYGHMI.update(
    {
        "safe": "disk",  # boot to disk in safe mode → treat as disk
        "diag": "bios",  # diagnostic partition → bios is closest friendly
        "floppy": "cd",  # legacy
    }
)


[docs] class IpmiError(Exception): """An IPMI operation failed."""
[docs] class IpmiAuthError(IpmiError): """Authentication against the IPMI endpoint failed."""
# Candidate key names by normalized output field, in priority order. # pyghmi's FRU info dict (from get_inventory_of_component) uses title-case # keys with spaces ('Product name', 'Serial Number', 'Manufacturer', ...). # Older callers / test fixtures use snake_case ('product_name', ...). _FRU_KEY_ALIASES = { "manufacturer": ["Manufacturer", "product_manufacturer", "manufacturer"], "model": ["Product name", "Model", "product_name", "model", "name"], "serial_number": ["Serial Number", "product_serial", "serial_number"], "uuid": ["UUID", "uuid"], "sku": ["SKU", "product_sku", "sku"], } def _pick(src: dict, candidates: list[str]): """First non-empty value from ``src`` for any of ``candidates`` (case-insensitive fallback).""" for key in candidates: value = src.get(key) if value: return value lower = {str(k).lower(): v for k, v in src.items()} for key in candidates: value = lower.get(key.lower()) if value: return value return None def _select_fru_product(fru) -> dict: """ Return normalized inventory fields from a pyghmi FRU dict. pyghmi's :meth:`get_inventory_of_component` returns a flat dict with title-case keys. Older / nested shapes are still accepted for backwards compatibility with callers feeding us pre-normalized data. """ if not isinstance(fru, dict) or not fru: return {} # Nested 'product' shape (legacy) if isinstance(fru.get("product"), dict): return _normalize_fru_nested(fru) # Dict-of-FRUs keyed by id (legacy) — pick the most informative entry. if not any(k in fru for k in ("Product name", "product_name", "Manufacturer", "manufacturer")): for entry in fru.values(): if isinstance(entry, dict): got = _select_fru_product(entry) if got: return got return {} return _normalize_fru_flat(fru) def _normalize_fru_flat(fru: dict) -> dict: return { "manufacturer": _pick(fru, _FRU_KEY_ALIASES["manufacturer"]), "model": _pick(fru, _FRU_KEY_ALIASES["model"]), "serial_number": _pick(fru, _FRU_KEY_ALIASES["serial_number"]), "uuid": _pick(fru, _FRU_KEY_ALIASES["uuid"]), "sku": _pick(fru, _FRU_KEY_ALIASES["sku"]), } def _normalize_fru_nested(fru: dict) -> dict: product = fru.get("product") or {} return { "manufacturer": _pick(product, _FRU_KEY_ALIASES["manufacturer"]), "model": _pick(product, _FRU_KEY_ALIASES["model"]), "serial_number": _pick(product, _FRU_KEY_ALIASES["serial_number"]), "uuid": _pick(fru, _FRU_KEY_ALIASES["uuid"]) or _pick(product, _FRU_KEY_ALIASES["uuid"]), "sku": _pick(product, _FRU_KEY_ALIASES["sku"]), } def _attr(obj, name, default=None): """Get attribute from object or key from dict.""" if isinstance(obj, dict): return obj.get(name, default) return getattr(obj, name, default) def _load_command_cls(): """Lazy-load ``pyghmi.ipmi.command.Command``. Raises a clear :class:`IpmiError` if ``pyghmi`` is not installed. """ try: from pyghmi.ipmi import command as _command # pylint: disable=import-outside-toplevel except ImportError as exc: # pragma: no cover - exercised via dependency mocking raise IpmiError( "pyghmi is required for the IPMI backend. " "Install with: pip install 'saltext.bmc[ipmi]'" ) from exc return _command.Command def _load_fru_cls(): """Lazy-load ``pyghmi.ipmi.fru.FRU`` for raw-bytes parsing.""" try: from pyghmi.ipmi import fru as _fru # pylint: disable=import-outside-toplevel except ImportError as exc: # pragma: no cover raise IpmiError("pyghmi is required for the IPMI backend.") from exc return _fru.FRU # IPMI completion codes that signal "chunk size too large". pyghmi's # fetch_fru only retries on 0xC9 / 0xCA, which leaves older BMCs that # return 0xC7 ("Request data length invalid") for >16-byte reads broken. # See pyghmi/ipmi/fru.py fetch_fru(). _FRU_CHUNK_TOO_BIG_CODES = (0xC7, 0xC9, 0xCA) # IPMI completion codes that mean "no FRU device" (treat as empty). _FRU_ABSENT_CODES = (0xC1, 0xC3, 0xCB) def _fetch_system_fru(cmd, fruid: int = 0) -> dict: """ Read FRU data via raw IPMI commands and parse via ``pyghmi.ipmi.fru.FRU``. Replaces pyghmi's :meth:`Command.get_inventory_of_component` for the system FRU. Pyghmi's built-in reader starts at a 224-byte chunk size and only backs off on completion codes ``0xC9``/``0xCA``; some legacy BMCs return ``0xC7`` instead, which propagates as an unhandled exception. This implementation backs off on all three codes, so it works on both modern hardware (succeeds at 224 bytes) and older hardware that needs 16 bytes or less. Returns the parsed FRU info dict (the same shape pyghmi would return), or ``{}`` if the BMC does not have FRU ``fruid``. """ info_resp = cmd.raw_command(netfn=0x0A, command=0x10, data=[fruid]) code = info_resp.get("code", 0) if code in _FRU_ABSENT_CODES: return {} if code != 0: raise IpmiError( f"FRU inventory area read failed (code {code:#x}): " f"{info_resp.get('error', 'unknown')}" ) data = info_resp["data"] frusize = data[0] | (data[1] << 8) if frusize == 0: return {} rawfru = bytearray() chunksize = 224 # pyghmi's tested ceiling; we back off on errors offset = 0 while offset < frusize: want = min(chunksize, frusize - offset) resp = cmd.raw_command( netfn=0x0A, command=0x11, data=[fruid, offset & 0xFF, (offset >> 8) & 0xFF, want], ) code = resp.get("code", 0) if code in _FRU_CHUNK_TOO_BIG_CODES: if chunksize <= 3: raise IpmiError( f"FRU read failed at offset {offset}: BMC rejects even minimal chunk size" ) # Same back-off formula pyghmi uses: roughly halve, plus 2 to # avoid the chunksize=3 lower bound on the next iteration. chunksize = max(3, chunksize // 2 + 2) continue if code != 0: raise IpmiError( f"FRU read failed at offset {offset} (code {code:#x}): " f"{resp.get('error', 'unknown')}" ) returned = resp["data"][0] if returned == 0: break rawfru.extend(resp["data"][1 : 1 + returned]) offset += returned fru_cls = _load_fru_cls() return fru_cls(rawdata=rawfru).info or {} def _wrap_pyghmi_errors(method): """Re-raise pyghmi exceptions as IpmiError / IpmiAuthError.""" def wrapper(self, *args, **kwargs): try: return method(self, *args, **kwargs) except IpmiError: raise except Exception as exc: # pylint: disable=broad-except # pyghmi raises various IpmiException subclasses + socket errors. msg = str(exc).lower() if "unauthorized" in msg or "permission" in msg or "credentials" in msg: raise IpmiAuthError(f"IPMI auth failed: {exc}") from exc raise IpmiError(f"IPMI operation failed: {exc}") from exc return wrapper
[docs] class IpmiClient: """ Thin IPMI client wrapping :class:`pyghmi.ipmi.command.Command`. Use as a context manager:: with IpmiClient(host, user, password) as c: state = c.power_status() """ def __init__( self, host: str, username: str, password: str, *, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT, ) -> None: self.host = host self.username = username self.password = password self.port = port self.timeout = timeout self._cmd = None def __enter__(self) -> IpmiClient: cmd_cls = _load_command_cls() try: self._cmd = cmd_cls( bmc=self.host, userid=self.username, password=self.password, port=self.port, ) except Exception as exc: # pylint: disable=broad-except msg = str(exc).lower() if "unauthorized" in msg or "permission" in msg or "credentials" in msg: raise IpmiAuthError(f"IPMI session to {self.host} rejected: {exc}") from exc raise IpmiError(f"Cannot reach IPMI endpoint at {self.host}: {exc}") from exc return self def __exit__(self, exc_type, exc, tb) -> None: # pyghmi.Command holds an internal Session; explicit logout is # exposed as `ipmi_session.logout()`, but pyghmi cleans up on GC. # We drop our reference so the session can close. self._cmd = None # ------------------------------------------------------------------ # Power # ------------------------------------------------------------------ @_wrap_pyghmi_errors def power_status(self) -> str: result = self._cmd.get_power() state = (result or {}).get("powerstate", "") s = state.lower() return s if s in ("on", "off") else "unknown" @_wrap_pyghmi_errors def do_reset(self, reset_type: str) -> dict: if reset_type == "PowerCycle": return self._do_cycle() native = _RESET_TO_PYGHMI.get(reset_type) if native is None: raise IpmiError(f"Unknown reset type {reset_type!r} for IPMI backend.") self._cmd.set_power(native) return {"action": reset_type, "result": True} def _do_cycle(self) -> dict: """Issue an IPMI Chassis Control 0x02 (Power Cycle).""" # netfn 0x00 (chassis), command 0x02 (chassis control), data 0x02 (power cycle) self._cmd.raw_command(netfn=0x00, command=0x02, data=[0x02]) return {"action": "PowerCycle", "result": True} # ------------------------------------------------------------------ # Boot device # ------------------------------------------------------------------ @_wrap_pyghmi_errors def get_boot(self) -> dict: raw = self._cmd.get_bootdev() or {} native = raw.get("bootdev", "default") friendly = _DEVICE_FROM_PYGHMI.get(native, "none") persistent = bool(raw.get("persistent", False)) if native == "default": enabled = "Disabled" else: enabled = "Continuous" if persistent else "Once" return { "device": friendly, "redfish_target": None, # not applicable "native_target": native, "enabled": enabled, } @_wrap_pyghmi_errors def get_system_info(self) -> dict: """ Return a normalised inventory dict (some fields may be ``None``). Reads ``get_inventory_of_component('System')`` for FRU-derived product info, ``get_firmware()`` for the BMC firmware version, and ``get_power()`` for ``power_state``. Each piece is best-effort — BMCs with broken or absent FRU data (common on older hardware) will return ``None`` for the affected fields rather than failing the whole call. """ # We read the FRU ourselves rather than calling pyghmi's # get_inventory_of_component('System'), which is broken on legacy # BMCs that return IPMI completion code 0xC7 for >16-byte chunks. try: sys_fru = _fetch_system_fru(self._cmd) except Exception: # pylint: disable=broad-except sys_fru = {} product = _select_fru_product(sys_fru) # get_firmware() yields (component_name, {'version': str, ...}) tuples. # The generic OEM handler always yields a 'BMC Version' entry; vendor # OEMs may yield BIOS and component firmware too. fw_version = None try: for name, info in self._cmd.get_firmware() or []: if name and "bmc" in name.lower(): fw_version = (info or {}).get("version") break except Exception: # pylint: disable=broad-except pass try: power_raw = (self._cmd.get_power() or {}).get("powerstate", "") except Exception: # pylint: disable=broad-except power_raw = "" power = power_raw.lower() if power_raw.lower() in ("on", "off") else None return { "manufacturer": product.get("manufacturer"), "model": product.get("model"), "serial_number": product.get("serial_number"), "uuid": product.get("uuid"), "sku": product.get("sku"), "host_name": None, # IPMI does not expose this directly "bios_version": None, # IPMI does not expose this directly "firmware_version": fw_version, "power_state": power, } @_wrap_pyghmi_errors def get_sensors(self) -> dict: """ Return ``{"temperatures": [...], "fans": [...], "voltages": [...]}`` bucketed by pyghmi's unit hint. """ temperatures: list[dict] = [] fans: list[dict] = [] voltages: list[dict] = [] for reading in self._cmd.get_sensor_data() or []: unit = _attr(reading, "units", default="") or "" entry = { "name": _attr(reading, "name", default=None), "reading": _attr(reading, "value", default=None), "unit": unit, "status": _attr(reading, "health", default=None) or _attr(reading, "state", default=None), } u = unit.lower() if "celsius" in u or u in ("c", "°c", "degrees c"): entry["unit"] = "C" temperatures.append(entry) elif "rpm" in u: entry["unit"] = "RPM" fans.append(entry) elif "volt" in u or u == "v": entry["unit"] = "V" voltages.append(entry) return {"temperatures": temperatures, "fans": fans, "voltages": voltages} @_wrap_pyghmi_errors def set_boot(self, device: str, persistent: bool = False) -> dict: native = _DEVICE_TO_PYGHMI.get(device) if native is None: raise IpmiError( f"Boot device {device!r} is not supported over IPMI. " "Use the Redfish backend, or pick one of: " f"{', '.join(sorted(_DEVICE_TO_PYGHMI))}." ) # uefiboot is best-effort; most modern systems accept it. self._cmd.set_bootdev(native, persist=persistent, uefiboot=False) enabled = "Continuous" if persistent else "Once" if native == "default": enabled = "Disabled" return { "device": device, "redfish_target": None, "native_target": native, "enabled": enabled, "result": True, }