Source code for saltext.redis.returners.redis_return

"""
Return data to a redis server

To enable this returner the minion will need the python client for redis
installed and the following values configured in the minion or master
config, these are the defaults:

.. code-block:: yaml

    redis.db: '0'
    redis.host: 'salt'
    redis.port: 6379
    redis.password: ''

.. versionadded:: 2018.3.1

    Alternatively a UNIX socket can be specified by `unix_socket_path`:

.. code-block:: yaml

    redis.db: '0'
    redis.unix_socket_path: /var/run/redis/redis.sock

Cluster Mode Example:

.. code-block:: yaml

    redis.db: '0'
    redis.cluster_mode: true
    redis.cluster.skip_full_coverage_check: true
    redis.cluster.startup_nodes:
      - host: redis-member-1
        port: 6379
      - host: redis-member-2
        port: 6379

Alternative configuration values can be used by prefacing the configuration.
Any values not found in the alternative configuration will be pulled from
the default location:

.. code-block:: yaml

    alternative.redis.db: '0'
    alternative.redis.host: 'salt'
    alternative.redis.port: 6379
    alternative.redis.password: ''

To use the redis returner, append '--return redis' to the salt command.

.. code-block:: bash

    salt '*' test.ping --return redis

To use the alternative configuration, append '--return_config alternative' to the salt command.

.. versionadded:: 2015.5.0

.. code-block:: bash

    salt '*' test.ping --return redis --return_config alternative

To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.

.. versionadded:: 2016.3.0

.. code-block:: bash

    salt '*' test.ping --return redis --return_kwargs '{"db": "another-salt"}'

Redis Cluster Mode Options:

cluster_mode: ``False``
    Whether cluster_mode is enabled or not

cluster.startup_nodes:
    A list of host, port dictionaries pointing to cluster members. At least one is required
    but multiple nodes are better

    .. code-block:: yaml

        redis.cluster.startup_nodes
          - host: redis-member-1
            port: 6379
          - host: redis-member-2
            port: 6379

cluster.skip_full_coverage_check: ``False``
    Some cluster providers restrict certain redis commands such as CONFIG for enhanced security.
    Set this option to true to skip checks that required advanced privileges.

    .. note::

        Most cloud hosted redis clusters will require this to be set to ``True``


"""

import logging

import salt.returners
import salt.utils.jid
import salt.utils.job
import salt.utils.json
import salt.utils.platform

try:
    import redis

    HAS_REDIS = True
except ImportError:
    HAS_REDIS = False

log = logging.getLogger(__name__)

try:
    # pylint: disable=no-name-in-module
    from rediscluster import StrictRedisCluster

    # pylint: enable=no-name-in-module

    HAS_REDIS_CLUSTER = True
except ImportError:
    HAS_REDIS_CLUSTER = False

REDIS_POOL = None

# Define the module's virtual name
__virtualname__ = "redis"


