Source code for saltext.boto3.modules.boto3_asg

"""
Connection module for Amazon Autoscale Groups using boto3.
==========================================================

    Renamed from ``boto_asg`` to ``boto3_asg`` and rewritten to use the
    boto3 ``autoscaling`` (and ``ec2``) client APIs directly via
    :py:mod:`saltext.boto3.utils.boto3mod`.  The legacy boto2 code path
    (object-style access, retry loops) has been removed.

:depends:
  - boto3 >= 1.28.0
  - botocore >= 1.31.0

:configuration: This module accepts explicit autoscale credentials but can
    also utilize IAM roles assigned to the instance through Instance Profiles.
    Dynamic credentials are then automatically obtained from AWS API and no
    further configuration is necessary. More Information available at:

    .. code-block:: text

        http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html

    If IAM roles are not used you need to specify them either in the minion's
    config file or as a profile. For example, to specify them in the minion's
    config file:

.. code-block:: yaml

    asg.keyid: GKTADJGHEIQSXMKKRBJ08H
    asg.key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs

A region may also be specified in the configuration:

.. code-block:: yaml

    asg.region: us-east-1

It's also possible to specify key, keyid and region via a profile, either
as a passed in dict, or as a string to pull from pillars or minion config:

.. code-block:: yaml

    myprofile:
        keyid: GKTADJGHEIQSXMKKRBJ08H
        key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
        region: us-east-1

.. versionadded:: 1.0.0
"""

import datetime
import email.mime.multipart
import email.mime.text
import logging

import salt.utils.json
import salt.utils.yaml
from salt.utils import odict

from saltext.boto3.utils import boto3mod

try:
    from botocore.exceptions import ClientError

    HAS_BOTO3 = True
except ImportError:
    HAS_BOTO3 = False

log = logging.getLogger(__name__)

DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

__virtualname__ = "boto3_asg"


