Source code for saltext.kubernetes.modules.kubernetesmod

"""
Module for handling kubernetes calls.

:optdepends:    - kubernetes Python client >= v19.15.0
                - PyYAML >= 5.3.1
:configuration: The k8s API settings are provided either in a pillar, in
    the minion's config file, or in master's config file. The classic
    kubeconfig-based setup looks like::

        kubernetes.kubeconfig: '/path/to/kubeconfig'
        kubernetes.kubeconfig-data: '<base64 encoded kubeconfig content>'
        kubernetes.context: 'context'

    For other auth modes — in-cluster ServiceAccount, bearer token, basic
    auth, or explicit client certificates with optional proxy support — see
    the dedicated :doc:`/topics/auth` guide. All settings can also be
    supplied via ``K8S_AUTH_*`` environment variables (compatible with
    Ansible's ``kubernetes.core`` collection) or as per-call kwargs that
    take precedence over both env and config.

The data format for `kubernetes.kubeconfig-data` value is the content of
`kubeconfig` base64 encoded in one line.

These settings can be overridden by adding `context` and `kubeconfig` or
`kubeconfig_data` parameters when calling a function.

Only `kubeconfig` or `kubeconfig-data` should be provided. In case both are
provided `kubeconfig` entry is preferred.

CLI Example:

.. code-block:: bash

    salt '*' kubernetes.nodes
    salt '*' kubernetes.nodes kubeconfig=/etc/salt/k8s/kubeconfig context=minikube

.. versionadded:: 2017.7.0
.. versionchanged:: 2019.2.0
.. versionchanged:: 2.1.0

    Added in-cluster ServiceAccount, bearer token, basic auth, explicit
    client-certificate, proxy, and ``K8S_AUTH_*`` environment-variable
    auth modes. The legacy kubeconfig path is unchanged and remains the
    default. See :doc:`/topics/auth`.

.. warning::

    Configuration options changed in 2019.2.0. The following configuration options have been removed:

    - kubernetes.user
    - kubernetes.password
    - kubernetes.api_url
    - kubernetes.certificate-authority-data/file
    - kubernetes.client-certificate-data/file
    - kubernetes.client-key-data/file

    These options were re-introduced under different names in 2.1.0 as
    part of the rich-auth work — see the auth guide. The 2019.2.0
    removal warning still stands for the *legacy* names; use the new
    ``kubernetes.host`` / ``kubernetes.api_key`` / ``kubernetes.username``
    / ``kubernetes.client_cert`` / etc. options instead.

"""

import base64
import logging
import sys
import time

import salt.utils.files
import salt.utils.platform
import salt.utils.templates
import salt.utils.yaml
from salt.exceptions import CommandExecutionError

# Re-exports kept on the module surface for backwards compatibility with any
# external code that imported these from ``kubernetesmod`` before the helpers
# were extracted to ``saltext.kubernetes.utils._connection``.
# pylint: disable=unused-import
from saltext.kubernetes.utils._connection import POLLING_TIME_LIMIT  # noqa: F401
from saltext.kubernetes.utils._connection import _cleanup  # noqa: F401
from saltext.kubernetes.utils._connection import _setup_conn as _setup_conn_impl  # noqa: F401

# pylint: enable=unused-import

if not salt.utils.platform.is_windows():
    # pylint: disable=unused-import
    from saltext.kubernetes.utils._connection import _time_limit  # noqa: F401

# pylint: disable=import-error,no-name-in-module
try:
    import kubernetes  # pylint: disable=import-self
    import kubernetes.client
    from kubernetes.client import ApiClient
    from kubernetes.client import V1Deployment
    from kubernetes.client import V1DeploymentSpec
    from kubernetes.client.rest import ApiException
    from kubernetes.watch import Watch
    from urllib3.exceptions import HTTPError

    HAS_LIBS = True
except ImportError:
    HAS_LIBS = False
# pylint: enable=import-error,no-name-in-module

log = logging.getLogger(__name__)

__virtualname__ = "kubernetes"


