Source code for saltext.nebula.states.nebula

"""
Nebula state module.

Manages Nebula VPN certificates on minions.  All platform-specific logic
lives in the ``nebula`` execution module; these states are thin
orchestration wrappers.

:depends: nebula execution module
"""

import logging
import os
import platform
import shutil
import time
from pathlib import Path

try:
    import grp
    import pwd

    HAS_UNIX_PERMISSIONS = True
except ImportError:
    pwd = None  # pylint: disable=invalid-name
    grp = None  # pylint: disable=invalid-name
    HAS_UNIX_PERMISSIONS = False

log = logging.getLogger(__name__)

__virtualname__ = "nebula"

DEFAULT_RENEWAL_BUFFER_DAYS = 30


[docs] def __virtual__(): """Only load if the nebula execution module is available.""" if "nebula.detect_paths" in __salt__: return __virtualname__ return (False, "nebula execution module not available")
# ============================================================================= # Internal helpers # ============================================================================= def _setup_directories(cert_dir): """Ensure certificate and backup directories exist with proper permissions.""" cert_dir = Path(cert_dir) backup_dir = cert_dir / "backups" if platform.system() == "Windows": cert_dir.mkdir(parents=True, exist_ok=True) backup_dir.mkdir(exist_ok=True) else: cert_dir.mkdir(parents=True, exist_ok=True, mode=0o750) backup_dir.mkdir(exist_ok=True, mode=0o750) def _backup_cert(cert_path): """Back up a single certificate file. Returns the backup path or None.""" cert_path = Path(cert_path) if not cert_path.exists(): return None try: backup_dir = cert_path.parent / "backups" backup_dir.mkdir(exist_ok=True, mode=0o750 if platform.system() != "Windows" else 0o777) backup_name = f"{cert_path.name}.{int(time.time())}" backup_path = backup_dir / backup_name shutil.copy2(cert_path, backup_path) log.info("Backed up %s to %s", cert_path, backup_path) return str(backup_path) except Exception as e: # pylint: disable=broad-exception-caught log.warning("Failed to backup %s: %s", cert_path, e) return None def _set_permissions(file_path, is_private_key=False): """Set file permissions appropriate for the platform.""" file_path = Path(file_path) if platform.system() == "Windows": import subprocess # pylint: disable=import-outside-toplevel args = [ "icacls", str(file_path), "/inheritance:r", "/grant:r", "SYSTEM:(F)", "/grant:r", "Administrators:(F)", ] if not is_private_key: args.extend(["/grant:r", "Users:(R)"]) try: subprocess.run(args, check=True, capture_output=True) except Exception as e: # pylint: disable=broad-exception-caught log.warning("Failed to set Windows permissions for %s: %s", file_path, e) else: file_path.chmod(0o600 if is_private_key else 0o644) def _set_ownership(file_path, user="nebula", group="nebula"): """Set file ownership (Unix only).""" if platform.system() == "Windows" or not HAS_UNIX_PERMISSIONS: return try: uid = pwd.getpwnam(user).pw_uid gid = grp.getgrnam(group).gr_gid os.chown(str(file_path), uid, gid) except (KeyError, OSError) as e: log.warning("Could not set %s:%s on %s: %s", user, group, file_path, e) # ============================================================================= # States # =============================================================================
[docs] def certificates_present( name, minion_id=None, cert_dir=None, force_regenerate=False, auto_renew=True, renewal_threshold_days=DEFAULT_RENEWAL_BUFFER_DAYS, backup_old_certs=True, validate_after_deploy=True, ): """ Ensure Nebula certificates are present and valid. Retrieves certificates from the Salt master file server. Certificates must first be generated on the master with the nebula runner: .. code-block:: bash salt-run nebula.get_certificate minion_id=<id> name Unique state name. minion_id Minion ID for certificates. Defaults to current minion. cert_dir Certificate directory. Auto-detected if omitted. force_regenerate Force retrieval even if certificates are valid. auto_renew Renew certificates approaching expiration. Default: True renewal_threshold_days Days before expiry to trigger renewal. Default: 30 backup_old_certs Back up existing certificates before replacement. Default: True validate_after_deploy Validate the certificate chain after deployment. Default: True Example state: .. code-block:: yaml nebula_certificates: nebula.certificates_present: - auto_renew: true - renewal_threshold_days: 30 """ ret = {"name": name, "changes": {}, "result": True, "comment": ""} if not minion_id: minion_id = __grains__["id"] paths = __salt__["nebula.detect_paths"]() if not cert_dir: cert_dir = paths["cert_dir"] cert_dir = Path(cert_dir) ca_path = cert_dir / "ca.crt" cert_path = cert_dir / f"{minion_id}.crt" key_path = cert_dir / f"{minion_id}.key" try: if __opts__["test"]: ret["comment"] = ( f"Would ensure certificates for {minion_id} " f"({paths['install_method']} at {cert_dir})" ) ret["result"] = None return ret _setup_directories(cert_dir) # --- Determine whether we need new certificates --- need_certs = force_regenerate reason = "" if not need_certs: missing = [str(p) for p in (ca_path, cert_path, key_path) if not p.exists()] if missing: need_certs = True reason = f"Missing: {', '.join(missing)}" elif auto_renew: check = __salt__["nebula.cert_needs_renewal"]( cert_path=str(cert_path), buffer_days=renewal_threshold_days ) if check["needs_renewal"]: need_certs = True reason = check["reason"] if force_regenerate: reason = "Force regeneration requested" if not need_certs: # Report current status expiry = __salt__["nebula.parse_cert_expiry"](cert_path=str(cert_path)) if expiry["success"]: ret["comment"] = ( f"Certificates for {minion_id} are up to date " f"(expires in {expiry['days_until_expiry']} days)" ) else: ret["comment"] = f"Certificates for {minion_id} are present" return ret # --- Retrieve new certificates --- log.info("Requesting certificates for %s: %s", minion_id, reason) changes = {} # Backup existing if backup_old_certs: backups = {} for label, path in (("ca", ca_path), ("cert", cert_path), ("key", key_path)): bp = _backup_cert(path) if bp: backups[label] = bp if backups: changes["backups"] = backups # Fetch from master ca_ok = __salt__["cp.get_file"]("salt://nebula/certs/ca.crt", str(ca_path)) cert_ok = __salt__["cp.get_file"](f"salt://nebula/certs/{minion_id}.crt", str(cert_path)) key_ok = __salt__["cp.get_file"](f"salt://nebula/certs/{minion_id}.key", str(key_path)) if not all((ca_ok, cert_ok, key_ok)): ret["result"] = False ret["comment"] = ( f"Certificate files for {minion_id} not found on master. " f"Run 'salt-run nebula.get_certificate minion_id={minion_id}' first." ) return ret # Set permissions _set_permissions(ca_path) _set_permissions(cert_path) _set_permissions(key_path, is_private_key=True) if platform.system() != "Windows": _set_ownership(ca_path) _set_ownership(cert_path) _set_ownership(key_path) changes["ca_cert"] = "Retrieved from master" changes["host_cert"] = f"Retrieved for {minion_id}" changes["private_key"] = f"Retrieved for {minion_id}" # Validate if validate_after_deploy: validation = __salt__["nebula.validate_certificate"]( cert_path=str(cert_path), ca_path=str(ca_path) ) if validation["valid"]: changes["validation"] = "Passed" else: ret["result"] = False ret["comment"] = f"Validation failed: {validation.get('error', 'Unknown')}" return ret ret["changes"] = changes ret["comment"] = f"Updated certificates for {minion_id}" if reason: ret["comment"] += f" ({reason})" return ret except Exception as e: # pylint: disable=broad-exception-caught ret["result"] = False ret["comment"] = f"Error managing certificates for {minion_id}: {e}" log.error("Nebula certificate state error: %s", e) return ret
[docs] def certificate_info(name, cert_path=None, minion_id=None): """ Display Nebula certificate information. Informational state -- reports status without making changes. name Unique state name. cert_path Path to certificate. Auto-detected if omitted. minion_id Minion ID. Defaults to current minion. Example state: .. code-block:: yaml show_cert_info: nebula.certificate_info: - name: cert_info """ ret = {"name": name, "changes": {}, "result": True, "comment": ""} if not minion_id: minion_id = __grains__["id"] status = __salt__["nebula.check_certificate_status"](cert_path=cert_path) paths = __salt__["nebula.detect_paths"]() lines = [ f"Minion: {minion_id}", f"Platform: {platform.system()}", f"Install method: {paths['install_method']}", f"Binary: {paths['binary_path']}", f"Config dir: {paths['config_dir']}", f"Cert dir: {paths['cert_dir']}", "", f"Certificate exists: {status.get('cert_exists', False)}", f"Private key exists: {status.get('key_exists', False)}", f"CA exists: {status.get('ca_exists', False)}", ] if status.get("cert_exists"): lines.append(f"Valid: {status.get('cert_valid', 'Unknown')}") if "expires_at" in status: days = status["days_until_expiry"] lines.append(f"Expires: {status['expires_at']} ({days} days)") if days <= DEFAULT_RENEWAL_BUFFER_DAYS: lines.append(f"WARNING: Renewal needed ({days} days remaining)") if status.get("ca_exists"): v = __salt__["nebula.validate_certificate"]() lines.append(f"Chain validation: {'Valid' if v['valid'] else 'INVALID'}") if not v["valid"]: lines.append(f" Error: {v.get('error', 'Unknown')}") ret["comment"] = "\n".join(lines) return ret