Source code for saltext.boto3.modules.boto3_kms

"""
Connection module for Amazon KMS using boto3.
=============================================

    Renamed from ``boto_kms`` to ``boto3_kms`` and rewritten to use the
    boto3 ``kms`` client APIs directly via
    :py:mod:`saltext.boto3.utils.boto3mod`.  The legacy boto2 code path
    (object-style access, retry loops) has been removed.

:depends:
  - boto3 >= 1.28.0
  - botocore >= 1.31.0

:configuration: This module accepts explicit kms credentials but can
    also utilize IAM roles assigned to the instance through Instance Profiles.
    Dynamic credentials are then automatically obtained from AWS API and no
    further configuration is necessary. More Information available at:

    .. code-block:: text

        http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html

    If IAM roles are not used you need to specify them either in the minion's
    config file or as a profile. For example, to specify them in the minion's
    config file:

.. code-block:: yaml

    kms.keyid: GKTADJGHEIQSXMKKRBJ08H
    kms.key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs

A region may also be specified in the configuration:

.. code-block:: yaml

    kms.region: us-east-1

It's also possible to specify key, keyid and region via a profile, either
as a passed in dict, or as a string to pull from pillars or minion config:

.. code-block:: yaml

    myprofile:
        keyid: GKTADJGHEIQSXMKKRBJ08H
        key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
        region: us-east-1

.. versionadded:: 1.0.0
"""

import logging

import salt.serializers.json
from salt.utils import odict

from saltext.boto3.utils import boto3mod

log = logging.getLogger(__name__)


try:
    from botocore.exceptions import ClientError

    logging.getLogger("boto3").setLevel(logging.CRITICAL)
    HAS_BOTO3 = True
except ImportError:
    HAS_BOTO3 = False


__virtualname__ = "boto3_kms"


def _get_conn(service, region=None, key=None, keyid=None, profile=None):
    """
    Return a boto3 client for ``service`` using this module's dunders.
    """
    return boto3mod.get_connection(
        service,
        opts=__opts__,
        context=__context__,
        region=region,
        key=key,
        keyid=keyid,
        profile=profile,
    )


