"""
Internal dynamic-client wrapper for the saltext-kubernetes extension.
Wraps :py:class:`kubernetes.dynamic.DynamicClient` with the small
helpers the generic-apply / generic-patch / generic-read code paths
need:
* :py:func:`get_dynamic_client` — lazily-cached client per process,
rebuilt when the auth Configuration changes (which happens whenever
``_setup_conn`` runs against a different kubeconfig/host).
* :py:func:`get_resource` — resolves a ``(api_version, kind)`` pair to
a :py:class:`kubernetes.dynamic.Resource`, with a small in-process
cache keyed by ``(group/version, kind)`` to avoid re-running API
discovery on every call.
* :py:func:`apply_manifest` — performs a server-side apply against the
resolved resource, surfacing the field-manager and force-conflicts
knobs that ``kubectl apply --server-side`` exposes.
* :py:func:`patch_object` — generic kind-agnostic patch with selectable
patch type (strategic / RFC 7396 merge / RFC 6902 json-patch).
* :py:func:`get_object`, :py:func:`delete_object`,
:py:func:`list_resource` — generic read/delete/list-by-GVK
counterparts to the typed CRUD wrappers in
:py:mod:`saltext.kubernetes.modules.kubernetesmod`.
This module is **internal**. Public callers should never import from
here — every helper has a public counterpart in the ``kubernetes``
execution module (:py:mod:`saltext.kubernetes.modules.kubernetesmod`)
that adds the user-facing concerns these helpers deliberately omit:
* connection lifecycle (``_setup_conn`` / ``_cleanup`` around each call)
* kwarg marshalling from the Salt loader (kubeconfig, context, cluster
alias, env-var precedence, etc.)
* kind-name inference from the typed kind-registry so callers can pass
``kind="Deployment"`` without spelling out ``api_version="apps/v1"``
* source-file rendering, multi-doc YAML, diff/idempotency tracking,
``test=True`` plumbing on the apply path
In short: this module's functions are **pure GVK plumbing**, and they
assume an already-installed default Configuration on
``kubernetes.client.Configuration``. The public ``kubernetes.*``
execution-module functions are the thin wrappers that bring those
preconditions about.
Public ↔ internal counterparts:
================================ ========================================
``kubernetes.apply`` :py:func:`apply_manifest`
``kubernetes.patch_object`` :py:func:`patch_object`
``kubernetes.get_object`` :py:func:`get_object`
``kubernetes.delete_manifest`` :py:func:`delete_object`
``kubernetes.list_*`` :py:func:`list_resource`
================================ ========================================
.. versionadded:: 2.1.0
"""
from __future__ import annotations
import logging
from typing import Any
from salt.exceptions import CommandExecutionError
# pylint: disable=import-error,no-name-in-module
try:
import kubernetes # pylint: disable=import-self
import kubernetes.client
from kubernetes import dynamic as k8s_dynamic
from kubernetes.client import ApiClient
from kubernetes.client.rest import ApiException
from urllib3.exceptions import HTTPError
HAS_LIBS = True
except ImportError:
HAS_LIBS = False
# pylint: enable=import-error,no-name-in-module
log = logging.getLogger(__name__)
# Module-level caches:
#
# * ``_DYN_CLIENT`` is a single :py:class:`DynamicClient` keyed by the
# identity of the active default ``Configuration``. We rebuild when
# the user switches kubeconfig/host between calls.
#
# * ``_RESOURCE_CACHE`` maps ``(client_id, api_version, kind)`` to the
# resolved :py:class:`Resource`, so repeated ``apply`` calls against
# the same kind don't pay the API-discovery cost twice.
_DYN_CLIENT: dict[int, k8s_dynamic.DynamicClient] = {}
_RESOURCE_CACHE: dict[tuple, k8s_dynamic.Resource] = {}
def _active_config_id() -> int:
"""Identity of the currently-installed default Configuration."""
return id(kubernetes.client.Configuration.get_default_copy())
def _api_client_with_default_config() -> ApiClient:
"""
Build an :py:class:`ApiClient` that picks up the current default
Configuration. We recreate per call rather than caching because the
auth refactor in PR3 installs a new default Configuration each
time ``_setup_conn`` runs.
"""
return ApiClient(configuration=kubernetes.client.Configuration.get_default_copy())
[docs]
def get_dynamic_client() -> k8s_dynamic.DynamicClient:
"""
Return a cached DynamicClient bound to the active default
Configuration. Building the discoverer is expensive (an API
discovery round-trip), so we hold one per Configuration instance.
"""
if not HAS_LIBS:
raise CommandExecutionError("kubernetes Python client not installed")
cfg_id = _active_config_id()
if cfg_id not in _DYN_CLIENT:
_DYN_CLIENT[cfg_id] = k8s_dynamic.DynamicClient(_api_client_with_default_config())
return _DYN_CLIENT[cfg_id]
[docs]
def get_resource(api_version: str, kind: str) -> k8s_dynamic.Resource:
"""
Resolve ``(api_version, kind)`` to a Resource via API discovery.
``api_version`` may be either a bare core version (``"v1"``) or a
group/version (``"apps/v1"``, ``"rbac.authorization.k8s.io/v1"``).
Raises :py:class:`CommandExecutionError` with a clear message when
the GVK isn't known to the cluster (e.g. CRD not installed,
typo'd apiVersion, or the user's RBAC scope can't see the API).
"""
dyn = get_dynamic_client()
cache_key = (id(dyn), api_version, kind)
cached = _RESOURCE_CACHE.get(cache_key)
if cached is not None:
return cached
try:
res = dyn.resources.get(api_version=api_version, kind=kind)
except k8s_dynamic.exceptions.ResourceNotFoundError as exc:
raise CommandExecutionError(
f"Kubernetes API has no resource for apiVersion={api_version!r}, "
f"kind={kind!r}. If this is a CRD, ensure it is installed first."
) from exc
except (ApiException, HTTPError) as exc:
raise CommandExecutionError(exc) from exc
_RESOURCE_CACHE[cache_key] = res
return res
[docs]
def invalidate_caches() -> None:
"""
Drop the dynamic-client and resource caches. Call after creating
a CustomResourceDefinition in the same Salt run as the CR that
uses it; otherwise the discoverer's snapshot won't include the
new GVK.
"""
_DYN_CLIENT.clear()
_RESOURCE_CACHE.clear()
def _resolve_gvk_from_manifest(manifest: dict) -> tuple[str, str]:
"""Pull (apiVersion, kind) out of a manifest dict; surface clear errors."""
if not isinstance(manifest, dict):
raise CommandExecutionError(f"Manifest must be a dictionary, not {type(manifest).__name__}")
api_version = manifest.get("apiVersion")
kind = manifest.get("kind")
if not api_version:
raise CommandExecutionError("Manifest is missing 'apiVersion'")
if not kind:
raise CommandExecutionError("Manifest is missing 'kind'")
return api_version, kind
[docs]
def apply_manifest(
manifest: dict,
field_manager: str = "salt",
force_conflicts: bool = False,
dry_run: bool = False,
) -> dict:
"""
Server-side apply *manifest* and return the applied object.
*manifest* must include ``apiVersion``, ``kind`` and ``metadata.name``.
Namespaced resources must include ``metadata.namespace`` (the typed
CRUD paths default to ``"default"``; we deliberately do not, because
silently scoping a manifest to ``default`` is a footgun).
The HTTP request is a PATCH with
``Content-Type: application/apply-patch+yaml``, ``fieldManager``
set to *field_manager*, and ``force=true`` when *force_conflicts*
is set. ``dry_run=True`` adds ``dryRun=All`` so the API server
validates the manifest and reports the resulting object without
persisting changes.
Raises :py:class:`CommandExecutionError` for both API-side errors
(404, conflicts, validation rejections) and client-side issues
(missing apiVersion/kind, bad GVK).
"""
api_version, kind = _resolve_gvk_from_manifest(manifest)
name = (manifest.get("metadata") or {}).get("name")
if not name:
raise CommandExecutionError("Manifest is missing 'metadata.name'")
namespace = (manifest.get("metadata") or {}).get("namespace")
resource = get_resource(api_version, kind)
if resource.namespaced and not namespace:
raise CommandExecutionError(
f"Namespaced kind {kind} requires 'metadata.namespace'; "
"the apply path does not silently default to 'default'."
)
apply_kwargs: dict[str, Any] = {
"body": manifest,
"name": name,
"field_manager": field_manager,
}
if namespace:
apply_kwargs["namespace"] = namespace
if force_conflicts:
apply_kwargs["force_conflicts"] = True
if dry_run:
# The dynamic client's server_side_apply forwards **kwargs as
# query params; ``dryRun=All`` is the documented value.
apply_kwargs["dry_run"] = "All"
try:
result = resource.server_side_apply(**apply_kwargs)
except (ApiException, HTTPError) as exc:
raise CommandExecutionError(exc) from exc
# ``server_side_apply`` returns a ResourceInstance whose ``.to_dict()``
# produces the same shape ``ApiClient().sanitize_for_serialization``
# produces for the typed paths.
if hasattr(result, "to_dict"):
return result.to_dict()
return ApiClient().sanitize_for_serialization(result)
[docs]
def patch_object(
api_version: str,
kind: str,
name: str,
patch,
namespace: str | None = None,
patch_type: str = "strategic",
field_manager: str | None = None,
dry_run: bool = False,
) -> dict:
"""
Patch an object by GVK with a caller-selected patch type.
Internal plumbing for the public
:py:func:`saltext.kubernetes.modules.kubernetesmod.patch_object`.
That public function is the one users call from Salt; it handles
connection setup, accepts the Salt-loader kwarg conventions, and
can infer ``api_version`` from the typed kind-registry when the
caller omits it. **This** function assumes both are already
resolved — ``api_version`` and ``kind`` must be supplied, and a
default :py:class:`kubernetes.client.Configuration` must already
be installed (as :py:func:`_setup_conn` installs on every call).
``patch_type`` selects the HTTP ``Content-Type`` and, with it, the
semantics of how ``patch`` is interpreted server-side:
* ``"strategic"`` — ``application/strategic-merge-patch+json``
(kubectl's default; works only on built-in kinds with
registered strategic-merge directives).
* ``"merge"`` / ``"json-merge"`` — ``application/merge-patch+json``
(RFC 7396); whole-object replacement at each key. Works on
CRDs and any kind.
* ``"json"`` / ``"json-patch"`` — ``application/json-patch+json``
(RFC 6902); ``patch`` must be a list of operation dicts like
``[{"op": "replace", "path": "/spec/replicas", "value": 5}]``.
Returns the patched object as a dict (same shape as
:py:func:`apply_manifest`).
"""
content_types = {
"strategic": "application/strategic-merge-patch+json",
"merge": "application/merge-patch+json",
"json-merge": "application/merge-patch+json",
"json": "application/json-patch+json",
"json-patch": "application/json-patch+json",
}
if patch_type not in content_types:
raise CommandExecutionError(
f"Unknown patch_type {patch_type!r}. " f"Accepted: {sorted(set(content_types))}"
)
if patch_type in ("json", "json-patch") and not isinstance(patch, list):
raise CommandExecutionError(
"json-patch requires a list of operation dicts "
"(e.g. [{'op': 'replace', 'path': '/spec/replicas', 'value': 5}])"
)
resource = get_resource(api_version, kind)
if resource.namespaced and not namespace:
raise CommandExecutionError(f"Namespaced kind {kind} requires 'namespace'.")
patch_kwargs: dict[str, Any] = {
"name": name,
"body": patch,
"content_type": content_types[patch_type],
}
if namespace:
patch_kwargs["namespace"] = namespace
if field_manager:
patch_kwargs["field_manager"] = field_manager
if dry_run:
patch_kwargs["dry_run"] = "All"
try:
result = resource.patch(**patch_kwargs)
except (ApiException, HTTPError) as exc:
raise CommandExecutionError(exc) from exc
if hasattr(result, "to_dict"):
return result.to_dict()
return ApiClient().sanitize_for_serialization(result)
[docs]
def list_resource(
api_version: str,
kind: str,
namespace: str | None = None,
label_selector: str | None = None,
field_selector: str | None = None,
) -> list[dict]:
"""
Generic list-by-GVK. Returns a list of object dicts.
Used by the ``kubernetes.list_`` execution-module function (PR10).
"""
resource = get_resource(api_version, kind)
list_kwargs: dict[str, Any] = {}
if namespace and resource.namespaced:
list_kwargs["namespace"] = namespace
if label_selector:
list_kwargs["label_selector"] = label_selector
if field_selector:
list_kwargs["field_selector"] = field_selector
try:
result = resource.get(**list_kwargs)
except (ApiException, HTTPError) as exc:
raise CommandExecutionError(exc) from exc
payload = result.to_dict() if hasattr(result, "to_dict") else result
return payload.get("items", []) if isinstance(payload, dict) else []
[docs]
def get_object(
api_version: str,
kind: str,
name: str,
namespace: str | None = None,
) -> dict | None:
"""
Generic read-by-GVK. Returns ``None`` when the object doesn't exist
(matching the existing typed ``show_*`` functions' behaviour).
"""
resource = get_resource(api_version, kind)
if resource.namespaced and not namespace:
raise CommandExecutionError(f"Namespaced kind {kind} requires 'namespace'.")
try:
result = resource.get(name=name, namespace=namespace)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
return result.to_dict() if hasattr(result, "to_dict") else result
[docs]
def delete_object(
api_version: str,
kind: str,
name: str,
namespace: str | None = None,
propagation_policy: str | None = None,
grace_period_seconds: int | None = None,
) -> dict | None:
"""
Generic delete-by-GVK. Returns ``None`` if the object was already
absent (404 swallowed); otherwise returns the API server's
response body.
"""
resource = get_resource(api_version, kind)
if resource.namespaced and not namespace:
raise CommandExecutionError(f"Namespaced kind {kind} requires 'namespace'.")
delete_kwargs: dict[str, Any] = {"name": name}
if namespace:
delete_kwargs["namespace"] = namespace
if propagation_policy is not None:
delete_kwargs["propagation_policy"] = propagation_policy
if grace_period_seconds is not None:
delete_kwargs["grace_period_seconds"] = grace_period_seconds
try:
result = resource.delete(**delete_kwargs)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
return result.to_dict() if hasattr(result, "to_dict") else result