"""
Manage the Vault PKI secret engine, request X.509 certificates.
.. versionadded:: 1.1.0
.. important::
This module requires the general :ref:`Vault setup <vault-setup>`.
"""
import logging
from salt.exceptions import CommandExecutionError
from salt.exceptions import SaltInvocationError
from saltext.vault.utils import vault
from saltext.vault.utils.vault.pki import dec2hex
log = logging.getLogger(__name__)
__virtualname__ = "vault_pki"
def __virtual__():
return __virtualname__
VALID_CSR_ARGS = (
"C",
"ST",
"L",
"STREET",
"O",
"OU",
"CN",
"MAIL",
"SN",
"GN",
"UID",
"authorityKeyIdentifier",
"basicConstraints",
"certificatePolicies",
"extendedKeyUsage",
"inhibitAnyPolicy",
"keyUsage",
"nameConstraints",
"noCheck",
"policyConstraints",
"subjectKeyIdentifier",
"tlsfeature",
)
[docs]
def list_roles(mount="pki"):
"""
List configured PKI roles.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#list-roles>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.list_roles
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/roles"
try:
return vault.query("LIST", endpoint, __opts__, __context__)["data"]["keys"]
except vault.VaultNotFoundError:
return []
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def read_role(name, mount="pki"):
"""
Get configuration of specific PKI role.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#read-role>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.read_role
name
The name of the role.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/roles/{name}"
try:
res = vault.query("GET", endpoint, __opts__, __context__)
return res["data"]
except vault.VaultNotFoundError:
return None
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def write_role(
name,
mount="pki",
issuer_ref=None,
ttl=None,
max_ttl=None,
allow_localhost=None,
allowed_domains=None,
server_flag=None,
client_flag=None,
key_usage=None,
no_store=None,
require_cn=None,
**kwargs,
):
"""
Create or update PKI role.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#create-update-role>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.write_role myrole
name
The name of the role.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
issuer_ref
Name or id of the issuer which will be used with this role. If not set, default issuer will be used.
ttl
Specifies the Time To Live value to be used for the validity period of the requested certificate,
provided as a string duration with time suffix. Hour is the largest suffix.
The value specified is strictly used for future validity.
If not set, uses the system default value or the value of ``max_ttl``, whichever is shorter.
max_ttl
Specifies the maximum Time To Live provided as a string duration with time suffix.
Hour is the largest suffix. If not set, defaults to the system maximum lease TTL.
allow_localhost
Specifies if clients can request certificates for ``localhost`` as one of the requested common names.
allowed_domains
Specifies the domains this role is allowed to issue certificates for.
This is used with the ``allow_bare_domains``, ``allow_subdomains``, and ``allow_glob_domains`` options to
determine the type of matching between these domains and the values of common name, DNS-typed SAN entries, and Email-typed SAN entries.
When ``allow_any_name`` is used, this attribute has no effect.
server_flag
Specifies if certificates are flagged for server authentication use.
See `RFC 5280 Section 4.2.1.12 <https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.12>`__
for information about the Extended Key Usage field.
If not set, defaults to true.
client_flag
Specifies if certificates are flagged for client authentication use.
See `RFC 5280 Section 4.2.1.12 <https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.12>`__
for information about the Extended Key Usage field.
If not set, defaults to true.
key_usage
Specifies the allowed key usage constraint on issued certificates.
If unset, defaults to ``["DigitalSignature", "KeyAgreement", "KeyEncipherment"]``
no_store
If set, certificates issued/signed against this role will not be stored in the storage backend.
require_cn
If set to false, makes the common_name field optional while generating a certificate. Defaults to true.
kwargs:
Any other params which can be understand by Vault API.
"""
endpoint = f"{mount}/roles/{name}"
method = "POST"
if read_role(name, mount=mount) is not None:
method = "PATCH"
payload = {k: v for k, v in kwargs.items() if not k.startswith("_")}
if issuer_ref is not None:
payload["issuer_ref"] = issuer_ref
if ttl is not None:
payload["ttl"] = ttl
if max_ttl is not None:
payload["max_ttl"] = max_ttl
if allow_localhost is not None:
payload["allow_localhost"] = allow_localhost
if allowed_domains is not None:
if not isinstance(allowed_domains, list):
allowed_domains = [allowed_domains]
payload["allowed_domains"] = allowed_domains
if server_flag is not None:
payload["server_flag"] = server_flag
if client_flag is not None:
payload["client_flag"] = client_flag
if key_usage is not None:
if not isinstance(key_usage, list):
key_usage = [key_usage]
payload["key_usage"] = key_usage
if no_store is not None:
payload["no_store"] = no_store
if require_cn is not None:
payload["require_cn"] = require_cn
try:
vault.query(method, endpoint, __opts__, __context__, payload=payload)
return True
except vault.VaultUnsupportedOperationError as err:
raise CommandExecutionError(
f"Vault version too old. Please upgrade to v1.11.0+: {err}"
) from err
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def delete_role(name, mount="pki"):
"""
Delete PKI role from Vault.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#delete-role>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.delete_role myrole
name
The name of the role.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/roles/{name}"
try:
vault.query("DELETE", endpoint, __opts__, __context__)
return True
except vault.VaultNotFoundError:
return False
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def list_issuers(mount="pki"):
"""
List issuers information
Returns ``{ "<issuer_id>" : { "is_default": False, "issuer_name": "...", "key_id": "...", "serial_number": "...."}}``
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#list-issuers>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.list_issuers
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/issuers"
try:
return vault.query("LIST", endpoint, __opts__, __context__, is_unauthd=True)["data"][
"key_info"
]
except vault.VaultNotFoundError:
return []
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def read_issuer(ref="default", mount="pki"):
"""
Read an issuer's information.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#read-issuer-certificate>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.read_issuer
ref
Reference of the issuer. Can be issuer id, issuer name or literal ``default``
which means default issuer. Defaults to ``default``.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/issuer/{ref}"
try:
return vault.query("GET", endpoint, __opts__, __context__, is_unauthd=True)["data"]
except vault.VaultNotFoundError:
return None
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def update_issuer(
ref="default",
mount="pki",
manual_chain=None,
usage=None,
aia_urls=None,
crl_endpoints=None,
ocsp_servers=None,
):
"""
Update issuer's information.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#update-issuer>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.update_issuer ref usage=["crl-signing"]
ref
Reference of the issuer. Can be issuer id, issuer name or literal ``default``
which means default issuer. Defaults to ``default``.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
manual_chain
Chain of issuer references to build this issuer's computed CAChain field from, when non-empty.
usage
Allowed usages for this issuer. Valid options are:
* ``read-only`` - to allow this issuer to be read; implict; always allowed;
* ``issuing-certificates`` - to allow this issuer to be used for issuing other certificates;
* ``crl-signing`` - to allow this issuer to be used for signing CRLs.
This is separate from the CRLSign KeyUsage on the x509 certificate, but this usage cannot be set
unless that KeyUsage is allowed on the x509 certificate;
* ``ocsp-signing`` - to allow this issuer to be used for signing OCSP responses.
aia_urls
Specifies the URL values for the Issuing Certificate field as an array.
crl_endpoints
Specifies the URL values for the CRL Distribution Points field as an array.
ocsp_servers
Specifies the URL values for the OCSP Servers field as an array.
"""
endpoint = f"{mount}/issuer/{ref}"
payload = {}
if manual_chain is not None:
payload["manual_chain"] = manual_chain
if usage:
payload["usage"] = usage
if aia_urls is not None:
payload["issuing_certificates"] = aia_urls
if crl_endpoints is not None:
payload["crl_distribution_points"] = crl_endpoints
if ocsp_servers is not None:
payload["ocsp_servers"] = ocsp_servers
try:
vault.query(
"PATCH",
endpoint,
__opts__,
__context__,
payload=payload,
)
return True
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def read_issuer_certificate(name="default", mount="pki", include_chain=False):
"""
Read an issuer's certificate.
Returns certificate(s) in PEM format
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#read-issuer-certificate>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.read_issuer_certificate
name
Name of the issuer. Can be issuer id, issuer name or literal ``default``
which means default issuer. Defaults to ``default``.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
include_chain
If set to true will append the CA chain to the certificate (in case of intermediate issuer)
"""
cert_data = read_issuer(name, mount)
if include_chain:
return "".join(cert_data["ca_chain"])
return cert_data["certificate"]
[docs]
def get_default_issuer(mount="pki"):
"""
Return the issuer ID of the default issuer.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#list-issuers>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.get_default_issuer
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
all_issuers = list_issuers(mount)
for k, v in all_issuers.items():
if v["is_default"]:
return k
# In case there is no default issuer
return None
[docs]
def set_default_issuer(name, mount="pki"):
"""
Set the default issuer.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#set-issuers-configuration>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.set_default_issuer myca
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/config/issuers"
payload = {"default": name}
try:
vault.query("POST", endpoint, __opts__, __context__, payload=payload)
return True
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def generate_root(
common_name,
mount="pki",
type="internal", # pylint: disable=redefined-builtin
issuer_name=None,
key_name=None,
ttl=None,
key_type="rsa",
key_bits=0,
max_path_length=-1,
**kwargs,
):
"""
Generate a new root issuer.
Returns ``{ "certificate" : "-----BEGIN CERTIFICATE...", "issuer_id": "...", "key_id": "...", }``
If type is ``exported`` it will also return the private key.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#generate-root>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.generate_root my-root
common_name
The common name to be used for the CA
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
type
Specifies the type of the root to create. If ``exported``, the private key will be returned in the response;
if ``internal``, the private key will not be returned and cannot be retrieved later. Defaults to ``internal``.
issuer_name
Provides a name to the specified issuer. The name must be unique across all issuers and not be the reserved value ``default``.
key_name
When a new key is created with this request, optionally specifies the name for this. The global ref ``default`` may not be used as a name.
ttl
Specifies the requested Time To Live (after which the certificate will be expired). This cannot be larger than the engine's max (or, if not set, the system max).
key_type
Specifies the desired key type; must be ``rsa``, ``ed25519`` or ``ec``. Defaults to ``rsa``.
key_bits
Specifies the number of bits to use for the generated keys.
Allowed values are 0 (universal default);
with ``key_type=rsa``, allowed values are: 2048 (default), 3072, 4096 or 8192;
with ``key_type=ec``, allowed values are: 224, 256 (default), 384, or 521;
ignored with ``key_type=ed25519``.
max_path_length
Specifies the maximum path length to encode in the generated certificate. ``-1`` means no limit,
unless the signing certificate has a maximum path length set, in which case the path length is set to one
less than that of the signing certificate. A limit of 0 means a literal path length of zero.
"""
if issuer_name == "default":
raise SaltInvocationError("issuer_name cannot be `default`. This is a reserved word.")
if key_name == "default":
raise SaltInvocationError("key_name cannot be `default`. This is a reserved word.")
endpoint = f"{mount}/root/generate/{type}"
payload = {k: v for k, v in kwargs.items() if not k.startswith("_")}
payload["common_name"] = common_name
payload["key_type"] = key_type
if issuer_name is not None:
payload["issuer_name"] = issuer_name
if key_name is not None:
payload["key_name"] = key_name
if ttl is not None:
payload["ttl"] = ttl
if key_bits > 0:
payload["key_bits"] = key_bits
if max_path_length > -1:
payload["max_path_length"] = max_path_length
try:
resp = vault.query("POST", endpoint, __opts__, __context__, payload=payload)["data"]
ret = {
"certificate": resp["certificate"],
"issuer_id": resp["issuer_id"],
"key_id": resp["key_id"],
}
if type == "exported":
ret["private_key"] = resp["private_key"]
return ret
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def delete_key(ref, mount="pki"):
"""
Delete private key from Vault.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#delete-key>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.delete_key ref
ref
Ref of the key. Could be name or key_id.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/key/{ref}"
try:
vault.query("DELETE", endpoint, __opts__, __context__)
return True
except vault.VaultNotFoundError:
return False
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def delete_issuer(ref, mount="pki", include_key=False):
"""
Delete issuer from Vault.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#delete-issuer>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.delete_issuer ref
ref
Ref of the issuer. Could be name or issuer_id.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
include_key
If set to true will also delete the private key if imported.
Defaults to false, so private key will be preserved.
"""
endpoint = f"{mount}/issuer/{ref}"
key_id = None
if include_key:
issuer_info = read_issuer(ref, mount=mount)
if issuer_info:
key_id = issuer_info["key_id"]
try:
vault.query("DELETE", endpoint, __opts__, __context__)
if key_id:
delete_key(key_id, mount=mount)
return True
except vault.VaultNotFoundError:
return False
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def read_issuer_crl(ref="default", mount="pki", delta=False):
"""
Get issuer CRL.
.. note::
If CA cannot sign CRLs will return None.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#read-issuer-crl>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.read_issuer_crl ref
ref
Ref of the issuer. Could be name or issuer_id. Defaults to default issuer.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
delta
If set to true, will return delta CRL instead of complete one.
"""
# Check if issuer can sign CRLs at all. If not,
# there is no point to check for CRL as this will throw error
issuer = None
try:
issuer = vault.query(
"GET", f"{mount}/issuer/{ref}", __opts__, __context__, is_unauthd=False
)["data"]
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
if issuer is None:
return None
if "crl-signing" not in issuer["usage"].split(","):
return None
endpoint = f"{mount}/issuer/{ref}/crl"
if delta:
endpoint = endpoint + "/delta"
try:
return vault.query("GET", endpoint, __opts__, __context__, is_unauthd=True)["data"]["crl"]
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def list_revoked_certificates(mount="pki"):
"""
List revoked certificates serial numbers
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#list-revoked-certificates>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.list_revoked_certificates
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/certs/revoked"
try:
return vault.query("LIST", endpoint, __opts__, __context__)["data"]["keys"]
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def list_certificates(mount="pki"):
"""
List issued certificates serial numbers
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#list-certificates>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.list_certificates
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/certs"
try:
return vault.query("LIST", endpoint, __opts__, __context__)["data"]["keys"]
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def read_certificate(serial, mount="pki"):
"""
Read issued certificate.
Returns certificate in PEM format
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#read-certificate>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.read_certificate 7e:85:c5:d1:85:94:9a:46:08:b5:1b:9c:22:cb:35:e5:ea:f3:56:3f
serial
Specifies the serial of the key to read. Valid values are:
* ``<serial>`` for the certificate with the given serial number, in hyphen-separated or colon-separated hexadecimal.
* ``ca`` for the default issuer's CA certificate
* ``crl`` for the default issuer's CRL
* ``ca_chain`` for the default issuer's CA trust chain.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/cert/{serial}"
try:
return vault.query("GET", endpoint, __opts__, __context__)["data"]["certificate"]
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def issue_certificate(
role_name,
common_name,
mount="pki",
issuer_ref=None,
alt_names=None,
ttl=None,
format="pem", # pylint: disable=redefined-builtin
exclude_cn_from_sans=False,
**kwargs,
):
"""
Generate and issue a new certificate with private key.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#generate-certificate-and-key>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.issue_certificate myrole common_name="www.example.com"
role_name
Name of the role to be used for issuing the certificate.
common_name
Common name to be set for the certificate.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
issuer_ref
Override role's issuer. Can be issuer_name or issuer_id.
alt_names
Any alternative names to be added to the certificate. Can be specified either as dict (``{ "<type>": "<value"}``)
or list of SANs (``["<type>:<value>"]``).
ttl
Specifies the requested Time To Live (after which the certificate will be expired).
This cannot be larger than the engine's max (or, if not set, the system max).
format
Can be either ``pem`` or ``der``. Defaults to ``pem``.
exclude_cn_from_sans
If set to true, Common name will not be part of the SANs.
kwargs
Any additional parameter accepted by Vault API.
"""
endpoint = f"{mount}/issue/{role_name}"
if issuer_ref is not None:
endpoint = f"{mount}/issuer/{issuer_ref}/issue/{role_name}"
payload = {k: v for k, v in kwargs.items() if not k.startswith("_")}
payload["common_name"] = common_name
if ttl is not None:
payload["ttl"] = ttl
payload["format"] = format
payload["exclude_cn_from_sans"] = exclude_cn_from_sans
if alt_names is not None:
dns_sans, ip_sans, uri_sans, other_sans = _split_sans(alt_names)
payload["alt_names"] = ",".join(dns_sans)
payload["ip_sans"] = ",".join(ip_sans)
payload["uri_sans"] = ",".join(uri_sans)
payload["other_sans"] = ",".join(other_sans)
try:
return vault.query("POST", endpoint, __opts__, __context__, payload=payload)["data"]
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def sign_certificate(
role_name,
common_name,
mount="pki",
csr=None,
private_key=None,
private_key_passphrase=None,
digest="sha256",
issuer_ref=None,
alt_names=None,
ttl=None,
sign_verbatim=False,
encoding="pem",
exclude_cn_from_sans=False,
**kwargs,
):
"""
Issue a new certificate from existing private key or CSR.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#sign-certificate>`__.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#sign-verbatim>`__
CLI Example:
.. code-block:: bash
salt '*' vault_pki.issue_certificate myrole common_name="www.example.com"
role_name
Name of the role to be used for issuing the certificate.
common_name
Common name to be set for the certificate.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
csr
Pass the CSR which will be used for issuing the certificate. Either ``csr`` or ``private_key`` parameter can be set, not both.
private_key
The private key for which certificate should be issued. Can be text or path.
Either ``csr`` or ``private_key`` parameter can be set, not both.
.. note::
This parameter requires the :py:mod:`x509_v2 execution module <salt.modules.x509_v2>` to be available.
private_key_passphrase
The passphrase for the ``private_key`` if encrypted. Not used in case of ``csr``.
digest
Digest to be used for generating the CSR. Not used in case of ``private_key``. Defaults to ``sha256``
issuer_ref
Override role's issuer. Can be issuer_name or issuer_id.
alt_names
Any alternative names to be added to the certificate. Can be specified either as dict (``{ "<type>": "<value"}``)
or list of SANs (``["<type>:<value>"]``).
ttl
Specifies the requested Time To Live (after which the certificate will be expired).
This cannot be larger than the engine's max (or, if not set, the system max).
sign_verbatim
If set to true, the resulting certificate follows the CSR exactly.
Otherwise, only ``CN`` can be set for the subject, any other subject parameter (like ``O``) is ignored.
.. warning::
This option is using a potentially dangerous endpoint. Be careful when using that option, as roles
are not restricting what can be issued anymore.
encoding
Can be either ``pem`` or ``der``. Defaults to ``pem``.
exclude_cn_from_sans
If set to true, Common name will not be part of the SANs.
kwargs
Any additional parameter accepted by Vault API or
`x509_v2 module <https://docs.saltproject.io/en/latest/ref/modules/all/salt.modules.x509_v2.html#salt.modules.x509_v2.create_csr>`__
"""
if csr is None and private_key is None:
raise SaltInvocationError("either csr or private_key must be passed.")
if csr is not None and private_key is not None:
raise SaltInvocationError("only one of csr or private_key must be passed, not both")
csr_args, extra_args = _split_csr_kwargs(kwargs)
sign = "sign-verbatim" if sign_verbatim else "sign"
endpoint = f"{mount}/{sign}/{role_name}"
if issuer_ref is not None:
endpoint = f"{mount}/issuer/{issuer_ref}/{sign}/{role_name}"
payload = {k: v for k, v in extra_args.items() if not k.startswith("_")}
payload["common_name"] = common_name
if ttl is not None:
payload["ttl"] = ttl
payload["format"] = encoding
payload["exclude_cn_from_sans"] = exclude_cn_from_sans
if alt_names is not None:
dns_sans, ip_sans, uri_sans, other_sans = _split_sans(alt_names)
payload["alt_names"] = ",".join(dns_sans)
payload["ip_sans"] = ",".join(ip_sans)
payload["uri_sans"] = ",".join(uri_sans)
payload["other_sans"] = ",".join(other_sans)
# In case private_key is passed we're going to build
# CSR in place.
if private_key is not None:
if isinstance(alt_names, dict):
alt_names = [f"{k}:{v}" for k, v in alt_names.items()]
if alt_names:
csr_args["subjectAltName"] = alt_names
csr_args["CN"] = common_name
csr = __salt__["x509.create_csr"](
private_key=private_key,
private_key_passphrase=private_key_passphrase,
digest=digest,
**csr_args,
)
payload["csr"] = csr
try:
return vault.query("POST", endpoint, __opts__, __context__, payload=payload)["data"]
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def revoke_certificate(serial=None, certificate=None, mount="pki"):
"""
Revoke issued certificate.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#revoke-certificate>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.revoke_certificate 7e:85:c5:d1:85:94:9a:46:08:b5:1b:9c:22:cb:35:e5:ea:f3:56:3f
serial
Specifies the serial of the certificate to revoke. Either ``serial`` or ``certificate`` must be specified.
certificate
Specifies the certificate (PEM or path) to revoke. Either ``serial`` or ``certificate`` must be specified.
.. note::
This parameter requires the :py:mod:`x509_v2 execution module <salt.modules.x509_v2>` to be available.
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/revoke/"
payload = {}
if serial is None and certificate is None:
raise SaltInvocationError("either serial or certificate must be passed.")
if serial is not None and certificate is not None:
raise SaltInvocationError("only one of serial or certificate must be passed, not both")
try:
if certificate is not None:
payload["certificate"] = __salt__["x509.encode_certificate"](
certificate, encoding="pem"
)
elif serial is not None:
if isinstance(serial, int):
serial = dec2hex(serial)
payload["serial_number"] = serial
vault.query("POST", endpoint, __opts__, __context__, payload=payload)
return True
except vault.VaultInvocationError:
return False
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
[docs]
def read_urls(mount="pki"):
"""
Fetch the URLs to be encoded in generated certificates.
No URL configuration will be returned until the configuration is set.
`API method docs <https://developer.hashicorp.com/vault/api-docs/secret/pki#read-urls>`__.
CLI Example:
.. code-block:: bash
salt '*' vault_pki.get_urls
mount
The mount path the PKI backend is mounted to. Defaults to ``pki``.
"""
endpoint = f"{mount}/config/urls"
try:
return vault.query("GET", endpoint, __opts__, __context__)["data"]
except vault.VaultException as err:
raise CommandExecutionError(f"{err.__class__}: {err}") from err
def _split_sans(sans) -> tuple[list, list, list, list]:
dns_sans = []
ip_sans = []
uri_sans = []
other_sans = []
try:
if isinstance(sans, list):
sans = dict(map(lambda x: x.split(":", 1), sans))
for k, v in sans.items():
if k.upper() == "DNS" or k.upper() == "EMAIL":
dns_sans.append(v)
elif k.upper() == "IP":
ip_sans.append(v)
elif k.upper() == "URI":
uri_sans.append(v)
else:
other_sans.append(f"{k};UTF8:{v}")
except ValueError as err:
raise CommandExecutionError(
f"SAN is not in correct format. Must be in format <type>:<value>: {err}"
) from err
return dns_sans, ip_sans, uri_sans, other_sans
def _split_csr_kwargs(kwargs):
csr_args = {}
extra_args = {}
for k, v in kwargs.items():
if k in VALID_CSR_ARGS:
csr_args[k] = v
else:
extra_args[k] = v
return csr_args, extra_args