[docs] def __virtual__(): """ The redis library must be installed for this module to work. The redis redis cluster library must be installed if cluster_mode is True """ if not HAS_REDIS: return ( False, "Could not import redis returner; redis python client is not installed.", ) if not HAS_REDIS_CLUSTER and _get_options().get("cluster_mode", False): return (False, "Please install the redis-py-cluster package.") return __virtualname__
def _get_options(ret=None): """ Get the redis options from salt. """ attrs = { "host": "host", "port": "port", "unix_socket_path": "unix_socket_path", "db": "db", "password": "password", "cluster_mode": "cluster_mode", "startup_nodes": "cluster.startup_nodes", "skip_full_coverage_check": "cluster.skip_full_coverage_check", } if salt.utils.platform.is_proxy(): return { "host": __opts__.get("redis.host", "salt"), "port": __opts__.get("redis.port", 6379), "unix_socket_path": __opts__.get("redis.unix_socket_path", None), "db": __opts__.get("redis.db", "0"), "password": __opts__.get("redis.password", None), "cluster_mode": __opts__.get("redis.cluster_mode", False), "startup_nodes": __opts__.get("redis.cluster.startup_nodes", {}), "skip_full_coverage_check": __opts__.get( "redis.cluster.skip_full_coverage_check", False ), } _options = salt.returners.get_returner_options( __virtualname__, ret, attrs, __salt__=__salt__, __opts__=__opts__ ) return _options def _get_serv(ret=None): """ Return a redis server object """ _options = _get_options(ret) global REDIS_POOL # pylint: disable=global-statement if REDIS_POOL: return REDIS_POOL elif _options.get("cluster_mode"): REDIS_POOL = StrictRedisCluster( startup_nodes=_options.get("startup_nodes"), skip_full_coverage_check=_options.get("skip_full_coverage_check"), decode_responses=True, ) else: REDIS_POOL = redis.StrictRedis( host=_options.get("host"), port=_options.get("port"), unix_socket_path=_options.get("unix_socket_path", None), db=_options.get("db"), password=_options.get("password"), decode_responses=True, ) return REDIS_POOL def _get_ttl(): return salt.utils.job.get_keep_jobs_seconds(__opts__)
[docs] def returner(ret): """ Return data to a redis data store """ serv = _get_serv(ret) pipeline = serv.pipeline(transaction=False) minion, jid = ret["id"], ret["jid"] pipeline.hset(f"ret:{jid}", minion, salt.utils.json.dumps(ret)) pipeline.expire(f"ret:{jid}", _get_ttl()) pipeline.set("{}:{}".format(minion, ret["fun"]), jid) # pylint: disable=consider-using-f-string pipeline.sadd("minions", minion) pipeline.execute()
# pylint: disable=unused-argument
[docs] def save_load(jid, load, minions=None): """ Save the load to the specified jid """ serv = _get_serv(ret=None) serv.setex(f"load:{jid}", _get_ttl(), salt.utils.json.dumps(load))
[docs] def save_minions(jid, minions, syndic_id=None): # pylint: disable=unused-argument """ Included for API consistency """
[docs] def get_load(jid): """ Return the load data that marks a specified jid """ serv = _get_serv(ret=None) data = serv.get(f"load:{jid}") if data: return salt.utils.json.loads(data) return {}
[docs] def get_jid(jid): """ Return the information returned when the specified job id was executed """ serv = _get_serv(ret=None) ret = {} for minion, data in serv.hgetall(f"ret:{jid}").items(): if data: ret[minion] = salt.utils.json.loads(data) return ret
[docs] def get_fun(fun): """ Return a dict of the last function called for all minions """ serv = _get_serv(ret=None) ret = {} for minion in serv.smembers("minions"): ind_str = f"{minion}:{fun}" try: jid = serv.get(ind_str) except Exception: # pylint: disable=broad-except continue if not jid: continue data = serv.get(f"{minion}:{jid}") if data: ret[minion] = salt.utils.json.loads(data) return ret
[docs] def get_jids(): """ Return a dict mapping all job ids to job information """ serv = _get_serv(ret=None) ret = {} for s in serv.mget(serv.keys("load:*")): if s is None: continue load = salt.utils.json.loads(s) jid = load["jid"] ret[jid] = salt.utils.jid.format_jid_instance(jid, load) return ret
[docs] def get_minions(): """ Return a list of minions """ serv = _get_serv(ret=None) return list(serv.smembers("minions"))
[docs] def clean_old_jobs(): """ Clean out minions's return data for old jobs. Normally, hset 'ret:<jid>' are saved with a TTL, and will eventually get cleaned by redis.But for jobs with some very late minion return, the corresponding hset's TTL will be refreshed to a too late timestamp, we'll do manually cleaning here. """ serv = _get_serv(ret=None) ret_jids = serv.keys("ret:*") living_jids = set(serv.keys("load:*")) to_remove = [] for ret_key in ret_jids: load_key = ret_key.replace("ret:", "load:", 1) if load_key not in living_jids: to_remove.append(ret_key) if len(to_remove) != 0: serv.delete(*to_remove) log.debug("clean old jobs: %s", to_remove)
[docs] def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument """ Do any work necessary to prepare a JID, including sending a custom id """ return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)