"""
Connection module for Amazon VPC using boto3.
=============================================
Renamed from ``boto_vpc`` to ``boto3_vpc`` and rewritten to use the
boto3 EC2 client API directly via :py:mod:`saltext.boto3.utils.boto3mod`.
The legacy boto2 code path has been removed.
:depends:
- boto3 >= 1.28.0
- botocore >= 1.31.0
:configuration: This module accepts explicit VPC credentials but can also
utilize IAM roles assigned to the instance through Instance Profiles.
Dynamic credentials are then automatically obtained from AWS API and no
further configuration is necessary. More information available at:
.. code-block:: text
http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
If IAM roles are not used you need to specify them either in the minion's
config file or as a profile. For example, to specify them in the minion's
config file:
.. code-block:: yaml
vpc.keyid: GKTADJGHEIQSXMKKRBJ08H
vpc.key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
A region may also be specified in the configuration:
.. code-block:: yaml
vpc.region: us-east-1
It's also possible to specify key, keyid and region via a profile, either
as a passed in dict, or as a string to pull from pillars or minion config:
.. code-block:: yaml
myprofile:
keyid: GKTADJGHEIQSXMKKRBJ08H
key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
region: us-east-1
.. versionadded:: 1.0.0
"""
import logging
import random
import socket
import time
from salt.exceptions import CommandExecutionError
from salt.exceptions import SaltInvocationError
from salt.utils.data import exactly_one
from saltext.boto3.utils import boto3mod
try:
import botocore
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
log = logging.getLogger(__name__)
__virtualname__ = "boto3_vpc"
def _get_conn(service, region=None, key=None, keyid=None, profile=None):
"""
Return a boto3 client for ``service`` using this module's dunders.
"""
return boto3mod.get_connection(
service,
opts=__opts__,
context=__context__,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
[docs]
def __virtual__():
"""
Only load if boto3 is available. Minimum version is enforced via the
project's ``pyproject.toml`` dependency declaration.
"""
if HAS_BOTO3:
return __virtualname__
return (False, "The boto3_vpc module could not be loaded: boto3 is not available.")
def _tags_dict(tags):
"""Convert AWS tag list to a flat ``{key: value}`` dict."""
return {t["Key"]: t["Value"] for t in (tags or [])}
def _tag_specifications(resource_type, name=None, tags=None):
"""Build ``TagSpecifications`` for a create_* call."""
items = []
if name:
items.append({"Key": "Name", "Value": name})
if tags:
for key, value in tags.items():
if key == "Name" and name:
continue
items.append({"Key": key, "Value": value})
if not items:
return None
return [{"ResourceType": resource_type, "Tags": items}]
def _client_error_code(exc):
if isinstance(exc, botocore.exceptions.ClientError):
return exc.response.get("Error", {}).get("Code")
return None
def _find_vpcs(
*,
vpc_id=None,
vpc_name=None,
cidr=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return a list of VPC ids matching the supplied filters. When no filters
are supplied, returns the id of the default VPC (if any).
"""
if vpc_id and vpc_name:
raise SaltInvocationError("Only one of vpc_name or vpc_id may be provided.")
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {}
if vpc_id:
kwargs["VpcIds"] = [vpc_id]
filters = []
if cidr:
filters.append({"Name": "cidr", "Values": [cidr]})
if vpc_name:
filters.append({"Name": "tag:Name", "Values": [vpc_name]})
for tag_name, tag_value in (tags or {}).items():
filters.append({"Name": f"tag:{tag_name}", "Values": [tag_value]})
if filters:
kwargs["Filters"] = filters
vpcs = conn.describe_vpcs(**kwargs).get("Vpcs", [])
log.debug("describe_vpcs(%s) matched: %s", kwargs, vpcs)
if not vpcs:
return []
if not any((vpc_id, vpc_name, cidr, tags)):
return [v["VpcId"] for v in vpcs if v.get("IsDefault")]
return [v["VpcId"] for v in vpcs]
def _get_id(
vpc_name=None,
cidr=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
if not any((vpc_name, tags, cidr)):
raise SaltInvocationError(
"At least one of the following must be provided: vpc_name, cidr or tags."
)
if vpc_name and not any((cidr, tags)):
cached = boto3mod.cache_id(
"ec2",
vpc_name,
opts=__opts__,
context=__context__,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if cached:
return cached
vpc_ids = _find_vpcs(
vpc_name=vpc_name,
cidr=cidr,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not vpc_ids:
return None
if len(vpc_ids) > 1:
raise CommandExecutionError("Found more than one VPC matching the criteria.")
vpc_id = vpc_ids[0]
if vpc_name:
boto3mod.cache_id(
"ec2",
vpc_name,
opts=__opts__,
context=__context__,
resource_id=vpc_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return vpc_id
[docs]
def get_id(
name=None,
cidr=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return the id of the VPC matching the supplied filters.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.get_id myvpc
"""
try:
return {
"id": _get_id(
vpc_name=name,
cidr=cidr,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
}
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
[docs]
def exists(
vpc_id=None,
name=None,
cidr=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``{"exists": True}`` if a VPC matching the supplied filters exists.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.exists myvpc
"""
if not any((vpc_id, name, tags, cidr)):
raise SaltInvocationError(
"At least one of the following must be provided: vpc_id, name, cidr or tags."
)
try:
vpc_ids = _find_vpcs(
vpc_id=vpc_id,
vpc_name=name,
cidr=cidr,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
except botocore.exceptions.ClientError as exc:
if _client_error_code(exc) == "InvalidVpcID.NotFound":
return {"exists": False}
return {"error": boto3mod.get_error(exc)}
return {"exists": bool(vpc_ids)}
[docs]
def check_vpc(
vpc_id=None,
vpc_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return the VPC id if a VPC with the supplied id or name exists, else
``None``.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.check_vpc vpc_name=myvpc
"""
if not exactly_one((vpc_name, vpc_id)):
raise SaltInvocationError("One (but not both) of vpc_id or vpc_name must be provided.")
try:
if vpc_name:
return _get_id(vpc_name=vpc_name, region=region, key=key, keyid=keyid, profile=profile)
if not _find_vpcs(vpc_id=vpc_id, region=region, key=key, keyid=keyid, profile=profile):
log.info("VPC %s does not exist.", vpc_id)
return None
return vpc_id
except botocore.exceptions.ClientError as exc:
log.error("Failed to look up VPC: %s", exc)
return None
[docs]
def create(
cidr_block,
instance_tenancy=None,
vpc_name=None,
enable_dns_support=None,
enable_dns_hostnames=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Create a VPC with the given CIDR block.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create '10.0.0.0/24'
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {"CidrBlock": cidr_block}
if instance_tenancy is not None:
kwargs["InstanceTenancy"] = instance_tenancy
tag_spec = _tag_specifications("vpc", name=vpc_name, tags=tags)
if tag_spec:
kwargs["TagSpecifications"] = tag_spec
vpc = conn.create_vpc(**kwargs).get("Vpc") or {}
vpc_id = vpc.get("VpcId")
if not vpc_id:
log.warning("VPC was not created")
return {"created": False}
log.info("Created VPC %s", vpc_id)
if enable_dns_support is not None:
conn.modify_vpc_attribute(
VpcId=vpc_id, EnableDnsSupport={"Value": bool(enable_dns_support)}
)
if enable_dns_hostnames is not None:
conn.modify_vpc_attribute(
VpcId=vpc_id, EnableDnsHostnames={"Value": bool(enable_dns_hostnames)}
)
if vpc_name:
boto3mod.cache_id(
"ec2",
vpc_name,
opts=__opts__,
context=__context__,
resource_id=vpc_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"created": True, "id": vpc_id}
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def delete(
vpc_id=None,
name=None,
vpc_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Delete a VPC by id or name.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete vpc_id='vpc-6b1fe402'
salt myminion boto3_vpc.delete vpc_name='myvpc'
"""
if name:
log.warning("boto3_vpc.delete: name parameter is deprecated; use vpc_name instead.")
vpc_name = name
if not exactly_one((vpc_name, vpc_id)):
raise SaltInvocationError("One (but not both) of vpc_name or vpc_id must be provided.")
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if not vpc_id:
vpc_id = _get_id(
vpc_name=vpc_name,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not vpc_id:
return {
"deleted": False,
"error": {"message": f"VPC {vpc_name} not found"},
}
conn.delete_vpc(VpcId=vpc_id)
log.info("Deleted VPC %s", vpc_id)
if vpc_name:
boto3mod.cache_id(
"ec2",
vpc_name,
opts=__opts__,
context=__context__,
resource_id=vpc_id,
invalidate=True,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
def _vpc_payload(vpc):
return {
"id": vpc.get("VpcId"),
"cidr_block": vpc.get("CidrBlock"),
"is_default": vpc.get("IsDefault"),
"state": vpc.get("State"),
"tags": _tags_dict(vpc.get("Tags")),
"dhcp_options_id": vpc.get("DhcpOptionsId"),
"instance_tenancy": vpc.get("InstanceTenancy"),
}
[docs]
def describe(
vpc_id=None,
vpc_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Describe a VPC's properties. If neither id nor name is provided the
default VPC (if any) is described.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.describe vpc_id=vpc-123456
"""
try:
vpc_ids = _find_vpcs(
vpc_id=vpc_id,
vpc_name=vpc_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not vpc_ids:
return {"vpc": None}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
vpcs = conn.describe_vpcs(VpcIds=vpc_ids).get("Vpcs", [])
except botocore.exceptions.ClientError as exc:
if _client_error_code(exc) == "InvalidVpcID.NotFound":
return {"vpc": None}
return {"error": boto3mod.get_error(exc)}
if not vpcs:
return {"vpc": None}
return {"vpc": _vpc_payload(vpcs[0])}
[docs]
def describe_vpcs(
vpc_id=None,
name=None,
cidr=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Describe all VPCs matching the supplied filters.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.describe_vpcs
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {}
if vpc_id:
kwargs["VpcIds"] = [vpc_id]
filters = []
if cidr:
filters.append({"Name": "cidr", "Values": [cidr]})
if name:
filters.append({"Name": "tag:Name", "Values": [name]})
for tag_name, tag_value in (tags or {}).items():
filters.append({"Name": f"tag:{tag_name}", "Values": [tag_value]})
if filters:
kwargs["Filters"] = filters
vpcs = conn.describe_vpcs(**kwargs).get("Vpcs", [])
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
return {"vpcs": [_vpc_payload(v) for v in vpcs]}
_SUPPORTED_RESOURCES = {
"vpc": ("describe_vpcs", "VpcIds", "Vpcs", "VpcId"),
"subnet": ("describe_subnets", "SubnetIds", "Subnets", "SubnetId"),
"dhcp_options": (
"describe_dhcp_options",
"DhcpOptionsIds",
"DhcpOptions",
"DhcpOptionsId",
),
"internet_gateway": (
"describe_internet_gateways",
"InternetGatewayIds",
"InternetGateways",
"InternetGatewayId",
),
"customer_gateway": (
"describe_customer_gateways",
"CustomerGatewayIds",
"CustomerGateways",
"CustomerGatewayId",
),
"network_acl": (
"describe_network_acls",
"NetworkAclIds",
"NetworkAcls",
"NetworkAclId",
),
"route_table": (
"describe_route_tables",
"RouteTableIds",
"RouteTables",
"RouteTableId",
),
"vpc_peering_connection": (
"describe_vpc_peering_connections",
"VpcPeeringConnectionIds",
"VpcPeeringConnections",
"VpcPeeringConnectionId",
),
}
def _find_resources(
resource,
name=None,
resource_id=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
if resource not in _SUPPORTED_RESOURCES:
raise SaltInvocationError(f"Resource type {resource!r} is not supported by boto3_vpc.")
if resource_id and name:
raise SaltInvocationError("Only one of name or id may be provided.")
if not any((resource_id, name, tags)):
raise SaltInvocationError(
"At least one of the following must be provided: id, name, or tags."
)
describe_method, ids_kw, items_key, _ = _SUPPORTED_RESOURCES[resource]
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {}
if resource_id:
kwargs[ids_kw] = [resource_id]
filters = []
if name:
filters.append({"Name": "tag:Name", "Values": [name]})
for tag_name, tag_value in (tags or {}).items():
filters.append({"Name": f"tag:{tag_name}", "Values": [tag_value]})
if filters:
kwargs["Filters"] = filters
try:
return getattr(conn, describe_method)(**kwargs).get(items_key, [])
except botocore.exceptions.ClientError as exc:
if (_client_error_code(exc) or "").endswith(".NotFound"):
return []
raise
def _get_resource_id(resource, name, region=None, key=None, keyid=None, profile=None):
cached = boto3mod.cache_id(
"ec2",
name,
opts=__opts__,
context=__context__,
sub_resource=resource,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if cached:
return cached
items = _find_resources(
resource,
name=name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not items:
return None
if len(items) > 1:
raise CommandExecutionError(f'Found more than one {resource} named "{name}"')
_, _, _, id_key = _SUPPORTED_RESOURCES[resource]
rid = items[0].get(id_key)
if rid:
boto3mod.cache_id(
"ec2",
name,
opts=__opts__,
context=__context__,
sub_resource=resource,
resource_id=rid,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return rid
[docs]
def get_resource_id(
resource,
name=None,
resource_id=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``{"id": "..."}`` for a VPC resource looked up by name or id.
Currently supported ``resource`` values: ``vpc``, ``subnet``,
``dhcp_options``.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.get_resource_id subnet mysubnet
"""
try:
if resource_id and not name:
return {"id": resource_id}
return {
"id": _get_resource_id(
resource, name, region=region, key=key, keyid=keyid, profile=profile
)
}
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
[docs]
def resource_exists(
resource,
name=None,
resource_id=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``{"exists": True}`` if a resource of ``resource`` matching the
supplied filters exists.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.resource_exists subnet name=mysubnet
"""
try:
return {
"exists": bool(
_find_resources(
resource,
name=name,
resource_id=resource_id,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
)
}
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
[docs]
def create_subnet(
vpc_id=None,
cidr_block=None,
vpc_name=None,
availability_zone=None,
subnet_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
auto_assign_public_ipv4=False,
):
"""
Create a subnet inside an existing VPC.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_subnet vpc_name=myvpc \\
subnet_name=mysubnet cidr_block=10.0.0.0/25
"""
try:
vpc_id = check_vpc(vpc_id, vpc_name, region, key, keyid, profile)
if not vpc_id:
return {
"created": False,
"error": {"message": f"VPC {vpc_name or vpc_id} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if subnet_name and _get_resource_id(
"subnet", subnet_name, region=region, key=key, keyid=keyid, profile=profile
):
return {
"created": False,
"error": {"message": f"A subnet named {subnet_name} already exists."},
}
kwargs = {"VpcId": vpc_id, "CidrBlock": cidr_block}
if availability_zone:
kwargs["AvailabilityZone"] = availability_zone
tag_spec = _tag_specifications("subnet", name=subnet_name, tags=tags)
if tag_spec:
kwargs["TagSpecifications"] = tag_spec
subnet = conn.create_subnet(**kwargs).get("Subnet") or {}
subnet_id = subnet.get("SubnetId")
if not subnet_id:
return {"created": False}
if auto_assign_public_ipv4:
conn.modify_subnet_attribute(SubnetId=subnet_id, MapPublicIpOnLaunch={"Value": True})
if subnet_name:
boto3mod.cache_id(
"ec2",
subnet_name,
opts=__opts__,
context=__context__,
sub_resource="subnet",
resource_id=subnet_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"created": True, "id": subnet_id}
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def delete_subnet(
subnet_id=None, subnet_name=None, region=None, key=None, keyid=None, profile=None
):
"""
Delete a subnet by id or name.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_subnet 'subnet-6a1fe403'
"""
if not exactly_one((subnet_name, subnet_id)):
raise SaltInvocationError(
"One (but not both) of subnet_name or subnet_id must be provided."
)
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if not subnet_id:
subnet_id = _get_resource_id(
"subnet",
subnet_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not subnet_id:
return {
"deleted": False,
"error": {"message": f"subnet {subnet_name} does not exist."},
}
conn.delete_subnet(SubnetId=subnet_id)
if subnet_name:
boto3mod.cache_id(
"ec2",
subnet_name,
opts=__opts__,
context=__context__,
sub_resource="subnet",
resource_id=subnet_id,
invalidate=True,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
[docs]
def subnet_exists(
subnet_id=None,
name=None,
subnet_name=None,
cidr=None,
tags=None,
zones=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``{"exists": True}`` when a subnet matching the filters exists.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.subnet_exists subnet_id='subnet-6a1fe403'
"""
if name:
log.warning(
"boto3_vpc.subnet_exists: name parameter is deprecated; use subnet_name instead."
)
subnet_name = name
if not any((subnet_id, subnet_name, cidr, tags, zones)):
raise SaltInvocationError(
"At least one of the following must be specified: "
"subnet_id, subnet_name, cidr, tags or zones."
)
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {}
if subnet_id:
kwargs["SubnetIds"] = [subnet_id]
filters = []
if subnet_name:
filters.append({"Name": "tag:Name", "Values": [subnet_name]})
if cidr:
filters.append({"Name": "cidr-block", "Values": [cidr]})
for tag_name, tag_value in (tags or {}).items():
filters.append({"Name": f"tag:{tag_name}", "Values": [tag_value]})
if zones:
zones_values = [zones] if isinstance(zones, str) else list(zones)
filters.append({"Name": "availability-zone", "Values": zones_values})
if filters:
kwargs["Filters"] = filters
subnets = conn.describe_subnets(**kwargs).get("Subnets", [])
except botocore.exceptions.ClientError as exc:
if _client_error_code(exc) == "InvalidSubnetID.NotFound":
return {"exists": False}
return {"error": boto3mod.get_error(exc)}
return {"exists": bool(subnets)}
[docs]
def get_subnet_association(subnets, region=None, key=None, keyid=None, profile=None):
"""
Return the VPC id (or list of VPC ids) associated with the given subnet
id or list of subnet ids.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.get_subnet_association subnet-61b47516
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
ids = [subnets] if isinstance(subnets, str) else list(subnets)
described = conn.describe_subnets(SubnetIds=ids).get("Subnets", [])
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
vpc_ids = {s["VpcId"] for s in described if s.get("VpcId")}
if not vpc_ids:
return {"vpc_id": None}
if len(vpc_ids) == 1:
return {"vpc_id": vpc_ids.pop()}
return {"vpc_ids": list(vpc_ids)}
def _subnet_payload(subnet):
return {
"id": subnet.get("SubnetId"),
"cidr_block": subnet.get("CidrBlock"),
"availability_zone": subnet.get("AvailabilityZone"),
"tags": _tags_dict(subnet.get("Tags")),
"vpc_id": subnet.get("VpcId"),
}
[docs]
def describe_subnet(
subnet_id=None, subnet_name=None, region=None, key=None, keyid=None, profile=None
):
"""
Describe a single subnet by id or name.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.describe_subnet subnet_id=subnet-123456
"""
if not exactly_one((subnet_name, subnet_id)):
raise SaltInvocationError(
"One (but not both) of subnet_name or subnet_id must be provided."
)
try:
items = _find_resources(
"subnet",
name=subnet_name,
resource_id=subnet_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
if not items:
return {"subnet": None}
if len(items) > 1:
raise CommandExecutionError(f'Found more than one subnet named "{subnet_name}"')
return {"subnet": _subnet_payload(items[0])}
[docs]
def describe_subnets(
subnet_ids=None,
subnet_names=None,
vpc_id=None,
cidr=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Describe subnets matching the supplied filters.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.describe_subnets vpc_id=vpc-123456
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {}
if subnet_ids:
kwargs["SubnetIds"] = [subnet_ids] if isinstance(subnet_ids, str) else list(subnet_ids)
filters = []
if vpc_id:
filters.append({"Name": "vpc-id", "Values": [vpc_id]})
if cidr:
filters.append({"Name": "cidr-block", "Values": [cidr]})
if subnet_names:
names = [subnet_names] if isinstance(subnet_names, str) else list(subnet_names)
filters.append({"Name": "tag:Name", "Values": names})
if filters:
kwargs["Filters"] = filters
subnets = conn.describe_subnets(**kwargs).get("Subnets", [])
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
if not subnets:
return {"subnets": None}
return {"subnets": [_subnet_payload(s) for s in subnets]}
_DHCP_KEY_MAP = {
"domain_name": "domain-name",
"domain_name_servers": "domain-name-servers",
"ntp_servers": "ntp-servers",
"netbios_name_servers": "netbios-name-servers",
"netbios_node_type": "netbios-node-type",
}
def _dhcp_configurations(
domain_name=None,
domain_name_servers=None,
ntp_servers=None,
netbios_name_servers=None,
netbios_node_type=None,
):
config = []
def _add(api_key, value):
if value is None:
return
if isinstance(value, (list, tuple)):
values = [str(v) for v in value]
else:
values = [str(value)]
config.append({"Key": api_key, "Values": values})
_add("domain-name", domain_name)
_add("domain-name-servers", domain_name_servers)
_add("ntp-servers", ntp_servers)
_add("netbios-name-servers", netbios_name_servers)
_add("netbios-node-type", netbios_node_type)
return config
[docs]
def create_dhcp_options(
domain_name=None,
domain_name_servers=None,
ntp_servers=None,
netbios_name_servers=None,
netbios_node_type=None,
dhcp_options_name=None,
tags=None,
vpc_id=None,
vpc_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Create a DHCP options set, optionally associating it with an existing VPC.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_dhcp_options domain_name='example.com' \\
domain_name_servers='[1.2.3.4]' vpc_name='myvpc'
"""
try:
if vpc_id or vpc_name:
vpc_id = check_vpc(vpc_id, vpc_name, region, key, keyid, profile)
if not vpc_id:
return {
"created": False,
"error": {"message": f"VPC {vpc_name or vpc_id} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if dhcp_options_name and _get_resource_id(
"dhcp_options",
dhcp_options_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
):
return {
"created": False,
"error": {"message": f"A dhcp_options named {dhcp_options_name} already exists."},
}
kwargs = {
"DhcpConfigurations": _dhcp_configurations(
domain_name=domain_name,
domain_name_servers=domain_name_servers,
ntp_servers=ntp_servers,
netbios_name_servers=netbios_name_servers,
netbios_node_type=netbios_node_type,
)
}
tag_spec = _tag_specifications("dhcp-options", name=dhcp_options_name, tags=tags)
if tag_spec:
kwargs["TagSpecifications"] = tag_spec
dhcp = conn.create_dhcp_options(**kwargs).get("DhcpOptions") or {}
dhcp_id = dhcp.get("DhcpOptionsId")
if not dhcp_id:
return {"created": False}
if vpc_id:
conn.associate_dhcp_options(DhcpOptionsId=dhcp_id, VpcId=vpc_id)
log.info("Associated DHCP options %s with VPC %s", dhcp_id, vpc_id)
if dhcp_options_name:
boto3mod.cache_id(
"ec2",
dhcp_options_name,
opts=__opts__,
context=__context__,
sub_resource="dhcp_options",
resource_id=dhcp_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"created": True, "id": dhcp_id}
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def get_dhcp_options(
dhcp_options_name=None,
dhcp_options_id=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return the configured options for the named DHCP options set.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.get_dhcp_options 'myfunnydhcpoptionsname'
"""
if not any((dhcp_options_name, dhcp_options_id)):
raise SaltInvocationError(
"At least one of the following must be specified: "
"dhcp_options_name or dhcp_options_id."
)
if not dhcp_options_id and dhcp_options_name:
dhcp_options_id = _get_resource_id(
"dhcp_options",
dhcp_options_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not dhcp_options_id:
return {"dhcp_options": {}}
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
items = conn.describe_dhcp_options(DhcpOptionsIds=[dhcp_options_id]).get("DhcpOptions", [])
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
if not items:
return {"dhcp_options": None}
by_api_key = {
cfg["Key"]: [v["Value"] for v in cfg.get("Values", [])]
for cfg in items[0].get("DhcpConfigurations", [])
}
options = {}
for legacy_key, api_key in _DHCP_KEY_MAP.items():
values = by_api_key.get(api_key)
if values is None:
options[legacy_key] = None
elif legacy_key in ("domain_name", "netbios_node_type"):
options[legacy_key] = values[0] if values else None
else:
options[legacy_key] = values
return {"dhcp_options": options}
[docs]
def delete_dhcp_options(
dhcp_options_id=None,
dhcp_options_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Delete a DHCP options set by id or name.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_dhcp_options 'dopt-b6a247df'
"""
if not exactly_one((dhcp_options_name, dhcp_options_id)):
raise SaltInvocationError(
"One (but not both) of dhcp_options_name or dhcp_options_id must be provided."
)
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if not dhcp_options_id:
dhcp_options_id = _get_resource_id(
"dhcp_options",
dhcp_options_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not dhcp_options_id:
return {
"deleted": False,
"error": {"message": f"dhcp_options {dhcp_options_name} does not exist."},
}
conn.delete_dhcp_options(DhcpOptionsId=dhcp_options_id)
if dhcp_options_name:
boto3mod.cache_id(
"ec2",
dhcp_options_name,
opts=__opts__,
context=__context__,
sub_resource="dhcp_options",
resource_id=dhcp_options_id,
invalidate=True,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
[docs]
def associate_dhcp_options_to_vpc(
dhcp_options_id,
vpc_id=None,
vpc_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Associate a DHCP options set with a VPC.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.associate_dhcp_options_to_vpc 'dopt-a0bl34pp' 'vpc-6b1fe402'
"""
try:
vpc_id = check_vpc(vpc_id, vpc_name, region, key, keyid, profile)
if not vpc_id:
return {
"associated": False,
"error": {"message": f"VPC {vpc_name or vpc_id} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
conn.associate_dhcp_options(DhcpOptionsId=dhcp_options_id, VpcId=vpc_id)
log.info("Associated DHCP options %s with VPC %s", dhcp_options_id, vpc_id)
return {"associated": True}
except botocore.exceptions.ClientError as exc:
return {"associated": False, "error": boto3mod.get_error(exc)}
[docs]
def dhcp_options_exists(
dhcp_options_id=None,
name=None,
dhcp_options_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``{"exists": True}`` if a DHCP options set matching the filters exists.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.dhcp_options_exists dhcp_options_id='dopt-a0bl34pp'
"""
if name:
log.warning(
"boto3_vpc.dhcp_options_exists: name parameter is deprecated; "
"use dhcp_options_name instead."
)
dhcp_options_name = name
return resource_exists(
"dhcp_options",
name=dhcp_options_name,
resource_id=dhcp_options_id,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
[docs]
def create_internet_gateway(
internet_gateway_name=None,
vpc_id=None,
vpc_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Create an internet gateway, optionally attaching it to an existing VPC.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_internet_gateway \\
internet_gateway_name=myigw vpc_name=myvpc
"""
try:
if vpc_id or vpc_name:
vpc_id = check_vpc(vpc_id, vpc_name, region, key, keyid, profile)
if not vpc_id:
return {
"created": False,
"error": {"message": f"VPC {vpc_name or vpc_id} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {}
tag_spec = _tag_specifications("internet-gateway", name=internet_gateway_name, tags=tags)
if tag_spec:
kwargs["TagSpecifications"] = tag_spec
gw = conn.create_internet_gateway(**kwargs).get("InternetGateway") or {}
gw_id = gw.get("InternetGatewayId")
if not gw_id:
return {"created": False}
if vpc_id:
conn.attach_internet_gateway(InternetGatewayId=gw_id, VpcId=vpc_id)
log.info("Attached internet gateway %s to VPC %s", gw_id, vpc_id)
if internet_gateway_name:
boto3mod.cache_id(
"ec2",
internet_gateway_name,
opts=__opts__,
context=__context__,
sub_resource="internet_gateway",
resource_id=gw_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"created": True, "id": gw_id}
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def delete_internet_gateway(
internet_gateway_id=None,
internet_gateway_name=None,
detach=False,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Delete an internet gateway by id or name. If ``detach`` is ``True``,
any VPC attachment is detached first.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_internet_gateway internet_gateway_id=igw-1a2b3c
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if internet_gateway_name and not internet_gateway_id:
internet_gateway_id = _get_resource_id(
"internet_gateway",
internet_gateway_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not internet_gateway_id:
return {
"deleted": False,
"error": {"message": f"internet gateway {internet_gateway_name} does not exist."},
}
if detach:
gws = conn.describe_internet_gateways(InternetGatewayIds=[internet_gateway_id]).get(
"InternetGateways", []
)
if not gws:
return {
"deleted": False,
"error": {"message": f"internet gateway {internet_gateway_id} does not exist."},
}
for att in gws[0].get("Attachments", []):
conn.detach_internet_gateway(
InternetGatewayId=internet_gateway_id, VpcId=att["VpcId"]
)
conn.delete_internet_gateway(InternetGatewayId=internet_gateway_id)
if internet_gateway_name:
boto3mod.cache_id(
"ec2",
internet_gateway_name,
opts=__opts__,
context=__context__,
sub_resource="internet_gateway",
resource_id=internet_gateway_id,
invalidate=True,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
def _find_nat_gateways(
nat_gateway_id=None,
subnet_id=None,
subnet_name=None,
vpc_id=None,
vpc_name=None,
states=("pending", "available"),
region=None,
key=None,
keyid=None,
profile=None,
):
if not any((nat_gateway_id, subnet_id, subnet_name, vpc_id, vpc_name)):
raise SaltInvocationError(
"At least one of the following must be provided: "
"nat_gateway_id, subnet_id, subnet_name, vpc_id, or vpc_name."
)
if subnet_name:
subnet_id = _get_resource_id(
"subnet", subnet_name, region=region, key=key, keyid=keyid, profile=profile
)
if not subnet_id:
return []
if vpc_name:
vpc_id = _get_resource_id(
"vpc", vpc_name, region=region, key=key, keyid=keyid, profile=profile
)
if not vpc_id:
return []
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {"Filter": []}
if nat_gateway_id:
kwargs["NatGatewayIds"] = [nat_gateway_id]
if subnet_id:
kwargs["Filter"].append({"Name": "subnet-id", "Values": [subnet_id]})
if vpc_id:
kwargs["Filter"].append({"Name": "vpc-id", "Values": [vpc_id]})
matches = []
next_token = None
while True:
call_kwargs = dict(kwargs)
if next_token:
call_kwargs["NextToken"] = next_token
resp = conn.describe_nat_gateways(**call_kwargs)
for gw in resp.get("NatGateways", []):
if gw.get("State") in states:
matches.append(gw)
next_token = resp.get("NextToken")
if not next_token:
break
return matches
[docs]
def nat_gateway_exists(
nat_gateway_id=None,
subnet_id=None,
subnet_name=None,
vpc_id=None,
vpc_name=None,
states=("pending", "available"),
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``True`` if a NAT gateway matching the filter criteria exists.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.nat_gateway_exists nat_gateway_id='nat-03b02643b43216fe7'
"""
try:
return bool(
_find_nat_gateways(
nat_gateway_id=nat_gateway_id,
subnet_id=subnet_id,
subnet_name=subnet_name,
vpc_id=vpc_id,
vpc_name=vpc_name,
states=states,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
)
except botocore.exceptions.ClientError as exc:
log.error("Failed to look up NAT gateway: %s", exc)
return False
[docs]
def describe_nat_gateways(
nat_gateway_id=None,
subnet_id=None,
subnet_name=None,
vpc_id=None,
vpc_name=None,
states=("pending", "available"),
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return a list of NAT gateway descriptions matching the selection criteria.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.describe_nat_gateways subnet_id='subnet-5b05942d'
"""
try:
return _find_nat_gateways(
nat_gateway_id=nat_gateway_id,
subnet_id=subnet_id,
subnet_name=subnet_name,
vpc_id=vpc_id,
vpc_name=vpc_name,
states=states,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
except botocore.exceptions.ClientError as exc:
log.error("Failed to describe NAT gateways: %s", exc)
return []
[docs]
def create_nat_gateway(
subnet_id=None,
subnet_name=None,
allocation_id=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Create a NAT gateway inside an existing subnet. If ``allocation_id`` is
not supplied a new Elastic IP is allocated and used.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_nat_gateway subnet_name=mysubnet
"""
if all((subnet_id, subnet_name)):
raise SaltInvocationError("Only one of subnet_name or subnet_id may be provided.")
try:
if subnet_name:
subnet_id = _get_resource_id(
"subnet", subnet_name, region=region, key=key, keyid=keyid, profile=profile
)
if not subnet_id:
return {
"created": False,
"error": {"message": f"Subnet {subnet_name} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if not allocation_id:
address = conn.allocate_address(Domain="vpc")
allocation_id = address.get("AllocationId")
r = conn.create_nat_gateway(SubnetId=subnet_id, AllocationId=allocation_id)
return {"created": True, "id": r.get("NatGateway", {}).get("NatGatewayId")}
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def delete_nat_gateway(
nat_gateway_id,
release_eips=False,
region=None,
key=None,
keyid=None,
profile=None,
wait_for_delete=False,
wait_for_delete_retries=5,
):
"""
Delete a NAT gateway by id, optionally releasing any associated EIPs.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_nat_gateway nat_gateway_id=nat-1a2b3c
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
info = conn.describe_nat_gateways(NatGatewayIds=[nat_gateway_id]).get(
"NatGateways", [None]
)[0]
conn.delete_nat_gateway(NatGatewayId=nat_gateway_id)
if wait_for_delete:
for retry in range(wait_for_delete_retries, 0, -1):
if info and info.get("State") not in ("deleted", "failed"):
time.sleep(
(2 ** (wait_for_delete_retries - retry))
+ (random.randint(0, 1000) / 1000.0)
)
info = conn.describe_nat_gateways(NatGatewayIds=[nat_gateway_id]).get(
"NatGateways", [None]
)[0]
continue
break
if release_eips and info:
for addr in info.get("NatGatewayAddresses") or []:
alloc = addr.get("AllocationId")
if alloc:
conn.release_address(AllocationId=alloc)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
[docs]
def create_customer_gateway(
vpn_connection_type,
ip_address,
bgp_asn,
customer_gateway_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Create a customer gateway.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_customer_gateway 'ipsec.1' '12.1.2.3' 65534
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {"Type": vpn_connection_type, "PublicIp": ip_address, "BgpAsn": int(bgp_asn)}
tag_spec = _tag_specifications("customer-gateway", name=customer_gateway_name, tags=tags)
if tag_spec:
kwargs["TagSpecifications"] = tag_spec
cg = conn.create_customer_gateway(**kwargs).get("CustomerGateway") or {}
cg_id = cg.get("CustomerGatewayId")
if not cg_id:
return {"created": False}
if customer_gateway_name:
boto3mod.cache_id(
"ec2",
customer_gateway_name,
opts=__opts__,
context=__context__,
sub_resource="customer_gateway",
resource_id=cg_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"created": True, "id": cg_id}
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def delete_customer_gateway(
customer_gateway_id=None,
customer_gateway_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Delete a customer gateway by id or name.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_customer_gateway 'cgw-b6a247df'
"""
try:
if customer_gateway_name and not customer_gateway_id:
customer_gateway_id = _get_resource_id(
"customer_gateway",
customer_gateway_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not customer_gateway_id:
return {
"deleted": False,
"error": {"message": f"customer gateway {customer_gateway_name} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
conn.delete_customer_gateway(CustomerGatewayId=customer_gateway_id)
if customer_gateway_name:
boto3mod.cache_id(
"ec2",
customer_gateway_name,
opts=__opts__,
context=__context__,
sub_resource="customer_gateway",
resource_id=customer_gateway_id,
invalidate=True,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
[docs]
def customer_gateway_exists(
customer_gateway_id=None,
customer_gateway_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``{"exists": True}`` if the given customer gateway exists.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.customer_gateway_exists cgw-b6a247df
"""
return resource_exists(
"customer_gateway",
name=customer_gateway_name,
resource_id=customer_gateway_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
[docs]
def create_network_acl(
vpc_id=None,
vpc_name=None,
network_acl_name=None,
subnet_id=None,
subnet_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Create a network ACL within a VPC, optionally associating it with a
subnet.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_network_acl 'vpc-6b1fe402'
"""
_id = vpc_name or vpc_id
try:
vpc_id = check_vpc(vpc_id, vpc_name, region, key, keyid, profile)
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
if not vpc_id:
return {"created": False, "error": {"message": f"VPC {_id} does not exist."}}
if all((subnet_id, subnet_name)):
raise SaltInvocationError("Only one of subnet_name or subnet_id may be provided.")
try:
if subnet_name:
subnet_id = _get_resource_id(
"subnet", subnet_name, region=region, key=key, keyid=keyid, profile=profile
)
if not subnet_id:
return {
"created": False,
"error": {"message": f"Subnet {subnet_name} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {"VpcId": vpc_id}
tag_spec = _tag_specifications("network-acl", name=network_acl_name, tags=tags)
if tag_spec:
kwargs["TagSpecifications"] = tag_spec
acl = conn.create_network_acl(**kwargs).get("NetworkAcl") or {}
acl_id = acl.get("NetworkAclId")
if not acl_id:
return {"created": False}
result = {"created": True, "id": acl_id}
if network_acl_name:
boto3mod.cache_id(
"ec2",
network_acl_name,
opts=__opts__,
context=__context__,
sub_resource="network_acl",
resource_id=acl_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if subnet_id:
# Find existing association on the subnet to replace.
existing = conn.describe_network_acls(
Filters=[{"Name": "association.subnet-id", "Values": [subnet_id]}]
).get("NetworkAcls", [])
assoc_id = None
for cur in existing:
for assoc in cur.get("Associations", []):
if assoc.get("SubnetId") == subnet_id:
assoc_id = assoc.get("NetworkAclAssociationId")
break
if assoc_id:
break
if assoc_id:
resp = conn.replace_network_acl_association(
AssociationId=assoc_id, NetworkAclId=acl_id
)
result["association_id"] = resp.get("NewAssociationId")
return result
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def delete_network_acl(
network_acl_id=None,
network_acl_name=None,
disassociate=False,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Delete a network ACL by id or name. When ``disassociate`` is ``True``
any existing subnet association is replaced with the VPC's default ACL
before deletion.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_network_acl network_acl_id='acl-5fb85d36'
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if network_acl_name and not network_acl_id:
network_acl_id = _get_resource_id(
"network_acl",
network_acl_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not network_acl_id:
return {
"deleted": False,
"error": {"message": f"network acl {network_acl_name} does not exist."},
}
if disassociate:
acls = conn.describe_network_acls(NetworkAclIds=[network_acl_id]).get("NetworkAcls", [])
if acls:
vpc_id = acls[0].get("VpcId")
default_acl = None
if vpc_id:
defaults = conn.describe_network_acls(
Filters=[
{"Name": "vpc-id", "Values": [vpc_id]},
{"Name": "default", "Values": ["true"]},
]
).get("NetworkAcls", [])
if defaults:
default_acl = defaults[0].get("NetworkAclId")
for assoc in acls[0].get("Associations", []):
assoc_id = assoc.get("NetworkAclAssociationId")
if default_acl and assoc_id:
try:
conn.replace_network_acl_association(
AssociationId=assoc_id, NetworkAclId=default_acl
)
except botocore.exceptions.ClientError:
log.exception("Failed to disassociate network acl %s", network_acl_id)
conn.delete_network_acl(NetworkAclId=network_acl_id)
if network_acl_name:
boto3mod.cache_id(
"ec2",
network_acl_name,
opts=__opts__,
context=__context__,
sub_resource="network_acl",
resource_id=network_acl_id,
invalidate=True,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
[docs]
def network_acl_exists(
network_acl_id=None,
name=None,
network_acl_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``{"exists": True}`` if the network ACL exists.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.network_acl_exists network_acl_id='acl-5fb85d36'
"""
if name:
log.warning(
"boto3_vpc.network_acl_exists: name parameter is deprecated; "
"use network_acl_name instead."
)
network_acl_name = name
return resource_exists(
"network_acl",
name=network_acl_name,
resource_id=network_acl_id,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
[docs]
def associate_network_acl_to_subnet(
network_acl_id=None,
subnet_id=None,
network_acl_name=None,
subnet_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Associate a network ACL with a subnet by replacing the subnet's current
network ACL association.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.associate_network_acl_to_subnet \\
network_acl_id='acl-5fb85d36' subnet_id='subnet-6a1fe403'
"""
try:
if network_acl_name:
network_acl_id = _get_resource_id(
"network_acl",
network_acl_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not network_acl_id:
return {
"associated": False,
"error": {"message": f"Network ACL {network_acl_name} does not exist."},
}
if subnet_name:
subnet_id = _get_resource_id(
"subnet", subnet_name, region=region, key=key, keyid=keyid, profile=profile
)
if not subnet_id:
return {
"associated": False,
"error": {"message": f"Subnet {subnet_name} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
existing = conn.describe_network_acls(
Filters=[{"Name": "association.subnet-id", "Values": [subnet_id]}]
).get("NetworkAcls", [])
assoc_id = None
for cur in existing:
for assoc in cur.get("Associations", []):
if assoc.get("SubnetId") == subnet_id:
assoc_id = assoc.get("NetworkAclAssociationId")
break
if assoc_id:
break
if not assoc_id:
return {
"associated": False,
"error": {
"message": f"No existing network acl association found for subnet {subnet_id}."
},
}
resp = conn.replace_network_acl_association(
AssociationId=assoc_id, NetworkAclId=network_acl_id
)
return {"associated": True, "id": resp.get("NewAssociationId")}
except botocore.exceptions.ClientError as exc:
return {"associated": False, "error": boto3mod.get_error(exc)}
[docs]
def disassociate_network_acl(
subnet_id=None,
vpc_id=None,
subnet_name=None,
vpc_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Disassociate the network ACL from a subnet by replacing its association
with the VPC's default network ACL.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.disassociate_network_acl 'subnet-6a1fe403'
"""
if not exactly_one((subnet_name, subnet_id)):
raise SaltInvocationError(
"One (but not both) of subnet_id or subnet_name must be provided."
)
if all((vpc_name, vpc_id)):
raise SaltInvocationError("Only one of vpc_id or vpc_name may be provided.")
try:
if subnet_name:
subnet_id = _get_resource_id(
"subnet", subnet_name, region=region, key=key, keyid=keyid, profile=profile
)
if not subnet_id:
return {
"disassociated": False,
"error": {"message": f"Subnet {subnet_name} does not exist."},
}
if vpc_name or vpc_id:
vpc_id = check_vpc(vpc_id, vpc_name, region, key, keyid, profile)
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
# find existing association
existing = conn.describe_network_acls(
Filters=[{"Name": "association.subnet-id", "Values": [subnet_id]}]
).get("NetworkAcls", [])
assoc_id = None
owning_vpc = None
for cur in existing:
for assoc in cur.get("Associations", []):
if assoc.get("SubnetId") == subnet_id:
assoc_id = assoc.get("NetworkAclAssociationId")
owning_vpc = cur.get("VpcId")
break
if assoc_id:
break
if not assoc_id:
return {
"disassociated": False,
"error": {
"message": f"No existing network acl association for subnet {subnet_id}."
},
}
defaults = conn.describe_network_acls(
Filters=[
{"Name": "vpc-id", "Values": [vpc_id or owning_vpc]},
{"Name": "default", "Values": ["true"]},
]
).get("NetworkAcls", [])
if not defaults:
return {
"disassociated": False,
"error": {"message": "Could not find default network acl for VPC."},
}
default_acl_id = defaults[0].get("NetworkAclId")
resp = conn.replace_network_acl_association(
AssociationId=assoc_id, NetworkAclId=default_acl_id
)
return {"disassociated": True, "association_id": resp.get("NewAssociationId")}
except botocore.exceptions.ClientError as exc:
return {"disassociated": False, "error": boto3mod.get_error(exc)}
def _network_acl_protocol(protocol):
if protocol is None:
return None
if isinstance(protocol, int):
return str(protocol)
if isinstance(protocol, str):
if protocol.isdigit():
return protocol
if protocol == "all":
return "-1"
try:
return str(socket.getprotobyname(protocol))
except OSError as exc:
raise SaltInvocationError(str(exc)) from exc
raise SaltInvocationError(f"Invalid protocol {protocol!r}")
def _do_network_acl_entry(replace, **kwargs):
rkey = "replaced" if replace else "created"
network_acl_id = kwargs.get("network_acl_id")
network_acl_name = kwargs.get("network_acl_name")
region = kwargs.get("region")
key = kwargs.get("key")
keyid = kwargs.get("keyid")
profile = kwargs.get("profile")
if not exactly_one((network_acl_name, network_acl_id)):
raise SaltInvocationError(
"One (but not both) of network_acl_id or network_acl_name must be provided."
)
for v in ("rule_number", "protocol", "rule_action", "cidr_block"):
if kwargs.get(v) is None:
raise SaltInvocationError(f"{v} is required.")
if network_acl_name:
network_acl_id = _get_resource_id(
"network_acl",
network_acl_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not network_acl_id:
return {
rkey: False,
"error": {
"message": "Network ACL {} does not exist.".format(
network_acl_name or network_acl_id
)
},
}
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
entry = {
"NetworkAclId": network_acl_id,
"RuleNumber": int(kwargs["rule_number"]),
"Protocol": _network_acl_protocol(kwargs["protocol"]),
"RuleAction": kwargs["rule_action"],
"Egress": bool(kwargs.get("egress")),
"CidrBlock": kwargs["cidr_block"],
}
if kwargs.get("port_range_from") is not None or kwargs.get("port_range_to") is not None:
entry["PortRange"] = {
"From": int(kwargs.get("port_range_from") or 0),
"To": int(kwargs.get("port_range_to") or 0),
}
if kwargs.get("icmp_type") is not None or kwargs.get("icmp_code") is not None:
entry["IcmpTypeCode"] = {
"Type": int(kwargs.get("icmp_type") or 0),
"Code": int(kwargs.get("icmp_code") or 0),
}
if replace:
conn.replace_network_acl_entry(**entry)
else:
conn.create_network_acl_entry(**entry)
return {rkey: True}
except botocore.exceptions.ClientError as exc:
return {rkey: False, "error": boto3mod.get_error(exc)}
[docs]
def create_network_acl_entry(
network_acl_id=None,
rule_number=None,
protocol=None,
rule_action=None,
cidr_block=None,
egress=None,
network_acl_name=None,
icmp_code=None,
icmp_type=None,
port_range_from=None,
port_range_to=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Create a network ACL entry.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_network_acl_entry 'acl-5fb85d36' 32767 \\
'all' 'deny' '0.0.0.0/0' egress=True
"""
return _do_network_acl_entry(
False,
network_acl_id=network_acl_id,
rule_number=rule_number,
protocol=protocol,
rule_action=rule_action,
cidr_block=cidr_block,
egress=egress,
network_acl_name=network_acl_name,
icmp_code=icmp_code,
icmp_type=icmp_type,
port_range_from=port_range_from,
port_range_to=port_range_to,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
[docs]
def replace_network_acl_entry(
network_acl_id=None,
rule_number=None,
protocol=None,
rule_action=None,
cidr_block=None,
egress=None,
network_acl_name=None,
icmp_code=None,
icmp_type=None,
port_range_from=None,
port_range_to=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Replace a network ACL entry.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.replace_network_acl_entry 'acl-5fb85d36' 32767 \\
'all' 'deny' '0.0.0.0/0' egress=True
"""
return _do_network_acl_entry(
True,
network_acl_id=network_acl_id,
rule_number=rule_number,
protocol=protocol,
rule_action=rule_action,
cidr_block=cidr_block,
egress=egress,
network_acl_name=network_acl_name,
icmp_code=icmp_code,
icmp_type=icmp_type,
port_range_from=port_range_from,
port_range_to=port_range_to,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
[docs]
def delete_network_acl_entry(
network_acl_id=None,
rule_number=None,
egress=None,
network_acl_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Delete a network ACL entry.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_network_acl_entry 'acl-5fb85d36' 32767
"""
if not exactly_one((network_acl_name, network_acl_id)):
raise SaltInvocationError(
"One (but not both) of network_acl_id or network_acl_name must be provided."
)
for v in ("rule_number", "egress"):
if locals()[v] is None:
raise SaltInvocationError(f"{v} is required.")
if network_acl_name:
network_acl_id = _get_resource_id(
"network_acl",
network_acl_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not network_acl_id:
return {
"deleted": False,
"error": {
"message": "Network ACL {} does not exist.".format(
network_acl_name or network_acl_id
)
},
}
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
conn.delete_network_acl_entry(
NetworkAclId=network_acl_id, RuleNumber=int(rule_number), Egress=bool(egress)
)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
[docs]
def create_route_table(
vpc_id=None,
vpc_name=None,
route_table_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Create a route table in the specified VPC.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_route_table vpc_name='myvpc' \\
route_table_name='myroutetable'
"""
try:
vpc_id = check_vpc(vpc_id, vpc_name, region, key, keyid, profile)
if not vpc_id:
return {
"created": False,
"error": {"message": f"VPC {vpc_name or vpc_id} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {"VpcId": vpc_id}
tag_spec = _tag_specifications("route-table", name=route_table_name, tags=tags)
if tag_spec:
kwargs["TagSpecifications"] = tag_spec
rt = conn.create_route_table(**kwargs).get("RouteTable") or {}
rt_id = rt.get("RouteTableId")
if not rt_id:
return {"created": False}
if route_table_name:
boto3mod.cache_id(
"ec2",
route_table_name,
opts=__opts__,
context=__context__,
sub_resource="route_table",
resource_id=rt_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"created": True, "id": rt_id}
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def delete_route_table(
route_table_id=None,
route_table_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Delete a route table by id or name.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_route_table route_table_id='rtb-1f382e7d'
"""
try:
if route_table_name and not route_table_id:
route_table_id = _get_resource_id(
"route_table",
route_table_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not route_table_id:
return {
"deleted": False,
"error": {"message": f"route table {route_table_name} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
conn.delete_route_table(RouteTableId=route_table_id)
if route_table_name:
boto3mod.cache_id(
"ec2",
route_table_name,
opts=__opts__,
context=__context__,
sub_resource="route_table",
resource_id=route_table_id,
invalidate=True,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
[docs]
def route_table_exists(
route_table_id=None,
name=None,
route_table_name=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``{"exists": True}`` if the route table exists.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.route_table_exists route_table_id='rtb-1f382e7d'
"""
if name:
log.warning(
"boto3_vpc.route_table_exists: name parameter is deprecated; "
"use route_table_name instead."
)
route_table_name = name
return resource_exists(
"route_table",
name=route_table_name,
resource_id=route_table_id,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
_ROUTE_KEYS = {
"destination_cidr_block": "DestinationCidrBlock",
"gateway_id": "GatewayId",
"instance_id": "InstanceId",
"interface_id": "NetworkInterfaceId",
"nat_gateway_id": "NatGatewayId",
"vpc_peering_connection_id": "VpcPeeringConnectionId",
}
_ASSOC_KEYS = {
"id": "RouteTableAssociationId",
"main": "Main",
"route_table_id": "RouteTableId",
"subnet_id": "SubnetId",
}
def _route_payload(rt):
routes = []
for r in rt.get("Routes", []) or []:
routes.append({k: r.get(v) for k, v in _ROUTE_KEYS.items() if v in r})
assocs = []
for a in rt.get("Associations", []) or []:
assocs.append({k: a.get(v) for k, v in _ASSOC_KEYS.items() if v in a})
return {
"id": rt.get("RouteTableId"),
"vpc_id": rt.get("VpcId"),
"tags": _tags_dict(rt.get("Tags")),
"routes": routes,
"associations": assocs,
}
[docs]
def describe_route_tables(
route_table_id=None,
route_table_name=None,
vpc_id=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return a list of route tables matching the filter criteria.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.describe_route_tables vpc_id='vpc-a6a9efc3'
"""
if not any((route_table_id, route_table_name, tags, vpc_id)):
raise SaltInvocationError(
"At least one of the following must be specified: "
"route table id, route table name, vpc_id, or tags."
)
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {"Filters": []}
if route_table_id:
kwargs["RouteTableIds"] = [route_table_id]
if vpc_id:
kwargs["Filters"].append({"Name": "vpc-id", "Values": [vpc_id]})
if route_table_name:
kwargs["Filters"].append({"Name": "tag:Name", "Values": [route_table_name]})
for tag_name, tag_value in (tags or {}).items():
kwargs["Filters"].append({"Name": f"tag:{tag_name}", "Values": [tag_value]})
if not kwargs["Filters"]:
del kwargs["Filters"]
tables = conn.describe_route_tables(**kwargs).get("RouteTables", [])
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
return [_route_payload(rt) for rt in tables]
[docs]
def route_exists(
destination_cidr_block,
route_table_name=None,
route_table_id=None,
gateway_id=None,
instance_id=None,
interface_id=None,
tags=None,
region=None,
key=None,
keyid=None,
profile=None,
vpc_peering_connection_id=None,
nat_gateway_id=None,
):
"""
Return ``{"exists": True}`` if a matching route is present in the
specified route table.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.route_exists destination_cidr_block='10.0.0.0/20' \\
gateway_id='local' route_table_name='test'
"""
if not any((route_table_name, route_table_id)):
raise SaltInvocationError(
"At least one of the following must be specified: "
"route_table_name or route_table_id."
)
if not any((gateway_id, instance_id, interface_id, vpc_peering_connection_id, nat_gateway_id)):
raise SaltInvocationError(
"At least one of the following must be specified: gateway_id, instance_id, "
"interface_id, nat_gateway_id or vpc_peering_connection_id."
)
tables = describe_route_tables(
route_table_id=route_table_id,
route_table_name=route_table_name,
tags=tags,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if isinstance(tables, dict) and "error" in tables:
return tables
if not tables:
return {"exists": False}
if len(tables) != 1:
raise CommandExecutionError("Found more than one route table.")
want = {
"destination_cidr_block": destination_cidr_block,
"gateway_id": gateway_id,
"instance_id": instance_id,
"interface_id": interface_id,
"vpc_peering_connection_id": vpc_peering_connection_id,
"nat_gateway_id": nat_gateway_id,
}
for route in tables[0]["routes"]:
have = {k: route.get(k) for k in want}
if have == want:
return {"exists": True}
return {"exists": False}
[docs]
def associate_route_table(
route_table_id=None,
subnet_id=None,
route_table_name=None,
subnet_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Associate a route table with a subnet.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.associate_route_table 'rtb-1f382e7d' 'subnet-6a1fe403'
"""
if all((subnet_id, subnet_name)):
raise SaltInvocationError("Only one of subnet_name or subnet_id may be provided.")
if all((route_table_id, route_table_name)):
raise SaltInvocationError("Only one of route_table_name or route_table_id may be provided.")
try:
if subnet_name:
subnet_id = _get_resource_id(
"subnet", subnet_name, region=region, key=key, keyid=keyid, profile=profile
)
if not subnet_id:
return {
"associated": False,
"error": {"message": f"Subnet {subnet_name} does not exist."},
}
if route_table_name:
route_table_id = _get_resource_id(
"route_table",
route_table_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not route_table_id:
return {
"associated": False,
"error": {"message": f"Route table {route_table_name} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
resp = conn.associate_route_table(RouteTableId=route_table_id, SubnetId=subnet_id)
return {"association_id": resp.get("AssociationId")}
except botocore.exceptions.ClientError as exc:
return {"associated": False, "error": boto3mod.get_error(exc)}
[docs]
def disassociate_route_table(association_id, region=None, key=None, keyid=None, profile=None):
"""
Disassociate a route table.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.disassociate_route_table 'rtbassoc-d8ccddba'
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
conn.disassociate_route_table(AssociationId=association_id)
return {"disassociated": True}
except botocore.exceptions.ClientError as exc:
return {"disassociated": False, "error": boto3mod.get_error(exc)}
[docs]
def replace_route_table_association(
association_id, route_table_id, region=None, key=None, keyid=None, profile=None
):
"""
Replace a route table association with a new route table.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.replace_route_table_association 'rtbassoc-d8ccddba' 'rtb-1f382e7d'
"""
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
resp = conn.replace_route_table_association(
AssociationId=association_id, RouteTableId=route_table_id
)
return {"replaced": True, "association_id": resp.get("NewAssociationId")}
except botocore.exceptions.ClientError as exc:
return {"replaced": False, "error": boto3mod.get_error(exc)}
[docs]
def create_route(
route_table_id=None,
destination_cidr_block=None,
route_table_name=None,
gateway_id=None,
internet_gateway_name=None,
instance_id=None,
interface_id=None,
vpc_peering_connection_id=None,
vpc_peering_connection_name=None,
region=None,
key=None,
keyid=None,
profile=None,
nat_gateway_id=None,
nat_gateway_subnet_name=None,
nat_gateway_subnet_id=None,
):
"""
Create a route in a route table.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.create_route 'rtb-1f382e7d' '10.0.0.0/16' gateway_id='vgw-a1b2c3'
"""
if not exactly_one((route_table_name, route_table_id)):
raise SaltInvocationError(
"One (but not both) of route_table_id or route_table_name must be provided."
)
if not exactly_one(
(
gateway_id,
internet_gateway_name,
instance_id,
interface_id,
vpc_peering_connection_id,
nat_gateway_id,
nat_gateway_subnet_id,
nat_gateway_subnet_name,
vpc_peering_connection_name,
)
):
raise SaltInvocationError(
"Exactly one of gateway_id, internet_gateway_name, instance_id, interface_id, "
"vpc_peering_connection_id, nat_gateway_id, nat_gateway_subnet_id, "
"nat_gateway_subnet_name or vpc_peering_connection_name must be provided."
)
if destination_cidr_block is None:
raise SaltInvocationError("destination_cidr_block is required.")
try:
if route_table_name:
route_table_id = _get_resource_id(
"route_table",
route_table_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not route_table_id:
return {
"created": False,
"error": {"message": f"route table {route_table_name} does not exist."},
}
if internet_gateway_name:
gateway_id = _get_resource_id(
"internet_gateway",
internet_gateway_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not gateway_id:
return {
"created": False,
"error": {
"message": f"internet gateway {internet_gateway_name} does not exist."
},
}
if vpc_peering_connection_name:
vpc_peering_connection_id = _get_resource_id(
"vpc_peering_connection",
vpc_peering_connection_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not vpc_peering_connection_id:
return {
"created": False,
"error": {
"message": (
f"VPC peering connection {vpc_peering_connection_name} "
"does not exist."
)
},
}
if nat_gateway_subnet_name:
gws = describe_nat_gateways(
subnet_name=nat_gateway_subnet_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not gws:
return {
"created": False,
"error": {
"message": (f"nat gateway for {nat_gateway_subnet_name} does not exist.")
},
}
nat_gateway_id = gws[0]["NatGatewayId"]
if nat_gateway_subnet_id:
gws = describe_nat_gateways(
subnet_id=nat_gateway_subnet_id,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not gws:
return {
"created": False,
"error": {
"message": (f"nat gateway for {nat_gateway_subnet_id} does not exist.")
},
}
nat_gateway_id = gws[0]["NatGatewayId"]
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {
"RouteTableId": route_table_id,
"DestinationCidrBlock": destination_cidr_block,
}
if gateway_id:
kwargs["GatewayId"] = gateway_id
if instance_id:
kwargs["InstanceId"] = instance_id
if interface_id:
kwargs["NetworkInterfaceId"] = interface_id
if vpc_peering_connection_id:
kwargs["VpcPeeringConnectionId"] = vpc_peering_connection_id
if nat_gateway_id:
kwargs["NatGatewayId"] = nat_gateway_id
conn.create_route(**kwargs)
return {"created": True}
except botocore.exceptions.ClientError as exc:
return {"created": False, "error": boto3mod.get_error(exc)}
[docs]
def delete_route(
route_table_id=None,
destination_cidr_block=None,
route_table_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Delete a route from a route table.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_route 'rtb-1f382e7d' '10.0.0.0/16'
"""
if not exactly_one((route_table_name, route_table_id)):
raise SaltInvocationError(
"One (but not both) of route_table_id or route_table_name must be provided."
)
if destination_cidr_block is None:
raise SaltInvocationError("destination_cidr_block is required.")
try:
if route_table_name:
route_table_id = _get_resource_id(
"route_table",
route_table_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not route_table_id:
return {
"deleted": False,
"error": {"message": f"route table {route_table_name} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
conn.delete_route(RouteTableId=route_table_id, DestinationCidrBlock=destination_cidr_block)
return {"deleted": True}
except botocore.exceptions.ClientError as exc:
return {"deleted": False, "error": boto3mod.get_error(exc)}
[docs]
def replace_route(
route_table_id=None,
destination_cidr_block=None,
route_table_name=None,
gateway_id=None,
instance_id=None,
interface_id=None,
region=None,
key=None,
keyid=None,
profile=None,
vpc_peering_connection_id=None,
nat_gateway_id=None,
):
"""
Replace an existing route in a route table.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.replace_route 'rtb-1f382e7d' '10.0.0.0/16' gateway_id='vgw-a1b2c3'
"""
if not exactly_one((route_table_name, route_table_id)):
raise SaltInvocationError(
"One (but not both) of route_table_id or route_table_name must be provided."
)
if destination_cidr_block is None:
raise SaltInvocationError("destination_cidr_block is required.")
try:
if route_table_name:
route_table_id = _get_resource_id(
"route_table",
route_table_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not route_table_id:
return {
"replaced": False,
"error": {"message": f"route table {route_table_name} does not exist."},
}
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
kwargs = {
"RouteTableId": route_table_id,
"DestinationCidrBlock": destination_cidr_block,
}
if gateway_id:
kwargs["GatewayId"] = gateway_id
if instance_id:
kwargs["InstanceId"] = instance_id
if interface_id:
kwargs["NetworkInterfaceId"] = interface_id
if vpc_peering_connection_id:
kwargs["VpcPeeringConnectionId"] = vpc_peering_connection_id
if nat_gateway_id:
kwargs["NatGatewayId"] = nat_gateway_id
conn.replace_route(**kwargs)
return {"replaced": True}
except botocore.exceptions.ClientError as exc:
return {"replaced": False, "error": boto3mod.get_error(exc)}
_VPC_PEERING_ACTIVE_STATES = ("active", "pending-acceptance", "provisioning")
def _get_peering_connection_ids(name, conn):
filters = [
{"Name": "tag:Name", "Values": [name]},
{"Name": "status-code", "Values": list(_VPC_PEERING_ACTIVE_STATES)},
]
peerings = conn.describe_vpc_peering_connections(Filters=filters).get(
"VpcPeeringConnections", []
)
return [p["VpcPeeringConnectionId"] for p in peerings]
def _vpc_peering_conn_id_for_name(name, conn):
ids = _get_peering_connection_ids(name, conn)
if not ids:
return None
if len(ids) > 1:
raise CommandExecutionError(
f"Found multiple VPC peering connections named {name}; use an ID instead."
)
return ids[0]
[docs]
def request_vpc_peering_connection(
requester_vpc_id=None,
requester_vpc_name=None,
peer_vpc_id=None,
peer_vpc_name=None,
name=None,
peer_owner_id=None,
peer_region=None,
region=None,
key=None,
keyid=None,
profile=None,
dry_run=False,
):
"""
Request a VPC peering connection between two VPCs.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.request_vpc_peering_connection vpc-4a3e622e vpc-be82e9da \\
name=my_vpc_connection
"""
if not exactly_one((requester_vpc_id, requester_vpc_name)):
raise SaltInvocationError(
"Exactly one of requester_vpc_id or requester_vpc_name is required."
)
if not exactly_one((peer_vpc_id, peer_vpc_name)):
raise SaltInvocationError("Exactly one of peer_vpc_id or peer_vpc_name is required.")
try:
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if name and _vpc_peering_conn_id_for_name(name, conn):
raise SaltInvocationError(f"A VPC peering connection named {name} already exists.")
if requester_vpc_name:
requester_vpc_id = _get_id(
vpc_name=requester_vpc_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not requester_vpc_id:
return {"error": f"Could not resolve VPC name {requester_vpc_name} to an ID"}
if peer_vpc_name:
peer_vpc_id = _get_id(
vpc_name=peer_vpc_name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if not peer_vpc_id:
return {"error": f"Could not resolve VPC name {peer_vpc_name} to an ID"}
params = {"VpcId": requester_vpc_id, "PeerVpcId": peer_vpc_id, "DryRun": dry_run}
if peer_owner_id:
params["PeerOwnerId"] = peer_owner_id
if peer_region:
params["PeerRegion"] = peer_region
resp = conn.create_vpc_peering_connection(**params)
peering = resp.get("VpcPeeringConnection", {})
conn_id = peering.get("VpcPeeringConnectionId", "ERROR")
msg = f"VPC peering {conn_id} requested."
if name:
conn.create_tags(Resources=[conn_id], Tags=[{"Key": "Name", "Value": name}])
msg += f" With name {name}."
return {"msg": msg}
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
[docs]
def describe_vpc_peering_connection(name, region=None, key=None, keyid=None, profile=None):
"""
Return any VPC peering connection ids for the given VPC peering
connection name that are ``active``, ``pending-acceptance`` or
``provisioning``.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.describe_vpc_peering_connection salt-vpc
"""
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
try:
return {"VPC-Peerings": _get_peering_connection_ids(name, conn)}
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
[docs]
def accept_vpc_peering_connection(
conn_id="",
name="",
region=None,
key=None,
keyid=None,
profile=None,
dry_run=False,
):
"""
Accept a pending VPC peering connection request.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.accept_vpc_peering_connection name=salt-vpc
"""
if not exactly_one((conn_id, name)):
raise SaltInvocationError("One (but not both) of conn_id or name must be provided.")
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if name:
conn_id = _vpc_peering_conn_id_for_name(name, conn)
if not conn_id:
raise SaltInvocationError(f"No ID found for VPC peering connection named {name}.")
try:
conn.accept_vpc_peering_connection(DryRun=dry_run, VpcPeeringConnectionId=conn_id)
return {"msg": "VPC peering connection accepted."}
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
[docs]
def delete_vpc_peering_connection(
conn_id=None,
conn_name=None,
region=None,
key=None,
keyid=None,
profile=None,
dry_run=False,
):
"""
Delete a VPC peering connection by id or name.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.delete_vpc_peering_connection conn_name=salt-vpc
"""
if not exactly_one((conn_id, conn_name)):
raise SaltInvocationError("Exactly one of conn_id or conn_name must be provided.")
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
if conn_name:
conn_id = _vpc_peering_conn_id_for_name(conn_name, conn)
if not conn_id:
raise SaltInvocationError(
f"Couldn't resolve VPC peering connection {conn_name} to an ID."
)
try:
conn.delete_vpc_peering_connection(DryRun=dry_run, VpcPeeringConnectionId=conn_id)
return {"msg": "VPC peering connection deleted."}
except botocore.exceptions.ClientError as exc:
return {"error": boto3mod.get_error(exc)}
[docs]
def is_peering_connection_pending(
conn_id=None,
conn_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``True`` if the VPC peering connection is in the
``pending-acceptance`` state.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.is_peering_connection_pending conn_name=salt-vpc
"""
if not exactly_one((conn_id, conn_name)):
raise SaltInvocationError("Exactly one of conn_id or conn_name must be provided.")
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
try:
if conn_id:
vpcs = conn.describe_vpc_peering_connections(VpcPeeringConnectionIds=[conn_id]).get(
"VpcPeeringConnections", []
)
else:
filters = [
{"Name": "tag:Name", "Values": [conn_name]},
{"Name": "status-code", "Values": list(_VPC_PEERING_ACTIVE_STATES)},
]
vpcs = conn.describe_vpc_peering_connections(Filters=filters).get(
"VpcPeeringConnections", []
)
except botocore.exceptions.ClientError as exc:
log.error("Failed to describe VPC peering connections: %s", exc)
return False
if not vpcs:
return False
if len(vpcs) > 1:
raise CommandExecutionError(
f"Found more than one VPC peering connection for {conn_id or conn_name}."
)
return vpcs[0]["Status"]["Code"] == "pending-acceptance"
[docs]
def peering_connection_pending_from_vpc(
conn_id=None,
conn_name=None,
vpc_id=None,
vpc_name=None,
region=None,
key=None,
keyid=None,
profile=None,
):
"""
Return ``True`` if a VPC peering connection is pending from the given
requester VPC.
CLI Example:
.. code-block:: bash
salt myminion boto3_vpc.peering_connection_pending_from_vpc conn_name=salt-vpc \\
vpc_name=myvpc
"""
if not exactly_one((conn_id, conn_name)):
raise SaltInvocationError("Exactly one of conn_id or conn_name must be provided.")
if not exactly_one((vpc_id, vpc_name)):
raise SaltInvocationError("Exactly one of vpc_id or vpc_name must be provided.")
if vpc_name:
vpc_id = check_vpc(vpc_name=vpc_name, region=region, key=key, keyid=keyid, profile=profile)
if not vpc_id:
return False
conn = _get_conn("ec2", region=region, key=key, keyid=keyid, profile=profile)
filters = [
{"Name": "requester-vpc-info.vpc-id", "Values": [vpc_id]},
{"Name": "status-code", "Values": list(_VPC_PEERING_ACTIVE_STATES)},
]
if conn_id:
filters.append({"Name": "vpc-peering-connection-id", "Values": [conn_id]})
else:
filters.append({"Name": "tag:Name", "Values": [conn_name]})
try:
vpcs = conn.describe_vpc_peering_connections(Filters=filters).get(
"VpcPeeringConnections", []
)
except botocore.exceptions.ClientError as exc:
log.error("Failed to describe VPC peering connections: %s", exc)
return False
if not vpcs:
return False
if len(vpcs) > 1:
raise CommandExecutionError(
f"Found more than one VPC peering connection for {conn_id or conn_name}."
)
return vpcs[0]["Status"]["Code"] == "pending-acceptance"