Source code for saltext.kubernetes.utils._kinds

"""
Kind registry for the saltext-kubernetes extension.

A single source of truth for per-Kubernetes-kind metadata used by the
wait subsystem and (future) the operations modules:

* which kubernetes-client API class hosts the kind's CRUD methods
  (e.g. ``CoreV1Api`` for Pod, ``AppsV1Api`` for Deployment)
* the name of the ``list_*`` and ``read_*`` methods on that class
* whether the kind is namespaced or cluster-scoped
* a readiness predicate evaluated against a live API object

Adding a new typed kind is one entry in :py:data:`_KIND_REGISTRY` plus
its public CRUD functions on :py:mod:`...kubernetesmod`. Before this
registry existed the kind→method mapping lived as a duplicated literal
dict in :py:func:`_wait_for_resource_status` (twice — once for the
"deleted" path's ``read_*`` lookup, once for the "created/ready"
path's ``list_*`` lookup).
"""

from collections.abc import Callable
from dataclasses import dataclass

from salt.exceptions import CommandExecutionError

# pylint: disable=import-error,no-name-in-module
try:
    import kubernetes.client.rest  # noqa: F401  pylint: disable=unused-import

    HAS_LIBS = True
except ImportError:
    HAS_LIBS = False
# pylint: enable=import-error,no-name-in-module