[docs] def __virtual__(): """ Check dependencies """ if HAS_LIBS: return __virtualname__ return False, "python kubernetes library not found"
def _setup_conn(**kwargs): """ Setup kubernetes API connection singleton. Backwards-compatible shim around :py:func:`saltext.kubernetes.utils._connection._setup_conn`. The signature, kwargs handling, and return shape are preserved so that existing call sites and ``mock.patch("...kubernetesmod._setup_conn")`` paths continue to work. """ return _setup_conn_impl(__salt__["config.option"], **kwargs)
[docs] def ping(**kwargs): """ Checks connection with the kubernetes API server. Returns True if the API is available. CLI Example: .. code-block:: bash salt '*' kubernetes.ping """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.get_api_resources() return bool(api_response and hasattr(api_response, "resources") and api_response.resources) except (ApiException, HTTPError): log.error( "Exception when calling CoreV1Api->get_api_resources", exc_info_on_loglevel=logging.DEBUG, ) return False finally: _cleanup(**cfg)
[docs] def nodes(**kwargs): """ Return the names of the nodes composing the kubernetes cluster CLI Example: .. code-block:: bash salt '*' kubernetes.nodes """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.list_node() return [ k8s_node["metadata"]["name"] for k8s_node in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def node(name, **kwargs): """ Return the details of the node identified by the specified name CLI Example: .. code-block:: bash salt '*' kubernetes.node name='minikube' """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.list_node() except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg) for k8s_node in api_response.items: if k8s_node.metadata.name == name: return ApiClient().sanitize_for_serialization(k8s_node) return None
[docs] def node_labels(name, **kwargs): """ Return the labels of the node identified by the specified name name The name of the node CLI Example: .. code-block:: bash salt '*' kubernetes.node_labels name="minikube" """ match = node(name, **kwargs) if match is not None: return match["metadata"]["labels"] return {}
[docs] def node_add_label(node_name, label_name, label_value, **kwargs): """ Set the value of the label identified by `label_name` to `label_value` on the node identified by the name `node_name`. Creates the label if not present. node_name The name of the node label_name The name of the label label_value The value of the label CLI Example: .. code-block:: bash salt '*' kubernetes.node_add_label node_name="minikube" \ label_name="foo" label_value="bar" """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() # First verify the node exists try: api_instance.read_node(node_name) except ApiException as exc: if exc.status == 404: raise CommandExecutionError(f"Node {node_name} not found") from exc raise body = {"metadata": {"labels": {label_name: label_value}}} api_response = api_instance.patch_node(node_name, body) return api_response except (ApiException, HTTPError) as exc: raise CommandExecutionError(str(exc)) from exc finally: _cleanup(**cfg)
[docs] def node_remove_label(node_name, label_name, **kwargs): """ Removes the label identified by `label_name` from the node identified by the name `node_name`. node_name The name of the node label_name The name of the label CLI Example: .. code-block:: bash salt '*' kubernetes.node_remove_label node_name="minikube" \ label_name="foo" """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() body = {"metadata": {"labels": {label_name: None}}} api_response = api_instance.patch_node(node_name, body) return api_response except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"Node {node_name} not found") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def namespaces(**kwargs): """ Return the names of the available namespaces CLI Example: .. code-block:: bash salt '*' kubernetes.namespaces """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.list_namespace() return [ nms["metadata"]["name"] for nms in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def deployments(namespace="default", **kwargs): """ Return a list of kubernetes deployments defined in the namespace namespace The namespace to list deployments from. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.deployments salt '*' kubernetes.deployments namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.list_namespaced_deployment(namespace) serialized_response = ApiClient().sanitize_for_serialization(api_response) items = serialized_response.get("items") or [] return [dep["metadata"]["name"] for dep in items] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def services(namespace="default", **kwargs): """ Return a list of kubernetes services defined in the namespace namespace The namespace to list services from. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.services salt '*' kubernetes.services namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.list_namespaced_service(namespace) return [ srv["metadata"]["name"] for srv in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def pods(namespace="default", **kwargs): """ Return a list of kubernetes pods defined in the namespace namespace The namespace to list pods from. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.pods salt '*' kubernetes.pods namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.list_namespaced_pod(namespace) return [ pod["metadata"]["name"] for pod in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] # Return empty list for nonexistent namespace raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def secrets(namespace="default", **kwargs): """ Return a list of kubernetes secrets defined in the namespace namespace The namespace to list secrets from. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.secrets salt '*' kubernetes.secrets namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.list_namespaced_secret(namespace) return [ secret["metadata"]["name"] for secret in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def configmaps(namespace="default", **kwargs): """ Return a list of kubernetes configmaps defined in the namespace namespace The namespace to list configmaps from. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.configmaps salt '*' kubernetes.configmaps namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.list_namespaced_config_map(namespace) return [ configmap["metadata"]["name"] for configmap in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] # Return empty list for nonexistent namespace raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def statefulsets(namespace="default", **kwargs): """ .. versionadded:: 2.1.0 Return a list of kubernetes statefulsets defined in the namespace namespace The namespace to list statefulsets from. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.statefulsets salt '*' kubernetes.statefulsets namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.list_namespaced_stateful_set(namespace) return [ statefulset["metadata"]["name"] for statefulset in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] # Return empty list for nonexistent namespace raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def replicasets(namespace="default", **kwargs): """ .. versionadded:: 2.1.0 Return a list of kubernetes replicasets defined in the namespace namespace The namespace to list replicasets from. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.replicasets salt '*' kubernetes.replicasets namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.list_namespaced_replica_set(namespace) return [ replicaset["metadata"]["name"] for replicaset in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def daemonsets(namespace="default", **kwargs): """ .. versionadded:: 2.1.0 Return a list of kubernetes daemonsets defined in the namespace namespace The namespace to list daemonsets from. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.daemonsets salt '*' kubernetes.daemonsets namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.list_namespaced_daemon_set(namespace) return [ daemonset["metadata"]["name"] for daemonset in ApiClient().sanitize_for_serialization(api_response).get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def storageclasses(**kwargs): """ .. versionadded:: 2.1.0 Return a list of kubernetes storageclasses. CLI Example: .. code-block:: bash salt '*' kubernetes.storageclasses """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.StorageV1Api() api_response = api_instance.list_storage_class() return [ storageclass["metadata"]["name"] for storageclass in ApiClient() .sanitize_for_serialization(api_response) .get("items", []) ] except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return [] raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_deployment(name, namespace="default", **kwargs): """ Return the kubernetes deployment defined by name and namespace name The name of the deployment namespace The namespace to look for the deployment. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.show_deployment my-nginx default salt '*' kubernetes.show_deployment name=my-nginx namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.read_namespaced_deployment(name, namespace) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_service(name, namespace="default", **kwargs): """ Return the kubernetes service defined by name and namespace name The name of the service namespace The namespace to look for the service. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.show_service my-nginx default salt '*' kubernetes.show_service name=my-nginx namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.read_namespaced_service(name, namespace) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_pod(name, namespace="default", **kwargs): """ Return POD information for a given pod name defined in the namespace name The name of the pod namespace The namespace to look for the pod. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.show_pod guestbook-708336848-fqr2x salt '*' kubernetes.show_pod guestbook-708336848-fqr2x namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.read_namespaced_pod(name, namespace) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_namespace(name, **kwargs): """ Return information for a given namespace defined by the specified name name The name of the namespace to show CLI Example: .. code-block:: bash salt '*' kubernetes.show_namespace kube-system """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.read_namespace(name) return ApiClient().sanitize_for_serialization(api_response) except ApiException as exc: if exc.status == 404: return None raise CommandExecutionError(exc) from exc except HTTPError as exc: raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_secret(name, namespace="default", decode=False, **kwargs): """ Return the kubernetes secret defined by name and namespace. The secrets can be decoded if specified by the user. Warning: this has security implications. name The name of the secret namespace The namespace to look for the secret. Defaults to ``default``. decode Decode the secret values. Default is False CLI Example: .. code-block:: bash salt '*' kubernetes.show_secret confidential default salt '*' kubernetes.show_secret name=confidential namespace=default salt '*' kubernetes.show_secret name=confidential decode=True """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.read_namespaced_secret(name, namespace) response_dict = ApiClient().sanitize_for_serialization(api_response) if response_dict.get("data") and decode: decoded_data = {} for key, value in response_dict["data"].items(): try: decoded_data[key] = base64.b64decode(value).decode("utf-8") except UnicodeDecodeError: decoded_data[key] = base64.b64decode(value) response_dict["data"] = decoded_data return response_dict except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_configmap(name, namespace="default", **kwargs): """ Return the kubernetes configmap defined by name and namespace. name The name of the configmap namespace The namespace to look for the configmap. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.show_configmap game-config default salt '*' kubernetes.show_configmap name=game-config namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.read_namespaced_config_map(name, namespace) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_statefulset(name, namespace="default", **kwargs): """ .. versionadded:: 2.1.0 Return the kubernetes statefulset defined by name and namespace. name The name of the statefulset namespace The namespace to look for the statefulset. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.show_statefulset my-statefulset default salt '*' kubernetes.show_statefulset name=my-statefulset namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.read_namespaced_stateful_set(name, namespace) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_replicaset(name, namespace="default", **kwargs): """ .. versionadded:: 2.1.0 Return the kubernetes replicaset defined by name and namespace. name The name of the replicaset namespace The namespace to look for the replicaset. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.show_replicaset my-replicaset default salt '*' kubernetes.show_replicaset name=my-replicaset namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.read_namespaced_replica_set(name, namespace) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_daemonset(name, namespace="default", **kwargs): """ .. versionadded:: 2.1.0 Return the kubernetes daemonset defined by name and namespace. name The name of the daemonset namespace The namespace to look for the daemonset. Defaults to ``default``. CLI Example: .. code-block:: bash salt '*' kubernetes.show_daemonset my-daemonset default salt '*' kubernetes.show_daemonset name=my-daemonset namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.read_namespaced_daemon_set(name, namespace) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def show_storageclass(name, **kwargs): """ .. versionadded:: 2.1.0 Return the kubernetes storageclass defined by name. name The name of the storageclass CLI Example: .. code-block:: bash salt '*' kubernetes.show_storageclass my-storageclass salt '*' kubernetes.show_storageclass name=my-storageclass """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.StorageV1Api() api_response = api_instance.read_storage_class(name) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_deployment(name, namespace="default", wait=False, timeout=60, **kwargs): """ Deletes the kubernetes deployment defined by name and namespace name The name of the deployment namespace The namespace to delete the deployment from. Defaults to ``default``. wait .. versionadded:: 2.0.0 Wait for deployment deletion to complete (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_deployment my-nginx default wait=True """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.delete_namespaced_deployment( name=name, namespace=namespace, body=body ) if wait: if not _wait_for_resource_status( api_instance, "deployment", name, namespace, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for deployment {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_service(name, namespace="default", wait=False, timeout=60, **kwargs): """ Deletes the kubernetes service defined by name and namespace name The name of the service namespace The namespace to delete the service from. Defaults to ``default``. wait .. versionadded:: 2.0.0 Wait for service deletion to complete (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_service my-nginx default salt '*' kubernetes.delete_service name=my-nginx namespace=default """ cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.delete_namespaced_service(name=name, namespace=namespace) if wait: if not _wait_for_resource_status( api_instance, "service", name, namespace, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for service {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None else: raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_pod(name, namespace="default", wait=False, timeout=60, **kwargs): """ Deletes the kubernetes pod defined by name and namespace name The name of the pod namespace The namespace to delete the pod from. Defaults to ``default``. wait .. versionadded:: 2.0.0 Wait for pod deletion to complete (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_pod guestbook-708336848-5nl8c default salt '*' kubernetes.delete_pod name=guestbook-708336848-5nl8c namespace=default """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.delete_namespaced_pod(name=name, namespace=namespace, body=body) if wait: if not _wait_for_resource_status( api_instance, "pod", name, namespace, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for pod {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None else: raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_namespace(name, wait=False, timeout=60, **kwargs): """ Deletes the kubernetes namespace defined by name name The name of the namespace wait .. versionadded:: 2.0.0 Wait for namespace deletion to complete (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_namespace salt salt '*' kubernetes.delete_namespace name=salt """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.delete_namespace(name=name, body=body) if wait: if not _wait_for_resource_status( api_instance, "namespace", name, None, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for namespace {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except ApiException as exc: if exc.status == 404: return None if exc.status == 403: raise CommandExecutionError(f"Cannot delete namespace {name}: {exc.reason}") from exc raise CommandExecutionError(exc) from exc except HTTPError as exc: raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_secret(name, namespace="default", wait=False, timeout=60, **kwargs): """ Deletes the kubernetes secret defined by name and namespace name The name of the secret namespace The namespace to delete the secret from. Defaults to ``default``. wait .. versionadded:: 2.0.0 Wait for secret deletion to complete (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_secret confidential default salt '*' kubernetes.delete_secret name=confidential namespace=default """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.delete_namespaced_secret( name=name, namespace=namespace, body=body ) if wait: if not _wait_for_resource_status( api_instance, "secret", name, namespace, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for secret {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_configmap(name, namespace="default", wait=False, timeout=60, **kwargs): """ Deletes the kubernetes configmap defined by name and namespace name The name of the configmap namespace The namespace to delete the configmap from. Defaults to ``default``. wait .. versionadded:: 2.0.0 Wait for configmap deletion to complete (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_configmap settings default salt '*' kubernetes.delete_configmap name=settings namespace=default """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.delete_namespaced_config_map( name=name, namespace=namespace, body=body ) if wait: if not _wait_for_resource_status( api_instance, "configmap", name, namespace, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for configmap {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None else: raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_statefulset(name, namespace="default", wait=False, timeout=60, **kwargs): """ .. versionadded:: 2.1.0 Deletes the kubernetes statefulset defined by name and namespace name The name of the statefulset namespace The namespace to delete the statefulset from. Defaults to ``default``. wait Wait for statefulset deletion to complete (default: False) timeout Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_statefulset my-statefulset default salt '*' kubernetes.delete_statefulset name=my-statefulset namespace=default """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.delete_namespaced_stateful_set( name=name, namespace=namespace, body=body ) if wait: if not _wait_for_resource_status( api_instance, "statefulset", name, namespace, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for statefulset {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_replicaset(name, namespace="default", wait=False, timeout=60, **kwargs): """ .. versionadded:: 2.1.0 Deletes the kubernetes replicaset defined by name and namespace name The name of the replicaset namespace The namespace to delete the replicaset from. Defaults to ``default``. wait Wait for replicaset deletion to complete (default: False) timeout Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_replicaset my-replicaset default salt '*' kubernetes.delete_replicaset name=my-replicaset namespace=default """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.delete_namespaced_replica_set( name=name, namespace=namespace, body=body ) if wait: if not _wait_for_resource_status( api_instance, "replicaset", name, namespace, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for replicaset {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_daemonset(name, namespace="default", wait=False, timeout=60, **kwargs): """ .. versionadded:: 2.1.0 Deletes the kubernetes daemonset defined by name and namespace name The name of the daemonset namespace The namespace to delete the daemonset from. Defaults to ``default``. wait Wait for daemonset deletion to complete (default: False) timeout Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_daemonset my-daemonset default salt '*' kubernetes.delete_daemonset name=my-daemonset namespace=default """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.delete_namespaced_daemon_set( name=name, namespace=namespace, body=body ) if wait: if not _wait_for_resource_status( api_instance, "daemonset", name, namespace, "deleted", timeout ): raise CommandExecutionError(f"Timeout waiting for daemonset {name} to be deleted") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def delete_storageclass(name, wait=False, timeout=60, **kwargs): """ .. versionadded:: 2.1.0 Deletes the kubernetes storageclass defined by name name The name of the storageclass wait Wait for storageclass deletion to complete (default: False) timeout Timeout in seconds to wait for deletion (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.delete_storageclass my-storageclass salt '*' kubernetes.delete_storageclass name=my-storageclass """ cfg = _setup_conn(**kwargs) body = kubernetes.client.V1DeleteOptions(orphan_dependents=True) try: api_instance = kubernetes.client.StorageV1Api() api_response = api_instance.delete_storage_class(name=name, body=body) if wait: if not _wait_for_resource_status( api_instance, "storageclass", name, None, "deleted", timeout ): raise CommandExecutionError( f"Timeout waiting for storageclass {name} to be deleted" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: return None raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_deployment( name, namespace, metadata, spec, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ Creates the kubernetes deployment as defined by the user. name The name of the deployment namespace The namespace to create the deployment in metadata Deployment metadata dict spec Deployment spec dict following kubernetes API conventions source File path to deployment definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files dry_run .. versionadded:: 2.0.0 If True, only simulates the creation of the deployment wait .. versionadded:: 2.0.0 Wait for deployment to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for deployment (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.create_deployment name=nginx namespace=default spec='{"replicas": 1}' wait=True """ body = __create_object_body( kind="Deployment", obj_class=V1Deployment, spec_creator=__dict_to_deployment_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.create_namespaced_deployment( namespace, body, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "deployment", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for deployment {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 404: raise CommandExecutionError(f"Deployment {namespace}/{name} not found") from exc if exc.status == 409: raise CommandExecutionError(f"Deployment {name} already exists") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_pod( name, namespace, metadata, spec, source=None, template=None, saltenv=None, template_context=None, wait=False, timeout=60, **kwargs, ): """ Creates a kubernetes pod as defined by the user. name The name of the pod namespace The namespace to create the pod in metadata Pod metadata dict spec Pod spec dict following kubernetes API conventions source File path to pod definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files wait .. versionadded:: 2.0.0 Wait for pod to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for pod (default: 60) Pod spec must follow kubernetes API conventions: .. code-block:: yaml - spec: ports: - containerPort: 8080 name: http protocol: TCP CLI Examples: .. code-block:: bash salt '*' kubernetes.create_pod name=nginx namespace=default spec='{"containers": [{"name": "nginx", "image": "nginx"}]}' """ body = __create_object_body( kind="Pod", obj_class=kubernetes.client.V1Pod, spec_creator=__dict_to_pod_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.create_namespaced_pod(namespace, body) if wait: if not _wait_for_resource_status( api_instance, "pod", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for pod {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 404: raise CommandExecutionError(f"Pod {namespace}/{name} not found") from exc if exc.status == 409: raise CommandExecutionError(f"Pod {name} already exists") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_service( name, namespace, metadata, spec, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ Creates the kubernetes service as defined by the user. name The name of the service namespace The namespace to create the service in metadata Service metadata dict spec Service spec dict that follows kubernetes API conventions source File path to service definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files wait .. versionadded:: 2.0.0 Wait for service to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for service (default: 60) Service spec must follow kubernetes API conventions. Port specifications can be: Simple integer for basic port definition: ``[80, 443]`` Dictionary for advanced configuration: .. code-block:: yaml - spec: ports: - port: 80 targetPort: 8080 name: http # Required if multiple ports are specified - port: 443 targetPort: web-https # targetPort can reference container port names name: https nodePort: 30443 # nodePort must be between 30000-32767 CLI Examples: .. code-block:: bash salt '*' kubernetes.create_service name=nginx namespace=default spec='{"ports": [80]}' salt '*' kubernetes.create_service name=nginx namespace=default spec='{ "ports": [{"port": 80, "targetPort": 8000, "name": "http"}], "selector": {"app": "nginx"}, "type": "LoadBalancer" }' """ body = __create_object_body( kind="Service", obj_class=kubernetes.client.V1Service, spec_creator=__dict_to_service_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.create_namespaced_service( namespace, body, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "service", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for service {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 404: raise CommandExecutionError(f"Service {namespace}/{name} not found") from exc if exc.status == 409: raise CommandExecutionError(f"Service {name} already exists") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_secret( name, namespace="default", data=None, source=None, template=None, saltenv=None, template_context=None, secret_type=None, metadata=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ Creates the kubernetes secret as defined by the user. Values that are already base64 encoded will not be re-encoded. .. note:: Automatic encoding of secret values might cause issues if the values are not correctly identified as base64. If you run into issues - encode the values before passing them to this function. name The name of the secret namespace The namespace to create the secret in. Defaults to ``default``. data A dictionary of key-value pairs to store in the secret source File path to secret definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files secret_type .. versionadded:: 2.0.0 The type of the secret metadata .. versionadded:: 2.0.0 Secret metadata dict wait .. versionadded:: 2.0.0 Wait for secret to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for secret (default: 60) CLI Example: .. code-block:: bash # For regular secrets with plain text values salt 'minion1' kubernetes.create_secret \ passwords default '{"db": "letmein"}' # For secrets with pre-encoded values salt 'minion2' kubernetes.create_secret \ name=passwords namespace=default data='{"db": "bGV0bWVpbg=="}' # For docker registry secrets salt 'minion3' kubernetes.create_secret \ name=docker-registry \ type=kubernetes.io/dockerconfigjson \ data='{".dockerconfigjson": "{\"auths\":{...}}"}' # For TLS secrets salt 'minion4' kubernetes.create_secret \ name=tls-secret \ type=kubernetes.io/tls \ data='{"tls.crt": "...", "tls.key": "..."}' """ cfg = _setup_conn(**kwargs) if source: src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(src_obj, dict): raise CommandExecutionError("`source` did not render to a dictionary") if "data" in src_obj: data = src_obj["data"] secret_type = src_obj.get("secret_type") elif data is None: data = {} data = __enforce_only_strings_dict(data) # Encode the secrets using base64 if not already encoded encoded_data = {} for key, value in data.items(): if __is_base64(value): encoded_data[key] = value else: encoded_data[key] = base64.b64encode(str(value).encode("utf-8")).decode("utf-8") body = kubernetes.client.V1Secret( metadata=__dict_to_object_meta(name, namespace, metadata), data=encoded_data, type=secret_type, ) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.create_namespaced_secret( namespace, body, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "secret", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for secret {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 409: raise CommandExecutionError( f"Secret {name} already exists in namespace {namespace}. Use replace_secret to update it." ) from exc if exc.status == 404: raise CommandExecutionError(f"Secret {namespace}/{name} not found") from exc raise CommandExecutionError(str(exc)) from exc finally: _cleanup(**cfg)
[docs] def create_configmap( name, namespace, data, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ Creates the kubernetes configmap as defined by the user. name The name of the configmap namespace The namespace to create the configmap in data A dictionary of key-value pairs to store in the configmap source File path to configmap definition .. versionchanged:: 2.0.0 The configmap definition must be a proper spec with the configmap data in the ``data`` key. In previous versions, the rendered output was used as the data directly. template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files wait .. versionadded:: 2.0.0 Wait for configmap to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for configmap (default: 60) CLI Example: .. code-block:: bash salt 'minion1' kubernetes.create_configmap \ settings default '{"example.conf": "# example file"}' salt 'minion2' kubernetes.create_configmap \ name=settings namespace=default data='{"example.conf": "# example file"}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) try: data = rendered["data"] except KeyError as err: raise CommandExecutionError( f"The template for configmap '{name}' (at '{source}') did not render to a spec: Missing `data` key." ) from err except TypeError as err: raise CommandExecutionError( f"The template for configmap '{name}' (at '{source}') did not render to a spec: Expected mapping, got '{type(rendered).__name__}'." ) from err elif data is None: data = {} if not isinstance(data, dict): raise CommandExecutionError("Data must be a dictionary") data = __enforce_only_strings_dict(data) body = kubernetes.client.V1ConfigMap( metadata=__dict_to_object_meta(name, namespace, {}), data=data ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.create_namespaced_config_map( namespace, body, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "configmap", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for configmap {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 404: raise CommandExecutionError(f"ConfigMap {namespace}/{name} not found") from exc if exc.status == 409: raise CommandExecutionError(f"ConfigMap {name} already exists") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_namespace(name, **kwargs): """ Creates a namespace with the specified name. name The name of the namespace to create CLI Example: .. code-block:: bash salt '*' kubernetes.create_namespace salt salt '*' kubernetes.create_namespace name=salt """ meta_obj = kubernetes.client.V1ObjectMeta(name=name) body = kubernetes.client.V1Namespace(metadata=meta_obj) body.metadata.name = name cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.create_namespace(body) return ApiClient().sanitize_for_serialization(api_response) except ApiException as exc: if exc.status == 409: raise CommandExecutionError(f"Namespace {name} already exists: {exc.reason}") from exc if exc.status == 422: raise CommandExecutionError(f"Invalid namespace name {name}: {exc.reason}") from exc raise CommandExecutionError(exc) from exc except HTTPError as exc: raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_statefulset( name, namespace="default", metadata=None, spec=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Creates a statefulset with the specified name, namespace, metadata, and spec. name The name of the statefulset namespace The namespace to create the statefulset in. Defaults to ``default``. metadata StatefulSet metadata dict spec StatefulSet spec dict following kubernetes API conventions source File path to statefulset definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the creation of the statefulset wait Wait for statefulset to become ready (default: False) timeout Timeout in seconds to wait for statefulset (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.create_statefulset name=my-statefulset namespace=default spec='{"replicas": 3}' wait=True """ body = __create_object_body( kind="StatefulSet", obj_class=kubernetes.client.V1StatefulSet, spec_creator=__dict_to_statefulset_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.create_namespaced_stateful_set( namespace, body, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "statefulset", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for statefulset {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 404: raise CommandExecutionError(f"StatefulSet {namespace}/{name} not found") from exc if exc.status == 409: raise CommandExecutionError(f"StatefulSet {name} already exists") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_replicaset( name, namespace="default", metadata=None, spec=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Creates a replicaset with the specified name, namespace, metadata, and spec. name The name of the replicaset namespace The namespace to create the replicaset in. Defaults to ``default``. metadata ReplicaSet metadata dict spec ReplicaSet spec dict following kubernetes API conventions source File path to replicaset definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the creation of the replicaset wait Wait for replicaset to become ready (default: False) timeout Timeout in seconds to wait for replicaset (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.create_replicaset name=my-rs namespace=default spec='{"replicas": 3}' wait=True """ body = __create_object_body( kind="ReplicaSet", obj_class=kubernetes.client.V1ReplicaSet, spec_creator=__dict_to_replicaset_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.create_namespaced_replica_set( namespace, body, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "replicaset", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for replicaset {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 404: raise CommandExecutionError(f"ReplicaSet {namespace}/{name} not found") from exc if exc.status == 409: raise CommandExecutionError(f"ReplicaSet {name} already exists") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_daemonset( name, namespace="default", metadata=None, spec=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Creates a daemonset with the specified name, namespace, metadata, and spec. name The name of the daemonset namespace The namespace to create the daemonset in. Defaults to ``default``. metadata DaemonSet metadata dict spec DaemonSet spec dict following kubernetes API conventions source File path to daemonset definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the creation of the daemonset wait Wait for daemonset to become ready (default: False) timeout Timeout in seconds to wait for daemonset (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.create_daemonset name=my-ds namespace=default wait=True """ body = __create_object_body( kind="DaemonSet", obj_class=kubernetes.client.V1DaemonSet, spec_creator=__dict_to_daemonset_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.create_namespaced_daemon_set( namespace, body, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "daemonset", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for daemonset {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 404: raise CommandExecutionError(f"DaemonSet {namespace}/{name} not found") from exc if exc.status == 409: raise CommandExecutionError(f"DaemonSet {name} already exists") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def create_storageclass( name, metadata=None, spec=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Creates a storageclass with the specified name, metadata, and spec. name The name of the storageclass metadata StorageClass metadata dict spec StorageClass spec dict following kubernetes API conventions source File path to storageclass definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the creation of the storageclass wait Wait for storageclass to become ready (default: False) timeout Timeout in seconds to wait for storageclass (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.create_storageclass name=fast-sc spec='{"provisioner": "kubernetes.io/no-provisioner"}' """ if source: src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(src_obj, dict) or src_obj.get("kind") != "StorageClass": raise CommandExecutionError("The source file should define only a StorageClass object") if "metadata" in src_obj: metadata = src_obj["metadata"] if "spec" in src_obj: spec = src_obj["spec"] elif spec is None: spec = { key: value for key, value in src_obj.items() if key not in ("apiVersion", "kind", "metadata") } if metadata is None: metadata = {} if spec is None: spec = {} created_spec = __dict_to_storageclass_spec(spec) body = kubernetes.client.V1StorageClass( metadata=__dict_to_object_meta(name, None, metadata), **created_spec, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.StorageV1Api() api_response = api_instance.create_storage_class(body, dry_run="All" if dry_run else None) if wait: if not _wait_for_resource_status( api_instance, "storageclass", name, None, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for storageclass {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException): if exc.status == 404: raise CommandExecutionError(f"StorageClass {name} not found") from exc if exc.status == 409: raise CommandExecutionError(f"StorageClass {name} already exists") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def replace_deployment( name, metadata, spec, source=None, template=None, saltenv=None, namespace="default", template_context=None, wait=False, timeout=60, **kwargs, ): """ Replaces an existing deployment with a new one defined by name and namespace, having the specificed metadata and spec. name The name of the deployment metadata Deployment metadata dict spec Deployment spec dict following kubernetes API conventions source File path to deployment definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. namespace The namespace to replace the deployment in. Defaults to ``default``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files wait .. versionadded:: 2.0.0 Wait for deployment to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for deployment (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.replace_deployment *args """ body = __create_object_body( kind="Deployment", obj_class=V1Deployment, spec_creator=__dict_to_deployment_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.replace_namespaced_deployment(name, namespace, body) if wait: if not _wait_for_resource_status( api_instance, "deployment", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for deployment {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"Deployment {namespace}/{name} not found") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def replace_service( name, old_service, metadata, spec, source=None, template=None, saltenv=None, namespace="default", template_context=None, wait=False, timeout=60, **kwargs, ): """ .. versionchanged:: 2.0.0 The `old_service` parameter was moved to the second position, which pushes `metadata`, `spec`, `source` and `template` one position further down the parameter list. Replaces an existing service with a new one defined by name and namespace, having the specified metadata and spec. name The name of the service old_service The existing service to replace metadata Service metadata dict spec Service spec dict following kubernetes API conventions source File path to service definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. namespace The namespace to replace the service in. Defaults to ``default``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files wait .. versionadded:: 2.0.0 Wait for service to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for service (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.replace_service name=my-service \ old_service='{"metadata": {"resourceVersion": "12345"}, "spec": {"clusterIP": "10.0.0.1"}}' \ metadata='{"labels": {"app": "my-app"}}' \ spec='{"ports": [{"port": 80, "targetPort": 8080}], "selector": {"app": "my-app"}}' \ source=/path/to/service.yaml \ template=jinja \ saltenv=base \ namespace=default \ template_context='{"var1": "value1"}' """ body = __create_object_body( kind="Service", obj_class=kubernetes.client.V1Service, spec_creator=__dict_to_service_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) # Some attributes have to be preserved # otherwise exceptions will be thrown body.spec.cluster_ip = old_service["spec"]["clusterIP"] body.metadata.resource_version = old_service["metadata"]["resourceVersion"] cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.replace_namespaced_service(name, namespace, body) if wait: if not _wait_for_resource_status( api_instance, "service", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for service {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"Service {namespace}/{name} not found") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def replace_secret( name, data, source=None, template=None, saltenv=None, namespace="default", template_context=None, secret_type=None, metadata=None, wait=False, timeout=60, **kwargs, ): """ Replaces an existing secret with a new one defined by name and namespace. Values that are already base64 encoded will not be re-encoded. If a source file is specified, the secret type will be read from the template. .. note:: Automatic encoding of secret values might cause issues if the values are not correctly identified as base64. If you run into issues - encode the values before passing them to this function. name The name of the secret data A dictionary of key-value pairs to store in the secret source File path to secret definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. namespace The namespace to replace the secret in. Defaults to ``default``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files secret_type .. versionadded:: 2.0.0 The type of the secret metadata .. versionadded:: 2.0.0 Secret metadata dict wait .. versionadded:: 2.0.0 Wait for secret to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for secret (default: 60) CLI Example: .. code-block:: bash # For regular secrets with plain text values salt 'minion1' kubernetes.replace_secret \ name=passwords data='{"db": "letmein"}' # For secrets with pre-encoded values salt 'minion2' kubernetes.replace_secret \ name=passwords data='{"db": "bGV0bWVpbg=="}' # For docker registry secrets salt 'minion3' kubernetes.replace_secret \ name=docker-registry \ source=/path/to/docker-secret.yaml \ secret_type=kubernetes.io/dockerconfigjson # For TLS secrets salt 'minion4' kubernetes.replace_secret \ name=tls-secret \ source=/path/to/tls-secret.yaml \ secret_type=kubernetes.io/tls """ if source: src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(src_obj, dict): raise CommandExecutionError("`source` did not render to a dictionary") if "data" in src_obj: data = src_obj["data"] secret_type = src_obj.get("secret_type") elif data is None: data = {} data = __enforce_only_strings_dict(data) # Encode the secrets using base64 if not already encoded encoded_data = {} for key, value in data.items(): if __is_base64(value): encoded_data[key] = value else: encoded_data[key] = base64.b64encode(str(value).encode("utf-8")).decode("utf-8") # Get existing secret type if not specified if not type: existing_secret = kubernetes.client.CoreV1Api().read_namespaced_secret(name, namespace) secret_type = existing_secret.type body = kubernetes.client.V1Secret( metadata=__dict_to_object_meta(name, namespace, metadata), data=encoded_data, type=secret_type, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.replace_namespaced_secret(name, namespace, body) if wait: if not _wait_for_resource_status( api_instance, "secret", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for secret {name} to be ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"Secret {namespace}/{name} not found") from exc raise CommandExecutionError(str(exc)) from exc finally: _cleanup(**cfg)
[docs] def replace_configmap( name, data, source=None, template=None, saltenv=None, namespace="default", template_context=None, wait=False, timeout=60, **kwargs, ): """ Replaces an existing configmap with a new one defined by name and namespace with the specified data. name The name of the configmap data A dictionary of key-value pairs to store in the configmap source File path to configmap definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from .. versionchanged:: 2.0.0 Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``. namespace The namespace to replace the configmap in. Defaults to ``default``. template_context .. versionadded:: 2.0.0 Variables to make available in templated files wait .. versionadded:: 2.0.0 Wait for configmap to become ready (default: False) timeout .. versionadded:: 2.0.0 Timeout in seconds to wait for configmap (default: 60) CLI Example: .. code-block:: bash salt 'minion1' kubernetes.replace_configmap \ settings default '{"example.conf": "# example file"}' salt 'minion2' kubernetes.replace_configmap \ name=settings namespace=default data='{"example.conf": "# example file"}' """ if source: data = __read_and_render_yaml_file(source, template, saltenv, template_context) data = __enforce_only_strings_dict(data) body = kubernetes.client.V1ConfigMap( metadata=__dict_to_object_meta(name, namespace, {}), data=data ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.replace_namespaced_config_map(name, namespace, body) if wait: if not _wait_for_resource_status( api_instance, "configmap", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for configmap {name} to be ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"ConfigMap {namespace}/{name} not found") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def replace_statefulset( name, namespace, spec, metadata=None, source=None, template=None, saltenv=None, template_context=None, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Replaces an existing statefulset with a new one defined by name and namespace with the specified spec. name The name of the statefulset namespace The namespace of the statefulset spec A dictionary representing the spec of the statefulset metadata A dictionary representing the metadata of the statefulset source File path to statefulset definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files wait Wait for statefulset to become ready (default: False) timeout Timeout in seconds to wait for statefulset (default: 60) CLI Example: .. code-block:: bash salt 'minion1' kubernetes.replace_statefulset \ name=my-statefulset namespace=default spec='{"replicas": 3}' """ body = __create_object_body( kind="StatefulSet", obj_class=kubernetes.client.V1StatefulSet, spec_creator=__dict_to_statefulset_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.replace_namespaced_stateful_set(name, namespace, body) if wait: if not _wait_for_resource_status( api_instance, "statefulset", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for statefulset {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"StatefulSet {namespace}/{name} not found") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def replace_replicaset( name, namespace, spec, metadata=None, source=None, template=None, saltenv=None, template_context=None, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Replaces an existing replicaset with a new one defined by name and namespace with the specified spec. name The name of the replicaset namespace The namespace of the replicaset spec A dictionary representing the spec of the replicaset metadata A dictionary representing the metadata of the replicaset source File path to replicaset definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files wait Wait for replicaset to become ready (default: False) timeout Timeout in seconds to wait for replicaset (default: 60) CLI Example: .. code-block:: bash salt 'minion1' kubernetes.replace_replicaset \ name=my-replicaset namespace=default spec='{"replicas": 3}' """ body = __create_object_body( kind="ReplicaSet", obj_class=kubernetes.client.V1ReplicaSet, spec_creator=__dict_to_replicaset_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.replace_namespaced_replica_set(name, namespace, body) if wait: if not _wait_for_resource_status( api_instance, "replicaset", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for replicaset {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"ReplicaSet {namespace}/{name} not found") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def replace_daemonset( name, namespace, spec, metadata=None, source=None, template=None, saltenv=None, template_context=None, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Replaces an existing daemonset with a new one defined by name and namespace with the specified spec. name The name of the daemonset namespace The namespace of the daemonset spec A dictionary representing the spec of the daemonset metadata A dictionary representing the metadata of the daemonset source File path to daemonset definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files wait Wait for daemonset to become ready (default: False) timeout Timeout in seconds to wait for daemonset (default: 60) CLI Example: .. code-block:: bash salt 'minion1' kubernetes.replace_daemonset \ name=my-daemonset namespace=default spec='{"replicas": 3}' """ body = __create_object_body( kind="DaemonSet", obj_class=kubernetes.client.V1DaemonSet, spec_creator=__dict_to_daemonset_spec, name=name, namespace=namespace, metadata=metadata, spec=spec, source=source, template=template, saltenv=saltenv, template_context=template_context, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.replace_namespaced_daemon_set(name, namespace, body) if wait: if not _wait_for_resource_status( api_instance, "daemonset", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for daemonset {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"DaemonSet {namespace}/{name} not found") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def replace_storageclass( name, spec, metadata=None, source=None, template=None, saltenv=None, template_context=None, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Replaces an existing storageclass with a new one defined by name. name The name of the storageclass spec A dictionary representing the spec of the storageclass metadata A dictionary representing the metadata of the storageclass source File path to storageclass definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files wait Wait for storageclass to become ready (default: False) timeout Timeout in seconds to wait for storageclass (default: 60) CLI Example: .. code-block:: bash salt 'minion1' kubernetes.replace_storageclass \ name=my-storageclass spec='{"provisioner": "kubernetes.io/no-provisioner"}' """ if source: src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(src_obj, dict) or src_obj.get("kind") != "StorageClass": raise CommandExecutionError("The source file should define only a StorageClass object") if "metadata" in src_obj: metadata = src_obj["metadata"] if "spec" in src_obj: spec = src_obj["spec"] elif spec is None: spec = { key: value for key, value in src_obj.items() if key not in ("apiVersion", "kind", "metadata") } if metadata is None: metadata = {} created_spec = __dict_to_storageclass_spec(spec) body = kubernetes.client.V1StorageClass( metadata=__dict_to_object_meta(name, None, metadata), **created_spec, ) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.StorageV1Api() current_storageclass = api_instance.read_storage_class(name) body.metadata.resource_version = current_storageclass.metadata.resource_version api_response = api_instance.replace_storage_class(name, body) if wait: if not _wait_for_resource_status( api_instance, "storageclass", name, None, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for storageclass {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"StorageClass {name} not found") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def patch_service( name, namespace, patch=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.0.0 Patches an existing service with the provided patch dictionary. name The name of the service namespace The namespace of the service patch A dictionary representing the patch to apply to the service source File path to patch definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the patch without applying it (default: False) wait Wait for service to become ready (default: False) timeout Timeout in seconds to wait for service (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.patch_service \\ name=my-service \\ namespace=default \\ patch='{"spec": {"type": "LoadBalancer"}}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(rendered, dict): raise CommandExecutionError("The source file did not render to a dictionary") patch = rendered if not isinstance(patch, dict): raise CommandExecutionError("Patch must be a dictionary") cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.patch_namespaced_service( name, namespace, patch, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "service", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for service {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"Service {namespace}/{name} not found") from exc if isinstance(exc, ApiException) and exc.status == 409: raise CommandExecutionError(f"Conflict when patching service {name}") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def patch_secret( name, namespace, patch=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.0.0 Patches an existing secret with the provided patch dictionary. name The name of the secret namespace The namespace of the secret patch A dictionary representing the patch to apply to the secret source File path to patch definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the patch without applying it (default: False) wait Wait for secret to become ready (default: False) timeout Timeout in seconds to wait for secret (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.patch_secret \\ name=my-secret \\ namespace=default \\ patch='{"data": {"password": "bmV3cGFzcw=="}}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(rendered, dict): raise CommandExecutionError("The source file did not render to a dictionary") patch = rendered if not isinstance(patch, dict): raise CommandExecutionError("Patch must be a dictionary") # Encode secret data values to base64 if not already encoded if "data" in patch and isinstance(patch["data"], dict): encoded_data = {} for key, value in patch["data"].items(): value = str(value) if __is_base64(value): encoded_data[key] = value else: encoded_data[key] = base64.b64encode(value.encode("utf-8")).decode("utf-8") patch = {**patch, "data": encoded_data} cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.patch_namespaced_secret( name, namespace, patch, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "secret", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for secret {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"Secret {namespace}/{name} not found") from exc if isinstance(exc, ApiException) and exc.status == 409: raise CommandExecutionError(f"Conflict when patching secret {name}") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def patch_configmap( name, namespace, patch=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.0.0 Patches an existing configmap with the provided patch dictionary. name The name of the configmap namespace The namespace of the configmap patch A dictionary representing the patch to apply to the configmap source File path to patch definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the patch without applying it (default: False) wait Wait for configmap to become ready (default: False) timeout Timeout in seconds to wait for configmap (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.patch_configmap \\ name=my-config \\ namespace=default \\ patch='{"data": {"key": "new-value"}}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(rendered, dict): raise CommandExecutionError("The source file did not render to a dictionary") patch = rendered if not isinstance(patch, dict): raise CommandExecutionError("Patch must be a dictionary") cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.CoreV1Api() api_response = api_instance.patch_namespaced_config_map( name, namespace, patch, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "configmap", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for configmap {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"ConfigMap {namespace}/{name} not found") from exc if isinstance(exc, ApiException) and exc.status == 409: raise CommandExecutionError(f"Conflict when patching configmap {name}") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def patch_deployment( name, namespace, patch=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.0.0 Patches an existing deployment with the provided patch dictionary. name The name of the deployment namespace The namespace of the deployment patch A dictionary representing the patch to apply to the deployment source File path to patch definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the patch without applying it (default: False) wait Wait for deployment to become ready (default: False) timeout Timeout in seconds to wait for deployment (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.patch_deployment \ name=my-deployment \ namespace=default \ patch='{"spec": {"replicas": 5}}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(rendered, dict): raise CommandExecutionError("The source file did not render to a dictionary") patch = rendered if not isinstance(patch, dict): raise CommandExecutionError("Patch must be a dictionary") cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.patch_namespaced_deployment( name, namespace, patch, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "deployment", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for deployment {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"Deployment {namespace}/{name} not found") from exc if isinstance(exc, ApiException) and exc.status == 409: raise CommandExecutionError(f"Conflict when patching deployment {name}") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def patch_statefulset( name, namespace, patch=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Patches an existing statefulset with the provided patch dictionary. name The name of the statefulset namespace The namespace of the statefulset patch A dictionary representing the patch to apply to the statefulset source File path to patch definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the patch without applying it (default: False) wait Wait for statefulset to become ready (default: False) timeout Timeout in seconds to wait for statefulset (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.patch_statefulset \ name=my-statefulset \ namespace=default \ patch='{"spec": {"replicas": 5}}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(rendered, dict): raise CommandExecutionError("The source file did not render to a dictionary") patch = rendered if not isinstance(patch, dict): raise CommandExecutionError("Patch must be a dictionary") cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.patch_namespaced_stateful_set( name, namespace, patch, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "statefulset", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for statefulset {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"StatefulSet {namespace}/{name} not found") from exc if isinstance(exc, ApiException) and exc.status == 409: raise CommandExecutionError(f"Conflict when patching statefulset {name}") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def patch_replicaset( name, namespace, patch=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Patches an existing replicaset with the provided patch dictionary. name The name of the replicaset namespace The namespace of the replicaset patch A dictionary representing the patch to apply to the replicaset source File path to patch definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the patch without applying it (default: False) wait Wait for replicaset to become ready (default: False) timeout Timeout in seconds to wait for replicaset (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.patch_replicaset \ name=my-replicaset \ namespace=default \ patch='{"spec": {"replicas": 5}}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(rendered, dict): raise CommandExecutionError("The source file did not render to a dictionary") patch = rendered if not isinstance(patch, dict): raise CommandExecutionError("Patch must be a dictionary") cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.patch_namespaced_replica_set( name, namespace, patch, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "replicaset", name, namespace, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for replicaset {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"ReplicaSet {namespace}/{name} not found") from exc if isinstance(exc, ApiException) and exc.status == 409: raise CommandExecutionError(f"Conflict when patching replicaset {name}") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def patch_daemonset( name, namespace, patch=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Patches an existing daemonset with the provided patch dictionary. name The name of the daemonset namespace The namespace of the daemonset patch A dictionary representing the patch to apply to the daemonset source File path to patch definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the patch without applying it (default: False) wait Wait for daemonset to become ready (default: False) timeout Timeout in seconds to wait for daemonset (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.patch_daemonset \ name=my-daemonset \ namespace=default \ patch='{"spec": {"replicas": 5}}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(rendered, dict): raise CommandExecutionError("The source file did not render to a dictionary") patch = rendered if not isinstance(patch, dict): raise CommandExecutionError("Patch must be a dictionary") cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.AppsV1Api() api_response = api_instance.patch_namespaced_daemon_set( name, namespace, patch, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "daemonset", name, namespace, "ready", timeout ): raise CommandExecutionError(f"Timeout waiting for daemonset {name} to become ready") return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"DaemonSet {namespace}/{name} not found") from exc if isinstance(exc, ApiException) and exc.status == 409: raise CommandExecutionError(f"Conflict when patching daemonset {name}") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
[docs] def patch_storageclass( name, patch=None, source=None, template=None, saltenv=None, template_context=None, dry_run=False, wait=False, timeout=60, **kwargs, ): """ .. versionadded:: 2.1.0 Patches an existing storageclass with the provided patch dictionary. name The name of the storageclass patch A dictionary representing the patch to apply to the storageclass source File path to patch definition template Template engine to use to render the source file saltenv Salt environment to pull the source file from template_context Variables to make available in templated files dry_run If True, only simulates the patch without applying it (default: False) wait Wait for storageclass to become ready (default: False) timeout Timeout in seconds to wait for storageclass (default: 60) CLI Example: .. code-block:: bash salt '*' kubernetes.patch_storageclass \ name=my-storageclass \ patch='{"reclaimPolicy": "Retain"}' """ if source: rendered = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(rendered, dict): raise CommandExecutionError("The source file did not render to a dictionary") if rendered.get("kind") == "StorageClass": metadata = rendered.get("metadata") spec = rendered.get("spec") if spec is None: spec = { key: value for key, value in rendered.items() if key not in ("apiVersion", "kind", "metadata") } patch = {} if metadata: metadata_patch = { key: value for key, value in metadata.items() if key not in ( "name", "namespace", "resourceVersion", "uid", "creationTimestamp", "managedFields", "generation", "selfLink", ) } if metadata_patch: patch["metadata"] = metadata_patch if spec: patch.update(spec) else: patch = rendered if not isinstance(patch, dict): raise CommandExecutionError("Patch must be a dictionary") # Allow state-style payloads that wrap StorageClass fields under `spec`. if "spec" in patch: spec_patch = patch.get("spec") if not isinstance(spec_patch, dict): raise CommandExecutionError("StorageClass spec patch must be a dictionary") patch = {key: value for key, value in patch.items() if key != "spec"} patch.update(spec_patch) cfg = _setup_conn(**kwargs) try: api_instance = kubernetes.client.StorageV1Api() api_response = api_instance.patch_storage_class( name, patch, dry_run="All" if dry_run else None ) if wait: if not _wait_for_resource_status( api_instance, "storageclass", name, None, "ready", timeout ): raise CommandExecutionError( f"Timeout waiting for storageclass {name} to become ready" ) return ApiClient().sanitize_for_serialization(api_response) except (ApiException, HTTPError) as exc: if isinstance(exc, ApiException) and exc.status == 404: raise CommandExecutionError(f"StorageClass {name} not found") from exc if isinstance(exc, ApiException) and exc.status == 409: raise CommandExecutionError(f"Conflict when patching storageclass {name}") from exc raise CommandExecutionError(exc) from exc finally: _cleanup(**cfg)
def __is_base64(value): """ Check if a string is base64 encoded by attempting to decode it. Handles whitespace and validates against base64. """ if not isinstance(value, str): return False # Remove whitespace and newlines value = "".join(value.split()) try: # Try decoding with validation base64.b64decode(value, validate=True).decode("utf-8") return True except ValueError: return False def __create_object_body( kind, obj_class, spec_creator, name, namespace, metadata, spec, source, template, saltenv, template_context=None, ): """ Create a Kubernetes Object body instance. """ if source: src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context) if not isinstance(src_obj, dict) or "kind" not in src_obj or src_obj["kind"] != kind: raise CommandExecutionError(f"The source file should define only a {kind} object") if "metadata" in src_obj: metadata = src_obj["metadata"] if "spec" in src_obj: spec = src_obj["spec"] if metadata is None: metadata = {} if spec is None: spec = {} try: created_spec = spec_creator(spec) except (ValueError, TypeError) as exc: raise CommandExecutionError(f"Invalid {kind} spec: {exc}") from exc return obj_class( metadata=__dict_to_object_meta(name, namespace, metadata), spec=created_spec, ) def __read_and_render_yaml_file(source, template, saltenv, template_context=None): """ Read a yaml file and, if needed, renders that using the specified templating. Returns the python objects defined inside of the file. """ saltenv = saltenv or __opts__["saltenv"] or "base" sfn = __salt__["cp.cache_file"](source, saltenv) if not sfn: raise CommandExecutionError(f"Source file '{source}' not found") with salt.utils.files.fopen(sfn, "r") as src: contents = src.read() if template: if template not in salt.utils.templates.TEMPLATE_REGISTRY: raise CommandExecutionError(f"Unknown template specified: {template}") # Apply templating with template_context if template_context is None: template_context = {} data = salt.utils.templates.TEMPLATE_REGISTRY[template]( contents, from_str=True, to_str=True, saltenv=saltenv, grains=__grains__, pillar=__pillar__, salt=__salt__, opts=__opts__, context=template_context, ) if not data["result"]: # Failed to render the template raise CommandExecutionError( f'Failed to render file path with error: {data["data"]}' ) contents = data["data"].encode("utf-8") return salt.utils.yaml.safe_load(contents) def __dict_to_object_meta(name, namespace, metadata): """ Converts a dictionary into kubernetes ObjectMetaV1 instance. """ meta_obj = kubernetes.client.V1ObjectMeta() meta_obj.namespace = namespace if metadata is None: metadata = {} # Handle nested dictionaries in metadata processed_metadata = {} for key, value in metadata.items(): if isinstance(value, dict): # Keep nested structure for fields like annotations and labels processed_metadata[key] = value else: # Convert non-dict values to string processed_metadata[key] = str(value) # Replicate `kubectl [create|replace|apply] --record` if "annotations" not in processed_metadata: processed_metadata["annotations"] = {} if "kubernetes.io/change-cause" not in processed_metadata["annotations"]: processed_metadata["annotations"]["kubernetes.io/change-cause"] = " ".join(sys.argv) for key, value in processed_metadata.items(): if hasattr(meta_obj, key): setattr(meta_obj, key, value) if meta_obj.name != name: log.info( "The object already has a name attribute, overwriting it with " "the one defined inside of salt" ) meta_obj.name = name return meta_obj def __dict_to_deployment_spec(spec): """ Converts a dictionary into kubernetes V1DeploymentSpec instance. """ if not isinstance(spec, dict): raise CommandExecutionError( f"Deployment spec must be a dictionary, not {type(spec).__name__}" ) processed_spec = spec.copy() # Validate required template field if "template" not in processed_spec: raise CommandExecutionError("Deployment spec must include template with pod specification") template = processed_spec["template"] template_metadata = template.get("metadata", {}) template_labels = template_metadata.get("labels", {}) # Handle selector if "selector" not in processed_spec: if not template_labels: raise CommandExecutionError( "Template must include labels when selector is not specified" ) processed_spec["selector"] = {"match_labels": template_labels} else: selector = processed_spec["selector"] if not selector or not selector.get("matchLabels"): raise CommandExecutionError("Deployment selector must include matchLabels") if not all(template_labels.get(k) == v for k, v in selector["matchLabels"].items()): raise CommandExecutionError("selector.matchLabels must match template metadata.labels") # Convert selector format if "matchLabels" in processed_spec["selector"]: processed_spec["selector"] = {"match_labels": processed_spec["selector"]["matchLabels"]} # Create pod spec try: pod_spec = __dict_to_pod_spec(template["spec"]) except (CommandExecutionError, ValueError) as exc: raise CommandExecutionError(f"Invalid pod spec in deployment template: {exc}") from exc # Create pod template pod_template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(**template_metadata), spec=pod_spec ) processed_spec["template"] = pod_template # Create selector object processed_spec["selector"] = kubernetes.client.V1LabelSelector(**processed_spec["selector"]) # Handle replicas conversion if "replicas" in processed_spec: try: processed_spec["replicas"] = int(processed_spec["replicas"]) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"replicas must be an integer: {exc}") from exc # Create final spec try: return V1DeploymentSpec(**processed_spec) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"Invalid deployment spec: {exc}") from exc def __dict_to_pod_spec(spec): """ Converts a dictionary into kubernetes V1PodSpec instance. """ if spec is None: raise CommandExecutionError("Pod spec cannot be None") # Directly return if already a V1PodSpec if isinstance(spec, kubernetes.client.V1PodSpec): return spec if not isinstance(spec, dict): raise CommandExecutionError(f"Pod spec must be a dictionary, not {type(spec).__name__}") processed_spec = spec.copy() # Validate containers if not processed_spec.get("containers"): raise CommandExecutionError("Pod spec must include at least one container") if not isinstance(processed_spec["containers"], list): raise CommandExecutionError( f"containers must be a list, not {type(processed_spec['containers']).__name__}" ) # Convert container specs containers = [] for i, container in enumerate(processed_spec["containers"]): if not isinstance(container, dict): raise CommandExecutionError( f"Container {i} must be a dictionary, not {type(container).__name__}" ) container_copy = container.copy() if not container_copy.get("name"): raise CommandExecutionError(f"Container {i} must specify 'name'") if not container_copy.get("image"): raise CommandExecutionError(f"Container {i} must specify 'image'") # Handle ports if "ports" in container_copy: ports = container_copy["ports"] if not isinstance(ports, list): raise CommandExecutionError( f"Container {container_copy['name']} ports must be a list" ) processed_ports = [] for port in ports: if not isinstance(port, dict): raise CommandExecutionError( f"Port in container {container_copy['name']} must be a dictionary" ) port_copy = port.copy() # Handle containerPort conversion if "containerPort" in port_copy: try: port_copy["container_port"] = int(port_copy.pop("containerPort")) except (TypeError, ValueError) as exc: raise CommandExecutionError( f"containerPort in container {container_copy['name']} must be an integer: {exc}" ) from exc processed_ports.append(kubernetes.client.V1ContainerPort(**port_copy)) containers.append(kubernetes.client.V1Container(**container_copy)) processed_spec["containers"] = containers # Handle imagePullSecrets field if "imagePullSecrets" in processed_spec: image_pull_secrets = processed_spec.pop("imagePullSecrets") if not isinstance(image_pull_secrets, list): raise CommandExecutionError("imagePullSecrets must be a list") processed_secrets = [] for secret in image_pull_secrets: if not isinstance(secret, dict): raise CommandExecutionError( f"Each imagePullSecret must be a dictionary, not {type(secret).__name__}" ) processed_secrets.append(kubernetes.client.V1LocalObjectReference(**secret)) try: return kubernetes.client.V1PodSpec(**processed_spec) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"Invalid pod spec: {exc}") from exc def __dict_to_service_spec(spec): """ Converts a dictionary into kubernetes V1ServiceSpec instance. Args: spec: Service specification dictionary following kubernetes API conventions Returns: kubernetes.client.V1ServiceSpec: The converted service spec """ if not isinstance(spec, dict): raise CommandExecutionError(f"Service spec must be a dictionary, got {type(spec)}") # Validate required fields if "ports" not in spec: raise CommandExecutionError("Service spec must include 'ports'") if not isinstance(spec["ports"], list): raise CommandExecutionError("Service ports must be a list") # Validate service type if specified valid_service_types = {"ClusterIP", "ExternalName", "LoadBalancer", "NodePort"} if "type" in spec and spec["type"] not in valid_service_types: raise CommandExecutionError( f"Invalid service type: {spec['type']}. Must be one of: {', '.join(sorted(valid_service_types))}" ) spec_obj = kubernetes.client.V1ServiceSpec() for key, value in spec.items(): if key == "ports": spec_obj.ports = [] # Validate port specifications has_multiple_ports = len(value) > 1 for i, port in enumerate(value): if not isinstance(port, dict): try: # Allow simple integer port definitions kube_port = kubernetes.client.V1ServicePort(port=int(port)) except (TypeError, ValueError) as exc: raise CommandExecutionError( f"Invalid port specification at index {i}: {exc}" ) from exc else: # Verify required fields for port if "port" not in port: raise CommandExecutionError( f"Service port at index {i} must specify 'port' value" ) try: port_num = int(port["port"]) except (TypeError, ValueError) as exc: raise CommandExecutionError( f"Invalid port number at index {i}: {exc}" ) from exc # Create port object kube_port = kubernetes.client.V1ServicePort(port=port_num) # Validate name requirement for multi-port services if has_multiple_ports and "name" not in port: raise CommandExecutionError( f"Port at index {i} must specify 'name' in multi-port service" ) # Validate nodePort range if specified if "nodePort" in port: try: node_port = int(port["nodePort"]) if not 30000 <= node_port <= 32767: raise CommandExecutionError( f"NodePort {node_port} at index {i} must be between 30000-32767" ) except (TypeError, ValueError) as exc: raise CommandExecutionError( f"Invalid nodePort value at index {i}: {exc}" ) from exc # Copy remaining port attributes for port_key, port_value in port.items(): if port_key != "port": if port_key in ["nodePort", "targetPort"]: try: if isinstance(port_value, str) and not port_value.isdigit(): # Allow string targetPort for named ports if port_key != "targetPort": port_value = int(port_value) elif isinstance(port_value, (int, str)): port_value = int(port_value) except (TypeError, ValueError) as exc: raise CommandExecutionError( f"Invalid {port_key} value at index {i}: {exc}" ) from exc if hasattr(kube_port, port_key): setattr(kube_port, port_key, port_value) spec_obj.ports.append(kube_port) elif hasattr(spec_obj, key): setattr(spec_obj, key, value) return spec_obj def __dict_to_statefulset_spec(spec): """ .. versionadded:: 2.1.0 Converts a dictionary into kubernetes V1StatefulSetSpec instance. """ if not isinstance(spec, dict): raise CommandExecutionError( f"StatefulSet spec must be a dictionary, not {type(spec).__name__}" ) processed_spec = spec.copy() # Validate required fields (accept both camelCase and snake_case input) if "serviceName" not in processed_spec and "service_name" not in processed_spec: raise CommandExecutionError("StatefulSet spec must include 'serviceName'") # Validate required template field if "template" not in processed_spec: raise CommandExecutionError("StatefulSet spec must include template with pod specification") template = processed_spec["template"] if not isinstance(template, dict): raise CommandExecutionError(f"Template must be a dictionary, not {type(template).__name__}") template_metadata = template.get("metadata", {}) template_spec = template.get("spec", {}) # Validate template has pod spec if not template_spec: raise CommandExecutionError("Template must include pod specification") # Create pod spec try: pod_spec = __dict_to_pod_spec(template_spec) except (CommandExecutionError, ValueError) as exc: raise CommandExecutionError(f"Invalid pod spec in statefulset template: {exc}") from exc # Create pod template pod_template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(**template_metadata), spec=pod_spec ) processed_spec["template"] = pod_template # Handle selector - optional for StatefulSet but validate if provided if "selector" in processed_spec: selector = processed_spec["selector"] if not isinstance(selector, dict): raise CommandExecutionError( f"Selector must be a dictionary, not {type(selector).__name__}" ) if "matchLabels" in selector: processed_spec["selector"] = {"match_labels": selector["matchLabels"]} processed_spec["selector"] = kubernetes.client.V1LabelSelector(**processed_spec["selector"]) # Handle replicas conversion if "replicas" in processed_spec: try: processed_spec["replicas"] = int(processed_spec["replicas"]) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"replicas must be an integer: {exc}") from exc # Convert serviceName (camelCase from YAML/user input) to service_name (Python client) if "serviceName" in processed_spec: processed_spec["service_name"] = processed_spec.pop("serviceName") # Create final spec try: return kubernetes.client.V1StatefulSetSpec(**processed_spec) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"Invalid statefulset spec: {exc}") from exc def __dict_to_replicaset_spec(spec): """ .. versionadded:: 2.1.0 Converts a dictionary into kubernetes V1ReplicaSetSpec instance. """ if not isinstance(spec, dict): raise CommandExecutionError( f"ReplicaSet spec must be a dictionary, not {type(spec).__name__}" ) processed_spec = spec.copy() if "template" not in processed_spec: raise CommandExecutionError("ReplicaSet spec must include template with pod specification") template = processed_spec["template"] template_metadata = template.get("metadata", {}) template_labels = template_metadata.get("labels", {}) if "selector" not in processed_spec: if not template_labels: raise CommandExecutionError( "Template must include labels when selector is not specified" ) processed_spec["selector"] = {"match_labels": template_labels} else: selector = processed_spec["selector"] if not selector or not selector.get("matchLabels"): raise CommandExecutionError("ReplicaSet selector must include matchLabels") if not all(template_labels.get(k) == v for k, v in selector["matchLabels"].items()): raise CommandExecutionError("selector.matchLabels must match template metadata.labels") if "matchLabels" in processed_spec["selector"]: processed_spec["selector"] = {"match_labels": processed_spec["selector"]["matchLabels"]} try: pod_spec = __dict_to_pod_spec(template["spec"]) except (CommandExecutionError, ValueError) as exc: raise CommandExecutionError(f"Invalid pod spec in replicaset template: {exc}") from exc pod_template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(**template_metadata), spec=pod_spec ) processed_spec["template"] = pod_template processed_spec["selector"] = kubernetes.client.V1LabelSelector(**processed_spec["selector"]) if "replicas" in processed_spec: try: processed_spec["replicas"] = int(processed_spec["replicas"]) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"replicas must be an integer: {exc}") from exc try: return kubernetes.client.V1ReplicaSetSpec(**processed_spec) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"Invalid replicaset spec: {exc}") from exc def __dict_to_daemonset_spec(spec): """ .. versionadded:: 2.1.0 Converts a dictionary into kubernetes V1DaemonSetSpec instance. """ if not isinstance(spec, dict): raise CommandExecutionError( f"DaemonSet spec must be a dictionary, not {type(spec).__name__}" ) processed_spec = spec.copy() if "template" not in processed_spec: raise CommandExecutionError("DaemonSet spec must include template with pod specification") template = processed_spec["template"] template_metadata = template.get("metadata", {}) template_labels = template_metadata.get("labels", {}) if "selector" not in processed_spec: if not template_labels: raise CommandExecutionError( "Template must include labels when selector is not specified" ) processed_spec["selector"] = {"match_labels": template_labels} else: selector = processed_spec["selector"] if not selector or not selector.get("matchLabels"): raise CommandExecutionError("DaemonSet selector must include matchLabels") if not all(template_labels.get(k) == v for k, v in selector["matchLabels"].items()): raise CommandExecutionError("selector.matchLabels must match template metadata.labels") if "matchLabels" in processed_spec["selector"]: processed_spec["selector"] = {"match_labels": processed_spec["selector"]["matchLabels"]} try: pod_spec = __dict_to_pod_spec(template["spec"]) except (CommandExecutionError, ValueError) as exc: raise CommandExecutionError(f"Invalid pod spec in daemonset template: {exc}") from exc pod_template = kubernetes.client.V1PodTemplateSpec( metadata=kubernetes.client.V1ObjectMeta(**template_metadata), spec=pod_spec ) processed_spec["template"] = pod_template processed_spec["selector"] = kubernetes.client.V1LabelSelector(**processed_spec["selector"]) try: return kubernetes.client.V1DaemonSetSpec(**processed_spec) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"Invalid daemonset spec: {exc}") from exc def __dict_to_storageclass_spec(spec): """ .. versionadded:: 2.1.0 Validates and normalizes a dictionary into a V1StorageClass-compatible payload. """ if not isinstance(spec, dict): raise CommandExecutionError( f"StorageClass spec must be a dictionary, not {type(spec).__name__}" ) processed_spec = spec.copy() if not processed_spec.get("provisioner"): raise CommandExecutionError("StorageClass spec must include provisioner") if "reclaimPolicy" in processed_spec: processed_spec["reclaim_policy"] = processed_spec.pop("reclaimPolicy") if "allowVolumeExpansion" in processed_spec: processed_spec["allow_volume_expansion"] = processed_spec.pop("allowVolumeExpansion") if "volumeBindingMode" in processed_spec: processed_spec["volume_binding_mode"] = processed_spec.pop("volumeBindingMode") if "mountOptions" in processed_spec: mount_options = processed_spec.pop("mountOptions") if not isinstance(mount_options, list): raise CommandExecutionError("StorageClass mountOptions must be a list") processed_spec["mount_options"] = mount_options if "parameters" in processed_spec: parameters = processed_spec["parameters"] if not isinstance(parameters, dict): raise CommandExecutionError("StorageClass parameters must be a dictionary") processed_spec["parameters"] = __enforce_only_strings_dict(parameters) if "allowedTopologies" in processed_spec: allowed_topologies = processed_spec.pop("allowedTopologies") if not isinstance(allowed_topologies, list): raise CommandExecutionError("StorageClass allowedTopologies must be a list") processed_spec["allowed_topologies"] = [ kubernetes.client.V1TopologySelectorTerm(**term) for term in allowed_topologies ] try: kubernetes.client.V1StorageClass(**processed_spec) except (TypeError, ValueError) as exc: raise CommandExecutionError(f"Invalid storageclass spec: {exc}") from exc return processed_spec def __enforce_only_strings_dict(dictionary): """ Returns a dictionary that has string keys and values. Only converts non-string values to strings. """ ret = {} for key, value in dictionary.items(): ret[str(key)] = str(value) return ret def _wait_for_resource_status( api_instance, resource_type, name, namespace, expected_status, timeout=60 ): """ .. versionadded:: 2.0.0 Helper function to wait for a resource to reach an expected status. api_instance The kubernetes API instance to use resource_type Type of resource to wait for (e.g., 'deployment', 'pod', 'service') name Name of the resource namespace Namespace of the resource expected_status .. versionchanged:: 2.1.0 Expected status to wait for ('created', 'deleted', 'ready') timeout Timeout in seconds (default: 60) Returns True if the resource reached the expected status, False otherwise. """ try: w = Watch() start_time = time.time() if expected_status == "deleted": method = None if resource_type != "namespace": # Compound resource names map differently in the k8s client. method_name = { "statefulset": "read_namespaced_stateful_set", "replicaset": "read_namespaced_replica_set", "daemonset": "read_namespaced_daemon_set", "configmap": "read_namespaced_config_map", "storageclass": "read_storage_class", }.get(resource_type, f"read_namespaced_{resource_type}") try: method = getattr(api_instance, method_name) except AttributeError as exc: raise CommandExecutionError( f"Unsupported resource type for wait operation: {resource_type}" ) from exc # For deletion, periodically check if the resource still exists until timeout while time.time() - start_time < timeout: try: if resource_type == "namespace": api_instance.read_namespace(name) elif resource_type == "storageclass": method(name) else: method(name, namespace) except ApiException as e: if e.status == 404: # Resource is gone, deletion successful return True # Resource still exists, wait before retrying time.sleep(1) # Timed out waiting for deletion return False # Compound resource names map differently in the k8s client. method_name = { "statefulset": "list_namespaced_stateful_set", "replicaset": "list_namespaced_replica_set", "daemonset": "list_namespaced_daemon_set", "configmap": "list_namespaced_config_map", "storageclass": "list_storage_class", }.get(resource_type, f"list_namespaced_{resource_type}") try: list_method = getattr(api_instance, method_name) except AttributeError as exc: raise CommandExecutionError( f"Unsupported resource type for wait operation: {resource_type}" ) from exc if resource_type == "storageclass": stream = w.stream( func=list_method, field_selector=f"metadata.name={name}", timeout_seconds=timeout, ) else: stream = w.stream( func=list_method, namespace=namespace, field_selector=f"metadata.name={name}", timeout_seconds=timeout, ) for event in stream: if event["object"].metadata.name == name: if expected_status == "created": return True elif expected_status == "ready": if resource_type == "deployment": if ( event["object"].status.available_replicas and event["object"].status.available_replicas == event["object"].spec.replicas ): return True elif resource_type == "pod": # More detailed pod readiness check if event["object"].status.phase == "Running": if not event["object"].status.container_statuses: continue all_containers_ready = True unready_containers = [] for container_status in event["object"].status.container_statuses: if not container_status.ready: all_containers_ready = False unready_containers.append(container_status.name) if all_containers_ready: return True elif resource_type == "service": # Services are considered ready once they exist and have a clusterIP assigned if event["object"].spec.cluster_ip: return True else: return True # For other resources, assume ready when created if time.time() - start_time >= timeout: log.warning( "Timeout reached while waiting for %s/%s to become %s", resource_type, name, expected_status, ) return False log.warning( "Watch stream ended before %s/%s reached %s status", resource_type, name, expected_status, ) return False except (ApiException, HTTPError) as exc: raise CommandExecutionError(exc) from exc finally: w.stop()