Source code for saltext.redis.cache.redis_cache

r"""
Redis
=====

Redis plugin for the Salt caching subsystem.

.. versionadded:: 2017.7.0
.. versionchanged:: 3005

To enable this cache plugin, the master will need the python client for redis installed.
This can be easily installed with pip:

.. code-block:: bash

    salt \* pip.install redis

As Redis provides a simple mechanism for very fast key-value store, in order to
provide the necessary features for the Salt caching subsystem, the following
conventions are used:

- A Redis key consists of the bank name and the cache key separated by ``/``, e.g.:
  ``$KEY_minions/alpha/stuff`` where ``minions/alpha`` is the bank name
  and ``stuff`` is the key name.
- As the caching subsystem is organised as a tree, we need to store the caching
  path and identify the bank and its offspring.  At the same time, Redis is
  linear and we need to avoid doing ``keys <pattern>`` which is very
  inefficient as it goes through all the keys on the remote Redis server.
  Instead, each bank hierarchy has a Redis SET associated which stores the list
  of sub-banks. By default, these keys begin with ``$BANK_``.
- In addition, each key name is stored in a separate SET of all the keys within
  a bank. By default, these SETs begin with ``$BANKEYS_``.

For example, to store the key ``my-key`` under the bank ``root-bank/sub-bank/leaf-bank``,
the following hierarchy will be built:

.. code-block:: text

    127.0.0.1:6379> SMEMBERS $BANK_root-bank
    1) "sub-bank"
    127.0.0.1:6379> SMEMBERS $BANK_root-bank/sub-bank
    1) "leaf-bank"
    127.0.0.1:6379> SMEMBERS $BANKEYS_root-bank/sub-bank/leaf-bank
    1) "my-key"
    127.0.0.1:6379> GET $KEY_root-bank/sub-bank/leaf-bank/my-key
    "my-value"


There are four types of keys stored:

- ``$BANK_*`` is a Redis SET containing the list of banks under the current bank.
- ``$BANKEYS_*`` is a Redis SET containing the list of keys under the current bank.
- ``$KEY_*`` keeps the value of the key.
- ``$TSTAMP_*`` stores the last updated timestamp of the key.

These prefixes and the separator can be adjusted using the configuration options:

bank_prefix: ``$BANK``
    The prefix used for the name of the Redis key storing the list of sub-banks.

bank_keys_prefix: ``$BANKEYS``
    The prefix used for the name of the Redis key storing the list of keys under a certain bank.

key_prefix: ``$KEY``
    The prefix of the Redis keys having the value of the keys to be cached under
    a certain bank.

timestamp_prefix: ``$TSTAMP``
    The prefix for the last modified timestamp for keys.

    .. versionadded:: 3005

separator: ``_``
    The separator between the prefix and the key body.

The connection details can be specified using:

host: ``localhost``
    The hostname of the Redis server.

port: ``6379``
    The Redis server port.

cluster_mode: ``False``
    Whether cluster_mode is enabled or not

cluster.startup_nodes:
    A list of host, port dictionaries pointing to cluster members. At least one is required
    but multiple nodes are better

    .. code-block:: yaml

        cache.redis.cluster.startup_nodes
          - host: redis-member-1
            port: 6379
          - host: redis-member-2
            port: 6379

cluster.skip_full_coverage_check: ``False``
    Some cluster providers restrict certain redis commands such as CONFIG for enhanced security.
    Set this option to true to skip checks that required advanced privileges.

    .. note::

        Most cloud hosted redis clusters will require this to be set to ``True``

db: ``'0'``
    The database index.

    .. note::
        The database index must be specified as string not as integer value!

password:
    Redis connection password.

unix_socket_path:

    .. versionadded:: 2018.3.1

    Path to a UNIX socket for access. Overrides `host` / `port`.

Configuration Example:

.. code-block:: yaml

    cache.redis.host: localhost
    cache.redis.port: 6379
    cache.redis.db: '0'
    cache.redis.password: my pass
    cache.redis.bank_prefix: #BANK
    cache.redis.bank_keys_prefix: #BANKEYS
    cache.redis.key_prefix: #KEY
    cache.redis.timestamp_prefix: #TICKS
    cache.redis.separator: '@'

Cluster Configuration Example:

.. code-block:: yaml

    cache.redis.cluster_mode: true
    cache.redis.cluster.skip_full_coverage_check: true
    cache.redis.cluster.startup_nodes:
      - host: redis-member-1
        port: 6379
      - host: redis-member-2
        port: 6379
    cache.redis.db: '0'
    cache.redis.password: my pass
    cache.redis.bank_prefix: #BANK
    cache.redis.bank_keys_prefix: #BANKEYS
    cache.redis.key_prefix: #KEY
    cache.redis.separator: '@'
"""

