Source code for saltext.vault.states.vault_secret

"""
Manage Vault (or OpenBao) KV v1/v2 secrets statefully.

.. versionadded:: 1.2.0

.. important::
    This module requires the general :ref:`Vault setup <vault-setup>`.
"""

import copy
import logging

from salt.exceptions import CommandExecutionError
from salt.exceptions import SaltException
from salt.exceptions import SaltInvocationError

log = logging.getLogger(__name__)


[docs] def present(name, values, sync=False): """ Ensure a secret is present as specified. Does not report a diff. name Path of the secret. values A mapping of values the secret should expose. sync Ensure the secret only exposes ``values`` and delete unspecified ones. Defaults to false, which results in patching (merging over) existing data and deleting keys that are set to ``None``/``null``. For details, see https://datatracker.ietf.org/doc/html/draft-ietf-appsawg-json-merge-patch-07 """ # TODO: manage KV v2 metadata? ret = { "name": name, "result": True, "comment": "The secret is already present as specified", "changes": {}, } try: try: current = __salt__["vault.read_secret"](name) except CommandExecutionError as err: # VaultNotFoundError should be subclassed to # CommandExecutionError and not re-raised by the # execution module @FIXME? if "VaultNotFoundError" not in str(err): raise current = None else: if sync: if current == values: return ret else: def apply_json_merge_patch(data, patch): if not patch: return data if not isinstance(data, dict) or not isinstance(patch, dict): raise ValueError("Data and patch must be dictionaries.") for key, value in patch.items(): if value is None: data.pop(key, None) elif isinstance(value, dict): data[key] = apply_json_merge_patch(data.get(key, {}), value) else: data[key] = value return data new = apply_json_merge_patch(copy.deepcopy(current), values) if new == current: return ret verb = "patch" if current is not None and not sync else "write" pp = "patched" if verb == "patch" else "written" ret["changes"][pp] = name if __opts__["test"]: ret["result"] = None ret["comment"] = f"Would have {pp} the secret" return ret if not __salt__[f"vault.{verb}_secret"](name, **values): # Only read_secret raises exceptions sadly FIXME? raise CommandExecutionError(f"Failed to {verb} secret, see logs for details") ret["comment"] = f"The secret was {pp}" except SaltException as err: ret["result"] = False ret["comment"] = str(err) ret["changes"] = {} return ret
[docs] def absent(name, operation="delete"): """ Ensure a secret is absent. This operates only on the most recent version for delete/destroy. Currently does not destroy/wipe a secret that has been made unreadable in some other way. name Path of the secret. operation Operation to perform to remove the secret. Only relevant for KV v2. Options are: ``delete`` (meaning: soft-delete), ``destroy`` (meaning delete unrecoverably) and ``wipe`` (forget about the secret completely). Defaults to ``delete``. KV v1 secrets are always wiped since the backend does not support versioning. """ valid_ops = ("delete", "destroy", "wipe") if operation not in valid_ops: raise SaltInvocationError(f"Invalid operation '{operation}'. Valid: {', '.join(valid_ops)}") ret = { "name": name, "result": True, "comment": "The secret is already absent", "changes": {}, } pp = "destroyed" if operation == "destroy" else operation + "d" try: try: __salt__["vault.read_secret"](name) except CommandExecutionError as err: if "VaultNotFoundError" not in str(err): raise return ret ret["changes"][pp] = name if __opts__["test"]: ret["result"] = None ret["comment"] = f"Would have {pp} the secret" return ret if not __salt__[f"vault.{operation}_secret"](name): # Only read_secret raises exceptions sadly FIXME? raise CommandExecutionError(f"Failed to {operation} secret, see logs for details") ret["comment"] = f"The secret has been {pp}" except SaltException as err: ret["result"] = False ret["comment"] = str(err) ret["changes"] = {} return ret