Source code for saltext.nebula.runners.nebula

"""
Nebula certificate management runner for Salt master.

This runner handles Nebula VPN certificate generation and management.
It reads minion configuration from pillar, generates certificates using
the nebula-cert binary, and makes them available via the Salt file server
for minion retrieval.

:depends: nebula-cert binary on the Salt master
:configuration: The following options can be set in the master config:

    .. code-block:: yaml

        # Path configuration
        nebula.cert_dir: /etc/nebula/certs
        nebula.ca_key: /etc/nebula/ca/ca.key
        nebula.ca_crt: /etc/nebula/ca/ca.crt
        nebula.salt_cert_dir: /srv/salt/nebula/certs

        # CA configuration
        nebula.ca_name: "My Nebula Network"
        nebula.ca_duration: "87600h"    # 10 years
        nebula.ca_encrypt: true         # Encrypt CA private key
        nebula.ca_passphrase: "secure-passphrase-here"
"""

import logging
import os
import subprocess
import time

try:
    import pty
    import select

    HAS_PTY = True
except ImportError:
    HAS_PTY = False
from datetime import datetime
from pathlib import Path

log = logging.getLogger(__name__)

# Default configuration values
_DEFAULTS = {
    # Path configuration
    "cert_dir": "/etc/nebula/certs",
    "ca_key": "/etc/nebula/ca/ca.key",
    "ca_crt": "/etc/nebula/ca/ca.crt",
    "salt_cert_dir": "/srv/salt/nebula/certs",
    # CA configuration
    "ca_name": "Salt Managed Nebula Network",
    "ca_duration": "87600h",  # 10 years
    "ca_encrypt": False,  # Default to unencrypted until passphrase is configured
    "ca_passphrase": None,
}


def _get_config():
    """
    Get Nebula runner configuration from master config with defaults.

    Configuration options (set in master config):
        nebula.cert_dir
            Directory where generated certificates are stored on the master.
            Default: /etc/nebula/certs

        nebula.ca_key
            Path to the CA private key for signing certificates.
            Default: /etc/nebula/ca/ca.key

        nebula.ca_crt
            Path to the CA certificate.
            Default: /etc/nebula/ca/ca.crt

        nebula.salt_cert_dir
            Directory in Salt file_roots where certificates are copied
            for minion retrieval via cp.get_file.
            Default: /srv/salt/nebula/certs

        nebula.ca_name
            Name for the CA certificate (used during ca_init).
            Default: Salt Managed Nebula Network

        nebula.ca_duration
            Validity duration for the CA certificate.
            Default: 87600h (10 years)

        nebula.ca_encrypt
            Whether to encrypt the CA private key with a passphrase.
            Default: False

        nebula.ca_passphrase
            Passphrase for encrypted CA private key. Required if ca_encrypt
            is True or if the existing CA key is encrypted.
            Default: None
    """
    config = {
        # Paths
        "cert_dir": __opts__.get("nebula.cert_dir", _DEFAULTS["cert_dir"]),
        "ca_key": __opts__.get("nebula.ca_key", _DEFAULTS["ca_key"]),
        "ca_crt": __opts__.get("nebula.ca_crt", _DEFAULTS["ca_crt"]),
        "salt_cert_dir": __opts__.get("nebula.salt_cert_dir", _DEFAULTS["salt_cert_dir"]),
        # CA settings
        "ca_name": __opts__.get("nebula.ca_name", _DEFAULTS["ca_name"]),
        "ca_duration": __opts__.get("nebula.ca_duration", _DEFAULTS["ca_duration"]),
        "ca_encrypt": __opts__.get("nebula.ca_encrypt", _DEFAULTS["ca_encrypt"]),
        "ca_passphrase": __opts__.get("nebula.ca_passphrase", _DEFAULTS["ca_passphrase"]),
    }

    # Warn if encryption is enabled but no passphrase is set
    if config["ca_encrypt"] and not config["ca_passphrase"]:
        log.warning(
            "nebula.ca_encrypt is True but nebula.ca_passphrase is not set. "
            "CA operations requiring encryption will fail."
        )

    return config


def _ensure_cert_directory():
    """Ensure certificate directory exists"""
    config = _get_config()
    Path(config["cert_dir"]).mkdir(parents=True, exist_ok=True, mode=0o750)