[docs] def __virtual__(): """ Only load if boto3 is available. """ if HAS_BOTO3: return __virtualname__ return (False, "The boto3_kms module could not be loaded: boto3 is not available.")
def _err(e): return boto3mod.get_error(e) def _resolve_alias(key_id, region=None, key=None, keyid=None, profile=None): """ If ``key_id`` is an alias, resolve it to a KeyId via describe_key. """ if key_id.startswith("alias/"): r = describe_key(key_id, region=region, key=key, keyid=keyid, profile=profile) if "key_metadata" in r: return r["key_metadata"]["KeyId"] return key_id
[docs] def create_alias(alias_name, target_key_id, region=None, key=None, keyid=None, profile=None): """ Create a display name for a key. CLI Example: .. code-block:: bash salt myminion boto3_kms.create_alias 'alias/mykey' key_id """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: conn.create_alias(AliasName=alias_name, TargetKeyId=target_key_id) return {"result": True} except ClientError as e: return {"result": False, "error": _err(e)}
[docs] def create_grant( key_id, grantee_principal, retiring_principal=None, operations=None, constraints=None, grant_tokens=None, region=None, key=None, keyid=None, profile=None, ): """ Add a grant to a key. CLI Example: .. code-block:: bash salt myminion boto3_kms.create_grant 'alias/mykey' 'arn:aws:iam::1:role/r' operations='["Encrypt","Decrypt"]' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) key_id = _resolve_alias(key_id, region, key, keyid, profile) kwargs = {"KeyId": key_id, "GranteePrincipal": grantee_principal} if retiring_principal is not None: kwargs["RetiringPrincipal"] = retiring_principal if operations is not None: kwargs["Operations"] = operations if constraints is not None: kwargs["Constraints"] = constraints if grant_tokens is not None: kwargs["GrantTokens"] = grant_tokens try: return {"grant": conn.create_grant(**kwargs)} except ClientError as e: return {"error": _err(e)}
[docs] def create_key( policy=None, description=None, key_usage=None, region=None, key=None, keyid=None, profile=None, ): """ Create a customer master key. CLI Example: .. code-block:: bash salt myminion boto3_kms.create_key '{"Statement":...}' "My master key" """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) kwargs = {} if policy is not None: kwargs["Policy"] = salt.serializers.json.serialize(policy) if description is not None: kwargs["Description"] = description if key_usage is not None: kwargs["KeyUsage"] = key_usage try: res = conn.create_key(**kwargs) return {"key_metadata": res["KeyMetadata"]} except ClientError as e: return {"error": _err(e)}
[docs] def decrypt( ciphertext_blob, encryption_context=None, grant_tokens=None, region=None, key=None, keyid=None, profile=None, ): """ Decrypt ciphertext. CLI Example: .. code-block:: bash salt myminion boto3_kms.decrypt encrypted_ciphertext """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) kwargs = {"CiphertextBlob": ciphertext_blob} if encryption_context is not None: kwargs["EncryptionContext"] = encryption_context if grant_tokens is not None: kwargs["GrantTokens"] = grant_tokens try: res = conn.decrypt(**kwargs) return {"plaintext": res["Plaintext"]} except ClientError as e: return {"error": _err(e)}
[docs] def key_exists(key_id, region=None, key=None, keyid=None, profile=None): """ Check whether a KMS key exists. CLI Example: .. code-block:: bash salt myminion boto3_kms.key_exists 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: conn.describe_key(KeyId=key_id) return {"result": True} except ClientError as e: code = e.response.get("Error", {}).get("Code") if code in ("NotFoundException", "NotFound"): return {"result": False} return {"error": _err(e)}
[docs] def describe_key(key_id, region=None, key=None, keyid=None, profile=None): """ Get detailed information about a key. CLI Example: .. code-block:: bash salt myminion boto3_kms.describe_key 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: res = conn.describe_key(KeyId=key_id) return {"key_metadata": res["KeyMetadata"]} except ClientError as e: return {"error": _err(e)}
[docs] def disable_key(key_id, region=None, key=None, keyid=None, profile=None): """ Mark a key as disabled. CLI Example: .. code-block:: bash salt myminion boto3_kms.disable_key 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: conn.disable_key(KeyId=key_id) return {"result": True} except ClientError as e: return {"result": False, "error": _err(e)}
[docs] def disable_key_rotation(key_id, region=None, key=None, keyid=None, profile=None): """ Disable key rotation for a key. CLI Example: .. code-block:: bash salt myminion boto3_kms.disable_key_rotation 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: conn.disable_key_rotation(KeyId=key_id) return {"result": True} except ClientError as e: return {"result": False, "error": _err(e)}
[docs] def enable_key(key_id, region=None, key=None, keyid=None, profile=None): """ Mark a key as enabled. CLI Example: .. code-block:: bash salt myminion boto3_kms.enable_key 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: conn.enable_key(KeyId=key_id) return {"result": True} except ClientError as e: return {"result": False, "error": _err(e)}
[docs] def enable_key_rotation(key_id, region=None, key=None, keyid=None, profile=None): """ Enable key rotation for a key. CLI Example: .. code-block:: bash salt myminion boto3_kms.enable_key_rotation 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: conn.enable_key_rotation(KeyId=key_id) return {"result": True} except ClientError as e: return {"result": False, "error": _err(e)}
[docs] def encrypt( key_id, plaintext, encryption_context=None, grant_tokens=None, region=None, key=None, keyid=None, profile=None, ): """ Encrypt plaintext using a KMS key. CLI Example: .. code-block:: bash salt myminion boto3_kms.encrypt 'alias/mykey' 'myplaindata' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) kwargs = {"KeyId": key_id, "Plaintext": plaintext} if encryption_context is not None: kwargs["EncryptionContext"] = encryption_context if grant_tokens is not None: kwargs["GrantTokens"] = grant_tokens try: res = conn.encrypt(**kwargs) return {"ciphertext": res["CiphertextBlob"]} except ClientError as e: return {"error": _err(e)}
[docs] def generate_data_key( key_id, encryption_context=None, number_of_bytes=None, key_spec=None, grant_tokens=None, region=None, key=None, keyid=None, profile=None, ): """ Generate a secure data key. CLI Example: .. code-block:: bash salt myminion boto3_kms.generate_data_key 'alias/mykey' number_of_bytes=1024 key_spec=AES_128 """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) kwargs = {"KeyId": key_id} if encryption_context is not None: kwargs["EncryptionContext"] = encryption_context if number_of_bytes is not None: kwargs["NumberOfBytes"] = number_of_bytes if key_spec is not None: kwargs["KeySpec"] = key_spec if grant_tokens is not None: kwargs["GrantTokens"] = grant_tokens try: return {"data_key": conn.generate_data_key(**kwargs)} except ClientError as e: return {"error": _err(e)}
[docs] def generate_data_key_without_plaintext( key_id, encryption_context=None, number_of_bytes=None, key_spec=None, grant_tokens=None, region=None, key=None, keyid=None, profile=None, ): """ Generate a secure data key without a plaintext copy. CLI Example: .. code-block:: bash salt myminion boto3_kms.generate_data_key_without_plaintext 'alias/mykey' number_of_bytes=1024 """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) kwargs = {"KeyId": key_id} if encryption_context is not None: kwargs["EncryptionContext"] = encryption_context if number_of_bytes is not None: kwargs["NumberOfBytes"] = number_of_bytes if key_spec is not None: kwargs["KeySpec"] = key_spec if grant_tokens is not None: kwargs["GrantTokens"] = grant_tokens try: return {"data_key": conn.generate_data_key_without_plaintext(**kwargs)} except ClientError as e: return {"error": _err(e)}
[docs] def generate_random(number_of_bytes=None, region=None, key=None, keyid=None, profile=None): """ Generate cryptographically secure random bytes. CLI Example: .. code-block:: bash salt myminion boto3_kms.generate_random number_of_bytes=1024 """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) kwargs = {} if number_of_bytes is not None: kwargs["NumberOfBytes"] = number_of_bytes try: res = conn.generate_random(**kwargs) return {"random": res["Plaintext"]} except ClientError as e: return {"error": _err(e)}
[docs] def get_key_policy(key_id, policy_name, region=None, key=None, keyid=None, profile=None): """ Get the policy for the specified key. CLI Example: .. code-block:: bash salt myminion boto3_kms.get_key_policy 'alias/mykey' default """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: res = conn.get_key_policy(KeyId=key_id, PolicyName=policy_name) return { "key_policy": salt.serializers.json.deserialize( res["Policy"], object_pairs_hook=odict.OrderedDict ) } except ClientError as e: return {"error": _err(e)}
[docs] def get_key_rotation_status(key_id, region=None, key=None, keyid=None, profile=None): """ Return whether key rotation is enabled for the specified key. CLI Example: .. code-block:: bash salt myminion boto3_kms.get_key_rotation_status 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: res = conn.get_key_rotation_status(KeyId=key_id) return {"result": res["KeyRotationEnabled"]} except ClientError as e: return {"error": _err(e)}
[docs] def list_grants(key_id, limit=None, marker=None, region=None, key=None, keyid=None, profile=None): """ List grants for the specified key. CLI Example: .. code-block:: bash salt myminion boto3_kms.list_grants 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) key_id = _resolve_alias(key_id, region, key, keyid, profile) grants = [] next_marker = marker try: while True: kwargs = {"KeyId": key_id} if limit is not None: kwargs["Limit"] = limit if next_marker is not None: kwargs["Marker"] = next_marker res = conn.list_grants(**kwargs) grants.extend(res.get("Grants", [])) if "NextMarker" in res and res.get("Truncated"): next_marker = res["NextMarker"] else: break return {"grants": grants} except ClientError as e: return {"error": _err(e)}
[docs] def list_key_policies( key_id, limit=None, marker=None, region=None, key=None, keyid=None, profile=None ): """ List key policies for the specified key. CLI Example: .. code-block:: bash salt myminion boto3_kms.list_key_policies 'alias/mykey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) key_id = _resolve_alias(key_id, region, key, keyid, profile) kwargs = {"KeyId": key_id} if limit is not None: kwargs["Limit"] = limit if marker is not None: kwargs["Marker"] = marker try: res = conn.list_key_policies(**kwargs) return {"key_policies": res["PolicyNames"]} except ClientError as e: return {"error": _err(e)}
[docs] def put_key_policy(key_id, policy_name, policy, region=None, key=None, keyid=None, profile=None): """ Attach a key policy to the specified key. CLI Example: .. code-block:: bash salt myminion boto3_kms.put_key_policy 'alias/mykey' default '{"Statement":...}' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: conn.put_key_policy( KeyId=key_id, PolicyName=policy_name, Policy=salt.serializers.json.serialize(policy), ) return {"result": True} except ClientError as e: return {"result": False, "error": _err(e)}
[docs] def re_encrypt( ciphertext_blob, destination_key_id, source_encryption_context=None, destination_encryption_context=None, grant_tokens=None, region=None, key=None, keyid=None, profile=None, ): """ Re-encrypt ciphertext with a new master key. CLI Example: .. code-block:: bash salt myminion boto3_kms.re_encrypt 'encrypted_data' 'alias/mynewkey' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) kwargs = { "CiphertextBlob": ciphertext_blob, "DestinationKeyId": destination_key_id, } if source_encryption_context is not None: kwargs["SourceEncryptionContext"] = source_encryption_context if destination_encryption_context is not None: kwargs["DestinationEncryptionContext"] = destination_encryption_context if grant_tokens is not None: kwargs["GrantTokens"] = grant_tokens try: return {"ciphertext": conn.re_encrypt(**kwargs)} except ClientError as e: return {"error": _err(e)}
[docs] def revoke_grant(key_id, grant_id, region=None, key=None, keyid=None, profile=None): """ Revoke a grant from a key. CLI Example: .. code-block:: bash salt myminion boto3_kms.revoke_grant 'alias/mykey' 8u89hf-j09j... """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) key_id = _resolve_alias(key_id, region, key, keyid, profile) try: conn.revoke_grant(KeyId=key_id, GrantId=grant_id) return {"result": True} except ClientError as e: return {"result": False, "error": _err(e)}
[docs] def update_key_description(key_id, description, region=None, key=None, keyid=None, profile=None): """ Update a key's description. CLI Example: .. code-block:: bash salt myminion boto3_kms.update_key_description 'alias/mykey' 'My key' """ conn = _get_conn("kms", region=region, key=key, keyid=keyid, profile=profile) try: conn.update_key_description(KeyId=key_id, Description=description) return {"result": True} except ClientError as e: return {"result": False, "error": _err(e)}