Source code for saltext.vault.states.vault_secret
"""
Manage Vault (or OpenBao) KV v1/v2 secrets statefully.
.. versionadded:: 1.2.0
.. important::
This module requires the general :ref:`Vault setup <vault-setup>`.
"""
import copy
import logging
from salt.exceptions import CommandExecutionError
from salt.exceptions import SaltException
from salt.exceptions import SaltInvocationError
log = logging.getLogger(__name__)
[docs]
def present(name, values, sync=False):
"""
Ensure a secret is present as specified.
Does not report a diff.
name
Path of the secret.
values
A mapping of values the secret should expose.
sync
Ensure the secret only exposes ``values`` and delete unspecified ones.
Defaults to false, which results in patching (merging over) existing data
and deleting keys that are set to ``None``/``null``. For details, see
https://datatracker.ietf.org/doc/html/draft-ietf-appsawg-json-merge-patch-07
"""
# TODO: manage KV v2 metadata?
ret = {
"name": name,
"result": True,
"comment": "The secret is already present as specified",
"changes": {},
}
try:
try:
current = __salt__["vault.read_secret"](name)
except CommandExecutionError as err:
# VaultNotFoundError should be subclassed to
# CommandExecutionError and not re-raised by the
# execution module @FIXME?
if "VaultNotFoundError" not in str(err):
raise
current = None
else:
if sync:
if current == values:
return ret
else:
def apply_json_merge_patch(data, patch):
if not patch:
return data
if not isinstance(data, dict) or not isinstance(patch, dict):
raise ValueError("Data and patch must be dictionaries.")
for key, value in patch.items():
if value is None:
data.pop(key, None)
elif isinstance(value, dict):
data[key] = apply_json_merge_patch(data.get(key, {}), value)
else:
data[key] = value
return data
new = apply_json_merge_patch(copy.deepcopy(current), values)
if new == current:
return ret
verb = "patch" if current is not None and not sync else "write"
pp = "patched" if verb == "patch" else "written"
ret["changes"][pp] = name
if __opts__["test"]:
ret["result"] = None
ret["comment"] = f"Would have {pp} the secret"
return ret
if not __salt__[f"vault.{verb}_secret"](name, **values):
# Only read_secret raises exceptions sadly FIXME?
raise CommandExecutionError(f"Failed to {verb} secret, see logs for details")
ret["comment"] = f"The secret was {pp}"
except SaltException as err:
ret["result"] = False
ret["comment"] = str(err)
ret["changes"] = {}
return ret
[docs]
def absent(name, operation="delete"):
"""
Ensure a secret is absent. This operates only on the most recent version
for delete/destroy. Currently does not destroy/wipe a secret that has
been made unreadable in some other way.
name
Path of the secret.
operation
Operation to perform to remove the secret. Only relevant for KV v2.
Options are: ``delete`` (meaning: soft-delete), ``destroy`` (meaning delete unrecoverably)
and ``wipe`` (forget about the secret completely). Defaults to ``delete``.
KV v1 secrets are always wiped since the backend does not support versioning.
"""
valid_ops = ("delete", "destroy", "wipe")
if operation not in valid_ops:
raise SaltInvocationError(f"Invalid operation '{operation}'. Valid: {', '.join(valid_ops)}")
ret = {
"name": name,
"result": True,
"comment": "The secret is already absent",
"changes": {},
}
pp = "destroyed" if operation == "destroy" else operation + "d"
try:
try:
__salt__["vault.read_secret"](name)
except CommandExecutionError as err:
if "VaultNotFoundError" not in str(err):
raise
return ret
ret["changes"][pp] = name
if __opts__["test"]:
ret["result"] = None
ret["comment"] = f"Would have {pp} the secret"
return ret
if not __salt__[f"vault.{operation}_secret"](name):
# Only read_secret raises exceptions sadly FIXME?
raise CommandExecutionError(f"Failed to {operation} secret, see logs for details")
ret["comment"] = f"The secret has been {pp}"
except SaltException as err:
ret["result"] = False
ret["comment"] = str(err)
ret["changes"] = {}
return ret