"""
Module for handling kubernetes calls.
:optdepends: - kubernetes Python client >= v19.15.0
- PyYAML >= 5.3.1
:configuration: The k8s API settings are provided either in a pillar, in
the minion's config file, or in master's config file. The classic
kubeconfig-based setup looks like::
kubernetes.kubeconfig: '/path/to/kubeconfig'
kubernetes.kubeconfig-data: '<base64 encoded kubeconfig content>'
kubernetes.context: 'context'
For other auth modes — in-cluster ServiceAccount, bearer token, basic
auth, or explicit client certificates with optional proxy support — see
the dedicated :doc:`/topics/auth` guide. All settings can also be
supplied via ``K8S_AUTH_*`` environment variables (compatible with
Ansible's ``kubernetes.core`` collection) or as per-call kwargs that
take precedence over both env and config.
The data format for `kubernetes.kubeconfig-data` value is the content of
`kubeconfig` base64 encoded in one line.
These settings can be overridden by adding `context` and `kubeconfig` or
`kubeconfig_data` parameters when calling a function.
Only `kubeconfig` or `kubeconfig-data` should be provided. In case both are
provided `kubeconfig` entry is preferred.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.nodes
salt '*' kubernetes.nodes kubeconfig=/etc/salt/k8s/kubeconfig context=minikube
.. versionadded:: 2017.7.0
.. versionchanged:: 2019.2.0
.. versionchanged:: 2.1.0
Added in-cluster ServiceAccount, bearer token, basic auth, explicit
client-certificate, proxy, and ``K8S_AUTH_*`` environment-variable
auth modes. The legacy kubeconfig path is unchanged and remains the
default. See :doc:`/topics/auth`.
.. warning::
Configuration options changed in 2019.2.0. The following configuration options have been removed:
- kubernetes.user
- kubernetes.password
- kubernetes.api_url
- kubernetes.certificate-authority-data/file
- kubernetes.client-certificate-data/file
- kubernetes.client-key-data/file
These options were re-introduced under different names in 2.1.0 as
part of the rich-auth work — see the auth guide. The 2019.2.0
removal warning still stands for the *legacy* names; use the new
``kubernetes.host`` / ``kubernetes.api_key`` / ``kubernetes.username``
/ ``kubernetes.client_cert`` / etc. options instead.
"""
import base64
import logging
import sys
import time
import salt.utils.files
import salt.utils.platform
import salt.utils.templates
import salt.utils.yaml
from salt.exceptions import CommandExecutionError
# Re-exports kept on the module surface for backwards compatibility with any
# external code that imported these from ``kubernetesmod`` before the helpers
# were extracted to ``saltext.kubernetes.utils._connection``.
# pylint: disable=unused-import
from saltext.kubernetes.utils._connection import POLLING_TIME_LIMIT # noqa: F401
from saltext.kubernetes.utils._connection import _cleanup # noqa: F401
from saltext.kubernetes.utils._connection import _setup_conn as _setup_conn_impl # noqa: F401
# pylint: enable=unused-import
if not salt.utils.platform.is_windows():
# pylint: disable=unused-import
from saltext.kubernetes.utils._connection import _time_limit # noqa: F401
# pylint: disable=import-error,no-name-in-module
try:
import kubernetes # pylint: disable=import-self
import kubernetes.client
from kubernetes.client import ApiClient
from kubernetes.client import V1Deployment
from kubernetes.client import V1DeploymentSpec
from kubernetes.client.rest import ApiException
from kubernetes.watch import Watch
from urllib3.exceptions import HTTPError
HAS_LIBS = True
except ImportError:
HAS_LIBS = False
# pylint: enable=import-error,no-name-in-module
log = logging.getLogger(__name__)
__virtualname__ = "kubernetes"
[docs]
def __virtual__():
"""
Check dependencies
"""
if HAS_LIBS:
return __virtualname__
return False, "python kubernetes library not found"
def _setup_conn(**kwargs):
"""
Setup kubernetes API connection singleton.
Backwards-compatible shim around
:py:func:`saltext.kubernetes.utils._connection._setup_conn`. The
signature, kwargs handling, and return shape are preserved so that
existing call sites and ``mock.patch("...kubernetesmod._setup_conn")``
paths continue to work.
"""
return _setup_conn_impl(__salt__["config.option"], **kwargs)
[docs]
def ping(**kwargs):
"""
Checks connection with the kubernetes API server.
Returns True if the API is available.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.ping
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.get_api_resources()
return bool(api_response and hasattr(api_response, "resources") and api_response.resources)
except (ApiException, HTTPError):
log.error(
"Exception when calling CoreV1Api->get_api_resources",
exc_info_on_loglevel=logging.DEBUG,
)
return False
finally:
_cleanup(**cfg)
[docs]
def nodes(**kwargs):
"""
Return the names of the nodes composing the kubernetes cluster
CLI Example:
.. code-block:: bash
salt '*' kubernetes.nodes
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.list_node()
return [
k8s_node["metadata"]["name"]
for k8s_node in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return []
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def node(name, **kwargs):
"""
Return the details of the node identified by the specified name
CLI Example:
.. code-block:: bash
salt '*' kubernetes.node name='minikube'
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.list_node()
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
for k8s_node in api_response.items:
if k8s_node.metadata.name == name:
return ApiClient().sanitize_for_serialization(k8s_node)
return None
[docs]
def node_labels(name, **kwargs):
"""
Return the labels of the node identified by the specified name
name
The name of the node
CLI Example:
.. code-block:: bash
salt '*' kubernetes.node_labels name="minikube"
"""
match = node(name, **kwargs)
if match is not None:
return match["metadata"]["labels"]
return {}
[docs]
def node_add_label(node_name, label_name, label_value, **kwargs):
"""
Set the value of the label identified by `label_name` to `label_value` on
the node identified by the name `node_name`.
Creates the label if not present.
node_name
The name of the node
label_name
The name of the label
label_value
The value of the label
CLI Example:
.. code-block:: bash
salt '*' kubernetes.node_add_label node_name="minikube" \
label_name="foo" label_value="bar"
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
# First verify the node exists
try:
api_instance.read_node(node_name)
except ApiException as exc:
if exc.status == 404:
raise CommandExecutionError(f"Node {node_name} not found") from exc
raise
body = {"metadata": {"labels": {label_name: label_value}}}
api_response = api_instance.patch_node(node_name, body)
return api_response
except (ApiException, HTTPError) as exc:
raise CommandExecutionError(str(exc)) from exc
finally:
_cleanup(**cfg)
[docs]
def node_remove_label(node_name, label_name, **kwargs):
"""
Removes the label identified by `label_name` from
the node identified by the name `node_name`.
node_name
The name of the node
label_name
The name of the label
CLI Example:
.. code-block:: bash
salt '*' kubernetes.node_remove_label node_name="minikube" \
label_name="foo"
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
body = {"metadata": {"labels": {label_name: None}}}
api_response = api_instance.patch_node(node_name, body)
return api_response
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"Node {node_name} not found") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def namespaces(**kwargs):
"""
Return the names of the available namespaces
CLI Example:
.. code-block:: bash
salt '*' kubernetes.namespaces
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.list_namespace()
return [
nms["metadata"]["name"]
for nms in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return []
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def deployments(namespace="default", **kwargs):
"""
Return a list of kubernetes deployments defined in the namespace
namespace
The namespace to list deployments from. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.deployments
salt '*' kubernetes.deployments namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.list_namespaced_deployment(namespace)
serialized_response = ApiClient().sanitize_for_serialization(api_response)
items = serialized_response.get("items") or []
return [dep["metadata"]["name"] for dep in items]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return []
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def services(namespace="default", **kwargs):
"""
Return a list of kubernetes services defined in the namespace
namespace
The namespace to list services from. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.services
salt '*' kubernetes.services namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.list_namespaced_service(namespace)
return [
srv["metadata"]["name"]
for srv in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return []
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def pods(namespace="default", **kwargs):
"""
Return a list of kubernetes pods defined in the namespace
namespace
The namespace to list pods from. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.pods
salt '*' kubernetes.pods namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.list_namespaced_pod(namespace)
return [
pod["metadata"]["name"]
for pod in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return [] # Return empty list for nonexistent namespace
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def secrets(namespace="default", **kwargs):
"""
Return a list of kubernetes secrets defined in the namespace
namespace
The namespace to list secrets from. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.secrets
salt '*' kubernetes.secrets namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.list_namespaced_secret(namespace)
return [
secret["metadata"]["name"]
for secret in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return []
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def configmaps(namespace="default", **kwargs):
"""
Return a list of kubernetes configmaps defined in the namespace
namespace
The namespace to list configmaps from. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.configmaps
salt '*' kubernetes.configmaps namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.list_namespaced_config_map(namespace)
return [
configmap["metadata"]["name"]
for configmap in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return [] # Return empty list for nonexistent namespace
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def statefulsets(namespace="default", **kwargs):
"""
.. versionadded:: 2.1.0
Return a list of kubernetes statefulsets defined in the namespace
namespace
The namespace to list statefulsets from. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.statefulsets
salt '*' kubernetes.statefulsets namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.list_namespaced_stateful_set(namespace)
return [
statefulset["metadata"]["name"]
for statefulset in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return [] # Return empty list for nonexistent namespace
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def replicasets(namespace="default", **kwargs):
"""
.. versionadded:: 2.1.0
Return a list of kubernetes replicasets defined in the namespace
namespace
The namespace to list replicasets from. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.replicasets
salt '*' kubernetes.replicasets namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.list_namespaced_replica_set(namespace)
return [
replicaset["metadata"]["name"]
for replicaset in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return []
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def daemonsets(namespace="default", **kwargs):
"""
.. versionadded:: 2.1.0
Return a list of kubernetes daemonsets defined in the namespace
namespace
The namespace to list daemonsets from. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.daemonsets
salt '*' kubernetes.daemonsets namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.list_namespaced_daemon_set(namespace)
return [
daemonset["metadata"]["name"]
for daemonset in ApiClient().sanitize_for_serialization(api_response).get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return []
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def storageclasses(**kwargs):
"""
.. versionadded:: 2.1.0
Return a list of kubernetes storageclasses.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.storageclasses
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.StorageV1Api()
api_response = api_instance.list_storage_class()
return [
storageclass["metadata"]["name"]
for storageclass in ApiClient()
.sanitize_for_serialization(api_response)
.get("items", [])
]
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return []
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_deployment(name, namespace="default", **kwargs):
"""
Return the kubernetes deployment defined by name and namespace
name
The name of the deployment
namespace
The namespace to look for the deployment. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_deployment my-nginx default
salt '*' kubernetes.show_deployment name=my-nginx namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.read_namespaced_deployment(name, namespace)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_service(name, namespace="default", **kwargs):
"""
Return the kubernetes service defined by name and namespace
name
The name of the service
namespace
The namespace to look for the service. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_service my-nginx default
salt '*' kubernetes.show_service name=my-nginx namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.read_namespaced_service(name, namespace)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_pod(name, namespace="default", **kwargs):
"""
Return POD information for a given pod name defined in the namespace
name
The name of the pod
namespace
The namespace to look for the pod. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_pod guestbook-708336848-fqr2x
salt '*' kubernetes.show_pod guestbook-708336848-fqr2x namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.read_namespaced_pod(name, namespace)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_namespace(name, **kwargs):
"""
Return information for a given namespace defined by the specified name
name
The name of the namespace to show
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_namespace kube-system
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.read_namespace(name)
return ApiClient().sanitize_for_serialization(api_response)
except ApiException as exc:
if exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
except HTTPError as exc:
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_secret(name, namespace="default", decode=False, **kwargs):
"""
Return the kubernetes secret defined by name and namespace.
The secrets can be decoded if specified by the user. Warning: this has
security implications.
name
The name of the secret
namespace
The namespace to look for the secret. Defaults to ``default``.
decode
Decode the secret values. Default is False
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_secret confidential default
salt '*' kubernetes.show_secret name=confidential namespace=default
salt '*' kubernetes.show_secret name=confidential decode=True
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.read_namespaced_secret(name, namespace)
response_dict = ApiClient().sanitize_for_serialization(api_response)
if response_dict.get("data") and decode:
decoded_data = {}
for key, value in response_dict["data"].items():
try:
decoded_data[key] = base64.b64decode(value).decode("utf-8")
except UnicodeDecodeError:
decoded_data[key] = base64.b64decode(value)
response_dict["data"] = decoded_data
return response_dict
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_configmap(name, namespace="default", **kwargs):
"""
Return the kubernetes configmap defined by name and namespace.
name
The name of the configmap
namespace
The namespace to look for the configmap. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_configmap game-config default
salt '*' kubernetes.show_configmap name=game-config namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.read_namespaced_config_map(name, namespace)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_statefulset(name, namespace="default", **kwargs):
"""
.. versionadded:: 2.1.0
Return the kubernetes statefulset defined by name and namespace.
name
The name of the statefulset
namespace
The namespace to look for the statefulset. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_statefulset my-statefulset default
salt '*' kubernetes.show_statefulset name=my-statefulset namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.read_namespaced_stateful_set(name, namespace)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_replicaset(name, namespace="default", **kwargs):
"""
.. versionadded:: 2.1.0
Return the kubernetes replicaset defined by name and namespace.
name
The name of the replicaset
namespace
The namespace to look for the replicaset. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_replicaset my-replicaset default
salt '*' kubernetes.show_replicaset name=my-replicaset namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.read_namespaced_replica_set(name, namespace)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_daemonset(name, namespace="default", **kwargs):
"""
.. versionadded:: 2.1.0
Return the kubernetes daemonset defined by name and namespace.
name
The name of the daemonset
namespace
The namespace to look for the daemonset. Defaults to ``default``.
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_daemonset my-daemonset default
salt '*' kubernetes.show_daemonset name=my-daemonset namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.read_namespaced_daemon_set(name, namespace)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def show_storageclass(name, **kwargs):
"""
.. versionadded:: 2.1.0
Return the kubernetes storageclass defined by name.
name
The name of the storageclass
CLI Example:
.. code-block:: bash
salt '*' kubernetes.show_storageclass my-storageclass
salt '*' kubernetes.show_storageclass name=my-storageclass
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.StorageV1Api()
api_response = api_instance.read_storage_class(name)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_deployment(name, namespace="default", wait=False, timeout=60, **kwargs):
"""
Deletes the kubernetes deployment defined by name and namespace
name
The name of the deployment
namespace
The namespace to delete the deployment from. Defaults to ``default``.
wait
.. versionadded:: 2.0.0
Wait for deployment deletion to complete (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_deployment my-nginx default wait=True
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.delete_namespaced_deployment(
name=name, namespace=namespace, body=body
)
if wait:
if not _wait_for_resource_status(
api_instance, "deployment", name, namespace, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for deployment {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_service(name, namespace="default", wait=False, timeout=60, **kwargs):
"""
Deletes the kubernetes service defined by name and namespace
name
The name of the service
namespace
The namespace to delete the service from. Defaults to ``default``.
wait
.. versionadded:: 2.0.0
Wait for service deletion to complete (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_service my-nginx default
salt '*' kubernetes.delete_service name=my-nginx namespace=default
"""
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.delete_namespaced_service(name=name, namespace=namespace)
if wait:
if not _wait_for_resource_status(
api_instance, "service", name, namespace, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for service {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
else:
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_pod(name, namespace="default", wait=False, timeout=60, **kwargs):
"""
Deletes the kubernetes pod defined by name and namespace
name
The name of the pod
namespace
The namespace to delete the pod from. Defaults to ``default``.
wait
.. versionadded:: 2.0.0
Wait for pod deletion to complete (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_pod guestbook-708336848-5nl8c default
salt '*' kubernetes.delete_pod name=guestbook-708336848-5nl8c namespace=default
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.delete_namespaced_pod(name=name, namespace=namespace, body=body)
if wait:
if not _wait_for_resource_status(
api_instance, "pod", name, namespace, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for pod {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
else:
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_namespace(name, wait=False, timeout=60, **kwargs):
"""
Deletes the kubernetes namespace defined by name
name
The name of the namespace
wait
.. versionadded:: 2.0.0
Wait for namespace deletion to complete (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_namespace salt
salt '*' kubernetes.delete_namespace name=salt
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.delete_namespace(name=name, body=body)
if wait:
if not _wait_for_resource_status(
api_instance, "namespace", name, None, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for namespace {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except ApiException as exc:
if exc.status == 404:
return None
if exc.status == 403:
raise CommandExecutionError(f"Cannot delete namespace {name}: {exc.reason}") from exc
raise CommandExecutionError(exc) from exc
except HTTPError as exc:
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_secret(name, namespace="default", wait=False, timeout=60, **kwargs):
"""
Deletes the kubernetes secret defined by name and namespace
name
The name of the secret
namespace
The namespace to delete the secret from. Defaults to ``default``.
wait
.. versionadded:: 2.0.0
Wait for secret deletion to complete (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_secret confidential default
salt '*' kubernetes.delete_secret name=confidential namespace=default
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.delete_namespaced_secret(
name=name, namespace=namespace, body=body
)
if wait:
if not _wait_for_resource_status(
api_instance, "secret", name, namespace, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for secret {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_configmap(name, namespace="default", wait=False, timeout=60, **kwargs):
"""
Deletes the kubernetes configmap defined by name and namespace
name
The name of the configmap
namespace
The namespace to delete the configmap from. Defaults to ``default``.
wait
.. versionadded:: 2.0.0
Wait for configmap deletion to complete (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_configmap settings default
salt '*' kubernetes.delete_configmap name=settings namespace=default
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.delete_namespaced_config_map(
name=name, namespace=namespace, body=body
)
if wait:
if not _wait_for_resource_status(
api_instance, "configmap", name, namespace, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for configmap {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
else:
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_statefulset(name, namespace="default", wait=False, timeout=60, **kwargs):
"""
.. versionadded:: 2.1.0
Deletes the kubernetes statefulset defined by name and namespace
name
The name of the statefulset
namespace
The namespace to delete the statefulset from. Defaults to ``default``.
wait
Wait for statefulset deletion to complete (default: False)
timeout
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_statefulset my-statefulset default
salt '*' kubernetes.delete_statefulset name=my-statefulset namespace=default
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.delete_namespaced_stateful_set(
name=name, namespace=namespace, body=body
)
if wait:
if not _wait_for_resource_status(
api_instance, "statefulset", name, namespace, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for statefulset {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_replicaset(name, namespace="default", wait=False, timeout=60, **kwargs):
"""
.. versionadded:: 2.1.0
Deletes the kubernetes replicaset defined by name and namespace
name
The name of the replicaset
namespace
The namespace to delete the replicaset from. Defaults to ``default``.
wait
Wait for replicaset deletion to complete (default: False)
timeout
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_replicaset my-replicaset default
salt '*' kubernetes.delete_replicaset name=my-replicaset namespace=default
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.delete_namespaced_replica_set(
name=name, namespace=namespace, body=body
)
if wait:
if not _wait_for_resource_status(
api_instance, "replicaset", name, namespace, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for replicaset {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_daemonset(name, namespace="default", wait=False, timeout=60, **kwargs):
"""
.. versionadded:: 2.1.0
Deletes the kubernetes daemonset defined by name and namespace
name
The name of the daemonset
namespace
The namespace to delete the daemonset from. Defaults to ``default``.
wait
Wait for daemonset deletion to complete (default: False)
timeout
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_daemonset my-daemonset default
salt '*' kubernetes.delete_daemonset name=my-daemonset namespace=default
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.delete_namespaced_daemon_set(
name=name, namespace=namespace, body=body
)
if wait:
if not _wait_for_resource_status(
api_instance, "daemonset", name, namespace, "deleted", timeout
):
raise CommandExecutionError(f"Timeout waiting for daemonset {name} to be deleted")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def delete_storageclass(name, wait=False, timeout=60, **kwargs):
"""
.. versionadded:: 2.1.0
Deletes the kubernetes storageclass defined by name
name
The name of the storageclass
wait
Wait for storageclass deletion to complete (default: False)
timeout
Timeout in seconds to wait for deletion (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.delete_storageclass my-storageclass
salt '*' kubernetes.delete_storageclass name=my-storageclass
"""
cfg = _setup_conn(**kwargs)
body = kubernetes.client.V1DeleteOptions(orphan_dependents=True)
try:
api_instance = kubernetes.client.StorageV1Api()
api_response = api_instance.delete_storage_class(name=name, body=body)
if wait:
if not _wait_for_resource_status(
api_instance, "storageclass", name, None, "deleted", timeout
):
raise CommandExecutionError(
f"Timeout waiting for storageclass {name} to be deleted"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
return None
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_deployment(
name,
namespace,
metadata,
spec,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
Creates the kubernetes deployment as defined by the user.
name
The name of the deployment
namespace
The namespace to create the deployment in
metadata
Deployment metadata dict
spec
Deployment spec dict following kubernetes API conventions
source
File path to deployment definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
dry_run
.. versionadded:: 2.0.0
If True, only simulates the creation of the deployment
wait
.. versionadded:: 2.0.0
Wait for deployment to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for deployment (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.create_deployment name=nginx namespace=default spec='{"replicas": 1}' wait=True
"""
body = __create_object_body(
kind="Deployment",
obj_class=V1Deployment,
spec_creator=__dict_to_deployment_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.create_namespaced_deployment(
namespace, body, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "deployment", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for deployment {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 404:
raise CommandExecutionError(f"Deployment {namespace}/{name} not found") from exc
if exc.status == 409:
raise CommandExecutionError(f"Deployment {name} already exists") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_pod(
name,
namespace,
metadata,
spec,
source=None,
template=None,
saltenv=None,
template_context=None,
wait=False,
timeout=60,
**kwargs,
):
"""
Creates a kubernetes pod as defined by the user.
name
The name of the pod
namespace
The namespace to create the pod in
metadata
Pod metadata dict
spec
Pod spec dict following kubernetes API conventions
source
File path to pod definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
wait
.. versionadded:: 2.0.0
Wait for pod to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for pod (default: 60)
Pod spec must follow kubernetes API conventions:
.. code-block:: yaml
- spec:
ports:
- containerPort: 8080
name: http
protocol: TCP
CLI Examples:
.. code-block:: bash
salt '*' kubernetes.create_pod name=nginx namespace=default spec='{"containers": [{"name": "nginx", "image": "nginx"}]}'
"""
body = __create_object_body(
kind="Pod",
obj_class=kubernetes.client.V1Pod,
spec_creator=__dict_to_pod_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.create_namespaced_pod(namespace, body)
if wait:
if not _wait_for_resource_status(
api_instance, "pod", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for pod {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 404:
raise CommandExecutionError(f"Pod {namespace}/{name} not found") from exc
if exc.status == 409:
raise CommandExecutionError(f"Pod {name} already exists") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_service(
name,
namespace,
metadata,
spec,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
Creates the kubernetes service as defined by the user.
name
The name of the service
namespace
The namespace to create the service in
metadata
Service metadata dict
spec
Service spec dict that follows kubernetes API conventions
source
File path to service definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
wait
.. versionadded:: 2.0.0
Wait for service to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for service (default: 60)
Service spec must follow kubernetes API conventions. Port specifications can be:
Simple integer for basic port definition: ``[80, 443]``
Dictionary for advanced configuration:
.. code-block:: yaml
- spec:
ports:
- port: 80
targetPort: 8080
name: http # Required if multiple ports are specified
- port: 443
targetPort: web-https # targetPort can reference container port names
name: https
nodePort: 30443 # nodePort must be between 30000-32767
CLI Examples:
.. code-block:: bash
salt '*' kubernetes.create_service name=nginx namespace=default spec='{"ports": [80]}'
salt '*' kubernetes.create_service name=nginx namespace=default spec='{
"ports": [{"port": 80, "targetPort": 8000, "name": "http"}],
"selector": {"app": "nginx"},
"type": "LoadBalancer"
}'
"""
body = __create_object_body(
kind="Service",
obj_class=kubernetes.client.V1Service,
spec_creator=__dict_to_service_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.create_namespaced_service(
namespace, body, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "service", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for service {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 404:
raise CommandExecutionError(f"Service {namespace}/{name} not found") from exc
if exc.status == 409:
raise CommandExecutionError(f"Service {name} already exists") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_secret(
name,
namespace="default",
data=None,
source=None,
template=None,
saltenv=None,
template_context=None,
secret_type=None,
metadata=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
Creates the kubernetes secret as defined by the user.
Values that are already base64 encoded will not be re-encoded.
.. note::
Automatic encoding of secret values might cause issues if the values are not correctly identified as base64.
If you run into issues - encode the values before passing them to this function.
name
The name of the secret
namespace
The namespace to create the secret in. Defaults to ``default``.
data
A dictionary of key-value pairs to store in the secret
source
File path to secret definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
secret_type
.. versionadded:: 2.0.0
The type of the secret
metadata
.. versionadded:: 2.0.0
Secret metadata dict
wait
.. versionadded:: 2.0.0
Wait for secret to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for secret (default: 60)
CLI Example:
.. code-block:: bash
# For regular secrets with plain text values
salt 'minion1' kubernetes.create_secret \
passwords default '{"db": "letmein"}'
# For secrets with pre-encoded values
salt 'minion2' kubernetes.create_secret \
name=passwords namespace=default data='{"db": "bGV0bWVpbg=="}'
# For docker registry secrets
salt 'minion3' kubernetes.create_secret \
name=docker-registry \
type=kubernetes.io/dockerconfigjson \
data='{".dockerconfigjson": "{\"auths\":{...}}"}'
# For TLS secrets
salt 'minion4' kubernetes.create_secret \
name=tls-secret \
type=kubernetes.io/tls \
data='{"tls.crt": "...", "tls.key": "..."}'
"""
cfg = _setup_conn(**kwargs)
if source:
src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(src_obj, dict):
raise CommandExecutionError("`source` did not render to a dictionary")
if "data" in src_obj:
data = src_obj["data"]
secret_type = src_obj.get("secret_type")
elif data is None:
data = {}
data = __enforce_only_strings_dict(data)
# Encode the secrets using base64 if not already encoded
encoded_data = {}
for key, value in data.items():
if __is_base64(value):
encoded_data[key] = value
else:
encoded_data[key] = base64.b64encode(str(value).encode("utf-8")).decode("utf-8")
body = kubernetes.client.V1Secret(
metadata=__dict_to_object_meta(name, namespace, metadata),
data=encoded_data,
type=secret_type,
)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.create_namespaced_secret(
namespace, body, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "secret", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for secret {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 409:
raise CommandExecutionError(
f"Secret {name} already exists in namespace {namespace}. Use replace_secret to update it."
) from exc
if exc.status == 404:
raise CommandExecutionError(f"Secret {namespace}/{name} not found") from exc
raise CommandExecutionError(str(exc)) from exc
finally:
_cleanup(**cfg)
[docs]
def create_configmap(
name,
namespace,
data,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
Creates the kubernetes configmap as defined by the user.
name
The name of the configmap
namespace
The namespace to create the configmap in
data
A dictionary of key-value pairs to store in the configmap
source
File path to configmap definition
.. versionchanged:: 2.0.0
The configmap definition must be a proper spec with the configmap data in
the ``data`` key. In previous versions, the rendered output was used as the
data directly.
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
wait
.. versionadded:: 2.0.0
Wait for configmap to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for configmap (default: 60)
CLI Example:
.. code-block:: bash
salt 'minion1' kubernetes.create_configmap \
settings default '{"example.conf": "# example file"}'
salt 'minion2' kubernetes.create_configmap \
name=settings namespace=default data='{"example.conf": "# example file"}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
try:
data = rendered["data"]
except KeyError as err:
raise CommandExecutionError(
f"The template for configmap '{name}' (at '{source}') did not render to a spec: Missing `data` key."
) from err
except TypeError as err:
raise CommandExecutionError(
f"The template for configmap '{name}' (at '{source}') did not render to a spec: Expected mapping, got '{type(rendered).__name__}'."
) from err
elif data is None:
data = {}
if not isinstance(data, dict):
raise CommandExecutionError("Data must be a dictionary")
data = __enforce_only_strings_dict(data)
body = kubernetes.client.V1ConfigMap(
metadata=__dict_to_object_meta(name, namespace, {}), data=data
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.create_namespaced_config_map(
namespace, body, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "configmap", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for configmap {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 404:
raise CommandExecutionError(f"ConfigMap {namespace}/{name} not found") from exc
if exc.status == 409:
raise CommandExecutionError(f"ConfigMap {name} already exists") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_namespace(name, **kwargs):
"""
Creates a namespace with the specified name.
name
The name of the namespace to create
CLI Example:
.. code-block:: bash
salt '*' kubernetes.create_namespace salt
salt '*' kubernetes.create_namespace name=salt
"""
meta_obj = kubernetes.client.V1ObjectMeta(name=name)
body = kubernetes.client.V1Namespace(metadata=meta_obj)
body.metadata.name = name
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.create_namespace(body)
return ApiClient().sanitize_for_serialization(api_response)
except ApiException as exc:
if exc.status == 409:
raise CommandExecutionError(f"Namespace {name} already exists: {exc.reason}") from exc
if exc.status == 422:
raise CommandExecutionError(f"Invalid namespace name {name}: {exc.reason}") from exc
raise CommandExecutionError(exc) from exc
except HTTPError as exc:
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_statefulset(
name,
namespace="default",
metadata=None,
spec=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Creates a statefulset with the specified name, namespace, metadata, and spec.
name
The name of the statefulset
namespace
The namespace to create the statefulset in. Defaults to ``default``.
metadata
StatefulSet metadata dict
spec
StatefulSet spec dict following kubernetes API conventions
source
File path to statefulset definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the creation of the statefulset
wait
Wait for statefulset to become ready (default: False)
timeout
Timeout in seconds to wait for statefulset (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.create_statefulset name=my-statefulset namespace=default spec='{"replicas": 3}' wait=True
"""
body = __create_object_body(
kind="StatefulSet",
obj_class=kubernetes.client.V1StatefulSet,
spec_creator=__dict_to_statefulset_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.create_namespaced_stateful_set(
namespace, body, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "statefulset", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for statefulset {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 404:
raise CommandExecutionError(f"StatefulSet {namespace}/{name} not found") from exc
if exc.status == 409:
raise CommandExecutionError(f"StatefulSet {name} already exists") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_replicaset(
name,
namespace="default",
metadata=None,
spec=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Creates a replicaset with the specified name, namespace, metadata, and spec.
name
The name of the replicaset
namespace
The namespace to create the replicaset in. Defaults to ``default``.
metadata
ReplicaSet metadata dict
spec
ReplicaSet spec dict following kubernetes API conventions
source
File path to replicaset definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the creation of the replicaset
wait
Wait for replicaset to become ready (default: False)
timeout
Timeout in seconds to wait for replicaset (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.create_replicaset name=my-rs namespace=default spec='{"replicas": 3}' wait=True
"""
body = __create_object_body(
kind="ReplicaSet",
obj_class=kubernetes.client.V1ReplicaSet,
spec_creator=__dict_to_replicaset_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.create_namespaced_replica_set(
namespace, body, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "replicaset", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for replicaset {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 404:
raise CommandExecutionError(f"ReplicaSet {namespace}/{name} not found") from exc
if exc.status == 409:
raise CommandExecutionError(f"ReplicaSet {name} already exists") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_daemonset(
name,
namespace="default",
metadata=None,
spec=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Creates a daemonset with the specified name, namespace, metadata, and spec.
name
The name of the daemonset
namespace
The namespace to create the daemonset in. Defaults to ``default``.
metadata
DaemonSet metadata dict
spec
DaemonSet spec dict following kubernetes API conventions
source
File path to daemonset definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the creation of the daemonset
wait
Wait for daemonset to become ready (default: False)
timeout
Timeout in seconds to wait for daemonset (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.create_daemonset name=my-ds namespace=default wait=True
"""
body = __create_object_body(
kind="DaemonSet",
obj_class=kubernetes.client.V1DaemonSet,
spec_creator=__dict_to_daemonset_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.create_namespaced_daemon_set(
namespace, body, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "daemonset", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for daemonset {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 404:
raise CommandExecutionError(f"DaemonSet {namespace}/{name} not found") from exc
if exc.status == 409:
raise CommandExecutionError(f"DaemonSet {name} already exists") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def create_storageclass(
name,
metadata=None,
spec=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Creates a storageclass with the specified name, metadata, and spec.
name
The name of the storageclass
metadata
StorageClass metadata dict
spec
StorageClass spec dict following kubernetes API conventions
source
File path to storageclass definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the creation of the storageclass
wait
Wait for storageclass to become ready (default: False)
timeout
Timeout in seconds to wait for storageclass (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.create_storageclass name=fast-sc spec='{"provisioner": "kubernetes.io/no-provisioner"}'
"""
if source:
src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(src_obj, dict) or src_obj.get("kind") != "StorageClass":
raise CommandExecutionError("The source file should define only a StorageClass object")
if "metadata" in src_obj:
metadata = src_obj["metadata"]
if "spec" in src_obj:
spec = src_obj["spec"]
elif spec is None:
spec = {
key: value
for key, value in src_obj.items()
if key not in ("apiVersion", "kind", "metadata")
}
if metadata is None:
metadata = {}
if spec is None:
spec = {}
created_spec = __dict_to_storageclass_spec(spec)
body = kubernetes.client.V1StorageClass(
metadata=__dict_to_object_meta(name, None, metadata),
**created_spec,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.StorageV1Api()
api_response = api_instance.create_storage_class(body, dry_run="All" if dry_run else None)
if wait:
if not _wait_for_resource_status(
api_instance, "storageclass", name, None, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for storageclass {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException):
if exc.status == 404:
raise CommandExecutionError(f"StorageClass {name} not found") from exc
if exc.status == 409:
raise CommandExecutionError(f"StorageClass {name} already exists") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def replace_deployment(
name,
metadata,
spec,
source=None,
template=None,
saltenv=None,
namespace="default",
template_context=None,
wait=False,
timeout=60,
**kwargs,
):
"""
Replaces an existing deployment with a new one defined by name and
namespace, having the specificed metadata and spec.
name
The name of the deployment
metadata
Deployment metadata dict
spec
Deployment spec dict following kubernetes API conventions
source
File path to deployment definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
namespace
The namespace to replace the deployment in. Defaults to ``default``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
wait
.. versionadded:: 2.0.0
Wait for deployment to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for deployment (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.replace_deployment *args
"""
body = __create_object_body(
kind="Deployment",
obj_class=V1Deployment,
spec_creator=__dict_to_deployment_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.replace_namespaced_deployment(name, namespace, body)
if wait:
if not _wait_for_resource_status(
api_instance, "deployment", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for deployment {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"Deployment {namespace}/{name} not found") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def replace_service(
name,
old_service,
metadata,
spec,
source=None,
template=None,
saltenv=None,
namespace="default",
template_context=None,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionchanged:: 2.0.0
The `old_service` parameter was moved to the second position,
which pushes `metadata`, `spec`, `source` and `template` one position
further down the parameter list.
Replaces an existing service with a new one defined by name and namespace,
having the specified metadata and spec.
name
The name of the service
old_service
The existing service to replace
metadata
Service metadata dict
spec
Service spec dict following kubernetes API conventions
source
File path to service definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
namespace
The namespace to replace the service in. Defaults to ``default``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
wait
.. versionadded:: 2.0.0
Wait for service to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for service (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.replace_service name=my-service \
old_service='{"metadata": {"resourceVersion": "12345"}, "spec": {"clusterIP": "10.0.0.1"}}' \
metadata='{"labels": {"app": "my-app"}}' \
spec='{"ports": [{"port": 80, "targetPort": 8080}], "selector": {"app": "my-app"}}' \
source=/path/to/service.yaml \
template=jinja \
saltenv=base \
namespace=default \
template_context='{"var1": "value1"}'
"""
body = __create_object_body(
kind="Service",
obj_class=kubernetes.client.V1Service,
spec_creator=__dict_to_service_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
# Some attributes have to be preserved
# otherwise exceptions will be thrown
body.spec.cluster_ip = old_service["spec"]["clusterIP"]
body.metadata.resource_version = old_service["metadata"]["resourceVersion"]
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.replace_namespaced_service(name, namespace, body)
if wait:
if not _wait_for_resource_status(
api_instance, "service", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for service {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"Service {namespace}/{name} not found") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def replace_secret(
name,
data,
source=None,
template=None,
saltenv=None,
namespace="default",
template_context=None,
secret_type=None,
metadata=None,
wait=False,
timeout=60,
**kwargs,
):
"""
Replaces an existing secret with a new one defined by name and namespace.
Values that are already base64 encoded will not be re-encoded.
If a source file is specified, the secret type will be read from the template.
.. note::
Automatic encoding of secret values might cause issues if the values are not correctly identified as base64.
If you run into issues - encode the values before passing them to this function.
name
The name of the secret
data
A dictionary of key-value pairs to store in the secret
source
File path to secret definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
namespace
The namespace to replace the secret in. Defaults to ``default``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
secret_type
.. versionadded:: 2.0.0
The type of the secret
metadata
.. versionadded:: 2.0.0
Secret metadata dict
wait
.. versionadded:: 2.0.0
Wait for secret to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for secret (default: 60)
CLI Example:
.. code-block:: bash
# For regular secrets with plain text values
salt 'minion1' kubernetes.replace_secret \
name=passwords data='{"db": "letmein"}'
# For secrets with pre-encoded values
salt 'minion2' kubernetes.replace_secret \
name=passwords data='{"db": "bGV0bWVpbg=="}'
# For docker registry secrets
salt 'minion3' kubernetes.replace_secret \
name=docker-registry \
source=/path/to/docker-secret.yaml \
secret_type=kubernetes.io/dockerconfigjson
# For TLS secrets
salt 'minion4' kubernetes.replace_secret \
name=tls-secret \
source=/path/to/tls-secret.yaml \
secret_type=kubernetes.io/tls
"""
if source:
src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(src_obj, dict):
raise CommandExecutionError("`source` did not render to a dictionary")
if "data" in src_obj:
data = src_obj["data"]
secret_type = src_obj.get("secret_type")
elif data is None:
data = {}
data = __enforce_only_strings_dict(data)
# Encode the secrets using base64 if not already encoded
encoded_data = {}
for key, value in data.items():
if __is_base64(value):
encoded_data[key] = value
else:
encoded_data[key] = base64.b64encode(str(value).encode("utf-8")).decode("utf-8")
# Get existing secret type if not specified
if not type:
existing_secret = kubernetes.client.CoreV1Api().read_namespaced_secret(name, namespace)
secret_type = existing_secret.type
body = kubernetes.client.V1Secret(
metadata=__dict_to_object_meta(name, namespace, metadata),
data=encoded_data,
type=secret_type,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.replace_namespaced_secret(name, namespace, body)
if wait:
if not _wait_for_resource_status(
api_instance, "secret", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for secret {name} to be ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"Secret {namespace}/{name} not found") from exc
raise CommandExecutionError(str(exc)) from exc
finally:
_cleanup(**cfg)
[docs]
def replace_configmap(
name,
data,
source=None,
template=None,
saltenv=None,
namespace="default",
template_context=None,
wait=False,
timeout=60,
**kwargs,
):
"""
Replaces an existing configmap with a new one defined by name and
namespace with the specified data.
name
The name of the configmap
data
A dictionary of key-value pairs to store in the configmap
source
File path to configmap definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
.. versionchanged:: 2.0.0
Defaults to the value of the :conf_minion:`saltenv` minion option or ``base``.
namespace
The namespace to replace the configmap in. Defaults to ``default``.
template_context
.. versionadded:: 2.0.0
Variables to make available in templated files
wait
.. versionadded:: 2.0.0
Wait for configmap to become ready (default: False)
timeout
.. versionadded:: 2.0.0
Timeout in seconds to wait for configmap (default: 60)
CLI Example:
.. code-block:: bash
salt 'minion1' kubernetes.replace_configmap \
settings default '{"example.conf": "# example file"}'
salt 'minion2' kubernetes.replace_configmap \
name=settings namespace=default data='{"example.conf": "# example file"}'
"""
if source:
data = __read_and_render_yaml_file(source, template, saltenv, template_context)
data = __enforce_only_strings_dict(data)
body = kubernetes.client.V1ConfigMap(
metadata=__dict_to_object_meta(name, namespace, {}), data=data
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.replace_namespaced_config_map(name, namespace, body)
if wait:
if not _wait_for_resource_status(
api_instance, "configmap", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for configmap {name} to be ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"ConfigMap {namespace}/{name} not found") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def replace_statefulset(
name,
namespace,
spec,
metadata=None,
source=None,
template=None,
saltenv=None,
template_context=None,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Replaces an existing statefulset with a new one defined by name and
namespace with the specified spec.
name
The name of the statefulset
namespace
The namespace of the statefulset
spec
A dictionary representing the spec of the statefulset
metadata
A dictionary representing the metadata of the statefulset
source
File path to statefulset definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
wait
Wait for statefulset to become ready (default: False)
timeout
Timeout in seconds to wait for statefulset (default: 60)
CLI Example:
.. code-block:: bash
salt 'minion1' kubernetes.replace_statefulset \
name=my-statefulset namespace=default spec='{"replicas": 3}'
"""
body = __create_object_body(
kind="StatefulSet",
obj_class=kubernetes.client.V1StatefulSet,
spec_creator=__dict_to_statefulset_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.replace_namespaced_stateful_set(name, namespace, body)
if wait:
if not _wait_for_resource_status(
api_instance, "statefulset", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for statefulset {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"StatefulSet {namespace}/{name} not found") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def replace_replicaset(
name,
namespace,
spec,
metadata=None,
source=None,
template=None,
saltenv=None,
template_context=None,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Replaces an existing replicaset with a new one defined by name and
namespace with the specified spec.
name
The name of the replicaset
namespace
The namespace of the replicaset
spec
A dictionary representing the spec of the replicaset
metadata
A dictionary representing the metadata of the replicaset
source
File path to replicaset definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
wait
Wait for replicaset to become ready (default: False)
timeout
Timeout in seconds to wait for replicaset (default: 60)
CLI Example:
.. code-block:: bash
salt 'minion1' kubernetes.replace_replicaset \
name=my-replicaset namespace=default spec='{"replicas": 3}'
"""
body = __create_object_body(
kind="ReplicaSet",
obj_class=kubernetes.client.V1ReplicaSet,
spec_creator=__dict_to_replicaset_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.replace_namespaced_replica_set(name, namespace, body)
if wait:
if not _wait_for_resource_status(
api_instance, "replicaset", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for replicaset {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"ReplicaSet {namespace}/{name} not found") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def replace_daemonset(
name,
namespace,
spec,
metadata=None,
source=None,
template=None,
saltenv=None,
template_context=None,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Replaces an existing daemonset with a new one defined by name and
namespace with the specified spec.
name
The name of the daemonset
namespace
The namespace of the daemonset
spec
A dictionary representing the spec of the daemonset
metadata
A dictionary representing the metadata of the daemonset
source
File path to daemonset definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
wait
Wait for daemonset to become ready (default: False)
timeout
Timeout in seconds to wait for daemonset (default: 60)
CLI Example:
.. code-block:: bash
salt 'minion1' kubernetes.replace_daemonset \
name=my-daemonset namespace=default spec='{"replicas": 3}'
"""
body = __create_object_body(
kind="DaemonSet",
obj_class=kubernetes.client.V1DaemonSet,
spec_creator=__dict_to_daemonset_spec,
name=name,
namespace=namespace,
metadata=metadata,
spec=spec,
source=source,
template=template,
saltenv=saltenv,
template_context=template_context,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.replace_namespaced_daemon_set(name, namespace, body)
if wait:
if not _wait_for_resource_status(
api_instance, "daemonset", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for daemonset {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"DaemonSet {namespace}/{name} not found") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def replace_storageclass(
name,
spec,
metadata=None,
source=None,
template=None,
saltenv=None,
template_context=None,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Replaces an existing storageclass with a new one defined by name.
name
The name of the storageclass
spec
A dictionary representing the spec of the storageclass
metadata
A dictionary representing the metadata of the storageclass
source
File path to storageclass definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
wait
Wait for storageclass to become ready (default: False)
timeout
Timeout in seconds to wait for storageclass (default: 60)
CLI Example:
.. code-block:: bash
salt 'minion1' kubernetes.replace_storageclass \
name=my-storageclass spec='{"provisioner": "kubernetes.io/no-provisioner"}'
"""
if source:
src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(src_obj, dict) or src_obj.get("kind") != "StorageClass":
raise CommandExecutionError("The source file should define only a StorageClass object")
if "metadata" in src_obj:
metadata = src_obj["metadata"]
if "spec" in src_obj:
spec = src_obj["spec"]
elif spec is None:
spec = {
key: value
for key, value in src_obj.items()
if key not in ("apiVersion", "kind", "metadata")
}
if metadata is None:
metadata = {}
created_spec = __dict_to_storageclass_spec(spec)
body = kubernetes.client.V1StorageClass(
metadata=__dict_to_object_meta(name, None, metadata),
**created_spec,
)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.StorageV1Api()
current_storageclass = api_instance.read_storage_class(name)
body.metadata.resource_version = current_storageclass.metadata.resource_version
api_response = api_instance.replace_storage_class(name, body)
if wait:
if not _wait_for_resource_status(
api_instance, "storageclass", name, None, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for storageclass {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"StorageClass {name} not found") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def patch_service(
name,
namespace,
patch=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.0.0
Patches an existing service with the provided patch dictionary.
name
The name of the service
namespace
The namespace of the service
patch
A dictionary representing the patch to apply to the service
source
File path to patch definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the patch without applying it (default: False)
wait
Wait for service to become ready (default: False)
timeout
Timeout in seconds to wait for service (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.patch_service \\
name=my-service \\
namespace=default \\
patch='{"spec": {"type": "LoadBalancer"}}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(rendered, dict):
raise CommandExecutionError("The source file did not render to a dictionary")
patch = rendered
if not isinstance(patch, dict):
raise CommandExecutionError("Patch must be a dictionary")
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.patch_namespaced_service(
name, namespace, patch, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "service", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for service {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"Service {namespace}/{name} not found") from exc
if isinstance(exc, ApiException) and exc.status == 409:
raise CommandExecutionError(f"Conflict when patching service {name}") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def patch_secret(
name,
namespace,
patch=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.0.0
Patches an existing secret with the provided patch dictionary.
name
The name of the secret
namespace
The namespace of the secret
patch
A dictionary representing the patch to apply to the secret
source
File path to patch definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the patch without applying it (default: False)
wait
Wait for secret to become ready (default: False)
timeout
Timeout in seconds to wait for secret (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.patch_secret \\
name=my-secret \\
namespace=default \\
patch='{"data": {"password": "bmV3cGFzcw=="}}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(rendered, dict):
raise CommandExecutionError("The source file did not render to a dictionary")
patch = rendered
if not isinstance(patch, dict):
raise CommandExecutionError("Patch must be a dictionary")
# Encode secret data values to base64 if not already encoded
if "data" in patch and isinstance(patch["data"], dict):
encoded_data = {}
for key, value in patch["data"].items():
value = str(value)
if __is_base64(value):
encoded_data[key] = value
else:
encoded_data[key] = base64.b64encode(value.encode("utf-8")).decode("utf-8")
patch = {**patch, "data": encoded_data}
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.patch_namespaced_secret(
name, namespace, patch, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "secret", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for secret {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"Secret {namespace}/{name} not found") from exc
if isinstance(exc, ApiException) and exc.status == 409:
raise CommandExecutionError(f"Conflict when patching secret {name}") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def patch_configmap(
name,
namespace,
patch=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.0.0
Patches an existing configmap with the provided patch dictionary.
name
The name of the configmap
namespace
The namespace of the configmap
patch
A dictionary representing the patch to apply to the configmap
source
File path to patch definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the patch without applying it (default: False)
wait
Wait for configmap to become ready (default: False)
timeout
Timeout in seconds to wait for configmap (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.patch_configmap \\
name=my-config \\
namespace=default \\
patch='{"data": {"key": "new-value"}}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(rendered, dict):
raise CommandExecutionError("The source file did not render to a dictionary")
patch = rendered
if not isinstance(patch, dict):
raise CommandExecutionError("Patch must be a dictionary")
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.CoreV1Api()
api_response = api_instance.patch_namespaced_config_map(
name, namespace, patch, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "configmap", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for configmap {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"ConfigMap {namespace}/{name} not found") from exc
if isinstance(exc, ApiException) and exc.status == 409:
raise CommandExecutionError(f"Conflict when patching configmap {name}") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def patch_deployment(
name,
namespace,
patch=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.0.0
Patches an existing deployment with the provided patch dictionary.
name
The name of the deployment
namespace
The namespace of the deployment
patch
A dictionary representing the patch to apply to the deployment
source
File path to patch definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the patch without applying it (default: False)
wait
Wait for deployment to become ready (default: False)
timeout
Timeout in seconds to wait for deployment (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.patch_deployment \
name=my-deployment \
namespace=default \
patch='{"spec": {"replicas": 5}}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(rendered, dict):
raise CommandExecutionError("The source file did not render to a dictionary")
patch = rendered
if not isinstance(patch, dict):
raise CommandExecutionError("Patch must be a dictionary")
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.patch_namespaced_deployment(
name, namespace, patch, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "deployment", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for deployment {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"Deployment {namespace}/{name} not found") from exc
if isinstance(exc, ApiException) and exc.status == 409:
raise CommandExecutionError(f"Conflict when patching deployment {name}") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def patch_statefulset(
name,
namespace,
patch=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Patches an existing statefulset with the provided patch dictionary.
name
The name of the statefulset
namespace
The namespace of the statefulset
patch
A dictionary representing the patch to apply to the statefulset
source
File path to patch definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the patch without applying it (default: False)
wait
Wait for statefulset to become ready (default: False)
timeout
Timeout in seconds to wait for statefulset (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.patch_statefulset \
name=my-statefulset \
namespace=default \
patch='{"spec": {"replicas": 5}}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(rendered, dict):
raise CommandExecutionError("The source file did not render to a dictionary")
patch = rendered
if not isinstance(patch, dict):
raise CommandExecutionError("Patch must be a dictionary")
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.patch_namespaced_stateful_set(
name, namespace, patch, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "statefulset", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for statefulset {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"StatefulSet {namespace}/{name} not found") from exc
if isinstance(exc, ApiException) and exc.status == 409:
raise CommandExecutionError(f"Conflict when patching statefulset {name}") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def patch_replicaset(
name,
namespace,
patch=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Patches an existing replicaset with the provided patch dictionary.
name
The name of the replicaset
namespace
The namespace of the replicaset
patch
A dictionary representing the patch to apply to the replicaset
source
File path to patch definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the patch without applying it (default: False)
wait
Wait for replicaset to become ready (default: False)
timeout
Timeout in seconds to wait for replicaset (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.patch_replicaset \
name=my-replicaset \
namespace=default \
patch='{"spec": {"replicas": 5}}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(rendered, dict):
raise CommandExecutionError("The source file did not render to a dictionary")
patch = rendered
if not isinstance(patch, dict):
raise CommandExecutionError("Patch must be a dictionary")
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.patch_namespaced_replica_set(
name, namespace, patch, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "replicaset", name, namespace, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for replicaset {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"ReplicaSet {namespace}/{name} not found") from exc
if isinstance(exc, ApiException) and exc.status == 409:
raise CommandExecutionError(f"Conflict when patching replicaset {name}") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def patch_daemonset(
name,
namespace,
patch=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Patches an existing daemonset with the provided patch dictionary.
name
The name of the daemonset
namespace
The namespace of the daemonset
patch
A dictionary representing the patch to apply to the daemonset
source
File path to patch definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the patch without applying it (default: False)
wait
Wait for daemonset to become ready (default: False)
timeout
Timeout in seconds to wait for daemonset (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.patch_daemonset \
name=my-daemonset \
namespace=default \
patch='{"spec": {"replicas": 5}}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(rendered, dict):
raise CommandExecutionError("The source file did not render to a dictionary")
patch = rendered
if not isinstance(patch, dict):
raise CommandExecutionError("Patch must be a dictionary")
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.AppsV1Api()
api_response = api_instance.patch_namespaced_daemon_set(
name, namespace, patch, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "daemonset", name, namespace, "ready", timeout
):
raise CommandExecutionError(f"Timeout waiting for daemonset {name} to become ready")
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"DaemonSet {namespace}/{name} not found") from exc
if isinstance(exc, ApiException) and exc.status == 409:
raise CommandExecutionError(f"Conflict when patching daemonset {name}") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
[docs]
def patch_storageclass(
name,
patch=None,
source=None,
template=None,
saltenv=None,
template_context=None,
dry_run=False,
wait=False,
timeout=60,
**kwargs,
):
"""
.. versionadded:: 2.1.0
Patches an existing storageclass with the provided patch dictionary.
name
The name of the storageclass
patch
A dictionary representing the patch to apply to the storageclass
source
File path to patch definition
template
Template engine to use to render the source file
saltenv
Salt environment to pull the source file from
template_context
Variables to make available in templated files
dry_run
If True, only simulates the patch without applying it (default: False)
wait
Wait for storageclass to become ready (default: False)
timeout
Timeout in seconds to wait for storageclass (default: 60)
CLI Example:
.. code-block:: bash
salt '*' kubernetes.patch_storageclass \
name=my-storageclass \
patch='{"reclaimPolicy": "Retain"}'
"""
if source:
rendered = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(rendered, dict):
raise CommandExecutionError("The source file did not render to a dictionary")
if rendered.get("kind") == "StorageClass":
metadata = rendered.get("metadata")
spec = rendered.get("spec")
if spec is None:
spec = {
key: value
for key, value in rendered.items()
if key not in ("apiVersion", "kind", "metadata")
}
patch = {}
if metadata:
metadata_patch = {
key: value
for key, value in metadata.items()
if key
not in (
"name",
"namespace",
"resourceVersion",
"uid",
"creationTimestamp",
"managedFields",
"generation",
"selfLink",
)
}
if metadata_patch:
patch["metadata"] = metadata_patch
if spec:
patch.update(spec)
else:
patch = rendered
if not isinstance(patch, dict):
raise CommandExecutionError("Patch must be a dictionary")
# Allow state-style payloads that wrap StorageClass fields under `spec`.
if "spec" in patch:
spec_patch = patch.get("spec")
if not isinstance(spec_patch, dict):
raise CommandExecutionError("StorageClass spec patch must be a dictionary")
patch = {key: value for key, value in patch.items() if key != "spec"}
patch.update(spec_patch)
cfg = _setup_conn(**kwargs)
try:
api_instance = kubernetes.client.StorageV1Api()
api_response = api_instance.patch_storage_class(
name, patch, dry_run="All" if dry_run else None
)
if wait:
if not _wait_for_resource_status(
api_instance, "storageclass", name, None, "ready", timeout
):
raise CommandExecutionError(
f"Timeout waiting for storageclass {name} to become ready"
)
return ApiClient().sanitize_for_serialization(api_response)
except (ApiException, HTTPError) as exc:
if isinstance(exc, ApiException) and exc.status == 404:
raise CommandExecutionError(f"StorageClass {name} not found") from exc
if isinstance(exc, ApiException) and exc.status == 409:
raise CommandExecutionError(f"Conflict when patching storageclass {name}") from exc
raise CommandExecutionError(exc) from exc
finally:
_cleanup(**cfg)
def __is_base64(value):
"""
Check if a string is base64 encoded by attempting to decode it.
Handles whitespace and validates against base64.
"""
if not isinstance(value, str):
return False
# Remove whitespace and newlines
value = "".join(value.split())
try:
# Try decoding with validation
base64.b64decode(value, validate=True).decode("utf-8")
return True
except ValueError:
return False
def __create_object_body(
kind,
obj_class,
spec_creator,
name,
namespace,
metadata,
spec,
source,
template,
saltenv,
template_context=None,
):
"""
Create a Kubernetes Object body instance.
"""
if source:
src_obj = __read_and_render_yaml_file(source, template, saltenv, template_context)
if not isinstance(src_obj, dict) or "kind" not in src_obj or src_obj["kind"] != kind:
raise CommandExecutionError(f"The source file should define only a {kind} object")
if "metadata" in src_obj:
metadata = src_obj["metadata"]
if "spec" in src_obj:
spec = src_obj["spec"]
if metadata is None:
metadata = {}
if spec is None:
spec = {}
try:
created_spec = spec_creator(spec)
except (ValueError, TypeError) as exc:
raise CommandExecutionError(f"Invalid {kind} spec: {exc}") from exc
return obj_class(
metadata=__dict_to_object_meta(name, namespace, metadata),
spec=created_spec,
)
def __read_and_render_yaml_file(source, template, saltenv, template_context=None):
"""
Read a yaml file and, if needed, renders that using the specified
templating. Returns the python objects defined inside of the file.
"""
saltenv = saltenv or __opts__["saltenv"] or "base"
sfn = __salt__["cp.cache_file"](source, saltenv)
if not sfn:
raise CommandExecutionError(f"Source file '{source}' not found")
with salt.utils.files.fopen(sfn, "r") as src:
contents = src.read()
if template:
if template not in salt.utils.templates.TEMPLATE_REGISTRY:
raise CommandExecutionError(f"Unknown template specified: {template}")
# Apply templating with template_context
if template_context is None:
template_context = {}
data = salt.utils.templates.TEMPLATE_REGISTRY[template](
contents,
from_str=True,
to_str=True,
saltenv=saltenv,
grains=__grains__,
pillar=__pillar__,
salt=__salt__,
opts=__opts__,
context=template_context,
)
if not data["result"]:
# Failed to render the template
raise CommandExecutionError(
f'Failed to render file path with error: {data["data"]}'
)
contents = data["data"].encode("utf-8")
return salt.utils.yaml.safe_load(contents)
def __dict_to_object_meta(name, namespace, metadata):
"""
Converts a dictionary into kubernetes ObjectMetaV1 instance.
"""
meta_obj = kubernetes.client.V1ObjectMeta()
meta_obj.namespace = namespace
if metadata is None:
metadata = {}
# Handle nested dictionaries in metadata
processed_metadata = {}
for key, value in metadata.items():
if isinstance(value, dict):
# Keep nested structure for fields like annotations and labels
processed_metadata[key] = value
else:
# Convert non-dict values to string
processed_metadata[key] = str(value)
# Replicate `kubectl [create|replace|apply] --record`
if "annotations" not in processed_metadata:
processed_metadata["annotations"] = {}
if "kubernetes.io/change-cause" not in processed_metadata["annotations"]:
processed_metadata["annotations"]["kubernetes.io/change-cause"] = " ".join(sys.argv)
for key, value in processed_metadata.items():
if hasattr(meta_obj, key):
setattr(meta_obj, key, value)
if meta_obj.name != name:
log.info(
"The object already has a name attribute, overwriting it with "
"the one defined inside of salt"
)
meta_obj.name = name
return meta_obj
def __dict_to_deployment_spec(spec):
"""
Converts a dictionary into kubernetes V1DeploymentSpec instance.
"""
if not isinstance(spec, dict):
raise CommandExecutionError(
f"Deployment spec must be a dictionary, not {type(spec).__name__}"
)
processed_spec = spec.copy()
# Validate required template field
if "template" not in processed_spec:
raise CommandExecutionError("Deployment spec must include template with pod specification")
template = processed_spec["template"]
template_metadata = template.get("metadata", {})
template_labels = template_metadata.get("labels", {})
# Handle selector
if "selector" not in processed_spec:
if not template_labels:
raise CommandExecutionError(
"Template must include labels when selector is not specified"
)
processed_spec["selector"] = {"match_labels": template_labels}
else:
selector = processed_spec["selector"]
if not selector or not selector.get("matchLabels"):
raise CommandExecutionError("Deployment selector must include matchLabels")
if not all(template_labels.get(k) == v for k, v in selector["matchLabels"].items()):
raise CommandExecutionError("selector.matchLabels must match template metadata.labels")
# Convert selector format
if "matchLabels" in processed_spec["selector"]:
processed_spec["selector"] = {"match_labels": processed_spec["selector"]["matchLabels"]}
# Create pod spec
try:
pod_spec = __dict_to_pod_spec(template["spec"])
except (CommandExecutionError, ValueError) as exc:
raise CommandExecutionError(f"Invalid pod spec in deployment template: {exc}") from exc
# Create pod template
pod_template = kubernetes.client.V1PodTemplateSpec(
metadata=kubernetes.client.V1ObjectMeta(**template_metadata), spec=pod_spec
)
processed_spec["template"] = pod_template
# Create selector object
processed_spec["selector"] = kubernetes.client.V1LabelSelector(**processed_spec["selector"])
# Handle replicas conversion
if "replicas" in processed_spec:
try:
processed_spec["replicas"] = int(processed_spec["replicas"])
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"replicas must be an integer: {exc}") from exc
# Create final spec
try:
return V1DeploymentSpec(**processed_spec)
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"Invalid deployment spec: {exc}") from exc
def __dict_to_pod_spec(spec):
"""
Converts a dictionary into kubernetes V1PodSpec instance.
"""
if spec is None:
raise CommandExecutionError("Pod spec cannot be None")
# Directly return if already a V1PodSpec
if isinstance(spec, kubernetes.client.V1PodSpec):
return spec
if not isinstance(spec, dict):
raise CommandExecutionError(f"Pod spec must be a dictionary, not {type(spec).__name__}")
processed_spec = spec.copy()
# Validate containers
if not processed_spec.get("containers"):
raise CommandExecutionError("Pod spec must include at least one container")
if not isinstance(processed_spec["containers"], list):
raise CommandExecutionError(
f"containers must be a list, not {type(processed_spec['containers']).__name__}"
)
# Convert container specs
containers = []
for i, container in enumerate(processed_spec["containers"]):
if not isinstance(container, dict):
raise CommandExecutionError(
f"Container {i} must be a dictionary, not {type(container).__name__}"
)
container_copy = container.copy()
if not container_copy.get("name"):
raise CommandExecutionError(f"Container {i} must specify 'name'")
if not container_copy.get("image"):
raise CommandExecutionError(f"Container {i} must specify 'image'")
# Handle ports
if "ports" in container_copy:
ports = container_copy["ports"]
if not isinstance(ports, list):
raise CommandExecutionError(
f"Container {container_copy['name']} ports must be a list"
)
processed_ports = []
for port in ports:
if not isinstance(port, dict):
raise CommandExecutionError(
f"Port in container {container_copy['name']} must be a dictionary"
)
port_copy = port.copy()
# Handle containerPort conversion
if "containerPort" in port_copy:
try:
port_copy["container_port"] = int(port_copy.pop("containerPort"))
except (TypeError, ValueError) as exc:
raise CommandExecutionError(
f"containerPort in container {container_copy['name']} must be an integer: {exc}"
) from exc
processed_ports.append(kubernetes.client.V1ContainerPort(**port_copy))
containers.append(kubernetes.client.V1Container(**container_copy))
processed_spec["containers"] = containers
# Handle imagePullSecrets field
if "imagePullSecrets" in processed_spec:
image_pull_secrets = processed_spec.pop("imagePullSecrets")
if not isinstance(image_pull_secrets, list):
raise CommandExecutionError("imagePullSecrets must be a list")
processed_secrets = []
for secret in image_pull_secrets:
if not isinstance(secret, dict):
raise CommandExecutionError(
f"Each imagePullSecret must be a dictionary, not {type(secret).__name__}"
)
processed_secrets.append(kubernetes.client.V1LocalObjectReference(**secret))
try:
return kubernetes.client.V1PodSpec(**processed_spec)
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"Invalid pod spec: {exc}") from exc
def __dict_to_service_spec(spec):
"""
Converts a dictionary into kubernetes V1ServiceSpec instance.
Args:
spec: Service specification dictionary following kubernetes API conventions
Returns:
kubernetes.client.V1ServiceSpec: The converted service spec
"""
if not isinstance(spec, dict):
raise CommandExecutionError(f"Service spec must be a dictionary, got {type(spec)}")
# Validate required fields
if "ports" not in spec:
raise CommandExecutionError("Service spec must include 'ports'")
if not isinstance(spec["ports"], list):
raise CommandExecutionError("Service ports must be a list")
# Validate service type if specified
valid_service_types = {"ClusterIP", "ExternalName", "LoadBalancer", "NodePort"}
if "type" in spec and spec["type"] not in valid_service_types:
raise CommandExecutionError(
f"Invalid service type: {spec['type']}. Must be one of: {', '.join(sorted(valid_service_types))}"
)
spec_obj = kubernetes.client.V1ServiceSpec()
for key, value in spec.items():
if key == "ports":
spec_obj.ports = []
# Validate port specifications
has_multiple_ports = len(value) > 1
for i, port in enumerate(value):
if not isinstance(port, dict):
try:
# Allow simple integer port definitions
kube_port = kubernetes.client.V1ServicePort(port=int(port))
except (TypeError, ValueError) as exc:
raise CommandExecutionError(
f"Invalid port specification at index {i}: {exc}"
) from exc
else:
# Verify required fields for port
if "port" not in port:
raise CommandExecutionError(
f"Service port at index {i} must specify 'port' value"
)
try:
port_num = int(port["port"])
except (TypeError, ValueError) as exc:
raise CommandExecutionError(
f"Invalid port number at index {i}: {exc}"
) from exc
# Create port object
kube_port = kubernetes.client.V1ServicePort(port=port_num)
# Validate name requirement for multi-port services
if has_multiple_ports and "name" not in port:
raise CommandExecutionError(
f"Port at index {i} must specify 'name' in multi-port service"
)
# Validate nodePort range if specified
if "nodePort" in port:
try:
node_port = int(port["nodePort"])
if not 30000 <= node_port <= 32767:
raise CommandExecutionError(
f"NodePort {node_port} at index {i} must be between 30000-32767"
)
except (TypeError, ValueError) as exc:
raise CommandExecutionError(
f"Invalid nodePort value at index {i}: {exc}"
) from exc
# Copy remaining port attributes
for port_key, port_value in port.items():
if port_key != "port":
if port_key in ["nodePort", "targetPort"]:
try:
if isinstance(port_value, str) and not port_value.isdigit():
# Allow string targetPort for named ports
if port_key != "targetPort":
port_value = int(port_value)
elif isinstance(port_value, (int, str)):
port_value = int(port_value)
except (TypeError, ValueError) as exc:
raise CommandExecutionError(
f"Invalid {port_key} value at index {i}: {exc}"
) from exc
if hasattr(kube_port, port_key):
setattr(kube_port, port_key, port_value)
spec_obj.ports.append(kube_port)
elif hasattr(spec_obj, key):
setattr(spec_obj, key, value)
return spec_obj
def __dict_to_statefulset_spec(spec):
"""
.. versionadded:: 2.1.0
Converts a dictionary into kubernetes V1StatefulSetSpec instance.
"""
if not isinstance(spec, dict):
raise CommandExecutionError(
f"StatefulSet spec must be a dictionary, not {type(spec).__name__}"
)
processed_spec = spec.copy()
# Validate required fields (accept both camelCase and snake_case input)
if "serviceName" not in processed_spec and "service_name" not in processed_spec:
raise CommandExecutionError("StatefulSet spec must include 'serviceName'")
# Validate required template field
if "template" not in processed_spec:
raise CommandExecutionError("StatefulSet spec must include template with pod specification")
template = processed_spec["template"]
if not isinstance(template, dict):
raise CommandExecutionError(f"Template must be a dictionary, not {type(template).__name__}")
template_metadata = template.get("metadata", {})
template_spec = template.get("spec", {})
# Validate template has pod spec
if not template_spec:
raise CommandExecutionError("Template must include pod specification")
# Create pod spec
try:
pod_spec = __dict_to_pod_spec(template_spec)
except (CommandExecutionError, ValueError) as exc:
raise CommandExecutionError(f"Invalid pod spec in statefulset template: {exc}") from exc
# Create pod template
pod_template = kubernetes.client.V1PodTemplateSpec(
metadata=kubernetes.client.V1ObjectMeta(**template_metadata), spec=pod_spec
)
processed_spec["template"] = pod_template
# Handle selector - optional for StatefulSet but validate if provided
if "selector" in processed_spec:
selector = processed_spec["selector"]
if not isinstance(selector, dict):
raise CommandExecutionError(
f"Selector must be a dictionary, not {type(selector).__name__}"
)
if "matchLabels" in selector:
processed_spec["selector"] = {"match_labels": selector["matchLabels"]}
processed_spec["selector"] = kubernetes.client.V1LabelSelector(**processed_spec["selector"])
# Handle replicas conversion
if "replicas" in processed_spec:
try:
processed_spec["replicas"] = int(processed_spec["replicas"])
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"replicas must be an integer: {exc}") from exc
# Convert serviceName (camelCase from YAML/user input) to service_name (Python client)
if "serviceName" in processed_spec:
processed_spec["service_name"] = processed_spec.pop("serviceName")
# Create final spec
try:
return kubernetes.client.V1StatefulSetSpec(**processed_spec)
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"Invalid statefulset spec: {exc}") from exc
def __dict_to_replicaset_spec(spec):
"""
.. versionadded:: 2.1.0
Converts a dictionary into kubernetes V1ReplicaSetSpec instance.
"""
if not isinstance(spec, dict):
raise CommandExecutionError(
f"ReplicaSet spec must be a dictionary, not {type(spec).__name__}"
)
processed_spec = spec.copy()
if "template" not in processed_spec:
raise CommandExecutionError("ReplicaSet spec must include template with pod specification")
template = processed_spec["template"]
template_metadata = template.get("metadata", {})
template_labels = template_metadata.get("labels", {})
if "selector" not in processed_spec:
if not template_labels:
raise CommandExecutionError(
"Template must include labels when selector is not specified"
)
processed_spec["selector"] = {"match_labels": template_labels}
else:
selector = processed_spec["selector"]
if not selector or not selector.get("matchLabels"):
raise CommandExecutionError("ReplicaSet selector must include matchLabels")
if not all(template_labels.get(k) == v for k, v in selector["matchLabels"].items()):
raise CommandExecutionError("selector.matchLabels must match template metadata.labels")
if "matchLabels" in processed_spec["selector"]:
processed_spec["selector"] = {"match_labels": processed_spec["selector"]["matchLabels"]}
try:
pod_spec = __dict_to_pod_spec(template["spec"])
except (CommandExecutionError, ValueError) as exc:
raise CommandExecutionError(f"Invalid pod spec in replicaset template: {exc}") from exc
pod_template = kubernetes.client.V1PodTemplateSpec(
metadata=kubernetes.client.V1ObjectMeta(**template_metadata), spec=pod_spec
)
processed_spec["template"] = pod_template
processed_spec["selector"] = kubernetes.client.V1LabelSelector(**processed_spec["selector"])
if "replicas" in processed_spec:
try:
processed_spec["replicas"] = int(processed_spec["replicas"])
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"replicas must be an integer: {exc}") from exc
try:
return kubernetes.client.V1ReplicaSetSpec(**processed_spec)
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"Invalid replicaset spec: {exc}") from exc
def __dict_to_daemonset_spec(spec):
"""
.. versionadded:: 2.1.0
Converts a dictionary into kubernetes V1DaemonSetSpec instance.
"""
if not isinstance(spec, dict):
raise CommandExecutionError(
f"DaemonSet spec must be a dictionary, not {type(spec).__name__}"
)
processed_spec = spec.copy()
if "template" not in processed_spec:
raise CommandExecutionError("DaemonSet spec must include template with pod specification")
template = processed_spec["template"]
template_metadata = template.get("metadata", {})
template_labels = template_metadata.get("labels", {})
if "selector" not in processed_spec:
if not template_labels:
raise CommandExecutionError(
"Template must include labels when selector is not specified"
)
processed_spec["selector"] = {"match_labels": template_labels}
else:
selector = processed_spec["selector"]
if not selector or not selector.get("matchLabels"):
raise CommandExecutionError("DaemonSet selector must include matchLabels")
if not all(template_labels.get(k) == v for k, v in selector["matchLabels"].items()):
raise CommandExecutionError("selector.matchLabels must match template metadata.labels")
if "matchLabels" in processed_spec["selector"]:
processed_spec["selector"] = {"match_labels": processed_spec["selector"]["matchLabels"]}
try:
pod_spec = __dict_to_pod_spec(template["spec"])
except (CommandExecutionError, ValueError) as exc:
raise CommandExecutionError(f"Invalid pod spec in daemonset template: {exc}") from exc
pod_template = kubernetes.client.V1PodTemplateSpec(
metadata=kubernetes.client.V1ObjectMeta(**template_metadata), spec=pod_spec
)
processed_spec["template"] = pod_template
processed_spec["selector"] = kubernetes.client.V1LabelSelector(**processed_spec["selector"])
try:
return kubernetes.client.V1DaemonSetSpec(**processed_spec)
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"Invalid daemonset spec: {exc}") from exc
def __dict_to_storageclass_spec(spec):
"""
.. versionadded:: 2.1.0
Validates and normalizes a dictionary into a V1StorageClass-compatible payload.
"""
if not isinstance(spec, dict):
raise CommandExecutionError(
f"StorageClass spec must be a dictionary, not {type(spec).__name__}"
)
processed_spec = spec.copy()
if not processed_spec.get("provisioner"):
raise CommandExecutionError("StorageClass spec must include provisioner")
if "reclaimPolicy" in processed_spec:
processed_spec["reclaim_policy"] = processed_spec.pop("reclaimPolicy")
if "allowVolumeExpansion" in processed_spec:
processed_spec["allow_volume_expansion"] = processed_spec.pop("allowVolumeExpansion")
if "volumeBindingMode" in processed_spec:
processed_spec["volume_binding_mode"] = processed_spec.pop("volumeBindingMode")
if "mountOptions" in processed_spec:
mount_options = processed_spec.pop("mountOptions")
if not isinstance(mount_options, list):
raise CommandExecutionError("StorageClass mountOptions must be a list")
processed_spec["mount_options"] = mount_options
if "parameters" in processed_spec:
parameters = processed_spec["parameters"]
if not isinstance(parameters, dict):
raise CommandExecutionError("StorageClass parameters must be a dictionary")
processed_spec["parameters"] = __enforce_only_strings_dict(parameters)
if "allowedTopologies" in processed_spec:
allowed_topologies = processed_spec.pop("allowedTopologies")
if not isinstance(allowed_topologies, list):
raise CommandExecutionError("StorageClass allowedTopologies must be a list")
processed_spec["allowed_topologies"] = [
kubernetes.client.V1TopologySelectorTerm(**term) for term in allowed_topologies
]
try:
kubernetes.client.V1StorageClass(**processed_spec)
except (TypeError, ValueError) as exc:
raise CommandExecutionError(f"Invalid storageclass spec: {exc}") from exc
return processed_spec
def __enforce_only_strings_dict(dictionary):
"""
Returns a dictionary that has string keys and values.
Only converts non-string values to strings.
"""
ret = {}
for key, value in dictionary.items():
ret[str(key)] = str(value)
return ret
def _wait_for_resource_status(
api_instance, resource_type, name, namespace, expected_status, timeout=60
):
"""
.. versionadded:: 2.0.0
Helper function to wait for a resource to reach an expected status.
api_instance
The kubernetes API instance to use
resource_type
Type of resource to wait for (e.g., 'deployment', 'pod', 'service')
name
Name of the resource
namespace
Namespace of the resource
expected_status
.. versionchanged:: 2.1.0
Expected status to wait for ('created', 'deleted', 'ready')
timeout
Timeout in seconds (default: 60)
Returns True if the resource reached the expected status, False otherwise.
"""
try:
w = Watch()
start_time = time.time()
if expected_status == "deleted":
method = None
if resource_type != "namespace":
# Compound resource names map differently in the k8s client.
method_name = {
"statefulset": "read_namespaced_stateful_set",
"replicaset": "read_namespaced_replica_set",
"daemonset": "read_namespaced_daemon_set",
"configmap": "read_namespaced_config_map",
"storageclass": "read_storage_class",
}.get(resource_type, f"read_namespaced_{resource_type}")
try:
method = getattr(api_instance, method_name)
except AttributeError as exc:
raise CommandExecutionError(
f"Unsupported resource type for wait operation: {resource_type}"
) from exc
# For deletion, periodically check if the resource still exists until timeout
while time.time() - start_time < timeout:
try:
if resource_type == "namespace":
api_instance.read_namespace(name)
elif resource_type == "storageclass":
method(name)
else:
method(name, namespace)
except ApiException as e:
if e.status == 404:
# Resource is gone, deletion successful
return True
# Resource still exists, wait before retrying
time.sleep(1)
# Timed out waiting for deletion
return False
# Compound resource names map differently in the k8s client.
method_name = {
"statefulset": "list_namespaced_stateful_set",
"replicaset": "list_namespaced_replica_set",
"daemonset": "list_namespaced_daemon_set",
"configmap": "list_namespaced_config_map",
"storageclass": "list_storage_class",
}.get(resource_type, f"list_namespaced_{resource_type}")
try:
list_method = getattr(api_instance, method_name)
except AttributeError as exc:
raise CommandExecutionError(
f"Unsupported resource type for wait operation: {resource_type}"
) from exc
if resource_type == "storageclass":
stream = w.stream(
func=list_method,
field_selector=f"metadata.name={name}",
timeout_seconds=timeout,
)
else:
stream = w.stream(
func=list_method,
namespace=namespace,
field_selector=f"metadata.name={name}",
timeout_seconds=timeout,
)
for event in stream:
if event["object"].metadata.name == name:
if expected_status == "created":
return True
elif expected_status == "ready":
if resource_type == "deployment":
if (
event["object"].status.available_replicas
and event["object"].status.available_replicas
== event["object"].spec.replicas
):
return True
elif resource_type == "pod":
# More detailed pod readiness check
if event["object"].status.phase == "Running":
if not event["object"].status.container_statuses:
continue
all_containers_ready = True
unready_containers = []
for container_status in event["object"].status.container_statuses:
if not container_status.ready:
all_containers_ready = False
unready_containers.append(container_status.name)
if all_containers_ready:
return True
elif resource_type == "service":
# Services are considered ready once they exist and have a clusterIP assigned
if event["object"].spec.cluster_ip:
return True
else:
return True # For other resources, assume ready when created
if time.time() - start_time >= timeout:
log.warning(
"Timeout reached while waiting for %s/%s to become %s",
resource_type,
name,
expected_status,
)
return False
log.warning(
"Watch stream ended before %s/%s reached %s status",
resource_type,
name,
expected_status,
)
return False
except (ApiException, HTTPError) as exc:
raise CommandExecutionError(exc) from exc
finally:
w.stop()