def _run_nebula_cert_command(cmd_args, timeout=30):
    """Run nebula-cert command with error handling (non-interactive)"""
    config = _get_config()
    try:
        result = subprocess.run(
            cmd_args,
            input="",
            capture_output=True,
            text=True,
            timeout=timeout,
            cwd=config["cert_dir"],
            check=False,
        )

        if result.returncode != 0:
            raise subprocess.CalledProcessError(
                result.returncode, cmd_args, result.stdout, result.stderr
            )

        return result
    except subprocess.TimeoutExpired as exc:
        raise subprocess.TimeoutExpired(
            cmd_args, timeout, f"Command timed out after {timeout}s"
        ) from exc
    except subprocess.CalledProcessError as exc:
        raise subprocess.CalledProcessError(
            exc.returncode, cmd_args, exc.stdout, exc.stderr
        ) from exc


def _run_nebula_cert_with_pty(cmd_args, passphrase, timeout=30):
    """
    Run nebula-cert command with PTY for interactive passphrase entry.
    """
    if not HAS_PTY:
        raise RuntimeError(
            "PTY support is not available on this platform. "
            "Encrypted CA keys require a Unix-like system."
        )
    master_fd, subordinate_fd = pty.openpty()

    try:
        proc = subprocess.Popen(  # pylint: disable=consider-using-with
            cmd_args,
            stdin=subordinate_fd,
            stdout=subordinate_fd,
            stderr=subordinate_fd,
            close_fds=True,
        )

        os.close(subordinate_fd)
        subordinate_fd = None

        output = b""
        passphrase_sent = 0
        start_time = time.time()

        while proc.poll() is None:
            if time.time() - start_time > timeout:
                proc.kill()
                raise subprocess.TimeoutExpired(cmd_args, timeout)

            readable, _, _ = select.select([master_fd], [], [], 1.0)

            if readable:
                try:
                    data = os.read(master_fd, 1024)
                    if data:
                        output += data
                        log.debug(f"PTY received: {data}")
                        if b"passphrase:" in output.lower() and passphrase_sent < 2:
                            time.sleep(0.1)
                            os.write(master_fd, f"{passphrase}\n".encode())
                            passphrase_sent += 1
                            log.debug(f"Sent passphrase ({passphrase_sent}/2)")
                            output = b""
                except OSError:
                    # PTY closed - process likely exited, this is normal
                    break

        # Ensure we get the return code
        proc.wait()

        # Read any remaining output (ignore errors - PTY may be closed)
        try:
            while True:
                readable, _, _ = select.select([master_fd], [], [], 0.1)
                if not readable:
                    break
                data = os.read(master_fd, 1024)
                if not data:
                    break
                output += data
        except OSError:
            pass  # Expected when PTY is closed

        return proc.returncode, output.decode("utf-8", errors="replace")

    finally:
        os.close(master_fd)
        if subordinate_fd is not None:
            os.close(subordinate_fd)


def _is_ca_key_encrypted(ca_key_path):
    """
    Check if a CA key file is encrypted.

    Args:
        ca_key_path: Path to the CA key file

    Returns:
        bool: True if the key appears to be encrypted
    """
    try:
        key_content = Path(ca_key_path).read_text(encoding="utf-8")
        # Encrypted keys contain "ENCRYPTED" in the PEM header
        return "ENCRYPTED" in key_content
    except Exception:  # pylint: disable=broad-exception-caught
        return False