[docs] @dataclass(frozen=True) class KindOps: """Per-kind metadata for the wait subsystem.""" api_class_attr: str """Attribute name on ``kubernetes.client`` (e.g. ``"AppsV1Api"``).""" list_method: str """Method on the API class used by Watch.stream (e.g. ``"list_namespaced_deployment"``).""" read_method: str """Method on the API class used for existence checks (e.g. ``"read_namespaced_deployment"``).""" namespaced: bool """``False`` for cluster-scoped kinds (Node, StorageClass, Namespace, ...).""" ready_predicate: Callable[[object], bool] """Returns ``True`` when an API object is considered Ready."""
# --------------------------------------------------------------------------- # Ready predicates. Behaviour is preserved exactly from the previous in-line # logic in ``_wait_for_resource_status`` so that storageclass / replicaset / # daemonset wait timing in the kind-cluster fixture does not flake. # --------------------------------------------------------------------------- def _always_ready(_obj): """Default predicate: any object that exists is considered ready.""" return True def _deployment_ready(obj): """A Deployment is ready when ``available_replicas == spec.replicas``.""" avail = obj.status.available_replicas spec_replicas = obj.spec.replicas return bool(avail) and avail == spec_replicas def _pod_ready(obj): """A Pod is ready when phase is Running and every container is ready.""" if obj.status.phase != "Running": return False container_statuses = obj.status.container_statuses if not container_statuses: return False return all(cs.ready for cs in container_statuses) def _service_ready(obj): """A Service is ready once the API server has assigned a clusterIP.""" return bool(obj.spec.cluster_ip) # --------------------------------------------------------------------------- # User-driven wait predicates: condition= and jsonpath= matching against the # live API object. Used by ``kubernetes.wait_for`` and the ``wait_for=`` block # accepted by ``manifest_present`` / typed ``*_present`` states. # ---------------------------------------------------------------------------
[docs] def match_condition(obj, condition_type, expected_status="True"): """ Match ``obj.status.conditions[?type == condition_type].status``. Mirrors ``kubectl wait --for=condition=Ready=true`` semantics. Returns ``True`` when a condition with the given ``type`` is present and its ``status`` matches ``expected_status`` (case-insensitive). """ status = getattr(obj, "status", None) conditions = getattr(status, "conditions", None) if status is not None else None if not conditions: return False want = str(expected_status).strip().lower() for cond in conditions: if getattr(cond, "type", None) == condition_type: return str(getattr(cond, "status", "")).strip().lower() == want return False
def _resolve_jsonpath(obj, path): """ Resolve a kubectl-style jsonpath against ``obj``. Subset accepted (matches the most common ``kubectl get -o jsonpath`` forms): * ``.foo.bar`` — attribute access * ``.foo.bar[0]`` — list index * ``.foo.bar[*]`` — list, returns last element (kubectl semantics for scalar coercion) * ``{.foo.bar}`` — surrounding braces are stripped Tries snake_case (Python model attr) then the camelCase form spelled in the path. Returns ``None`` if any segment is missing. """ import re as _re # pylint: disable=import-outside-toplevel if path.startswith("{") and path.endswith("}"): path = path[1:-1] if not path.startswith("."): return None current = obj segments = [] for piece in path[1:].split("."): if not piece: continue bracket = piece.find("[") if bracket == -1: segments.append((piece, None)) else: key = piece[:bracket] index = piece[bracket + 1 : -1] segments.append((key, index)) for key, index in segments: # Try multiple snake-case conversions. The kubernetes-client's # OpenAPI generator uses an inconsistent convention for acronyms: # ``clusterIP`` becomes ``cluster_ip`` (smart split) but # ``clusterIPs`` becomes ``cluster_i_ps`` (naive split). We try # both so callers can use the natural kubectl-style camelCase # spelling without knowing which form happens to match. smart = _re.sub(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])", "_", key).lower() naive = _re.sub(r"(?<!^)(?=[A-Z])", "_", key).lower() candidates = [] for cand in (key, smart, naive): if cand not in candidates: candidates.append(cand) resolved = None for cand in candidates: if isinstance(current, dict): if cand in current: resolved = current[cand] break else: got = getattr(current, cand, None) if got is not None: resolved = got break current = resolved if current is None: return None if index is not None: try: if not isinstance(current, (list, tuple)): return None if not current: return None if index == "*": current = current[-1] else: current = current[int(index)] except (ValueError, IndexError): return None return current
[docs] def match_jsonpath(obj, path, value=None, regex=None): """ Match ``obj`` against a kubectl-style jsonpath. Returns ``True`` when: * ``value`` is given and the resolved value equals ``value``, OR * ``regex`` is given and the stringified value matches ``re.search``, OR * neither is given and the resolved value is truthy (existence test). Returns ``False`` if the path does not resolve. """ import re as _re # pylint: disable=import-outside-toplevel resolved = _resolve_jsonpath(obj, path) if resolved is None: return False if value is not None: return resolved == value if regex is not None: return bool(_re.search(regex, str(resolved))) return bool(resolved)
[docs] def build_predicate(condition=None, status="True", jsonpath=None, value=None, regex=None): """ Build a predicate callable from user-supplied wait criteria. Exactly one of ``condition`` or ``jsonpath`` must be set. The returned callable accepts a live API object and returns a ``bool``. """ if condition and jsonpath: raise CommandExecutionError("wait_for accepts either 'condition' or 'jsonpath', not both") if condition: return lambda obj: match_condition(obj, condition, status) if jsonpath: return lambda obj: match_jsonpath(obj, jsonpath, value=value, regex=regex) raise CommandExecutionError("wait_for requires one of 'condition' or 'jsonpath'")
# --------------------------------------------------------------------------- # The registry. New kinds get an entry here and (in the same PR) their CRUD # functions on kubernetesmod. The wait subsystem then supports them for free. # --------------------------------------------------------------------------- _KIND_REGISTRY: dict[str, KindOps] = { # Workloads "deployment": KindOps( api_class_attr="AppsV1Api", list_method="list_namespaced_deployment", read_method="read_namespaced_deployment", namespaced=True, ready_predicate=_deployment_ready, ), "statefulset": KindOps( api_class_attr="AppsV1Api", list_method="list_namespaced_stateful_set", read_method="read_namespaced_stateful_set", namespaced=True, ready_predicate=_always_ready, ), "replicaset": KindOps( api_class_attr="AppsV1Api", list_method="list_namespaced_replica_set", read_method="read_namespaced_replica_set", namespaced=True, ready_predicate=_always_ready, ), "daemonset": KindOps( api_class_attr="AppsV1Api", list_method="list_namespaced_daemon_set", read_method="read_namespaced_daemon_set", namespaced=True, ready_predicate=_always_ready, ), "pod": KindOps( api_class_attr="CoreV1Api", list_method="list_namespaced_pod", read_method="read_namespaced_pod", namespaced=True, ready_predicate=_pod_ready, ), # Services & Config "service": KindOps( api_class_attr="CoreV1Api", list_method="list_namespaced_service", read_method="read_namespaced_service", namespaced=True, ready_predicate=_service_ready, ), "secret": KindOps( api_class_attr="CoreV1Api", list_method="list_namespaced_secret", read_method="read_namespaced_secret", namespaced=True, ready_predicate=_always_ready, ), "configmap": KindOps( api_class_attr="CoreV1Api", list_method="list_namespaced_config_map", read_method="read_namespaced_config_map", namespaced=True, ready_predicate=_always_ready, ), # Cluster-scoped "namespace": KindOps( api_class_attr="CoreV1Api", list_method="list_namespace", read_method="read_namespace", namespaced=False, ready_predicate=_always_ready, ), "storageclass": KindOps( api_class_attr="StorageV1Api", list_method="list_storage_class", read_method="read_storage_class", namespaced=False, ready_predicate=_always_ready, ), # RBAC "role": KindOps( api_class_attr="RbacAuthorizationV1Api", list_method="list_namespaced_role", read_method="read_namespaced_role", namespaced=True, ready_predicate=_always_ready, ), "role_binding": KindOps( api_class_attr="RbacAuthorizationV1Api", list_method="list_namespaced_role_binding", read_method="read_namespaced_role_binding", namespaced=True, ready_predicate=_always_ready, ), "cluster_role": KindOps( api_class_attr="RbacAuthorizationV1Api", list_method="list_cluster_role", read_method="read_cluster_role", namespaced=False, ready_predicate=_always_ready, ), "cluster_role_binding": KindOps( api_class_attr="RbacAuthorizationV1Api", list_method="list_cluster_role_binding", read_method="read_cluster_role_binding", namespaced=False, ready_predicate=_always_ready, ), "service_account": KindOps( api_class_attr="CoreV1Api", list_method="list_namespaced_service_account", read_method="read_namespaced_service_account", namespaced=True, ready_predicate=_always_ready, ), # Batch "job": KindOps( api_class_attr="BatchV1Api", list_method="list_namespaced_job", read_method="read_namespaced_job", namespaced=True, # A Job is "ready" once it exists; completion is a separate # state checked via _wait_for_job_completion. ready_predicate=_always_ready, ), "cron_job": KindOps( api_class_attr="BatchV1Api", list_method="list_namespaced_cron_job", read_method="read_namespaced_cron_job", namespaced=True, ready_predicate=_always_ready, ), # Networking / Autoscaling / Policy "ingress": KindOps( api_class_attr="NetworkingV1Api", list_method="list_namespaced_ingress", read_method="read_namespaced_ingress", namespaced=True, ready_predicate=_always_ready, ), "horizontal_pod_autoscaler": KindOps( api_class_attr="AutoscalingV2Api", list_method="list_namespaced_horizontal_pod_autoscaler", read_method="read_namespaced_horizontal_pod_autoscaler", namespaced=True, ready_predicate=_always_ready, ), "pod_disruption_budget": KindOps( api_class_attr="PolicyV1Api", list_method="list_namespaced_pod_disruption_budget", read_method="read_namespaced_pod_disruption_budget", namespaced=True, ready_predicate=_always_ready, ), # Persistent volumes "persistent_volume": KindOps( api_class_attr="CoreV1Api", list_method="list_persistent_volume", read_method="read_persistent_volume", namespaced=False, ready_predicate=_always_ready, ), "persistent_volume_claim": KindOps( api_class_attr="CoreV1Api", list_method="list_namespaced_persistent_volume_claim", read_method="read_namespaced_persistent_volume_claim", namespaced=True, ready_predicate=_always_ready, ), # Apply-only kinds: registered so the wait subsystem works for objects # created via kubernetes.apply / manifest_present, but no typed CRUD # wrappers — the canonical path for these is the generic apply. "network_policy": KindOps( api_class_attr="NetworkingV1Api", list_method="list_namespaced_network_policy", read_method="read_namespaced_network_policy", namespaced=True, ready_predicate=_always_ready, ), "resource_quota": KindOps( api_class_attr="CoreV1Api", list_method="list_namespaced_resource_quota", read_method="read_namespaced_resource_quota", namespaced=True, ready_predicate=_always_ready, ), "limit_range": KindOps( api_class_attr="CoreV1Api", list_method="list_namespaced_limit_range", read_method="read_namespaced_limit_range", namespaced=True, ready_predicate=_always_ready, ), "priority_class": KindOps( api_class_attr="SchedulingV1Api", list_method="list_priority_class", read_method="read_priority_class", namespaced=False, ready_predicate=_always_ready, ), # Cluster-scoped kinds the resources subsystem needs to enumerate # but that don't have a typed CRUD wrapper (and don't need one — # ``Node`` is read/labelled/tainted via dedicated execution-module # surface; CustomResourceDefinitions are created via the typed # ``create_custom_resource_definition`` wrapper added in PR #36). "node": KindOps( api_class_attr="CoreV1Api", list_method="list_node", read_method="read_node", namespaced=False, ready_predicate=_always_ready, ), "custom_resource_definition": KindOps( api_class_attr="ApiextensionsV1Api", list_method="list_custom_resource_definition", read_method="read_custom_resource_definition", namespaced=False, ready_predicate=_always_ready, ), }
[docs] def get_kind(resource_type: str) -> KindOps: """ Return the :py:class:`KindOps` for ``resource_type``. Raised exception type matches the legacy ``CommandExecutionError("Unsupported resource type for wait operation: ...")`` behaviour so existing callers and tests are not surprised. """ try: return _KIND_REGISTRY[resource_type] except KeyError as exc: raise CommandExecutionError( f"Unsupported resource type for wait operation: {resource_type}" ) from exc