Source code for saltext.boto3.modules.boto3_datapipeline

"""
Connection module for Amazon Data Pipeline using boto3.
=======================================================

    Renamed from ``boto_datapipeline`` to ``boto3_datapipeline`` and rewritten
    to use the boto3 ``datapipeline`` client APIs directly via
    :py:mod:`saltext.boto3.utils.boto3mod`.  The legacy boto2 code path
    (object-style access, retry loops) has been removed.

:depends:
  - boto3 >= 1.28.0
  - botocore >= 1.31.0

:configuration: This module accepts explicit Data Pipeline credentials but can
    also utilize IAM roles assigned to the instance through Instance Profiles.
    Dynamic credentials are then automatically obtained from AWS API and no
    further configuration is necessary. More Information available at:

    .. code-block:: text

        http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html

    If IAM roles are not used you need to specify them either in the minion's
    config file or as a profile. For example, to specify them in the minion's
    config file:

.. code-block:: yaml

    datapipeline.keyid: GKTADJGHEIQSXMKKRBJ08H
    datapipeline.key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs

A region may also be specified in the configuration:

.. code-block:: yaml

    datapipeline.region: us-east-1

It's also possible to specify key, keyid and region via a profile, either
as a passed in dict, or as a string to pull from pillars or minion config:

.. code-block:: yaml

    myprofile:
        keyid: GKTADJGHEIQSXMKKRBJ08H
        key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
        region: us-east-1

.. versionadded:: 1.0.0
"""

import logging

from saltext.boto3.utils import boto3mod

try:
    from botocore.exceptions import BotoCoreError
    from botocore.exceptions import ClientError

    logging.getLogger("boto3").setLevel(logging.CRITICAL)
    logging.getLogger("botocore").setLevel(logging.CRITICAL)
    HAS_BOTO3 = True
except ImportError:
    HAS_BOTO3 = False

log = logging.getLogger(__name__)

__virtualname__ = "boto3_datapipeline"


def _get_conn(service, region=None, key=None, keyid=None, profile=None):
    """
    Return a boto3 client for ``service`` using this module's dunders.
    """
    return boto3mod.get_connection(
        service,
        opts=__opts__,
        context=__context__,
        region=region,
        key=key,
        keyid=keyid,
        profile=profile,
    )


[docs] def __virtual__(): """ Only load if boto3 is available. """ if HAS_BOTO3: return __virtualname__ return ( False, "The boto3_datapipeline module could not be loaded: boto3 is not available.", )
[docs] def activate_pipeline(pipeline_id, region=None, key=None, keyid=None, profile=None): """ Start processing pipeline tasks. This function is idempotent. CLI Example: .. code-block:: bash salt myminion boto3_datapipeline.activate_pipeline my_pipeline_id """ r = {} try: client = _get_conn("datapipeline", region=region, key=key, keyid=keyid, profile=profile) client.activate_pipeline(pipelineId=pipeline_id) r["result"] = True except (BotoCoreError, ClientError) as e: r["error"] = str(e) return r
[docs] def create_pipeline( name, unique_id, description="", region=None, key=None, keyid=None, profile=None ): """ Create a new, empty pipeline. This function is idempotent. CLI Example: .. code-block:: bash salt myminion boto3_datapipeline.create_pipeline my_name my_unique_id """ r = {} try: client = _get_conn("datapipeline", region=region, key=key, keyid=keyid, profile=profile) response = client.create_pipeline( name=name, uniqueId=unique_id, description=description, ) r["result"] = response["pipelineId"] except (BotoCoreError, ClientError) as e: r["error"] = str(e) return r
[docs] def delete_pipeline(pipeline_id, region=None, key=None, keyid=None, profile=None): """ Delete a pipeline, its pipeline definition, and its run history. This function is idempotent. CLI Example: .. code-block:: bash salt myminion boto3_datapipeline.delete_pipeline my_pipeline_id """ r = {} try: client = _get_conn("datapipeline", region=region, key=key, keyid=keyid, profile=profile) client.delete_pipeline(pipelineId=pipeline_id) r["result"] = True except (BotoCoreError, ClientError) as e: r["error"] = str(e) return r
[docs] def describe_pipelines(pipeline_ids, region=None, key=None, keyid=None, profile=None): """ Retrieve metadata about one or more pipelines. CLI Example: .. code-block:: bash salt myminion boto3_datapipeline.describe_pipelines ['my_pipeline_id'] """ r = {} try: client = _get_conn("datapipeline", region=region, key=key, keyid=keyid, profile=profile) r["result"] = client.describe_pipelines(pipelineIds=pipeline_ids) except (BotoCoreError, ClientError) as e: r["error"] = str(e) return r
[docs] def get_pipeline_definition( pipeline_id, version="latest", region=None, key=None, keyid=None, profile=None ): """ Get the definition of the specified pipeline. CLI Example: .. code-block:: bash salt myminion boto3_datapipeline.get_pipeline_definition my_pipeline_id """ r = {} try: client = _get_conn("datapipeline", region=region, key=key, keyid=keyid, profile=profile) r["result"] = client.get_pipeline_definition( pipelineId=pipeline_id, version=version, ) except (BotoCoreError, ClientError) as e: r["error"] = str(e) return r
[docs] def list_pipelines(region=None, key=None, keyid=None, profile=None): """ Get a list of pipeline ids and names for all pipelines. CLI Example: .. code-block:: bash salt myminion boto3_datapipeline.list_pipelines profile=myprofile """ r = {} try: client = _get_conn("datapipeline", region=region, key=key, keyid=keyid, profile=profile) paginator = client.get_paginator("list_pipelines") pipelines = [] for page in paginator.paginate(): pipelines += page["pipelineIdList"] r["result"] = pipelines except (BotoCoreError, ClientError) as e: r["error"] = str(e) return r
[docs] def pipeline_id_from_name(name, region=None, key=None, keyid=None, profile=None): """ Get the pipeline id, if it exists, for the given name. CLI Example: .. code-block:: bash salt myminion boto3_datapipeline.pipeline_id_from_name my_pipeline_name """ r = {} result_pipelines = list_pipelines(region=region, key=key, keyid=keyid, profile=profile) if "error" in result_pipelines: return result_pipelines for pipeline in result_pipelines["result"]: if pipeline["name"] == name: r["result"] = pipeline["id"] return r r["error"] = f"No pipeline found with name={name}" return r
[docs] def put_pipeline_definition( pipeline_id, pipeline_objects, parameter_objects=None, parameter_values=None, region=None, key=None, keyid=None, profile=None, ): """ Add tasks, schedules, and preconditions to the specified pipeline. This function is idempotent and will replace an existing definition. CLI Example: .. code-block:: bash salt myminion boto3_datapipeline.put_pipeline_definition my_pipeline_id my_pipeline_objects """ parameter_objects = parameter_objects or [] parameter_values = parameter_values or [] r = {} try: client = _get_conn("datapipeline", region=region, key=key, keyid=keyid, profile=profile) response = client.put_pipeline_definition( pipelineId=pipeline_id, pipelineObjects=pipeline_objects, parameterObjects=parameter_objects, parameterValues=parameter_values, ) if response["errored"]: r["error"] = response["validationErrors"] else: r["result"] = response except (BotoCoreError, ClientError) as e: r["error"] = str(e) return r