import itertools
import logging
import time

import salt.payload
import salt.utils.stringutils
from salt.exceptions import SaltCacheError

# Import salt

try:
    import redis
    from redis.exceptions import ConnectionError as RedisConnectionError
    from redis.exceptions import ResponseError as RedisResponseError

    HAS_REDIS = True
except ImportError:
    HAS_REDIS = False

try:
    from rediscluster import RedisCluster  # pylint: disable=no-name-in-module

    HAS_REDIS_CLUSTER = True
except ImportError:
    HAS_REDIS_CLUSTER = False


# -----------------------------------------------------------------------------
# module properties
# -----------------------------------------------------------------------------

__virtualname__ = "redis"
__func_alias__ = {"list_": "list"}

log = logging.getLogger(__file__)

_BANK_PREFIX = "$BANK"
_KEY_PREFIX = "$KEY"
_TIMESTAMP_PREFIX = "$TSTAMP"
_BANK_KEYS_PREFIX = "$BANKEYS"
_SEPARATOR = "_"

REDIS_SERVER = None

# -----------------------------------------------------------------------------
# property functions
# -----------------------------------------------------------------------------


[docs] def __virtual__(): """ The redis library must be installed for this module to work. The redis redis cluster library must be installed if cluster_mode is True """ if not HAS_REDIS: return (False, "Please install the python-redis package.") if not HAS_REDIS_CLUSTER and _get_redis_cache_opts()["cluster_mode"]: return (False, "Please install the redis-py-cluster package.") return __virtualname__
# ----------------------------------------------------------------------------- # helper functions -- will not be exported # ----------------------------------------------------------------------------- # pylint: disable=unused-argument
[docs] def init_kwargs(kwargs): """ Effectively a noop. Return an empty dictionary. """ return {}
def _get_redis_cache_opts(): """ Return the Redis server connection details from the __opts__. """ return { "host": __opts__.get("cache.redis.host", "localhost"), "port": __opts__.get("cache.redis.port", 6379), "unix_socket_path": __opts__.get("cache.redis.unix_socket_path", None), "db": __opts__.get("cache.redis.db", "0"), "password": __opts__.get("cache.redis.password", ""), "cluster_mode": __opts__.get("cache.redis.cluster_mode", False), "startup_nodes": __opts__.get("cache.redis.cluster.startup_nodes", {}), "skip_full_coverage_check": __opts__.get( "cache.redis.cluster.skip_full_coverage_check", False ), } def _get_redis_server(opts=None): """ Return the Redis server instance. Caching the object instance. """ global REDIS_SERVER # pylint: disable=global-statement if REDIS_SERVER: return REDIS_SERVER if not opts: opts = _get_redis_cache_opts() if opts["cluster_mode"]: REDIS_SERVER = RedisCluster( startup_nodes=opts["startup_nodes"], skip_full_coverage_check=opts["skip_full_coverage_check"], ) else: REDIS_SERVER = redis.StrictRedis( opts["host"], opts["port"], unix_socket_path=opts["unix_socket_path"], db=opts["db"], password=opts["password"], ) return REDIS_SERVER def _get_redis_keys_opts(): """ Build the key opts based on the user options. """ return { "bank_prefix": __opts__.get("cache.redis.bank_prefix", _BANK_PREFIX), "bank_keys_prefix": __opts__.get("cache.redis.bank_keys_prefix", _BANK_KEYS_PREFIX), "key_prefix": __opts__.get("cache.redis.key_prefix", _KEY_PREFIX), "separator": __opts__.get("cache.redis.separator", _SEPARATOR), "timestamp_prefix": __opts__.get("cache.redis.timestamp_prefix", _TIMESTAMP_PREFIX), } def _get_bank_redis_key(bank): """ Return the Redis key for the bank given the name. """ opts = _get_redis_keys_opts() return "{prefix}{separator}{bank}".format( # pylint: disable=consider-using-f-string prefix=opts["bank_prefix"], separator=opts["separator"], bank=bank ) def _get_timestamp_key(bank, key): opts = _get_redis_keys_opts() return "{}{}{}/{}".format( # pylint: disable=consider-using-f-string opts["timestamp_prefix"], opts["separator"], {bank}, {key} ) # Use this line when we can use modern python # return f"{opts['timestamp_prefix']}{opts['separator']}{bank}/{key}" def _get_key_redis_key(bank, key): """ Return the Redis key given the bank name and the key name. """ opts = _get_redis_keys_opts() return "{prefix}{separator}{bank}/{key}".format( # pylint: disable=consider-using-f-string prefix=opts["key_prefix"], separator=opts["separator"], bank=bank, key=salt.utils.stringutils.to_str(key), ) def _get_bank_keys_redis_key(bank): """ Return the Redis key for the SET of keys under a certain bank, given the bank name. """ opts = _get_redis_keys_opts() return "{prefix}{separator}{bank}".format( # pylint: disable=consider-using-f-string prefix=opts["bank_keys_prefix"], separator=opts["separator"], bank=bank ) def _build_bank_hier(bank, redis_pipe): """ Build the bank hierarchy from the root of the tree. If already exists, it won't rewrite. It's using the Redis pipeline, so there will be only one interaction with the remote server. """ def joinbanks(*banks): return "/".join(banks) for bank_path in itertools.accumulate(bank.split("/"), joinbanks): bank_set = _get_bank_redis_key(bank_path) log.debug("Adding %s to %s", bank, bank_set) redis_pipe.sadd(bank_set, ".") def _get_banks_to_remove(redis_server, bank, path=""): """ A simple tree traversal algorithm that builds the list of banks to remove, starting from an arbitrary node in the tree. """ current_path = bank if not path else f"{path}/{bank}" bank_paths_to_remove = [current_path] # as you got here, you'll be removed bank_key = _get_bank_redis_key(current_path) child_banks = redis_server.smembers(bank_key) if not child_banks: return bank_paths_to_remove # this bank does not have any child banks so we stop here for child_bank in child_banks: bank_paths_to_remove.extend( _get_banks_to_remove(redis_server, child_bank, path=current_path) ) # go one more level deeper # and also remove the children of this child bank (if any) return bank_paths_to_remove # ----------------------------------------------------------------------------- # cache subsystem functions # -----------------------------------------------------------------------------
[docs] def store(bank, key, data): """ Store the data in a Redis key. """ redis_server = _get_redis_server() redis_pipe = redis_server.pipeline() redis_key = _get_key_redis_key(bank, key) redis_bank_keys = _get_bank_keys_redis_key(bank) try: _build_bank_hier(bank, redis_pipe) value = salt.payload.dumps(data) redis_pipe.set(redis_key, value) log.debug("Setting the value for %s under %s (%s)", key, bank, redis_key) redis_pipe.sadd(redis_bank_keys, key) # localfs cache truncates the timestamp to int only. We'll do the same. redis_pipe.set( _get_timestamp_key(bank=bank, key=key), salt.payload.dumps(int(time.time())), ) log.debug("Adding %s to %s", key, redis_bank_keys) redis_pipe.execute() except (RedisConnectionError, RedisResponseError) as rerr: mesg = f"Cannot set the Redis cache key {redis_key}: {rerr}" log.error(mesg) raise SaltCacheError(mesg) from rerr
[docs] def fetch(bank, key): """ Fetch data from the Redis cache. """ redis_server = _get_redis_server() redis_key = _get_key_redis_key(bank, key) redis_value = None try: redis_value = redis_server.get(redis_key) except (RedisConnectionError, RedisResponseError) as rerr: mesg = f"Cannot fetch the Redis cache key {redis_key}: {rerr}" log.error(mesg) raise SaltCacheError(mesg) from rerr if redis_value is None: return {} return salt.payload.loads(redis_value)
[docs] def flush(bank, key=None): """ Remove the key from the cache bank with all the key content. If no key is specified, remove the entire bank with all keys and sub-banks inside. This function is using the Redis pipelining for best performance. However, when removing a whole bank, in order to re-create the tree, there are a couple of requests made. In total: - one for node in the hierarchy sub-tree, starting from the bank node - one pipelined request to get the keys under all banks in the sub-tree - one pipeline request to remove the corresponding keys This is not quite optimal, as if we need to flush a bank having a very long list of sub-banks, the number of requests to build the sub-tree may grow quite big. An improvement for this would be loading a custom Lua script in the Redis instance of the user (using the ``register_script`` feature) and call it whenever we flush. This script would only need to build this sub-tree causing problems. It can be added later and the behaviour should not change as the user needs to explicitly allow Salt inject scripts in their Redis instance. """ redis_server = _get_redis_server() redis_pipe = redis_server.pipeline() if key is None: # will remove all bank keys bank_paths_to_remove = _get_banks_to_remove(redis_server, bank) # tree traversal to get all bank hierarchy for bank_to_remove in bank_paths_to_remove: bank_keys_redis_key = _get_bank_keys_redis_key(bank_to_remove) # Redis key of the SET that stores the bank keys redis_pipe.smembers(bank_keys_redis_key) # fetch these keys log.debug( "Fetching the keys of the %s bank (%s)", bank_to_remove, bank_keys_redis_key, ) try: log.debug("Executing the pipe...") subtree_keys = redis_pipe.execute() # here are the keys under these banks to be removed # this retunrs a list of sets, e.g.: # [set([]), set(['my-key']), set(['my-other-key', 'yet-another-key'])] # one set corresponding to a bank except (RedisConnectionError, RedisResponseError) as rerr: mesg = "Cannot retrieve the keys under these cache banks: {rbanks}: {rerr}".format( # pylint: disable=consider-using-f-string rbanks=", ".join(bank_paths_to_remove), rerr=rerr ) log.error(mesg) raise SaltCacheError(mesg) from rerr total_banks = len(bank_paths_to_remove) # bank_paths_to_remove and subtree_keys have the same length (see above) for index in range(total_banks): bank_keys = subtree_keys[index] # all the keys under this bank bank_path = bank_paths_to_remove[index] for key in bank_keys: redis_key = _get_key_redis_key(bank_path, key) redis_pipe.delete(redis_key) # kill 'em all! timestamp_key = _get_timestamp_key(bank=bank_path, key=key.decode()) redis_pipe.delete(timestamp_key) log.debug( "Removing the key %s under the %s bank (%s)", key, bank_path, redis_key, ) bank_keys_redis_key = _get_bank_keys_redis_key(bank_path) redis_pipe.delete(bank_keys_redis_key) log.debug( "Removing the bank-keys key for the %s bank (%s)", bank_path, bank_keys_redis_key, ) # delete the Redis key where are stored # the list of keys under this bank bank_key = _get_bank_redis_key(bank_path) redis_pipe.delete(bank_key) log.debug("Removing the %s bank (%s)", bank_path, bank_key) # delete the bank key itself else: redis_key = _get_key_redis_key(bank, key) redis_pipe.delete(redis_key) # delete the key cached timestamp_key = _get_timestamp_key(bank=bank, key=key) redis_pipe.delete(timestamp_key) log.debug("Removing the key %s under the %s bank (%s)", key, bank, redis_key) bank_keys_redis_key = _get_bank_keys_redis_key(bank) redis_pipe.srem(bank_keys_redis_key, key) log.debug( "De-referencing the key %s from the bank-keys of the %s bank (%s)", key, bank, bank_keys_redis_key, ) # but also its reference from $BANKEYS list try: redis_pipe.execute() # Fluuuush except (RedisConnectionError, RedisResponseError) as rerr: mesg = f"Cannot flush the Redis cache bank {bank}: {rerr}" log.error(mesg) raise SaltCacheError(mesg) from rerr return True
[docs] def list_(bank): """ Lists entries stored in the specified bank. """ redis_server = _get_redis_server() bank_redis_key = _get_bank_keys_redis_key(bank) try: banks = redis_server.smembers(bank_redis_key) except (RedisConnectionError, RedisResponseError) as rerr: mesg = f"Cannot list the Redis cache key {bank_redis_key}: {rerr}" log.error(mesg) raise SaltCacheError(mesg) from rerr if not banks: return [] return [bank.decode() for bank in banks if bank != b"."]
[docs] def contains(bank, key): """ Checks if the specified bank contains the specified key. """ redis_server = _get_redis_server() bank_redis_key = _get_bank_keys_redis_key(bank) try: if key is None: return salt.utils.stringutils.to_str(redis_server.type(bank_redis_key)) != "none" else: return redis_server.sismember(bank_redis_key, key) except (RedisConnectionError, RedisResponseError) as rerr: mesg = f"Cannot retrieve the Redis cache key {bank_redis_key}: {rerr}" log.error(mesg) raise SaltCacheError(mesg) from rerr
[docs] def updated(bank, key): """ Return the Unix Epoch timestamp of when the key was last updated. Return None if key is not found. """ redis_server = _get_redis_server() timestamp_key = _get_timestamp_key(bank=bank, key=key) value = redis_server.get(timestamp_key) if value is not None: value = salt.payload.loads(value) return value