Source code for saltext.vcf.clients.sddc_tasks

"""SDDC Manager async task polling.

Most VCF lifecycle operations (create domain, expand cluster, deploy
edge, etc.) return ``202 Accepted`` with a task identifier. This module
provides the read + poll loop those callers need.

Task statuses observed in VCF 9.x:
- ``IN_PROGRESS``
- ``Pending``
- ``Successful``
- ``Failed``
- ``Cancelled``
"""

import time

import requests

from saltext.vcf.utils import sddc

PATH = "/v1/tasks"
TERMINAL = {"Successful", "Failed", "Cancelled"}


def list_(opts, page=0, page_size=100, profile=None):
    return sddc.api_get(opts, PATH, params={"page": page, "pageSize": page_size}, profile=profile)


def get(opts, task_id, profile=None):
    return sddc.api_get(opts, f"{PATH}/{task_id}", profile=profile)


def get_or_none(opts, task_id, profile=None):
    try:
        return get(opts, task_id, profile=profile)
    except requests.HTTPError as exc:
        if exc.response is not None and exc.response.status_code == 404:
            return None
        raise


[docs] def retry(opts, task_id, profile=None): """Retry a failed task. Returns the task body.""" return sddc.api_patch(opts, f"{PATH}/{task_id}", body={}, profile=profile)
def cancel(opts, task_id, profile=None): return sddc.api_delete(opts, f"{PATH}/{task_id}", profile=profile)
[docs] def wait( opts, task_id, *, timeout=3600, poll_interval=10, profile=None, ): """Block until *task_id* reaches a terminal status. Returns the final task body. Raises :class:`TimeoutError` if the task does not finish within *timeout* seconds. Raises :class:`RuntimeError` on terminal ``Failed`` / ``Cancelled``. """ deadline = time.monotonic() + float(timeout) while True: task = get(opts, task_id, profile=profile) status = task.get("status") if status in TERMINAL: if status != "Successful": msg = _error_message(task) raise RuntimeError(f"task {task_id} ended with status {status!r}: {msg}") return task if time.monotonic() >= deadline: raise TimeoutError(f"task {task_id} still {status!r} after {timeout}s") time.sleep(float(poll_interval))
def _error_message(task): errors = task.get("errors") or [] if errors: msgs = [] for e in errors: if isinstance(e, dict): msgs.append(e.get("message") or e.get("errorCode") or repr(e)) else: msgs.append(str(e)) return "; ".join(msgs) return task.get("status_message") or task.get("description") or ""