"""
Kind registry for the saltext-kubernetes extension.
A single source of truth for per-Kubernetes-kind metadata used by the
wait subsystem and (future) the operations modules:
* which kubernetes-client API class hosts the kind's CRUD methods
(e.g. ``CoreV1Api`` for Pod, ``AppsV1Api`` for Deployment)
* the name of the ``list_*`` and ``read_*`` methods on that class
* whether the kind is namespaced or cluster-scoped
* a readiness predicate evaluated against a live API object
Adding a new typed kind is one entry in :py:data:`_KIND_REGISTRY` plus
its public CRUD functions on :py:mod:`...kubernetesmod`. Before this
registry existed the kind→method mapping lived as a duplicated literal
dict in :py:func:`_wait_for_resource_status` (twice — once for the
"deleted" path's ``read_*`` lookup, once for the "created/ready"
path's ``list_*`` lookup).
"""
from collections.abc import Callable
from dataclasses import dataclass
from salt.exceptions import CommandExecutionError
# pylint: disable=import-error,no-name-in-module
try:
import kubernetes.client.rest # noqa: F401 pylint: disable=unused-import
HAS_LIBS = True
except ImportError:
HAS_LIBS = False
# pylint: enable=import-error,no-name-in-module
[docs]
@dataclass(frozen=True)
class KindOps:
"""Per-kind metadata for the wait subsystem."""
api_class_attr: str
"""Attribute name on ``kubernetes.client`` (e.g. ``"AppsV1Api"``)."""
list_method: str
"""Method on the API class used by Watch.stream (e.g. ``"list_namespaced_deployment"``)."""
read_method: str
"""Method on the API class used for existence checks (e.g. ``"read_namespaced_deployment"``)."""
namespaced: bool
"""``False`` for cluster-scoped kinds (Node, StorageClass, Namespace, ...)."""
ready_predicate: Callable[[object], bool]
"""Returns ``True`` when an API object is considered Ready."""
# ---------------------------------------------------------------------------
# Ready predicates. Behaviour is preserved exactly from the previous in-line
# logic in ``_wait_for_resource_status`` so that storageclass / replicaset /
# daemonset wait timing in the kind-cluster fixture does not flake.
# ---------------------------------------------------------------------------
def _always_ready(_obj):
"""Default predicate: any object that exists is considered ready."""
return True
def _deployment_ready(obj):
"""A Deployment is ready when ``available_replicas == spec.replicas``."""
avail = obj.status.available_replicas
spec_replicas = obj.spec.replicas
return bool(avail) and avail == spec_replicas
def _pod_ready(obj):
"""A Pod is ready when phase is Running and every container is ready."""
if obj.status.phase != "Running":
return False
container_statuses = obj.status.container_statuses
if not container_statuses:
return False
return all(cs.ready for cs in container_statuses)
def _service_ready(obj):
"""A Service is ready once the API server has assigned a clusterIP."""
return bool(obj.spec.cluster_ip)
# ---------------------------------------------------------------------------
# User-driven wait predicates: condition= and jsonpath= matching against the
# live API object. Used by ``kubernetes.wait_for`` and the ``wait_for=`` block
# accepted by ``manifest_present`` / typed ``*_present`` states.
# ---------------------------------------------------------------------------
[docs]
def match_condition(obj, condition_type, expected_status="True"):
"""
Match ``obj.status.conditions[?type == condition_type].status``.
Mirrors ``kubectl wait --for=condition=Ready=true`` semantics.
Returns ``True`` when a condition with the given ``type`` is present and
its ``status`` matches ``expected_status`` (case-insensitive).
"""
status = getattr(obj, "status", None)
conditions = getattr(status, "conditions", None) if status is not None else None
if not conditions:
return False
want = str(expected_status).strip().lower()
for cond in conditions:
if getattr(cond, "type", None) == condition_type:
return str(getattr(cond, "status", "")).strip().lower() == want
return False
def _resolve_jsonpath(obj, path):
"""
Resolve a kubectl-style jsonpath against ``obj``.
Subset accepted (matches the most common ``kubectl get -o jsonpath`` forms):
* ``.foo.bar`` — attribute access
* ``.foo.bar[0]`` — list index
* ``.foo.bar[*]`` — list, returns last element (kubectl
semantics for scalar coercion)
* ``{.foo.bar}`` — surrounding braces are stripped
Tries snake_case (Python model attr) then the camelCase form spelled in
the path. Returns ``None`` if any segment is missing.
"""
import re as _re # pylint: disable=import-outside-toplevel
if path.startswith("{") and path.endswith("}"):
path = path[1:-1]
if not path.startswith("."):
return None
current = obj
segments = []
for piece in path[1:].split("."):
if not piece:
continue
bracket = piece.find("[")
if bracket == -1:
segments.append((piece, None))
else:
key = piece[:bracket]
index = piece[bracket + 1 : -1]
segments.append((key, index))
for key, index in segments:
# Try multiple snake-case conversions. The kubernetes-client's
# OpenAPI generator uses an inconsistent convention for acronyms:
# ``clusterIP`` becomes ``cluster_ip`` (smart split) but
# ``clusterIPs`` becomes ``cluster_i_ps`` (naive split). We try
# both so callers can use the natural kubectl-style camelCase
# spelling without knowing which form happens to match.
smart = _re.sub(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])", "_", key).lower()
naive = _re.sub(r"(?<!^)(?=[A-Z])", "_", key).lower()
candidates = []
for cand in (key, smart, naive):
if cand not in candidates:
candidates.append(cand)
resolved = None
for cand in candidates:
if isinstance(current, dict):
if cand in current:
resolved = current[cand]
break
else:
got = getattr(current, cand, None)
if got is not None:
resolved = got
break
current = resolved
if current is None:
return None
if index is not None:
try:
if not isinstance(current, (list, tuple)):
return None
if not current:
return None
if index == "*":
current = current[-1]
else:
current = current[int(index)]
except (ValueError, IndexError):
return None
return current
[docs]
def match_jsonpath(obj, path, value=None, regex=None):
"""
Match ``obj`` against a kubectl-style jsonpath.
Returns ``True`` when:
* ``value`` is given and the resolved value equals ``value``, OR
* ``regex`` is given and the stringified value matches ``re.search``, OR
* neither is given and the resolved value is truthy (existence test).
Returns ``False`` if the path does not resolve.
"""
import re as _re # pylint: disable=import-outside-toplevel
resolved = _resolve_jsonpath(obj, path)
if resolved is None:
return False
if value is not None:
return resolved == value
if regex is not None:
return bool(_re.search(regex, str(resolved)))
return bool(resolved)
[docs]
def build_predicate(condition=None, status="True", jsonpath=None, value=None, regex=None):
"""
Build a predicate callable from user-supplied wait criteria.
Exactly one of ``condition`` or ``jsonpath`` must be set. The returned
callable accepts a live API object and returns a ``bool``.
"""
if condition and jsonpath:
raise CommandExecutionError("wait_for accepts either 'condition' or 'jsonpath', not both")
if condition:
return lambda obj: match_condition(obj, condition, status)
if jsonpath:
return lambda obj: match_jsonpath(obj, jsonpath, value=value, regex=regex)
raise CommandExecutionError("wait_for requires one of 'condition' or 'jsonpath'")
# ---------------------------------------------------------------------------
# The registry. New kinds get an entry here and (in the same PR) their CRUD
# functions on kubernetesmod. The wait subsystem then supports them for free.
# ---------------------------------------------------------------------------
_KIND_REGISTRY: dict[str, KindOps] = {
# Workloads
"deployment": KindOps(
api_class_attr="AppsV1Api",
list_method="list_namespaced_deployment",
read_method="read_namespaced_deployment",
namespaced=True,
ready_predicate=_deployment_ready,
),
"statefulset": KindOps(
api_class_attr="AppsV1Api",
list_method="list_namespaced_stateful_set",
read_method="read_namespaced_stateful_set",
namespaced=True,
ready_predicate=_always_ready,
),
"replicaset": KindOps(
api_class_attr="AppsV1Api",
list_method="list_namespaced_replica_set",
read_method="read_namespaced_replica_set",
namespaced=True,
ready_predicate=_always_ready,
),
"daemonset": KindOps(
api_class_attr="AppsV1Api",
list_method="list_namespaced_daemon_set",
read_method="read_namespaced_daemon_set",
namespaced=True,
ready_predicate=_always_ready,
),
"pod": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespaced_pod",
read_method="read_namespaced_pod",
namespaced=True,
ready_predicate=_pod_ready,
),
# Services & Config
"service": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespaced_service",
read_method="read_namespaced_service",
namespaced=True,
ready_predicate=_service_ready,
),
"secret": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespaced_secret",
read_method="read_namespaced_secret",
namespaced=True,
ready_predicate=_always_ready,
),
"configmap": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespaced_config_map",
read_method="read_namespaced_config_map",
namespaced=True,
ready_predicate=_always_ready,
),
# Cluster-scoped
"namespace": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespace",
read_method="read_namespace",
namespaced=False,
ready_predicate=_always_ready,
),
"storageclass": KindOps(
api_class_attr="StorageV1Api",
list_method="list_storage_class",
read_method="read_storage_class",
namespaced=False,
ready_predicate=_always_ready,
),
# RBAC
"role": KindOps(
api_class_attr="RbacAuthorizationV1Api",
list_method="list_namespaced_role",
read_method="read_namespaced_role",
namespaced=True,
ready_predicate=_always_ready,
),
"role_binding": KindOps(
api_class_attr="RbacAuthorizationV1Api",
list_method="list_namespaced_role_binding",
read_method="read_namespaced_role_binding",
namespaced=True,
ready_predicate=_always_ready,
),
"cluster_role": KindOps(
api_class_attr="RbacAuthorizationV1Api",
list_method="list_cluster_role",
read_method="read_cluster_role",
namespaced=False,
ready_predicate=_always_ready,
),
"cluster_role_binding": KindOps(
api_class_attr="RbacAuthorizationV1Api",
list_method="list_cluster_role_binding",
read_method="read_cluster_role_binding",
namespaced=False,
ready_predicate=_always_ready,
),
"service_account": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespaced_service_account",
read_method="read_namespaced_service_account",
namespaced=True,
ready_predicate=_always_ready,
),
# Batch
"job": KindOps(
api_class_attr="BatchV1Api",
list_method="list_namespaced_job",
read_method="read_namespaced_job",
namespaced=True,
# A Job is "ready" once it exists; completion is a separate
# state checked via _wait_for_job_completion.
ready_predicate=_always_ready,
),
"cron_job": KindOps(
api_class_attr="BatchV1Api",
list_method="list_namespaced_cron_job",
read_method="read_namespaced_cron_job",
namespaced=True,
ready_predicate=_always_ready,
),
# Networking / Autoscaling / Policy
"ingress": KindOps(
api_class_attr="NetworkingV1Api",
list_method="list_namespaced_ingress",
read_method="read_namespaced_ingress",
namespaced=True,
ready_predicate=_always_ready,
),
"horizontal_pod_autoscaler": KindOps(
api_class_attr="AutoscalingV2Api",
list_method="list_namespaced_horizontal_pod_autoscaler",
read_method="read_namespaced_horizontal_pod_autoscaler",
namespaced=True,
ready_predicate=_always_ready,
),
"pod_disruption_budget": KindOps(
api_class_attr="PolicyV1Api",
list_method="list_namespaced_pod_disruption_budget",
read_method="read_namespaced_pod_disruption_budget",
namespaced=True,
ready_predicate=_always_ready,
),
# Persistent volumes
"persistent_volume": KindOps(
api_class_attr="CoreV1Api",
list_method="list_persistent_volume",
read_method="read_persistent_volume",
namespaced=False,
ready_predicate=_always_ready,
),
"persistent_volume_claim": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespaced_persistent_volume_claim",
read_method="read_namespaced_persistent_volume_claim",
namespaced=True,
ready_predicate=_always_ready,
),
# Apply-only kinds: registered so the wait subsystem works for objects
# created via kubernetes.apply / manifest_present, but no typed CRUD
# wrappers — the canonical path for these is the generic apply.
"network_policy": KindOps(
api_class_attr="NetworkingV1Api",
list_method="list_namespaced_network_policy",
read_method="read_namespaced_network_policy",
namespaced=True,
ready_predicate=_always_ready,
),
"resource_quota": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespaced_resource_quota",
read_method="read_namespaced_resource_quota",
namespaced=True,
ready_predicate=_always_ready,
),
"limit_range": KindOps(
api_class_attr="CoreV1Api",
list_method="list_namespaced_limit_range",
read_method="read_namespaced_limit_range",
namespaced=True,
ready_predicate=_always_ready,
),
"priority_class": KindOps(
api_class_attr="SchedulingV1Api",
list_method="list_priority_class",
read_method="read_priority_class",
namespaced=False,
ready_predicate=_always_ready,
),
# Cluster-scoped kinds the resources subsystem needs to enumerate
# but that don't have a typed CRUD wrapper (and don't need one —
# ``Node`` is read/labelled/tainted via dedicated execution-module
# surface; CustomResourceDefinitions are created via the typed
# ``create_custom_resource_definition`` wrapper added in PR #36).
"node": KindOps(
api_class_attr="CoreV1Api",
list_method="list_node",
read_method="read_node",
namespaced=False,
ready_predicate=_always_ready,
),
"custom_resource_definition": KindOps(
api_class_attr="ApiextensionsV1Api",
list_method="list_custom_resource_definition",
read_method="read_custom_resource_definition",
namespaced=False,
ready_predicate=_always_ready,
),
}
[docs]
def get_kind(resource_type: str) -> KindOps:
"""
Return the :py:class:`KindOps` for ``resource_type``.
Raised exception type matches the legacy
``CommandExecutionError("Unsupported resource type for wait operation: ...")``
behaviour so existing callers and tests are not surprised.
"""
try:
return _KIND_REGISTRY[resource_type]
except KeyError as exc:
raise CommandExecutionError(
f"Unsupported resource type for wait operation: {resource_type}"
) from exc