Source code for saltext.zookeeper.states.zk_concurrency

"""
Control concurrency of steps within state execution using zookeeper
===================================================================

.. important::
    This module requires the general :ref:`Zookeeper setup <zookeeper-setup>`.

This module allows you to "wrap" a state's execution with concurrency control.
This is useful to protect against all hosts executing highstate simultaneously
if your services don't all HUP restart. The common way of protecting against this
is to run in batch mode, but that doesn't protect from another person running
the same batch command (and thereby having 2x the number of nodes deploying at once).

This module will bock while acquiring a slot, meaning that however the command gets
called it will coordinate with zookeeper to ensure that no more than max_concurrency
steps are executing with a single path.

Example
-------
This example would allow the file state to change, but would limit the
concurrency of the trafficserver service restart to 4.

.. code-block:: yaml

    acquire_lock:
      zk_concurrency.lock:
        - name: /trafficeserver
        - zk_hosts: 'zookeeper:2181'
        - max_concurrency: 4
        - prereq:
            - service: trafficserver

    trafficserver:
      service.running:
        - watch:
          - file: /etc/trafficserver/records.config

    /etc/trafficserver/records.config:
      file.managed:
        - source: salt://records.config

    release_lock:
      zk_concurrency.unlock:
        - name: /trafficserver
        - require:
            - service: trafficserver
"""

# TODO: use depends decorator to make these per function deps, instead of all or nothing
REQUIRED_FUNCS = (
    "zk_concurrency.lock",
    "zk_concurrency.unlock",
    "zk_concurrency.party_members",
)

__virtualname__ = "zk_concurrency"


def __virtual__():
    if not all(func in __salt__ for func in REQUIRED_FUNCS):
        return (False, "zk_concurrency module could not be loaded")
    return __virtualname__


[docs] def lock( name, zk_hosts=None, identifier=None, max_concurrency=1, timeout=None, ephemeral_lease=False, profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Block state execution until you are able to get the lock (or hit the timeout) """ ret = {"name": name, "changes": {}, "result": False, "comment": ""} conn_kwargs = { "profile": profile, "scheme": scheme, "username": username, "password": password, "default_acl": default_acl, } if __opts__["test"]: ret["result"] = None ret["comment"] = "Attempt to acquire lock" return ret if identifier is None: identifier = __grains__["id"] locked = __salt__["zk_concurrency.lock"]( name, zk_hosts, identifier=identifier, max_concurrency=max_concurrency, timeout=timeout, ephemeral_lease=ephemeral_lease, **conn_kwargs, ) if locked: ret["result"] = True ret["comment"] = "lock acquired" else: ret["comment"] = "Unable to acquire lock" return ret
[docs] def unlock( name, zk_hosts=None, # in case you need to unlock without having run lock (failed execution for example) identifier=None, max_concurrency=1, ephemeral_lease=False, profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Remove lease from semaphore. """ ret = {"name": name, "changes": {}, "result": False, "comment": ""} conn_kwargs = { "profile": profile, "scheme": scheme, "username": username, "password": password, "default_acl": default_acl, } if __opts__["test"]: ret["result"] = None ret["comment"] = "Released lock if it is here" return ret if identifier is None: identifier = __grains__["id"] unlocked = __salt__["zk_concurrency.unlock"]( name, zk_hosts=zk_hosts, identifier=identifier, max_concurrency=max_concurrency, ephemeral_lease=ephemeral_lease, **conn_kwargs, ) if unlocked: ret["result"] = True else: ret["comment"] = f"Unable to find lease for path {name}" return ret
[docs] def min_party( name, zk_hosts, min_nodes, blocking=False, profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Ensure that there are `min_nodes` in the party at `name`, optionally blocking if not available. """ ret = {"name": name, "changes": {}, "result": False, "comment": ""} conn_kwargs = { "profile": profile, "scheme": scheme, "username": username, "password": password, "default_acl": default_acl, } if __opts__["test"]: ret["result"] = None ret["comment"] = "Attempt to ensure min_party" return ret nodes = __salt__["zk_concurrency.party_members"]( name, zk_hosts, min_nodes, blocking=blocking, **conn_kwargs ) if not isinstance(nodes, list): ret["result"] = False ret["comment"] = f"Error from zk_concurrency.party_members, return was not a list: {nodes}" return ret num_nodes = len(nodes) if num_nodes >= min_nodes or blocking: ret["result"] = None if __opts__["test"] else True if not blocking: ret["comment"] = f"Currently {num_nodes} nodes, which is >= {min_nodes}" else: ret["comment"] = ( "Blocked until {} nodes were available. Unblocked after {} nodes became" " available".format(min_nodes, num_nodes) ) else: ret["result"] = False ret["comment"] = f"Currently {num_nodes} nodes, which is < {min_nodes}" return ret