Source code for saltext.zookeeper.modules.zk_concurrency

"""
Concurrency controls in zookeeper
=================================

.. important::
    This module requires the general :ref:`Zookeeper setup <zookeeper-setup>`.

This module allows you to acquire and release a slot. This is primarily useful
for ensuring that no more than N hosts take a specific action at once. This can
also be used to coordinate between masters.
"""

import logging
import sys

try:
    from socket import gethostname

    import kazoo.client
    import kazoo.recipe.barrier
    import kazoo.recipe.lock
    import kazoo.recipe.party
    from kazoo.exceptions import CancelledError
    from kazoo.exceptions import NoNodeError
    from kazoo.retry import ForceRetryError

    # TODO: use the kazoo one, waiting for pull req:
    # https://github.com/python-zk/kazoo/pull/206
    class _Semaphore(kazoo.recipe.lock.Semaphore):
        def __init__(
            self,
            client,
            path,
            identifier=None,
            max_leases=1,
            ephemeral_lease=True,
        ):
            identifier = identifier or gethostname()
            kazoo.recipe.lock.Semaphore.__init__(
                self, client, path, identifier=identifier, max_leases=max_leases
            )
            self.ephemeral_lease = ephemeral_lease

            # if its not ephemeral, make sure we didn't already grab it
            if not self.ephemeral_lease:
                try:
                    for child in self.client.get_children(self.path):
                        try:
                            data, _ = self.client.get(self.path + "/" + child)
                            if identifier == data.decode("utf-8"):
                                self.create_path = self.path + "/" + child
                                self.is_acquired = True
                                break
                        except NoNodeError:  # pragma: nocover
                            pass
                except NoNodeError:  # pragma: nocover
                    pass

        def _get_lease(self, data=None):
            # Make sure the session is still valid
            if self._session_expired:
                raise ForceRetryError("Retry on session loss at top")

            # Make sure that the request hasn't been canceled
            if self.cancelled:
                raise CancelledError("Semaphore cancelled")

            # Get a list of the current potential lock holders. If they change,
            # notify our wake_event object. This is used to unblock a blocking
            # self._inner_acquire call.
            children = self.client.get_children(self.path, self._watch_lease_change)

            # If there are leases available, acquire one
            if len(children) < self.max_leases:
                self.client.create(self.create_path, self.data, ephemeral=self.ephemeral_lease)

            # Check if our acquisition was successful or not. Update our state.
            if self.client.exists(self.create_path):
                self.is_acquired = True
            else:
                self.is_acquired = False

            # Return current state
            return self.is_acquired

    HAS_DEPS = True
except ImportError:
    HAS_DEPS = False


__virtualname__ = "zk_concurrency"


def __virtual__():
    if not HAS_DEPS:
        return (False, "Module zk_concurrency: dependencies failed")

    __context__["semaphore_map"] = {}

    return __virtualname__


def _get_zk_conn(profile=None, **connection_args):
    if profile:
        prefix = "zookeeper:" + profile
    else:
        prefix = "zookeeper"

    def get(key, default=None):
        """
        look in connection_args first, then default to config file
        """
        return connection_args.get(key) or __salt__["config.get"](":".join([prefix, key]), default)

    hosts = get("hosts", "127.0.0.1:2181")
    scheme = get("scheme", None)
    username = get("username", None)
    password = get("password", None)
    default_acl = get("default_acl", None)

    if isinstance(hosts, list):
        hosts = ",".join(hosts)

    if username is not None and password is not None and scheme is None:
        scheme = "digest"

    auth_data = None
    if scheme and username and password:
        auth_data = [(scheme, ":".join([username, password]))]

    if default_acl is not None:
        if isinstance(default_acl, list):
            default_acl = [__salt__["zookeeper.make_digest_acl"](**acl) for acl in default_acl]
        else:
            default_acl = [__salt__["zookeeper.make_digest_acl"](**default_acl)]

    __context__.setdefault("zkconnection", {}).setdefault(
        profile or hosts,
        kazoo.client.KazooClient(hosts=hosts, default_acl=default_acl, auth_data=auth_data),
    )

    if not __context__["zkconnection"][profile or hosts].connected:
        __context__["zkconnection"][profile or hosts].start()

    return __context__["zkconnection"][profile or hosts]


