Source code for saltext.vcf.clients.vcfops_fleet_passwords

"""VCF Operations — fleet password management (VCF 9.x).

The VCF Operations 9.1 ``suite-api`` exposes a fleet-wide password manager
at ``/suite-api/api/fleet-management/password-management``. Each managed
**password account** (vCenter root, NSX admin, ESXi root, SSO admin, …)
is identified by an opaque ``passwordAccountKey``; query results include
expiration metadata (``expiryDate`` as a unix-millisecond timestamp,
``status``: ``ACTIVE`` / ``EXPIRING`` / ``EXPIRED`` / ``UNKNOWN``).

Endpoints used here (all on the VCF Operations host):

* ``POST   /suite-api/api/fleet-management/password-management/accounts/query``
  — paginated list, with optional ``appliance`` / ``applianceFqdn`` /
  ``status`` / ``username`` / ``vcfDomainId`` filters; query params
  ``page``, ``pageSize``, ``sortBy``, ``sortOrder``.
* ``PUT    /suite-api/api/fleet-management/password-management/accounts/{passwordAccountKey}/password``
  — set a new password; returns a ``WorkflowRequest`` describing the
  async credential-rotation job VCF Operations kicked off.

Auth is the same VCF Operations bearer-token surface used by every other
``vcfops_*`` client (see :mod:`saltext.vcf.utils.vcfops`).

This module is the recommended way to administer fleet passwords on VCF
9.x; :mod:`saltext.vcf.clients.fleet_password` (SDDC Manager-backed) is
retained for older deployments but the SDDC password surface is being
deprecated.
"""

from datetime import datetime
from datetime import timezone

from saltext.vcf.utils import vcfops

_BASE = "/suite-api/api/fleet-management/password-management"

DEFAULT_EXPIRY_THRESHOLD_DAYS = 90


def _iso(ms):
    """Convert a unix-ms timestamp to an ISO-8601 ``Z`` string, or ``None``.

    VCF Operations reports ``0`` for accounts that never expire (most
    admin / service accounts); preserve that as ``None`` in enriched
    records so callers can distinguish "no expiry" from "expires now".
    """
    if not ms:
        return None
    return datetime.fromtimestamp(ms / 1000, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _enrich(account):
    """Return a copy of *account* with ``expiryDateIso`` added."""
    out = dict(account)
    out["expiryDateIso"] = _iso(account.get("expiryDate"))
    return out


[docs] def query_accounts( opts, *, appliance=None, appliance_fqdn=None, status=None, username=None, vcf_domain_id=None, page=0, page_size=10, sort_by=None, sort_order=None, profile=None, ): """Raw paginated search; returns the unmodified ``VcfPasswordAccountsResponse``. Most callers want :func:`list_` (walks pagination, enriches with ``expiryDateIso``); this is the low-level handle for clients that need fine-grained control over paging. """ body = {} if appliance is not None: body["appliance"] = appliance if appliance_fqdn is not None: body["applianceFqdn"] = appliance_fqdn if status is not None: body["status"] = status if username is not None: body["username"] = username if vcf_domain_id is not None: body["vcfDomainId"] = vcf_domain_id params = {"page": page, "pageSize": page_size} if sort_by is not None: params["sortBy"] = sort_by if sort_order is not None: params["sortOrder"] = sort_order return vcfops.api_post( opts, f"{_BASE}/accounts/query", body=body, params=params, profile=profile )
[docs] def list_(opts, *, profile=None, page_size=100, **filters): """List every managed password account. Walks pagination and returns ``{"accounts": [...], "totalCount": N}``. Each account dict is enriched with ``expiryDateIso`` (the ISO-8601 rendering of ``expiryDate`` in UTC, or ``None`` when the account never expires). *filters* accepts the same keyword filters as :func:`query_accounts` (``appliance``, ``appliance_fqdn``, ``status``, ``username``, ``vcf_domain_id``). """ accounts = [] total = 0 page = 0 while True: resp = query_accounts(opts, page=page, page_size=page_size, profile=profile, **filters) chunk = resp.get("vcfPasswordAccounts", []) or [] accounts.extend(_enrich(a) for a in chunk) page_info = resp.get("pageInfo", {}) or {} total = page_info.get("totalCount", len(accounts)) if not chunk or len(accounts) >= total: break page += 1 return {"accounts": accounts, "totalCount": total}
[docs] def get_account(opts, password_account_key, profile=None): """Return the single account record matching *password_account_key*, or ``None`` if no account with that key is currently registered. """ for acct in list_(opts, profile=profile)["accounts"]: if acct.get("passwordAccountKey") == password_account_key: return acct return None
[docs] def check_expiry(opts, *, threshold_days=DEFAULT_EXPIRY_THRESHOLD_DAYS, profile=None, **filters): """Categorize accounts into ``ok`` / ``expiring`` / ``noExpiry`` buckets. *threshold_days* — accounts whose ``expiryDate`` is within this many days of "now" land in ``expiring`` (including already-expired accounts, which have a negative ``daysUntilExpiry``). Default 90. Returns:: { "ok": [...], # daysUntilExpiry > threshold_days "expiring": [...], # 0 ... threshold_days (and <0 if expired) "noExpiry": [...], # expiryDate == 0 (e.g. admin accounts) "okCount": int, "expiringCount": int, "noExpiryCount": int, "totalCount": int, "expiryThresholdDays": threshold_days, } Each ``ok`` / ``expiring`` account is augmented with ``daysUntilExpiry`` (float, rounded to 1 decimal). ``noExpiry`` entries are returned as-is. *filters* accepts the same keyword filters as :func:`list_`, so callers can scope the check to a single appliance / fqdn / domain. """ listing = list_(opts, profile=profile, **filters) now_ms = datetime.now(tz=timezone.utc).timestamp() * 1000 threshold_ms = threshold_days * 86400 * 1000 ok = [] expiring = [] no_expiry = [] for acct in listing["accounts"]: exp = acct.get("expiryDate") or 0 if exp == 0: no_expiry.append(acct) continue delta = exp - now_ms bucketed = dict(acct, daysUntilExpiry=round(delta / 86400000, 1)) if delta <= threshold_ms: expiring.append(bucketed) else: ok.append(bucketed) return { "ok": ok, "expiring": expiring, "noExpiry": no_expiry, "okCount": len(ok), "expiringCount": len(expiring), "noExpiryCount": len(no_expiry), "totalCount": listing["totalCount"], "expiryThresholdDays": threshold_days, }
[docs] def update( opts, password_account_key, current_password, new_password, username=None, profile=None, ): """Update the password for *password_account_key*. Returns the ``WorkflowRequest`` dict describing the async rotation job VCF Operations enqueued (``requestId``, ``state``, ``duration``, ``errorCause``, …). The actual rotation may take minutes; poll the ``requestId`` via the workflow API if you need to wait for completion. """ body = {"currentPassword": current_password, "newPassword": new_password} if username is not None: body["userName"] = username return vcfops.api_put( opts, f"{_BASE}/accounts/{password_account_key}/password", body=body, profile=profile )