"""
Salt module to manage Kubernetes cluster
.. versionadded:: 2016.3.0
Roadmap:
* Add creation of K8S objects (pod, rc, service, ...)
* Add replace of K8S objects (pod, rc, service, ...)
* Add deletion of K8S objects (pod, rc, service, ...)
* Add rolling update
* Add (auto)scalling
"""
import base64
import logging
import os
import re
import urllib.parse
import salt.utils.files
import salt.utils.json
from salt.utils import http
__virtualname__ = "k8s"
# Setup the logger
log = logging.getLogger(__name__)
[docs]
def __virtual__():
"""Load load if python-requests is installed."""
return __virtualname__
def _guess_apiserver(apiserver_url=None):
"""Try to guees the kubemaster url from environ,
then from `/etc/kubernetes/config` file
"""
default_config = "/etc/kubernetes/config"
if apiserver_url is not None:
return apiserver_url
if "KUBERNETES_MASTER" in os.environ:
apiserver_url = os.environ.get("KUBERNETES_MASTER")
elif __salt__["config.get"]("k8s:master"):
apiserver_url = __salt__["config.get"]("k8s:master")
elif os.path.exists(default_config) or __salt__["config.get"]("k8s:config", ""):
config = __salt__["config.get"]("k8s:config", default_config)
kubeapi_regex = re.compile("""KUBE_MASTER=['"]--master=(.*)['"]""", re.MULTILINE)
with salt.utils.files.fopen(config) as fh_k8s:
for line in fh_k8s.readlines():
match_line = kubeapi_regex.match(line)
if match_line:
apiserver_url = match_line.group(1)
else:
# we failed to discover, lets use k8s default address
apiserver_url = "http://127.0.0.1:8080"
log.debug("Discoverd k8s API server address: %s", apiserver_url)
return apiserver_url
def _kpost(url, data):
"""create any object in kubernetes based on URL"""
# Prepare headers
headers = {"Content-Type": "application/json"}
# Make request
log.trace("url is: %s, data is: %s", url, data)
ret = http.query(url, method="POST", header_dict=headers, data=salt.utils.json.dumps(data))
# Check requests status
if ret.get("error"):
return ret
else:
return salt.utils.json.loads(ret.get("body"))
def _kput(url, data):
"""put any object in kubernetes based on URL"""
# Prepare headers
headers = {"Content-Type": "application/json"}
# Make request
ret = http.query(url, method="PUT", header_dict=headers, data=salt.utils.json.dumps(data))
# Check requests status
if ret.get("error"):
return ret
else:
return salt.utils.json.loads(ret.get("body"))
def _kpatch(url, data):
"""patch any object in kubernetes based on URL"""
# Prepare headers
headers = {"Content-Type": "application/json-patch+json"}
# Make request
ret = http.query(url, method="PATCH", header_dict=headers, data=salt.utils.json.dumps(data))
# Check requests status
if ret.get("error"):
log.error("Got an error: %s", ret.get("error"))
return ret
else:
return salt.utils.json.loads(ret.get("body"))
def _kname(obj):
"""Get name or names out of json result from API server"""
if isinstance(obj, dict):
return [obj.get("metadata", {}).get("name", "")]
elif isinstance(obj, (list, tuple)):
names = []
for i in obj:
names.append(i.get("metadata", {}).get("name", ""))
return names
else:
return "Unknown type"
def _is_dns_subdomain(name):
"""Check that name is DNS subdomain: One or more lowercase rfc1035/rfc1123
labels separated by '.' with a maximum length of 253 characters"""
dns_subdomain = re.compile(r"""^[a-z0-9\.-]{1,253}$""")
if dns_subdomain.match(name):
log.debug("Name: %s is valid DNS subdomain", name)
return True
else:
log.debug("Name: %s is not valid DNS subdomain", name)
return False
def _is_port_name(name):
"""Check that name is IANA service: An alphanumeric (a-z, and 0-9) string,
with a maximum length of 15 characters, with the '-' character allowed
anywhere except the first or the last character or adjacent to another '-'
character, it must contain at least a (a-z) character"""
port_name = re.compile("""^[a-z0-9]{1,15}$""")
return bool(port_name.match(name))
def _is_dns_label(name):
"""Check that name is DNS label: An alphanumeric (a-z, and 0-9) string,
with a maximum length of 63 characters, with the '-' character allowed
anywhere except the first or last character, suitable for use as a hostname
or segment in a domain name"""
dns_label = re.compile(r"""^[a-z0-9][a-z0-9\.-]{1,62}$""")
return bool(dns_label.match(name))
def _guess_node_id(node):
"""Try to guess kube node ID using salt minion ID"""
if node is None:
return __salt__["grains.get"]("id")
return node
def _get_labels(node, apiserver_url):
"""Get all labels from a kube node."""
# Prepare URL
url = f"{apiserver_url}/api/v1/nodes/{node}"
# Make request
ret = http.query(url)
# Check requests status
if "body" in ret:
ret = salt.utils.json.loads(ret.get("body"))
elif ret.get("status", 0) == 404:
return f"Node {node} doesn't exist"
else:
return ret
# Get and return labels
return ret.get("metadata", {}).get("labels", {})
def _set_labels(node, apiserver_url, labels):
"""Replace labels dict by a new one"""
# Prepare URL
url = f"{apiserver_url}/api/v1/nodes/{node}"
# Prepare data
data = [{"op": "replace", "path": "/metadata/labels", "value": labels}]
# Make request
ret = _kpatch(url, data)
if ret.get("status") == 404:
return f"Node {node} doesn't exist"
return ret
[docs]
def get_labels(node=None, apiserver_url=None):
"""
.. versionadded:: 2016.3.0
Get labels from the current node
CLI Example:
.. code-block:: bash
salt '*' k8s.get_labels
salt '*' k8s.get_labels kube-node.cluster.local http://kube-master.cluster.local
"""
# Get salt minion ID
node = _guess_node_id(node)
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# Get data
ret = _get_labels(node, apiserver_url)
return {"labels": ret}
[docs]
def label_present(name, value, node=None, apiserver_url=None):
"""
.. versionadded:: 2016.3.0
Set label to the current node
CLI Example:
.. code-block:: bash
salt '*' k8s.label_present hw/disktype ssd
salt '*' k8s.label_present hw/disktype ssd kube-node.cluster.local http://kube-master.cluster.local
"""
ret = {"name": name, "result": True, "comment": "", "changes": {}}
# Get salt minion ID
node = _guess_node_id(node)
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# Get all labels
labels = _get_labels(node, apiserver_url)
if name not in labels:
# This is a new label
ret["changes"] = {name: value}
labels[name] = str(value)
res = _set_labels(node, apiserver_url, labels)
if res.get("status") == 409:
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret["changes"] = {}
ret["comment"] = f"Could not create label {name}, please retry"
else:
ret["comment"] = f"Label {name} created"
elif labels.get(name) != str(value):
# This is a old label and we are going to edit it
ret["changes"] = {name: str(value)}
labels[name] = str(value)
res = _set_labels(node, apiserver_url, labels)
if res.get("status") == 409:
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret["changes"] = {}
ret["comment"] = f"Could not update label {name}, please retry"
else:
ret["comment"] = f"Label {name} updated"
else:
# This is a old label and it has already the wanted value
ret["comment"] = f"Label {name} already set"
return ret
[docs]
def label_absent(name, node=None, apiserver_url=None):
"""
.. versionadded:: 2016.3.0
Delete label to the current node
CLI Example:
.. code-block:: bash
salt '*' k8s.label_absent hw/disktype
salt '*' k8s.label_absent hw/disktype kube-node.cluster.local http://kube-master.cluster.local
"""
ret = {"name": name, "result": True, "comment": "", "changes": {}}
# Get salt minion ID
node = _guess_node_id(node)
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# Get all labels
old_labels = _get_labels(node, apiserver_url)
# Prepare a temp labels dict
labels = {key: value for key, value in old_labels.items() if key != name}
# Compare old labels and what we want
if labels == old_labels:
# Label already absent
ret["comment"] = f"Label {name} already absent"
else:
# Label needs to be delete
res = _set_labels(node, apiserver_url, labels)
if res.get("status") == 409:
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret["changes"] = {}
ret["comment"] = f"Could not delete label {name}, please retry"
else:
ret["changes"] = {"deleted": name}
ret["comment"] = f"Label {name} absent"
return ret
[docs]
def label_folder_absent(name, node=None, apiserver_url=None):
"""
.. versionadded:: 2016.3.0
Delete label folder to the current node
CLI Example:
.. code-block:: bash
salt '*' k8s.label_folder_absent hw
salt '*' k8s.label_folder_absent hw/ kube-node.cluster.local http://kube-master.cluster.local
"""
folder = name.strip("/") + "/"
ret = {"name": folder, "result": True, "comment": "", "changes": {}}
# Get salt minion ID
node = _guess_node_id(node)
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# Get all labels
old_labels = _get_labels(node, apiserver_url)
# Prepare a temp labels dict
labels = {key: value for key, value in old_labels.items() if not key.startswith(folder)}
# Prepare a temp labels dict
if labels == old_labels:
# Label already absent
ret["comment"] = f"Label folder {folder} already absent"
else:
# Label needs to be delete
res = _set_labels(node, apiserver_url, labels)
if res.get("status") == 409:
log.debug("Got 409, will try later")
ret["changes"] = {}
ret["comment"] = f"Could not delete label folder {folder}, please retry"
else:
ret["changes"] = {"deleted": folder}
ret["comment"] = f"Label folder {folder} absent"
return ret
# Namespaces
def _get_namespaces(apiserver_url, name=""):
"""Get namespace is namespace is defined otherwise return all namespaces"""
# Prepare URL
url = f"{apiserver_url}/api/v1/namespaces/{name}"
# Make request
ret = http.query(url)
if ret.get("body"):
return salt.utils.json.loads(ret.get("body"))
else:
return None
def _create_namespace(namespace, apiserver_url):
"""create namespace on the defined k8s cluster"""
# Prepare URL
url = f"{apiserver_url}/api/v1/namespaces"
# Prepare data
data = {"kind": "Namespace", "apiVersion": "v1", "metadata": {"name": namespace}}
log.trace("namespace creation requests: %s", data)
# Make request
ret = _kpost(url, data)
log.trace("result is: %s", ret)
# Check requests status
return ret
[docs]
def create_namespace(name, apiserver_url=None):
"""
.. versionadded:: 2016.3.0
Create kubernetes namespace from the name, similar to the functionality added to kubectl since v.1.2.0:
.. code-block:: bash
kubectl create namespaces namespace-name
CLI Example:
.. code-block:: bash
salt '*' k8s.create_namespace namespace_name
salt '*' k8s.create_namespace namespace_name http://kube-master.cluster.local
"""
ret = {"name": name, "result": True, "comment": "", "changes": {}}
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
if not _get_namespaces(apiserver_url, name):
# This is a new namespace
_create_namespace(name, apiserver_url)
ret["changes"] = name
ret["comment"] = f"Namespace {name} created"
else:
ret["comment"] = f"Namespace {name} already present"
return ret
[docs]
def get_namespaces(namespace="", apiserver_url=None):
"""
.. versionadded:: 2016.3.0
Get one or all kubernetes namespaces.
If namespace parameter is omitted, all namespaces will be returned back to user, similar to following kubectl example:
.. code-block:: bash
kubectl get namespaces -o json
In case namespace is set by user, the output will be similar to the one from kubectl:
.. code-block:: bash
kubectl get namespaces namespace_name -o json
CLI Example:
.. code-block:: bash
salt '*' k8s.get_namespaces
salt '*' k8s.get_namespaces namespace_name http://kube-master.cluster.local
"""
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# Get data
ret = _get_namespaces(apiserver_url, namespace)
return ret
# Secrets
def _get_secrets(namespace, name, apiserver_url):
"""Get secrets of the namespace."""
# Prepare URL
url = f"{apiserver_url}/api/v1/namespaces/{namespace}/secrets/{name}"
# Make request
ret = http.query(url)
if ret.get("body"):
return salt.utils.json.loads(ret.get("body"))
else:
return None
def _update_secret(namespace, name, data, apiserver_url):
"""Replace secrets data by a new one"""
# Prepare URL
url = f"{apiserver_url}/api/v1/namespaces/{namespace}/secrets/{name}"
# Prepare data
data = [{"op": "replace", "path": "/data", "value": data}]
# Make request
ret = _kpatch(url, data)
if ret.get("status") == 404:
return f"Node {url} doesn't exist"
return ret
def _create_secret(namespace, name, data, apiserver_url):
"""create namespace on the defined k8s cluster"""
# Prepare URL
url = f"{apiserver_url}/api/v1/namespaces/{namespace}/secrets"
# Prepare data
request = {
"apiVersion": "v1",
"kind": "Secret",
"metadata": {"name": name, "namespace": namespace},
"data": data,
}
# Make request
ret = _kpost(url, request)
return ret
def _is_valid_secret_file(filename):
if os.path.exists(filename) and os.path.isfile(filename):
log.debug("File: %s is valid secret file", filename)
return True
log.warning("File: %s does not exists or not file", filename)
return False
def _file_encode(filename):
log.trace("Encoding secret file: %s", filename)
with salt.utils.files.fopen(filename, "rb") as f:
data = f.read()
return base64.b64encode(data)
def _decode_secrets(secrets):
items = secrets.get("items", [])
if items:
for i, secret in enumerate(items):
log.trace(i, secret)
for k, v in secret.get("data", {}).items():
items[i]["data"][k] = base64.b64decode(v)
secrets["items"] = items
return secrets
else:
for k, v in secrets.get("data", {}).items():
secrets["data"][k] = base64.b64decode(v)
return secrets
# pylint: disable=unused-argument
[docs]
def get_secrets(namespace, name="", apiserver_url=None, decode=False, brief=False):
"""
Get k8s namespaces
CLI Example:
.. code-block:: bash
salt '*' k8s.get_secrets namespace_name
salt '*' k8s.get_secrets namespace_name secret_name http://kube-master.cluster.local
"""
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# Get data
if not decode:
ret = _get_secrets(namespace, name, apiserver_url)
else:
ret = _decode_secrets(_get_secrets(namespace, name, apiserver_url))
return ret
def _source_encode(source, saltenv):
try:
source_url = urllib.parse.urlparse(source)
except TypeError:
return "", "Invalid format for source parameter"
protos = ("salt", "http", "https", "ftp", "swift", "s3", "file")
log.trace("parsed source looks like: %s", source_url)
if not source_url.scheme or source_url.scheme == "file":
# just a regular file
filename = os.path.abspath(source_url.path)
sname = os.path.basename(filename)
log.debug("Source is a regular local file: %s", source_url.path)
if _is_dns_subdomain(sname) and _is_valid_secret_file(filename):
return sname, _file_encode(filename)
else:
if source_url.scheme in protos:
# The source is a file on a server
filename = __salt__["cp.cache_file"](source, saltenv)
if not filename:
log.warning("Source file: %s can not be retrieved", source)
return "", ""
return os.path.basename(filename), _file_encode(filename)
return "", ""
[docs]
def update_secret(namespace, name, sources, apiserver_url=None, force=True, saltenv="base"):
"""
.. versionadded:: 2016.3.0
alias to k8s.create_secret with update=true
CLI Example:
.. code-block:: bash
salt '*' k8s.update_secret namespace_name secret_name sources [apiserver_url] [force=true] [update=false] [saltenv='base']
sources are either dictionary of {name: path, name1: path} pairs or array of strings defining paths.
Example of paths array:
.. code-block:: bash
['/full/path/filename', "file:///full/path/filename", "salt://secret/storage/file.txt", "http://user:password@securesite.com/secret-file.json"]
Example of dictionaries:
.. code-block:: bash
{"nameit": '/full/path/fiename', name2: "salt://secret/storage/file.txt"}
optional parameters accepted:
force=[true] default value is true
if the to False, secret will not be created in case one of the files is not
valid kubernetes secret. e.g. capital letters in secret name or _
in case force is set to True, wrong files will be skipped but secret will be created any way.
saltenv=['base'] default value is base
in case 'salt://' path is used, this parameter can change the visibility of files
"""
apiserver_url = _guess_apiserver(apiserver_url)
ret = create_secret(
namespace,
name,
sources,
apiserver_url=apiserver_url,
force=force,
update=True,
saltenv=saltenv,
)
return ret
[docs]
def create_secret(
namespace,
name,
sources,
apiserver_url=None,
force=False,
update=False,
saltenv="base",
):
"""
.. versionadded:: 2016.3.0
Create k8s secrets in the defined namespace from the list of files
CLI Example:
.. code-block:: bash
salt '*' k8s.create_secret namespace_name secret_name sources
salt '*' k8s.create_secret namespace_name secret_name sources
http://kube-master.cluster.local
sources are either dictionary of {name: path, name1: path} pairs or array of strings defining paths.
Example of paths array:
.. code-block:: bash
['/full/path/filename', "file:///full/path/filename", "salt://secret/storage/file.txt", "http://user:password@securesite.com/secret-file.json"]
Example of dictionaries:
.. code-block:: bash
{"nameit": '/full/path/fiename', name2: "salt://secret/storage/file.txt"}
optional parameters accepted:
update=[false] default value is false
if set to false, and secret is already present on the cluster - warning will be returned and no changes to the secret will be done.
In case it is set to "true" and secret is present but data is differ - secret will be updated.
force=[true] default value is true
if the to False, secret will not be created in case one of the files is not
valid kubernetes secret. e.g. capital letters in secret name or _
in case force is set to True, wrong files will be skipped but secret will be created any way.
saltenv=['base'] default value is base
in case 'salt://' path is used, this parameter can change the visibility of files
"""
ret = {"name": name, "result": True, "comment": "", "changes": {}}
if not sources:
return {
"name": name,
"result": False,
"comment": "No source available",
"changes": {},
}
apiserver_url = _guess_apiserver(apiserver_url)
# we need namespace to create secret in it
if not _get_namespaces(apiserver_url, namespace):
if force:
_create_namespace(namespace, apiserver_url)
else:
return {
"name": name,
"result": False,
"comment": "Namespace doesn't exists",
"changes": {},
}
secret = _get_secrets(namespace, name, apiserver_url)
if secret and not update:
log.info("Secret %s is already present on %s", name, namespace)
return {
"name": name,
"result": False,
"comment": f"Secret {name} is already present",
"changes": {},
}
data = {}
for source in sources:
log.debug("source is: %s", source)
if isinstance(source, dict):
# format is array of dictionaries:
# [{public_auth: salt://public_key}, {test: "/tmp/test"}]
log.trace("source is dictionary: %s", source)
for k, v in source.items():
sname, encoded = _source_encode(v, saltenv)
if sname == encoded == "":
ret["comment"] += f"Source file {v} is missing or name is incorrect\n"
if force:
continue
return ret
data[k] = encoded
elif isinstance(source, str):
# expected format is array of filenames
sname, encoded = _source_encode(source, saltenv)
if sname == encoded == "":
if force:
ret["comment"] += f"Source file {source} is missing or name is incorrect\n"
continue
return ret
data[sname] = encoded
log.trace("secret data is: %s", data)
if secret and update:
if not data:
ret["comment"] += "Could not find source files or your sources are empty"
ret["result"] = False
elif secret.get("data") and data != secret.get("data"):
_update_secret(namespace, name, data, apiserver_url)
ret["comment"] = "Updated secret"
ret["changes"] = "Updated secret"
else:
log.debug("Secret has not been changed on cluster, skipping it")
ret["comment"] = "Has not been changed on cluster, skipping it"
else:
_create_secret(namespace, name, data, apiserver_url)
return ret
# pylint: disable=unused-argument
[docs]
def delete_secret(namespace, name, apiserver_url=None, force=True):
"""
.. versionadded:: 2016.3.0
Delete kubernetes secret in the defined namespace. Namespace is the mandatory parameter as well as name.
CLI Example:
.. code-block:: bash
salt '*' k8s.delete_secret namespace_name secret_name
salt '*' k8s.delete_secret namespace_name secret_name http://kube-master.cluster.local
"""
ret = {"name": name, "result": True, "comment": "", "changes": {}}
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# we need namespace to delete secret in it
if not _get_namespaces(apiserver_url, namespace):
return {
"name": name,
"result": False,
"comment": "Namespace doesn't exists, can't delete anything there",
"changes": {},
}
url = f"{apiserver_url}/api/v1/namespaces/{namespace}/secrets/{name}"
res = http.query(url, method="DELETE")
if res.get("body"):
ret["comment"] = f"Removed secret {name} in {namespace} namespace"
return ret