[docs] def lock_holders( path, zk_hosts=None, identifier=None, max_concurrency=1, timeout=None, # pylint: disable=unused-argument ephemeral_lease=False, profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Return an un-ordered list of lock holders CLI Example: .. code-block:: bash salt '*' zk_concurrency.lock_holders some/path path The path in zookeeper where the lock is zk_hosts zookeeper connect string identifier Name to identify this minion, if unspecified defaults to hostname max_concurrency Maximum number of lock holders timeout timeout to wait for the lock. A None timeout will block forever ephemeral_lease Whether the locks in zookeper should be ephemeral Example: .. code-block:: bash salt minion zk_concurrency.lock_holders /lock/path host1:1234,host2:1234 """ zk = _get_zk_conn( profile=profile, hosts=zk_hosts, scheme=scheme, username=username, password=password, default_acl=default_acl, ) if path not in __context__["semaphore_map"]: __context__["semaphore_map"][path] = _Semaphore( zk, path, identifier, max_leases=max_concurrency, ephemeral_lease=ephemeral_lease, ) return __context__["semaphore_map"][path].lease_holders()
[docs] def lock( path, zk_hosts=None, identifier=None, max_concurrency=1, timeout=None, ephemeral_lease=False, force=False, # foricble get the lock regardless of open slots profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Get lock (with optional timeout) CLI Example: .. code-block:: bash salt '*' zk_concurrency.lock some/path path The path in zookeeper where the lock is zk_hosts zookeeper connect string identifier Name to identify this minion, if unspecified defaults to the hostname max_concurrency Maximum number of lock holders timeout timeout to wait for the lock. A None timeout will block forever ephemeral_lease Whether the locks in zookeper should be ephemeral force Forcibly acquire the lock regardless of available slots Example: .. code-block:: bash salt minion zk_concurrency.lock /lock/path host1:1234,host2:1234 """ zk = _get_zk_conn( profile=profile, hosts=zk_hosts, scheme=scheme, username=username, password=password, default_acl=default_acl, ) if path not in __context__["semaphore_map"]: __context__["semaphore_map"][path] = _Semaphore( zk, path, identifier, max_leases=max_concurrency, ephemeral_lease=ephemeral_lease, ) # forcibly get the lock regardless of max_concurrency if force: __context__["semaphore_map"][path].assured_path = True __context__["semaphore_map"][path].max_leases = sys.maxint # block waiting for lock acquisition if timeout: logging.info("Acquiring lock %s with timeout=%s", path, timeout) __context__["semaphore_map"][path].acquire(timeout=timeout) else: logging.info("Acquiring lock %s with no timeout", path) __context__["semaphore_map"][path].acquire() return __context__["semaphore_map"][path].is_acquired
[docs] def unlock( path, zk_hosts=None, # in case you need to unlock without having run lock (failed execution for example) identifier=None, max_concurrency=1, ephemeral_lease=False, scheme=None, profile=None, username=None, password=None, default_acl=None, ): """ Remove lease from semaphore CLI Example: .. code-block:: bash salt '*' zk_concurrency.unlock some/path path The path in zookeeper where the lock is zk_hosts zookeeper connect string identifier Name to identify this minion, if unspecified defaults to hostname max_concurrency Maximum number of lock holders timeout timeout to wait for the lock. A None timeout will block forever ephemeral_lease Whether the locks in zookeper should be ephemeral Example: .. code-block:: bash salt minion zk_concurrency.unlock /lock/path host1:1234,host2:1234 """ # if someone passed in zk_hosts, and the path isn't in __context__['semaphore_map'], lets # see if we can find it zk = _get_zk_conn( profile=profile, hosts=zk_hosts, scheme=scheme, username=username, password=password, default_acl=default_acl, ) if path not in __context__["semaphore_map"]: __context__["semaphore_map"][path] = _Semaphore( zk, path, identifier, max_leases=max_concurrency, ephemeral_lease=ephemeral_lease, ) if path in __context__["semaphore_map"]: __context__["semaphore_map"][path].release() del __context__["semaphore_map"][path] return True else: logging.error("Unable to find lease for path %s", path) return False
[docs] def party_members( path, zk_hosts=None, min_nodes=1, blocking=False, profile=None, scheme=None, username=None, password=None, default_acl=None, ): """ Get the List of identifiers in a particular party, optionally waiting for the specified minimum number of nodes (min_nodes) to appear CLI Example: .. code-block:: bash salt '*' zk_concurrency.party_members some/path path The path in zookeeper where the lock is zk_hosts zookeeper connect string min_nodes The minimum number of nodes expected to be present in the party blocking The boolean indicating if we need to block until min_nodes are available Example: .. code-block:: bash salt minion zk_concurrency.party_members /lock/path host1:1234,host2:1234 salt minion zk_concurrency.party_members /lock/path host1:1234,host2:1234 min_nodes=3 blocking=True """ zk = _get_zk_conn( profile=profile, hosts=zk_hosts, scheme=scheme, username=username, password=password, default_acl=default_acl, ) party = kazoo.recipe.party.ShallowParty(zk, path) if blocking: barrier = kazoo.recipe.barrier.DoubleBarrier(zk, path, min_nodes) barrier.enter() party = kazoo.recipe.party.ShallowParty(zk, path) barrier.leave() return list(party)