Source code for saltext.sqs_events.engines.sqs_events_mod

"""
Salt engine module
"""

import logging
import time

import salt.utils.event
import salt.utils.json

try:
    import boto3
    from botocore.exceptions import ClientError
    from botocore.exceptions import NoCredentialsError
    from botocore.exceptions import NoRegionError

    HAS_BOTO3 = True
except ImportError:
    HAS_BOTO3 = False


def __virtual__():
    if not HAS_BOTO3:
        return (
            False,
            "Cannot import engine sqs_events_boto3 because the required boto3 module is missing",
        )
    return True


log = logging.getLogger(__name__)


def _get_sqs_client(profile, region=None, key=None, keyid=None):
    """
    Get a boto3 client to SQS.
    Nutzt explizite Keys aus profile/__opts__ oder, falls nicht gesetzt,
    Standard-Credential-Provider-Chain (inkl. IMDSv2).
    """
    session_kwargs = {}
    client_kwargs = {}

    if profile:
        if isinstance(profile, str):
            _profile = __opts__[profile]
        elif isinstance(profile, dict):
            _profile = profile
        else:
            _profile = {}

        key = _profile.get("key", key)
        keyid = _profile.get("keyid", keyid)
        region = _profile.get("region", region)

    if not region:
        region = __opts__.get("sqs.region", "us-east-1")

    session_kwargs["region_name"] = region

    # Wenn key/keyid gesetzt sind, explizit verwenden, sonst Default Chain
    if keyid and key:
        client_kwargs["aws_access_key_id"] = keyid
        client_kwargs["aws_secret_access_key"] = key

    try:
        session = boto3.Session(**session_kwargs)
        client = session.client("sqs", **client_kwargs)
    except (NoCredentialsError, NoRegionError) as exc:
        log.error(
            "Error creating boto3 SQS client for sqs_events engine: %s",
            exc,
        )
        return None

    return client


def _get_queue_url(sqs_client, queue_name, owner_acct_id=None):
    """
    Resolve queue name to URL.
    """
    try:
        params = {"QueueName": queue_name}
        if owner_acct_id:
            params["QueueOwnerAWSAccountId"] = owner_acct_id
        resp = sqs_client.get_queue_url(**params)
        return resp["QueueUrl"]
    except ClientError as exc:
        log.warning(
            "Error getting queue URL for %s (owner_acct_id=%s): %s",
            queue_name,
            owner_acct_id,
            exc,
        )
        return None


def _process_queue(
    sqs_client,
    queue_url,
    q_name,
    fire_master,
    tag="salt/engine/sqs",
    owner_acct_id=None,
    message_format=None,
):
    if not queue_url:
        log.warning(
            "Failure resolving queue URL for: %s, waiting 10 seconds.",
            ":".join([_f for _f in (str(owner_acct_id), q_name) if _f]),
        )
        time.sleep(10)
        return

    try:
        # Long polling wie vorher (20 Sekunden)
        resp = sqs_client.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,
        )
    except ClientError as exc:
        log.warning(
            "Error receiving messages from SQS queue %s: %s",
            q_name,
            exc,
        )
        time.sleep(10)
        return

    messages = resp.get("Messages", [])
    for msg in messages:
        body = msg.get("Body", "")
        if message_format == "json":
            try:
                data = salt.utils.json.loads(body)
            except Exception as exc:  # pylint: disable=broad-except
                log.warning(
                    "Failed to decode JSON body from SQS message on queue %s: %s",
                    q_name,
                    exc,
                )
                data = body
            fire_master(tag=tag, data={"message": data})
        else:
            fire_master(tag=tag, data={"message": body})

        # Nachricht löschen, nachdem sie verarbeitet wurde
        try:
            sqs_client.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=msg["ReceiptHandle"],
            )
        except ClientError as exc:
            log.warning(
                "Failed to delete SQS message from queue %s: %s",
                q_name,
                exc,
            )


[docs] def start(queue, profile=None, tag="salt/engine/sqs", owner_acct_id=None): """ Listen to SQS and fire messages on Salt event bus (boto3-Version). """ if __opts__.get("__role") == "master": fire_master = salt.utils.event.get_master_event( __opts__, __opts__["sock_dir"], listen=False ).fire_event else: fire_master = __salt__["event.send"] message_format = __opts__.get("sqs.message_format", None) sqs_client = _get_sqs_client(profile) if not sqs_client: # Falls keine Credentials/Region etc. -> Retry-Schleife while not sqs_client: log.warning("Failed to create SQS boto3 client, retrying in 10 seconds.") time.sleep(10) sqs_client = _get_sqs_client(profile) queue_url = None while True: if not queue_url: queue_url = _get_queue_url(sqs_client, queue, owner_acct_id=owner_acct_id) _process_queue( sqs_client, queue_url, queue, fire_master, tag=tag, owner_acct_id=owner_acct_id, message_format=message_format, )