Source code for saltext.vcf.utils.sddc

"""
SDDC Manager REST API connection helpers.

Authentication uses Bearer JWT tokens from ``POST /v1/tokens``. The token is
cached per (host, username) pair for the lifetime of the Salt loader session.

Config is read from Salt opts/pillar under ``saltext.vcf.sddc_manager``:

.. code-block:: yaml

    saltext.vcf:
      sddc_manager:
        host: sddc-manager.vcf.nimbus.internal
        username: administrator@vsphere.local
        password: VMware123!VMware123!
        verify_ssl: false
"""

import logging

import requests
import urllib3

log = logging.getLogger(__name__)

_TOKEN_CACHE: dict[str, str] = {}


[docs] def get_config(opts, profile=None): """ Extract SDDC Manager connection config from Salt opts/pillar. Returns a dict with keys: host, username, password, verify_ssl. """ pillar = opts.get("pillar", {}) root = pillar.get("saltext.vcf", {}) or opts.get("saltext.vcf", {}) cfg = root.get("sddc_manager", {}) if profile: cfg = root.get("profiles", {}).get(profile, {}).get("sddc_manager", cfg) return { "host": cfg.get("host") or cfg.get("hostname"), "username": cfg.get("username") or cfg.get("user"), "password": cfg.get("password"), "verify_ssl": cfg.get("verify_ssl", True), }
[docs] def get_token(opts, profile=None): """ Acquire and cache a Bearer JWT from SDDC Manager. Returns the raw token string (without the ``Bearer`` prefix). """ cfg = get_config(opts, profile=profile) host = cfg["host"] username = cfg["username"] password = cfg["password"] verify = cfg["verify_ssl"] if not verify: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) cache_key = f"{host}:{username}" if cache_key in _TOKEN_CACHE: return _TOKEN_CACHE[cache_key] url = f"https://{host}/v1/tokens" resp = requests.post( url, json={"username": username, "password": password}, verify=verify, timeout=30, ) resp.raise_for_status() token = resp.json()["accessToken"] _TOKEN_CACHE[cache_key] = token return token
[docs] def invalidate_token(opts, profile=None): """Remove the cached token, forcing re-authentication on next call.""" cfg = get_config(opts, profile=profile) cache_key = f"{cfg['host']}:{cfg['username']}" _TOKEN_CACHE.pop(cache_key, None)
def _session(opts, profile=None): cfg = get_config(opts, profile=profile) verify = cfg["verify_ssl"] if not verify: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) token = get_token(opts, profile=profile) session = requests.Session() session.verify = verify session.headers.update({"Authorization": f"Bearer {token}"}) return session, cfg["host"]
[docs] def api_get(opts, path, params=None, profile=None): """GET ``/v1/<path>`` from SDDC Manager and return parsed JSON.""" session, host = _session(opts, profile=profile) url = f"https://{host}{path}" resp = session.get(url, params=params, timeout=30) resp.raise_for_status() if resp.content: return resp.json() return {}
[docs] def api_post(opts, path, body=None, profile=None): """POST JSON *body* to SDDC Manager and return parsed JSON.""" session, host = _session(opts, profile=profile) url = f"https://{host}{path}" resp = session.post(url, json=body, timeout=30) resp.raise_for_status() if resp.content: return resp.json() return {}
[docs] def api_patch(opts, path, body=None, profile=None): """PATCH JSON *body* to SDDC Manager and return parsed JSON.""" session, host = _session(opts, profile=profile) url = f"https://{host}{path}" resp = session.patch(url, json=body, timeout=30) resp.raise_for_status() if resp.content: return resp.json() return {}
[docs] def api_put(opts, path, body=None, profile=None): """PUT JSON *body* to SDDC Manager and return parsed JSON.""" session, host = _session(opts, profile=profile) url = f"https://{host}{path}" resp = session.put(url, json=body, timeout=30) resp.raise_for_status() if resp.content: return resp.json() return {}
[docs] def api_delete(opts, path, profile=None): """DELETE a resource from SDDC Manager.""" session, host = _session(opts, profile=profile) url = f"https://{host}{path}" resp = session.delete(url, timeout=30) resp.raise_for_status() return {}