Source code for saltext.boto3.utils.boto3mod

"""
Boto3 Common Utils
==================

Common helpers for boto3-based execution and state modules.

Execution/state modules call these helpers directly, passing their own
``__opts__`` and ``__context__``:

.. code-block:: python

    from saltext.boto3.utils import boto3mod


    def __virtual__():
        return "my_service"


    def describe():
        conn = boto3mod.get_connection(
            "ec2", opts=__opts__, context=__context__, profile="myprofile"
        )
        instance_id = boto3mod.cache_id(
            "ec2", "myinstance", opts=__opts__, context=__context__
        )

.. versionadded:: 1.0.0
"""

import hashlib
import logging

import salt.utils.stringutils
from salt.exceptions import SaltInvocationError

try:
    import boto3
    import boto3.session
    import botocore
    import botocore.exceptions

    logging.getLogger("boto3").setLevel(logging.CRITICAL)
    HAS_BOTO = True
except ImportError:
    HAS_BOTO = False


log = logging.getLogger(__name__)

__virtualname__ = "boto3"


[docs] def __virtual__(): """ Only load if boto3 is available. Minimum version is enforced via the project's ``pyproject.toml`` dependency declaration. """ if HAS_BOTO: return __virtualname__ return (False, "The boto3mod utility could not be loaded: boto3 is not available.")
def _option(value, opts): """ Look up an option value in ``opts``. """ if opts and value in opts: return opts[value] return None def _get_profile(service, region, key, keyid, profile, opts): if profile: if isinstance(profile, str): _profile = _option(profile, opts) or {} elif isinstance(profile, dict): _profile = profile else: _profile = {} key = _profile.get("key", None) keyid = _profile.get("keyid", None) region = _profile.get("region", None) if not region: region = _option(service + ".region", opts) if not region: region = "us-east-1" log.info("Assuming default region %s", region) if not key: key = _option(service + ".key", opts) if not keyid: keyid = _option(service + ".keyid", opts) label = f"boto3_{service}:" if keyid: hash_string = region + keyid + (key or "") hash_string = salt.utils.stringutils.to_bytes(hash_string) cxkey = label + hashlib.md5(hash_string, usedforsecurity=False).hexdigest() else: cxkey = label + region return (cxkey, region, key, keyid)
[docs] def cache_id( service, name, *, opts, context, sub_resource=None, resource_id=None, invalidate=False, region=None, key=None, keyid=None, profile=None, ): """ Cache, invalidate, or retrieve an AWS resource id keyed by name. ``opts`` and ``context`` are required; pass ``__opts__`` and ``__context__`` from the calling module. .. code-block:: python boto3mod.cache_id( "ec2", "myinstance", opts=__opts__, context=__context__, resource_id="i-a1b2c3", profile="custom_profile", ) """ cxkey, _, _, _ = _get_profile(service, region, key, keyid, profile, opts) if sub_resource: cxkey = f"{cxkey}:{sub_resource}:{name}:id" else: cxkey = f"{cxkey}:{name}:id" if invalidate: if cxkey in context: del context[cxkey] return True if resource_id is not None and resource_id in context.values(): stale = [k for k, v in context.items() if v == resource_id] for k in stale: del context[k] return True return False if resource_id: context[cxkey] = resource_id return True return context.get(cxkey)
[docs] def get_connection( service, *, opts, context, module=None, region=None, key=None, keyid=None, profile=None, ): """ Return a boto3 client for the given service, caching it in ``context``. ``opts`` and ``context`` are required; pass ``__opts__`` and ``__context__`` from the calling module. .. code-block:: python conn = boto3mod.get_connection( "ec2", opts=__opts__, context=__context__, profile="custom_profile", ) """ module = module or service cxkey, region, key, keyid = _get_profile(service, region, key, keyid, profile, opts) cxkey = cxkey + ":conn3" if cxkey in context: return context[cxkey] try: session = boto3.session.Session( aws_access_key_id=keyid, aws_secret_access_key=key, region_name=region, ) if session is None: raise SaltInvocationError(f'Region "{region}" is not valid.') conn = session.client(module) if conn is None: raise SaltInvocationError(f'Region "{region}" is not valid.') except botocore.exceptions.NoCredentialsError as exc: raise SaltInvocationError( "No authentication credentials found when " f"attempting to make boto {service} connection to " f'region "{region}".' ) from exc context[cxkey] = conn return conn
[docs] def get_region(service, region, profile, *, opts): """ Return the resolved region for a service based on the supplied region/profile and the calling module's ``opts``. """ _, region, _, _ = _get_profile(service, region, None, None, profile, opts) return region
[docs] def get_error(e): """ Best-effort extraction of an error message from a boto/botocore exception. """ message = "" if e.args: message = e.args[0] return {"message": message}
[docs] def exactly_n(l, n=1): """ Return True when exactly ``n`` items in ``l`` are truthy. """ i = iter(l) return all(any(i) for _ in range(n)) and not any(i)
def exactly_one(l): return exactly_n(l)
[docs] def paged_call(function, *args, **kwargs): """ Yield successive pages from a boto3 API call that may paginate via ``NextMarker`` / ``Marker`` (override with ``marker_flag`` and ``marker_arg`` kwargs). """ marker_flag = kwargs.pop("marker_flag", "NextMarker") marker_arg = kwargs.pop("marker_arg", "Marker") while True: ret = function(*args, **kwargs) marker = ret.get(marker_flag) yield ret if not marker: break kwargs[marker_arg] = marker
def ordered(obj): if isinstance(obj, (list, tuple)): return sorted(ordered(x) for x in obj) if isinstance(obj, dict): return {str(k) if isinstance(k, str) else k: ordered(v) for k, v in obj.items()} if isinstance(obj, str): return str(obj) return obj
[docs] def json_objs_equal(left, right): """Compare two parsed JSON objects, ignoring ordering inside containers.""" return ordered(left) == ordered(right)