[docs] def __virtual__(): """ Only load if boto3 is available. Minimum version is enforced via the project's ``pyproject.toml`` dependency declaration. """ if HAS_BOTO3: return __virtualname__ return (False, "The boto3_asg module could not be loaded: boto3 is not available.")
def _get_conn(service, region=None, key=None, keyid=None, profile=None): """ Return a boto3 client for ``service`` using this module's dunders. """ return boto3mod.get_connection( service, opts=__opts__, context=__context__, region=region, key=key, keyid=keyid, profile=profile, ) def _ec2_conn(region=None, key=None, keyid=None, profile=None): return _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile) def _paginate(function, result_key, **kwargs): """ Yield items from a boto3 autoscaling paginated response. ``function`` is the bound client method and ``result_key`` is the response key that holds the list of items (e.g. ``AutoScalingGroups``, ``LaunchConfigurations``, ``Tags``). """ next_token = "" while next_token is not None: call_kwargs = dict(kwargs) if next_token: call_kwargs["NextToken"] = next_token resp = function(**call_kwargs) yield from resp.get(result_key, []) next_token = resp.get("NextToken") if not next_token: return def _maybe_json(value): if isinstance(value, str): return salt.utils.json.loads(value) return value
[docs] def exists(name, region=None, key=None, keyid=None, profile=None): """ Check to see if an autoscale group exists. CLI Example: .. code-block:: bash salt myminion boto3_asg.exists myasg region=us-east-1 """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: resp = conn.describe_auto_scaling_groups(AutoScalingGroupNames=[name]) except ClientError as e: log.error(e) return False return bool(resp.get("AutoScalingGroups"))
def _asg_to_config(asg, policies, actions): """ Translate a boto3 ``AutoScalingGroup`` dict and its policies/actions into the legacy snake_case configuration dict consumed by the state module. """ ret = odict.OrderedDict() ret["name"] = asg.get("AutoScalingGroupName") ret["availability_zones"] = asg.get("AvailabilityZones", []) ret["default_cooldown"] = asg.get("DefaultCooldown") ret["desired_capacity"] = asg.get("DesiredCapacity") ret["health_check_period"] = asg.get("HealthCheckGracePeriod") ret["health_check_type"] = asg.get("HealthCheckType") ret["launch_config_name"] = asg.get("LaunchConfigurationName") ret["load_balancers"] = asg.get("LoadBalancerNames", []) ret["max_size"] = asg.get("MaxSize") ret["min_size"] = asg.get("MinSize") ret["placement_group"] = asg.get("PlacementGroup") vpc_zone = asg.get("VPCZoneIdentifier") # Boto2 always returned a comma-separated list; preserve that shape. ret["vpc_zone_identifier"] = vpc_zone.split(",") if vpc_zone else [] ret["tags"] = [ odict.OrderedDict( [ ("key", t.get("Key")), ("value", t.get("Value")), ("propagate_at_launch", t.get("PropagateAtLaunch", False)), ] ) for t in asg.get("Tags", []) ] ret["termination_policies"] = asg.get("TerminationPolicies", []) ret["suspended_processes"] = sorted(p["ProcessName"] for p in asg.get("SuspendedProcesses", [])) ret["scaling_policies"] = [ { "name": p.get("PolicyName"), "adjustment_type": p.get("AdjustmentType"), "scaling_adjustment": p.get("ScalingAdjustment"), "min_adjustment_step": p.get("MinAdjustmentStep"), "cooldown": p.get("Cooldown"), } for p in policies ] ret["scheduled_actions"] = {} for action in actions: end_time = action.get("EndTime") if end_time is not None and hasattr(end_time, "isoformat"): end_time = end_time.isoformat() start_time = action.get("StartTime") if start_time is not None and hasattr(start_time, "isoformat"): start_time = start_time.isoformat() ret["scheduled_actions"][action["ScheduledActionName"]] = { "min_size": action.get("MinSize"), "max_size": action.get("MaxSize"), "desired_capacity": ( int(action["DesiredCapacity"]) if action.get("DesiredCapacity") is not None else None ), "start_time": start_time, "end_time": end_time, "recurrence": action.get("Recurrence"), } return ret
[docs] def get_config(name, region=None, key=None, keyid=None, profile=None): """ Get the configuration for an autoscale group. CLI Example: .. code-block:: bash salt myminion boto3_asg.get_config myasg region=us-east-1 """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: resp = conn.describe_auto_scaling_groups(AutoScalingGroupNames=[name]) except ClientError as e: log.error(e) return {} asgs = resp.get("AutoScalingGroups") or [] if not asgs: return {} asg = asgs[0] try: policies = list( _paginate( conn.describe_policies, "ScalingPolicies", AutoScalingGroupName=name, ) ) actions = list( _paginate( conn.describe_scheduled_actions, "ScheduledUpdateGroupActions", AutoScalingGroupName=name, ) ) except ClientError as e: log.error(e) return {} return _asg_to_config(asg, policies, actions)
def _tags_kwarg(tags, asg_name): """ Build a boto3 Tags list for create/update from the state-style tag dicts. """ out = [] for tag in tags or []: if "key" not in tag: log.error("Tag missing key.") return None if "value" not in tag: log.error("Tag missing value.") return None out.append( { "ResourceId": asg_name, "ResourceType": "auto-scaling-group", "Key": tag["key"], "Value": tag["value"], "PropagateAtLaunch": tag.get("propagate_at_launch", False), } ) return out def _create_kwargs( name, launch_config_name, availability_zones, min_size, max_size, desired_capacity, load_balancers, default_cooldown, health_check_type, health_check_period, placement_group, vpc_zone_identifier, tags, termination_policies, ): """ Map the legacy create/update kwargs to boto3 CreateAutoScalingGroup kwargs, omitting values that are ``None``. """ mapping = [ ("AutoScalingGroupName", name), ("LaunchConfigurationName", launch_config_name), ("MinSize", min_size), ("MaxSize", max_size), ("DesiredCapacity", desired_capacity), ("DefaultCooldown", default_cooldown), ("HealthCheckType", health_check_type), ("HealthCheckGracePeriod", health_check_period), ("PlacementGroup", placement_group), ] kwargs = {k: v for k, v in mapping if v is not None} if availability_zones: kwargs["AvailabilityZones"] = availability_zones if load_balancers: kwargs["LoadBalancerNames"] = load_balancers if vpc_zone_identifier: if isinstance(vpc_zone_identifier, list): kwargs["VPCZoneIdentifier"] = ",".join(vpc_zone_identifier) else: kwargs["VPCZoneIdentifier"] = vpc_zone_identifier if termination_policies: kwargs["TerminationPolicies"] = termination_policies if tags: tag_list = _tags_kwarg(tags, name) if tag_list is None: return None kwargs["Tags"] = tag_list return kwargs
[docs] def create( name, launch_config_name, availability_zones, min_size, max_size, desired_capacity=None, load_balancers=None, default_cooldown=None, health_check_type=None, health_check_period=None, placement_group=None, vpc_zone_identifier=None, tags=None, termination_policies=None, suspended_processes=None, scaling_policies=None, scheduled_actions=None, region=None, notification_arn=None, notification_types=None, key=None, keyid=None, profile=None, ): """ Create an autoscale group. CLI Example: .. code-block:: bash salt myminion boto3_asg.create myasg mylc '["us-east-1a", "us-east-1e"]' 1 10 """ availability_zones = _maybe_json(availability_zones) load_balancers = _maybe_json(load_balancers) vpc_zone_identifier = _maybe_json(vpc_zone_identifier) tags = _maybe_json(tags) termination_policies = _maybe_json(termination_policies) suspended_processes = _maybe_json(suspended_processes) scheduled_actions = _maybe_json(scheduled_actions) kwargs = _create_kwargs( name, launch_config_name, availability_zones, min_size, max_size, desired_capacity, load_balancers, default_cooldown, health_check_type, health_check_period, placement_group, vpc_zone_identifier, tags, termination_policies, ) if kwargs is None: return False conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: conn.create_auto_scaling_group(**kwargs) _create_scaling_policies(conn, name, scaling_policies) _create_scheduled_actions(conn, name, scheduled_actions) if suspended_processes: conn.suspend_processes( AutoScalingGroupName=name, ScalingProcesses=list(suspended_processes) ) if notification_arn and notification_types: conn.put_notification_configuration( AutoScalingGroupName=name, TopicARN=notification_arn, NotificationTypes=list(notification_types), ) log.info("Created ASG %s", name) return True except ClientError as e: log.error("Failed to create ASG %s: %s", name, e) return False
[docs] def update( name, launch_config_name, availability_zones, min_size, max_size, desired_capacity=None, load_balancers=None, default_cooldown=None, health_check_type=None, health_check_period=None, placement_group=None, vpc_zone_identifier=None, tags=None, termination_policies=None, suspended_processes=None, scaling_policies=None, scheduled_actions=None, notification_arn=None, notification_types=None, region=None, key=None, keyid=None, profile=None, ): """ Update an autoscale group. CLI Example: .. code-block:: bash salt myminion boto3_asg.update myasg mylc '["us-east-1a", "us-east-1e"]' 1 10 """ availability_zones = _maybe_json(availability_zones) load_balancers = _maybe_json(load_balancers) vpc_zone_identifier = _maybe_json(vpc_zone_identifier) tags = _maybe_json(tags) termination_policies = _maybe_json(termination_policies) suspended_processes = _maybe_json(suspended_processes) scheduled_actions = _maybe_json(scheduled_actions) conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) if not conn: return False, "failed to connect to AWS" # Build the tag add/delete deltas against the current tag set. try: current_tags_resp = conn.describe_tags( Filters=[{"Name": "auto-scaling-group", "Values": [name]}] ) except ClientError as e: return False, str(e) current_tags = [ { "key": t["Key"], "value": t["Value"], "resource_id": t["ResourceId"], "propagate_at_launch": t.get("PropagateAtLaunch", False), } for t in current_tags_resp.get("Tags", []) ] add_tags = [] desired_tags = [] if tags: tags = boto3mod.ordered(tags) for tag in tags: if "key" not in tag: log.error("Tag missing key.") return False, f"Tag {tag} missing key" if "value" not in tag: log.error("Tag missing value.") return False, f"Tag {tag} missing value" _tag = { "key": tag["key"], "value": tag["value"], "resource_id": name, "propagate_at_launch": tag.get("propagate_at_launch", False), } if _tag not in current_tags: add_tags.append(_tag) desired_tags.append(_tag) delete_tags = [t for t in current_tags if t not in desired_tags] try: update_kwargs = _update_kwargs( name, launch_config_name, availability_zones, min_size, max_size, desired_capacity, default_cooldown, health_check_type, health_check_period, placement_group, vpc_zone_identifier, termination_policies, ) conn.update_auto_scaling_group(**update_kwargs) if load_balancers is not None: _reconcile_load_balancers(conn, name, load_balancers) if notification_arn and notification_types: conn.put_notification_configuration( AutoScalingGroupName=name, TopicARN=notification_arn, NotificationTypes=list(notification_types), ) if add_tags: log.debug("Adding/updating tags for ASG: %s", add_tags) conn.create_or_update_tags(Tags=_convert_tag_list(add_tags)) if delete_tags: log.debug("Deleting tags from ASG: %s", delete_tags) conn.delete_tags(Tags=_convert_tag_list(delete_tags)) # Resume all processes, then suspend any explicitly specified. conn.resume_processes(AutoScalingGroupName=name) if suspended_processes: conn.suspend_processes( AutoScalingGroupName=name, ScalingProcesses=list(suspended_processes) ) # Scaling policies: delete-all + recreate for policy in _paginate( conn.describe_policies, "ScalingPolicies", AutoScalingGroupName=name ): conn.delete_policy(AutoScalingGroupName=name, PolicyName=policy["PolicyName"]) _create_scaling_policies(conn, name, scaling_policies) # Scheduled actions: delete-all + recreate for action in _paginate( conn.describe_scheduled_actions, "ScheduledUpdateGroupActions", AutoScalingGroupName=name, ): conn.delete_scheduled_action( AutoScalingGroupName=name, ScheduledActionName=action["ScheduledActionName"], ) _create_scheduled_actions(conn, name, scheduled_actions) return True, "" except ClientError as e: log.error("Failed to update ASG %s: %s", name, e) return False, str(e)
def _update_kwargs( name, launch_config_name, availability_zones, min_size, max_size, desired_capacity, default_cooldown, health_check_type, health_check_period, placement_group, vpc_zone_identifier, termination_policies, ): mapping = [ ("AutoScalingGroupName", name), ("LaunchConfigurationName", launch_config_name), ("MinSize", min_size), ("MaxSize", max_size), ("DesiredCapacity", desired_capacity), ("DefaultCooldown", default_cooldown), ("HealthCheckType", health_check_type), ("HealthCheckGracePeriod", health_check_period), ("PlacementGroup", placement_group), ] kwargs = {k: v for k, v in mapping if v is not None} if availability_zones: kwargs["AvailabilityZones"] = availability_zones if vpc_zone_identifier: kwargs["VPCZoneIdentifier"] = ( ",".join(vpc_zone_identifier) if isinstance(vpc_zone_identifier, list) else vpc_zone_identifier ) if termination_policies: kwargs["TerminationPolicies"] = termination_policies return kwargs def _convert_tag_list(tags): return [ { "ResourceId": t["resource_id"], "ResourceType": "auto-scaling-group", "Key": t["key"], "Value": t["value"], "PropagateAtLaunch": t.get("propagate_at_launch", False), } for t in tags ] def _reconcile_load_balancers(conn, name, load_balancers): try: resp = conn.describe_load_balancers(AutoScalingGroupName=name) current_names = {lb["LoadBalancerName"] for lb in resp.get("LoadBalancers", [])} except ClientError: current_names = set() desired = set(load_balancers or []) to_attach = list(desired - current_names) to_detach = list(current_names - desired) if to_attach: conn.attach_load_balancers(AutoScalingGroupName=name, LoadBalancerNames=to_attach) if to_detach: conn.detach_load_balancers(AutoScalingGroupName=name, LoadBalancerNames=to_detach) def _create_scaling_policies(conn, as_name, scaling_policies): """helper function to create scaling policies""" if not scaling_policies: return for policy in scaling_policies: kwargs = { "AutoScalingGroupName": as_name, "PolicyName": policy["name"], "AdjustmentType": policy["adjustment_type"], "ScalingAdjustment": policy["scaling_adjustment"], } if policy.get("min_adjustment_step") is not None: kwargs["MinAdjustmentStep"] = policy["min_adjustment_step"] if policy.get("cooldown") is not None: kwargs["Cooldown"] = policy["cooldown"] conn.put_scaling_policy(**kwargs) def _create_scheduled_actions(conn, as_name, scheduled_actions): """Helper function to create scheduled actions""" if not scheduled_actions: return for action_name, action in scheduled_actions.items(): kwargs = { "AutoScalingGroupName": as_name, "ScheduledActionName": action_name, } for src, dst in ( ("desired_capacity", "DesiredCapacity"), ("min_size", "MinSize"), ("max_size", "MaxSize"), ("recurrence", "Recurrence"), ): if action.get(src) is not None: kwargs[dst] = action[src] for src, dst in (("start_time", "StartTime"), ("end_time", "EndTime")): value = action.get(src) if value is None: continue if isinstance(value, str): value = datetime.datetime.strptime(value, DATE_FORMAT) kwargs[dst] = value conn.put_scheduled_update_group_action(**kwargs)
[docs] def delete(name, force=False, region=None, key=None, keyid=None, profile=None): """ Delete an autoscale group. CLI Example: .. code-block:: bash salt myminion boto3_asg.delete myasg region=us-east-1 """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: conn.delete_auto_scaling_group(AutoScalingGroupName=name, ForceDelete=bool(force)) log.info("Deleted autoscale group %s.", name) return True except ClientError as e: log.error("Failed to delete autoscale group %s: %s", name, e) return False
[docs] def get_cloud_init_mime(cloud_init): """ Get a mime multipart encoded string from a cloud-init dict. Currently supports boothooks, scripts and cloud-config. CLI Example: .. code-block:: bash salt myminion boto3_asg.get_cloud_init_mime <cloud init> """ if isinstance(cloud_init, str): cloud_init = salt.utils.json.loads(cloud_init) _cloud_init = email.mime.multipart.MIMEMultipart() if "boothooks" in cloud_init: for _script_name, script in cloud_init["boothooks"].items(): _script = email.mime.text.MIMEText(script, "cloud-boothook") _cloud_init.attach(_script) if "scripts" in cloud_init: for _script_name, script in cloud_init["scripts"].items(): _script = email.mime.text.MIMEText(script, "x-shellscript") _cloud_init.attach(_script) if "cloud-config" in cloud_init: cloud_config = cloud_init["cloud-config"] _cloud_config = email.mime.text.MIMEText( salt.utils.yaml.safe_dump(cloud_config, default_flow_style=False), "cloud-config", ) _cloud_init.attach(_cloud_config) return _cloud_init.as_string()
[docs] def launch_configuration_exists(name, region=None, key=None, keyid=None, profile=None): """ Check for a launch configuration's existence. CLI Example: .. code-block:: bash salt myminion boto3_asg.launch_configuration_exists mylc """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: resp = conn.describe_launch_configurations(LaunchConfigurationNames=[name]) except ClientError as e: log.error(e) return False return bool(resp.get("LaunchConfigurations"))
[docs] def get_all_launch_configurations(region=None, key=None, keyid=None, profile=None): """ Fetch and return all Launch Configurations with details. CLI Example: .. code-block:: bash salt myminion boto3_asg.get_all_launch_configurations """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: return list(_paginate(conn.describe_launch_configurations, "LaunchConfigurations")) except ClientError as e: log.error(e) return []
[docs] def list_launch_configurations(region=None, key=None, keyid=None, profile=None): """ List all Launch Configuration names. CLI Example: .. code-block:: bash salt myminion boto3_asg.list_launch_configurations """ return [ lc["LaunchConfigurationName"] for lc in get_all_launch_configurations( region=region, key=key, keyid=keyid, profile=profile ) ]
[docs] def describe_launch_configuration(name, region=None, key=None, keyid=None, profile=None): """ Dump details of a given launch configuration. CLI Example: .. code-block:: bash salt myminion boto3_asg.describe_launch_configuration mylc """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: resp = conn.describe_launch_configurations(LaunchConfigurationNames=[name]) except ClientError as e: log.error(e) return None lcs = resp.get("LaunchConfigurations") or [] return lcs[0] if lcs else None
[docs] def create_launch_configuration( name, image_id, key_name=None, vpc_id=None, vpc_name=None, security_groups=None, user_data=None, instance_type="m1.small", kernel_id=None, ramdisk_id=None, block_device_mappings=None, instance_monitoring=False, spot_price=None, instance_profile_name=None, ebs_optimized=False, associate_public_ip_address=None, region=None, key=None, keyid=None, profile=None, ): """ Create a launch configuration. CLI Example: .. code-block:: bash salt myminion boto3_asg.create_launch_configuration mylc image_id=ami-0b9c9f62 """ security_groups = _maybe_json(security_groups) block_device_mappings = _maybe_json(block_device_mappings) # If a VPC is specified, determine the secgroup ids within that VPC. if security_groups and (vpc_id or vpc_name): security_groups = __salt__["boto3_secgroup.convert_to_group_ids"]( security_groups, vpc_id=vpc_id, vpc_name=vpc_name, region=region, key=key, keyid=keyid, profile=profile, ) # Translate the old {name: attrs} dict list into boto3 BlockDeviceMappings bdm_kwargs = [] if block_device_mappings: for block_device_dict in block_device_mappings: for device_name, attributes in block_device_dict.items(): entry = {"DeviceName": device_name} ebs = {} for attr, value in attributes.items(): if attr == "virtual_name": entry["VirtualName"] = value elif attr == "no_device": entry["NoDevice"] = value elif attr == "volume_type": ebs["VolumeType"] = value elif attr == "size": ebs["VolumeSize"] = value elif attr == "iops": ebs["Iops"] = value elif attr == "snapshot_id": ebs["SnapshotId"] = value elif attr == "delete_on_termination": ebs["DeleteOnTermination"] = value elif attr == "encrypted": ebs["Encrypted"] = value if ebs: entry["Ebs"] = ebs bdm_kwargs.append(entry) kwargs = { "LaunchConfigurationName": name, "ImageId": image_id, "InstanceType": instance_type, "InstanceMonitoring": {"Enabled": bool(instance_monitoring)}, "EbsOptimized": bool(ebs_optimized), } if key_name is not None: kwargs["KeyName"] = key_name if security_groups: kwargs["SecurityGroups"] = list(security_groups) if user_data is not None: kwargs["UserData"] = user_data if kernel_id is not None: kwargs["KernelId"] = kernel_id if ramdisk_id is not None: kwargs["RamdiskId"] = ramdisk_id if spot_price is not None: kwargs["SpotPrice"] = str(spot_price) if instance_profile_name is not None: kwargs["IamInstanceProfile"] = instance_profile_name if associate_public_ip_address is not None: kwargs["AssociatePublicIpAddress"] = bool(associate_public_ip_address) if bdm_kwargs: kwargs["BlockDeviceMappings"] = bdm_kwargs conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: conn.create_launch_configuration(**kwargs) log.info("Created LC %s", name) return True except ClientError as e: log.error("Failed to create LC %s: %s", name, e) return False
[docs] def delete_launch_configuration(name, region=None, key=None, keyid=None, profile=None): """ Delete a launch configuration. CLI Example: .. code-block:: bash salt myminion boto3_asg.delete_launch_configuration mylc """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: conn.delete_launch_configuration(LaunchConfigurationName=name) log.info("Deleted LC %s", name) return True except ClientError as e: log.error("Failed to delete LC %s: %s", name, e) return False
[docs] def get_scaling_policy_arn( as_group, scaling_policy_name, region=None, key=None, keyid=None, profile=None ): """ Return the arn for a scaling policy in a specific autoscale group or ``None`` if not found. Mainly used as a helper method for boto_cloudwatch_alarm, for linking alarms to scaling policies. CLI Example: .. code-block:: bash salt '*' boto3_asg.get_scaling_policy_arn mygroup mypolicy """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: for policy in _paginate( conn.describe_policies, "ScalingPolicies", AutoScalingGroupName=as_group, ): if policy["PolicyName"] == scaling_policy_name: return policy["PolicyARN"] except ClientError as e: log.error(e) return None log.error("Could not find scaling policy %s for %s", scaling_policy_name, as_group) return None
[docs] def get_all_groups(region=None, key=None, keyid=None, profile=None): """ Return all AutoScale Groups visible in the account as a list of boto3 describe-response dicts. CLI Example: .. code-block:: bash salt-call boto3_asg.get_all_groups region=us-east-1 --output yaml """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: return list(_paginate(conn.describe_auto_scaling_groups, "AutoScalingGroups")) except ClientError as e: log.error(e) return []
[docs] def list_groups(region=None, key=None, keyid=None, profile=None): """ Return all AutoScale Group names visible in the account. CLI Example: .. code-block:: bash salt-call boto3_asg.list_groups region=us-east-1 """ return [ a["AutoScalingGroupName"] for a in get_all_groups(region=region, key=key, keyid=keyid, profile=profile) ]
[docs] def get_instances( name, lifecycle_state="InService", health_status="Healthy", attribute="private_ip_address", attributes=None, region=None, key=None, keyid=None, profile=None, ): """ Return an attribute of all instances in the named autoscale group. CLI Example: .. code-block:: bash salt-call boto3_asg.get_instances my_autoscale_group_name """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: resp = conn.describe_auto_scaling_groups(AutoScalingGroupNames=[name]) except ClientError as e: log.error(e) return False asgs = resp.get("AutoScalingGroups") or [] if len(asgs) != 1: log.debug( "name '%s' returns multiple ASGs: %s", name, [a["AutoScalingGroupName"] for a in asgs], ) return False asg = asgs[0] wanted_ls = lifecycle_state wanted_hs = health_status instance_ids = [] for inst in asg.get("Instances", []): if wanted_ls is not None and inst.get("LifecycleState") != wanted_ls: continue if wanted_hs is not None and inst.get("HealthStatus") != wanted_hs: continue instance_ids.append(inst["InstanceId"]) if not instance_ids: return [] ec2 = _ec2_conn(region=region, key=key, keyid=keyid, profile=profile) try: reservations = ec2.describe_instances(InstanceIds=instance_ids).get("Reservations", []) except ClientError as e: log.error(e) return False instances = [i for r in reservations for i in r.get("Instances", [])] if attributes: return [[_ec2_instance_attribute(inst, a) for a in attributes] for inst in instances] return [ _ec2_instance_attribute(inst, attribute) for inst in instances if _ec2_instance_attribute(inst, attribute) is not None ]
# Mapping from the legacy boto2 attribute names to boto3 describe_instances # response fields. Only the fields that callers actually use are mapped; any # unknown attribute falls through to a CamelCase conversion. _BOTO2_ATTR_MAP = { "id": "InstanceId", "instance_id": "InstanceId", "private_ip_address": "PrivateIpAddress", "public_ip_address": "PublicIpAddress", "private_dns_name": "PrivateDnsName", "public_dns_name": "PublicDnsName", "ip_address": "PublicIpAddress", "image_id": "ImageId", "instance_type": "InstanceType", "key_name": "KeyName", "state": "State", "subnet_id": "SubnetId", "vpc_id": "VpcId", "tags": "Tags", } def _ec2_instance_attribute(instance, attribute): if attribute == "tags": return {t["Key"]: t["Value"] for t in instance.get("Tags", [])} key = _BOTO2_ATTR_MAP.get(attribute) if key is None: # best-effort CamelCase of snake_case key = "".join(part.title() for part in attribute.split("_")) return instance.get(key)
[docs] def enter_standby( name, instance_ids, should_decrement_desired_capacity=False, region=None, key=None, keyid=None, profile=None, ): """ Switch desired instances to StandBy mode CLI Example: .. code-block:: bash salt-call boto3_asg.enter_standby my_autoscale_group_name '["i-xxxxxx"]' """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: response = conn.enter_standby( InstanceIds=instance_ids, AutoScalingGroupName=name, ShouldDecrementDesiredCapacity=should_decrement_desired_capacity, ) except ClientError as e: err = boto3mod.get_error(e) if e.response.get("Error", {}).get("Code") == "ResourceNotFoundException": return {"exists": False} return {"error": err} return all(activity["StatusCode"] != "Failed" for activity in response["Activities"])
[docs] def exit_standby( name, instance_ids, should_decrement_desired_capacity=False, region=None, key=None, keyid=None, profile=None, ): """ Exit desired instances from StandBy mode CLI Example: .. code-block:: bash salt-call boto3_asg.exit_standby my_autoscale_group_name '["i-xxxxxx"]' """ conn = _get_conn("autoscaling", region=region, key=key, keyid=keyid, profile=profile) try: response = conn.exit_standby( InstanceIds=instance_ids, AutoScalingGroupName=name, ShouldDecrementDesiredCapacity=should_decrement_desired_capacity, ) except ClientError as e: err = boto3mod.get_error(e) if e.response.get("Error", {}).get("Code") == "ResourceNotFoundException": return {"exists": False} return {"error": err} return all(activity["StatusCode"] != "Failed" for activity in response["Activities"])