Concurrency controls in zookeeper

.. important::
    This module requires the general :ref:`Zookeeper setup <zookeeper-setup>`.

This module allows you to acquire and release a slot. This is primarily useful
for ensuring that no more than N hosts take a specific action at once. This can
also be used to coordinate between masters.

import logging
import sys

    from socket import gethostname

    import kazoo.client
    import kazoo.recipe.barrier
    import kazoo.recipe.lock
    from kazoo.exceptions import CancelledError
    from kazoo.exceptions import NoNodeError
    from kazoo.retry import ForceRetryError

    # TODO: use the kazoo one, waiting for pull req:
    class _Semaphore(kazoo.recipe.lock.Semaphore):
        def __init__(
            identifier = identifier or gethostname()
                self, client, path, identifier=identifier, max_leases=max_leases
            self.ephemeral_lease = ephemeral_lease

            # if its not ephemeral, make sure we didn't already grab it
            if not self.ephemeral_lease:
                    for child in self.client.get_children(self.path):
                            data, _ = self.client.get(self.path + "/" + child)
                            if identifier == data.decode("utf-8"):
                                self.create_path = self.path + "/" + child
                                self.is_acquired = True
                        except NoNodeError:  # pragma: nocover
                except NoNodeError:  # pragma: nocover

        def _get_lease(self, data=None):
            # Make sure the session is still valid
            if self._session_expired:
                raise ForceRetryError("Retry on session loss at top")

            # Make sure that the request hasn't been canceled
            if self.cancelled:
                raise CancelledError("Semaphore cancelled")

            # Get a list of the current potential lock holders. If they change,
            # notify our wake_event object. This is used to unblock a blocking
            # self._inner_acquire call.
            children = self.client.get_children(self.path, self._watch_lease_change)

            # If there are leases available, acquire one
            if len(children) < self.max_leases:
                self.client.create(self.create_path,, ephemeral=self.ephemeral_lease)

            # Check if our acquisition was successful or not. Update our state.
            if self.client.exists(self.create_path):
                self.is_acquired = True
                self.is_acquired = False

            # Return current state
            return self.is_acquired

    HAS_DEPS = True
except ImportError:
    HAS_DEPS = False

__virtualname__ = "zk_concurrency"

def __virtual__():
    if not HAS_DEPS:
        return (False, "Module zk_concurrency: dependencies failed")

    __context__["semaphore_map"] = {}

    return __virtualname__

def _get_zk_conn(profile=None, **connection_args):
    if profile:
        prefix = "zookeeper:" + profile
        prefix = "zookeeper"

    def get(key, default=None):
        look in connection_args first, then default to config file
        return connection_args.get(key) or __salt__["config.get"](":".join([prefix, key]), default)

    hosts = get("hosts", "")
    scheme = get("scheme", None)
    username = get("username", None)
    password = get("password", None)
    default_acl = get("default_acl", None)

    if isinstance(hosts, list):
        hosts = ",".join(hosts)

    if username is not None and password is not None and scheme is None:
        scheme = "digest"

    auth_data = None
    if scheme and username and password:
        auth_data = [(scheme, ":".join([username, password]))]

    if default_acl is not None:
        if isinstance(default_acl, list):
            default_acl = [__salt__["zookeeper.make_digest_acl"](**acl) for acl in default_acl]
            default_acl = [__salt__["zookeeper.make_digest_acl"](**default_acl)]

    __context__.setdefault("zkconnection", {}).setdefault(
        profile or hosts,
        kazoo.client.KazooClient(hosts=hosts, default_acl=default_acl, auth_data=auth_data),

    if not __context__["zkconnection"][profile or hosts].connected:
        __context__["zkconnection"][profile or hosts].start()

    return __context__["zkconnection"][profile or hosts]

[docs] def lock_holders( path, zk_hosts=None, identifier=None, max_concurrency=1, timeout=None, # pylint: disable=unused-argument ephemeral_lease=False, profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Return an un-ordered list of lock holders CLI Example: .. code-block:: bash salt '*' zk_concurrency.lock_holders some/path path The path in zookeeper where the lock is zk_hosts zookeeper connect string identifier Name to identify this minion, if unspecified defaults to hostname max_concurrency Maximum number of lock holders timeout timeout to wait for the lock. A None timeout will block forever ephemeral_lease Whether the locks in zookeper should be ephemeral Example: .. code-block:: bash salt minion zk_concurrency.lock_holders /lock/path host1:1234,host2:1234 """ zk = _get_zk_conn( profile=profile, hosts=zk_hosts, scheme=scheme, username=username, password=password, default_acl=default_acl, ) if path not in __context__["semaphore_map"]: __context__["semaphore_map"][path] = _Semaphore( zk, path, identifier, max_leases=max_concurrency, ephemeral_lease=ephemeral_lease, ) return __context__["semaphore_map"][path].lease_holders()
[docs] def lock( path, zk_hosts=None, identifier=None, max_concurrency=1, timeout=None, ephemeral_lease=False, force=False, # foricble get the lock regardless of open slots profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Get lock (with optional timeout) CLI Example: .. code-block:: bash salt '*' zk_concurrency.lock some/path path The path in zookeeper where the lock is zk_hosts zookeeper connect string identifier Name to identify this minion, if unspecified defaults to the hostname max_concurrency Maximum number of lock holders timeout timeout to wait for the lock. A None timeout will block forever ephemeral_lease Whether the locks in zookeper should be ephemeral force Forcibly acquire the lock regardless of available slots Example: .. code-block:: bash salt minion zk_concurrency.lock /lock/path host1:1234,host2:1234 """ zk = _get_zk_conn( profile=profile, hosts=zk_hosts, scheme=scheme, username=username, password=password, default_acl=default_acl, ) if path not in __context__["semaphore_map"]: __context__["semaphore_map"][path] = _Semaphore( zk, path, identifier, max_leases=max_concurrency, ephemeral_lease=ephemeral_lease, ) # forcibly get the lock regardless of max_concurrency if force: __context__["semaphore_map"][path].assured_path = True __context__["semaphore_map"][path].max_leases = sys.maxint # block waiting for lock acquisition if timeout:"Acquiring lock %s with timeout=%s", path, timeout) __context__["semaphore_map"][path].acquire(timeout=timeout) else:"Acquiring lock %s with no timeout", path) __context__["semaphore_map"][path].acquire() return __context__["semaphore_map"][path].is_acquired
[docs] def unlock( path, zk_hosts=None, # in case you need to unlock without having run lock (failed execution for example) identifier=None, max_concurrency=1, ephemeral_lease=False, scheme=None, profile=None, username=None, password=None, default_acl=None, ): """ Remove lease from semaphore CLI Example: .. code-block:: bash salt '*' zk_concurrency.unlock some/path path The path in zookeeper where the lock is zk_hosts zookeeper connect string identifier Name to identify this minion, if unspecified defaults to hostname max_concurrency Maximum number of lock holders timeout timeout to wait for the lock. A None timeout will block forever ephemeral_lease Whether the locks in zookeper should be ephemeral Example: .. code-block:: bash salt minion zk_concurrency.unlock /lock/path host1:1234,host2:1234 """ # if someone passed in zk_hosts, and the path isn't in __context__['semaphore_map'], lets # see if we can find it zk = _get_zk_conn( profile=profile, hosts=zk_hosts, scheme=scheme, username=username, password=password, default_acl=default_acl, ) if path not in __context__["semaphore_map"]: __context__["semaphore_map"][path] = _Semaphore( zk, path, identifier, max_leases=max_concurrency, ephemeral_lease=ephemeral_lease, ) if path in __context__["semaphore_map"]: __context__["semaphore_map"][path].release() del __context__["semaphore_map"][path] return True else: logging.error("Unable to find lease for path %s", path) return False
[docs] def party_members( path, zk_hosts=None, min_nodes=1, blocking=False, profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Get the List of identifiers in a particular party, optionally waiting for the specified minimum number of nodes (min_nodes) to appear CLI Example: .. code-block:: bash salt '*' zk_concurrency.party_members some/path path The path in zookeeper where the lock is zk_hosts zookeeper connect string min_nodes The minimum number of nodes expected to be present in the party blocking The boolean indicating if we need to block until min_nodes are available Example: .. code-block:: bash salt minion zk_concurrency.party_members /lock/path host1:1234,host2:1234 salt minion zk_concurrency.party_members /lock/path host1:1234,host2:1234 min_nodes=3 blocking=True """ zk = _get_zk_conn( profile=profile, hosts=zk_hosts, scheme=scheme, username=username, password=password, default_acl=default_acl, ) party =, path) if blocking: barrier = kazoo.recipe.barrier.DoubleBarrier(zk, path, min_nodes) barrier.enter() party =, path) barrier.leave() return list(party)