[docs] def get_certificate(minion_id, auto_generate=True, validate_existing=True, **_kwargs): """ Get or generate a Nebula certificate for a minion. Retrieves the minion's Nebula configuration from pillar and either returns an existing valid certificate or generates a new one. Generated certificates are automatically copied to the Salt file server for minion retrieval. minion_id The minion ID to generate a certificate for. Must have corresponding configuration in pillar under nebula:hosts:<minion_id>. auto_generate Whether to automatically generate a certificate if one doesn't exist or validation fails. Default: True validate_existing Whether to validate existing certificates before returning them. If validation fails and auto_generate is True, a new certificate will be generated. Default: True CLI Example: .. code-block:: bash salt-run nebula.get_certificate minion_id=web01 salt-run nebula.get_certificate minion_id=web01 auto_generate=False salt-run nebula.get_certificate minion_id=web01 validate_existing=False Required pillar structure: .. code-block:: yaml nebula: hosts: web01: ip: "172.25.1.10/20" groups: - webservers - managed duration: "720h" Returns: dict: Dictionary containing: - success: Whether the operation succeeded - cert_content: PEM-encoded certificate (if successful) - key_content: PEM-encoded private key (if successful) - ca_content: PEM-encoded CA certificate (if successful) - ip: Assigned Nebula IP address - groups: List of groups the certificate is valid for - source: 'existing' or 'generated' - error: Error message (if failed) """ config = _get_config() try: log.info(f"Certificate request for minion: {minion_id}") # Ensure certificate directory exists _ensure_cert_directory() # Get minion configuration from pillar # Use the correct runner interface to get pillar data try: log.debug(f"Getting pillar data for {minion_id}") pillar_data = __salt__["pillar.show_pillar"](minion_id) nebula_config = pillar_data.get("nebula", {}) host_config = nebula_config.get("hosts", {}).get(minion_id, {}) log.debug(f"Pillar data retrieved successfully for {minion_id}") except Exception as e: # pylint: disable=broad-exception-caught log.error(f"Failed to get pillar data for {minion_id}: {e}") return {"success": False, "error": f"Failed to get pillar data for {minion_id}: {e}"} if not host_config: return { "success": False, "error": f"No configuration found for minion {minion_id} in pillar data", } # Extract configuration ip = host_config.get("ip") groups = host_config.get("groups", []) subnets = host_config.get("subnets", []) duration = host_config.get("duration", "720h") dns_name = nebula_config.get("dns_name") # global mesh tld cert_name = f"{minion_id}.{dns_name}" if dns_name else minion_id log.info(f"Host config for {minion_id}: ip={ip}, groups={groups}, duration={duration}") if not ip: return {"success": False, "error": f"No IP address configured for minion {minion_id}"} # Certificate paths cert_path = Path(config["cert_dir"]) / f"{minion_id}.crt" key_path = Path(config["cert_dir"]) / f"{minion_id}.key" ca_crt_path = Path(config["ca_crt"]) ca_key_path = Path(config["ca_key"]) # Check if certificates exist and are valid if validate_existing and cert_path.exists() and key_path.exists(): log.info(f"Existing certificate found for {minion_id}, validating...") try: # Validate certificate _run_nebula_cert_command( ["nebula-cert", "verify", "-ca", str(ca_crt_path), "-crt", str(cert_path)] ) log.info(f"Existing certificate for {minion_id} is valid") # Read certificate contents cert_content = cert_path.read_text(encoding="utf-8") key_content = key_path.read_text(encoding="utf-8") ca_content = ca_crt_path.read_text(encoding="utf-8") # Also ensure certificates are available in Salt file server location try: salt_cert_dir = Path(config["salt_cert_dir"]) salt_cert_dir.mkdir(parents=True, exist_ok=True) # Copy CA certificate (salt_cert_dir / "ca.crt").write_text(ca_content, encoding="utf-8") # Copy minion certificate and key (salt_cert_dir / f"{minion_id}.crt").write_text(cert_content, encoding="utf-8") (salt_cert_dir / f"{minion_id}.key").write_text(key_content, encoding="utf-8") # Set proper permissions (salt_cert_dir / "ca.crt").chmod(0o644) (salt_cert_dir / f"{minion_id}.crt").chmod(0o644) (salt_cert_dir / f"{minion_id}.key").chmod(0o600) log.info(f"Ensured certificates available in Salt file server for {minion_id}") except Exception as e: # pylint: disable=broad-exception-caught log.warning(f"Failed to copy existing certificates to Salt file server: {e}") return { "success": True, "cert_content": cert_content, "key_content": key_content, "ca_content": ca_content, "ip": ip, "groups": groups, "subnets": subnets, "duration": duration, "source": "existing", } except Exception as e: # pylint: disable=broad-exception-caught log.warning(f"Existing certificate validation failed: {e}") if not auto_generate: return { "success": False, "error": f"Existing certificate invalid and auto_generate=False: {e}", } # Generate new certificate if needed if auto_generate: log.info(f"Generating new certificate for {minion_id}") # Check if CA key is encrypted ca_encrypted = _is_ca_key_encrypted(ca_key_path) ca_passphrase = config["ca_passphrase"] if ca_encrypted and not ca_passphrase: return { "success": False, "error": ( "CA key is encrypted but no passphrase configured. " "Set nebula.ca_passphrase in master config." ), } try: # Prepare command arguments cmd_args = [ "nebula-cert", "sign", "-ca-crt", str(ca_crt_path), "-ca-key", str(ca_key_path), "-name", cert_name, "-ip", ip, "-duration", duration, "-out-crt", str(cert_path), "-out-key", str(key_path), ] # Add groups if specified if groups: cmd_args.extend(["-groups", ",".join(groups)]) # Add subnets if specified if subnets: cmd_args.extend(["-subnets", ",".join(subnets)]) # Generate certificate - use PTY if CA is encrypted if ca_encrypted: log.info("Using PTY for encrypted CA key signing") returncode, output = _run_nebula_cert_with_pty(cmd_args, ca_passphrase) if returncode != 0: return { "success": False, "error": f"Certificate generation failed: {output}", } else: result = _run_nebula_cert_command(cmd_args) log.debug(f"nebula-cert output: {result.stdout}") log.info(f"Certificate generated successfully for {minion_id}") # Read generated certificate contents cert_content = cert_path.read_text(encoding="utf-8") key_content = key_path.read_text(encoding="utf-8") ca_content = ca_crt_path.read_text(encoding="utf-8") # Also copy certificates to Salt file server location for minion access try: salt_cert_dir = Path(config["salt_cert_dir"]) salt_cert_dir.mkdir(parents=True, exist_ok=True) # Copy CA certificate (salt_cert_dir / "ca.crt").write_text(ca_content, encoding="utf-8") # Copy minion certificate and key (salt_cert_dir / f"{minion_id}.crt").write_text(cert_content, encoding="utf-8") (salt_cert_dir / f"{minion_id}.key").write_text(key_content, encoding="utf-8") # Set proper permissions (salt_cert_dir / "ca.crt").chmod(0o644) (salt_cert_dir / f"{minion_id}.crt").chmod(0o644) (salt_cert_dir / f"{minion_id}.key").chmod(0o600) log.info(f"Copied certificates to Salt file server location for {minion_id}") except Exception as e: # pylint: disable=broad-exception-caught log.warning(f"Failed to copy certificates to Salt file server: {e}") return { "success": True, "cert_content": cert_content, "key_content": key_content, "ca_content": ca_content, "ip": ip, "groups": groups, "subnets": subnets, "duration": duration, "source": "generated", "generated_at": datetime.now().isoformat(), } except subprocess.TimeoutExpired: return {"success": False, "error": "Certificate generation timed out"} except Exception as e: # pylint: disable=broad-exception-caught log.error(f"Certificate generation failed for {minion_id}: {e}") return {"success": False, "error": f"Certificate generation failed: {e}"} return {"success": False, "error": "No valid certificate found and auto_generate=False"} except Exception as e: # pylint: disable=broad-exception-caught log.error(f"Unexpected error in get_certificate: {e}", exc_info=True) return {"success": False, "error": f"Unexpected error: {e}"}
[docs] def list_certificates(): """ List all Nebula certificates managed by this runner. Scans the certificate directory for all .crt files (excluding the CA) and returns information about each certificate. CLI Example: .. code-block:: bash salt-run nebula.list_certificates Returns: dict: Dictionary containing: - success: Whether the operation succeeded - certificates: List of certificate info dictionaries - total: Total number of certificates found - error: Error message (if failed) Each certificate dictionary contains: - minion_id: The minion ID (derived from filename) - cert_path: Full path to the certificate file - key_path: Full path to the corresponding key file - key_exists: Whether the key file exists - cert_size: Size of the certificate file in bytes - modified: ISO format timestamp of last modification """ config = _get_config() try: cert_dir = Path(config["cert_dir"]) certificates = [] if cert_dir.exists(): for cert_file in cert_dir.glob("*.crt"): if cert_file.name != "ca.crt": minion_id = cert_file.stem key_file = cert_dir / f"{minion_id}.key" certificates.append( { "minion_id": minion_id, "cert_path": str(cert_file), "key_path": str(key_file), "key_exists": key_file.exists(), "cert_size": cert_file.stat().st_size, "modified": datetime.fromtimestamp( cert_file.stat().st_mtime ).isoformat(), } ) return {"success": True, "certificates": certificates, "total": len(certificates)} except Exception as e: # pylint: disable=broad-exception-caught return {"success": False, "error": f"Failed to list certificates: {e}"}
[docs] def ca_init(name=None, duration=None, encrypt=None, passphrase=None, force=False): """ Initialize a new Nebula Certificate Authority. Creates a new CA certificate and private key for signing host certificates. This should be run once during initial setup of the Nebula network. name Name for the CA certificate. Defaults to nebula.ca_name from config or "Salt Managed Nebula Network". duration Validity duration for the CA certificate. Defaults to nebula.ca_duration from config or "87600h" (10 years). encrypt Whether to encrypt the CA private key with a passphrase. Defaults to nebula.ca_encrypt from config or False. passphrase Passphrase for encrypting the CA private key. Required if encrypt=True. Defaults to nebula.ca_passphrase from config. force If True, overwrite existing CA files. Default: False. WARNING: This will invalidate all existing certificates! CLI Example: .. code-block:: bash # Basic CA initialization (uses config defaults) salt-run nebula.ca_init # With custom name and duration salt-run nebula.ca_init name="Production Nebula" duration="43800h" # With encryption salt-run nebula.ca_init encrypt=True passphrase="secure-passphrase" # Force regeneration (WARNING: invalidates all certs!) salt-run nebula.ca_init force=True Returns: dict: Dictionary containing: - success: Whether the operation succeeded - ca_crt: Path to the CA certificate - ca_key: Path to the CA private key - name: Name of the CA - duration: Validity duration - encrypted: Whether the key is encrypted - error: Error message (if failed) """ config = _get_config() # Apply defaults from config ca_name = name or config["ca_name"] ca_duration = duration or config["ca_duration"] ca_encrypt = encrypt if encrypt is not None else config["ca_encrypt"] ca_passphrase = passphrase or config["ca_passphrase"] ca_key_path = Path(config["ca_key"]) ca_crt_path = Path(config["ca_crt"]) try: # Check if CA already exists if ca_key_path.exists() or ca_crt_path.exists(): if not force: return { "success": False, "error": ( f"CA already exists at {ca_key_path} and/or {ca_crt_path}. " "Use force=True to overwrite (WARNING: this invalidates all existing certificates!)" ), "ca_key_exists": ca_key_path.exists(), "ca_crt_exists": ca_crt_path.exists(), } else: log.warning( "Force regenerating CA - all existing certificates will be invalidated!" ) # Validate encryption settings if ca_encrypt and not ca_passphrase: return { "success": False, "error": "encrypt=True requires a passphrase. Set passphrase parameter or nebula.ca_passphrase in config.", } # Ensure CA directory exists ca_key_path.parent.mkdir(parents=True, exist_ok=True, mode=0o700) # Build command cmd_args = [ "nebula-cert", "ca", "-name", ca_name, "-duration", ca_duration, "-out-crt", str(ca_crt_path), "-out-key", str(ca_key_path), ] # Add encryption flag if ca_encrypt: cmd_args.append("-encrypt") else: cmd_args.append("-encrypt=false") log.info( f'Initializing Nebula CA: name="{ca_name}", duration={ca_duration}, encrypt={ca_encrypt}' ) # Run the command if ca_encrypt and ca_passphrase: # For encrypted keys, use PTY to handle interactive passphrase entry log.info("Using PTY for encrypted CA key generation") returncode, output = _run_nebula_cert_with_pty(cmd_args, ca_passphrase) if returncode != 0: return { "success": False, "error": f"nebula-cert ca failed: {output}", } else: # Unencrypted - use simple subprocess with empty stdin result = subprocess.run( cmd_args, input="", capture_output=True, text=True, timeout=30, check=False, ) if result.returncode != 0: return { "success": False, "error": f"nebula-cert ca failed: {result.stderr or result.stdout}", } # Set proper permissions ca_key_path.chmod(0o600) ca_crt_path.chmod(0o644) log.info(f"Nebula CA initialized successfully at {ca_crt_path}") # Also copy CA cert to salt file server try: salt_cert_dir = Path(config["salt_cert_dir"]) salt_cert_dir.mkdir(parents=True, exist_ok=True) (salt_cert_dir / "ca.crt").write_text(ca_crt_path.read_text(encoding="utf-8")) (salt_cert_dir / "ca.crt").chmod(0o644) log.info(f"CA certificate copied to Salt file server at {salt_cert_dir}/ca.crt") except Exception as e: # pylint: disable=broad-exception-caught log.warning(f"Failed to copy CA cert to Salt file server: {e}") return { "success": True, "ca_crt": str(ca_crt_path), "ca_key": str(ca_key_path), "name": ca_name, "duration": ca_duration, "encrypted": ca_encrypt, } except subprocess.TimeoutExpired: return {"success": False, "error": "CA initialization timed out"} except Exception as e: # pylint: disable=broad-exception-caught log.error(f"CA initialization failed: {e}") return {"success": False, "error": f"CA initialization failed: {e}"}
[docs] def test_pillar_access(minion_id): """ Test function to debug pillar access for a minion. Useful for troubleshooting when certificate generation fails due to missing or incorrect pillar configuration. minion_id The minion ID to check pillar data for. CLI Example: .. code-block:: bash salt-run nebula.test_pillar_access minion_id=web01 Returns: dict: Dictionary containing: - success: Whether pillar data was retrieved - pillar_data: Complete pillar data for the minion - nebula_config: Just the nebula section of pillar - host_config: Just the host-specific nebula config - error: Error message (if failed) """ try: pillar_data = __salt__["pillar.show_pillar"](minion_id) return { "success": True, "pillar_data": pillar_data, "nebula_config": pillar_data.get("nebula", {}), "host_config": pillar_data.get("nebula", {}).get("hosts", {}).get(minion_id, {}), } except Exception as e: # pylint: disable=broad-exception-caught return {"success": False, "error": str(e)}