"""
Cassandra Database Module
.. versionadded:: 2015.5.0
This module works with Cassandra v2 and v3 and hence generates
queries based on the internal schema of said version.
:depends: DataStax Python Driver for Apache Cassandra
https://github.com/datastax/python-driver
pip install cassandra-driver
:referenced by: Salt's cassandra_cql returner
:configuration:
The Cassandra cluster members and connection port can either be specified
in the master or minion config, the minion's pillar or be passed to the module.
Example configuration in the config for a single node:
.. code-block:: yaml
cassandra:
cluster: 192.168.50.10
port: 9000
Example configuration in the config for a cluster:
.. code-block:: yaml
cassandra:
cluster:
- 192.168.50.10
- 192.168.50.11
- 192.168.50.12
port: 9000
username: cas_admin
.. versionchanged:: 2016.11.0
Added support for ``ssl_options`` and ``protocol_version``.
Example configuration with
`ssl options <http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.ssl_options>`_:
If ``ssl_options`` are present in cassandra config the cassandra_cql returner
will use SSL. SSL isn't used if ``ssl_options`` isn't specified.
.. code-block:: yaml
cassandra:
cluster:
- 192.168.50.10
- 192.168.50.11
- 192.168.50.12
port: 9000
username: cas_admin
ssl_options:
ca_certs: /etc/ssl/certs/ca-bundle.trust.crt
# SSL version should be one from the ssl module
# This is an optional parameter
ssl_version: PROTOCOL_TLSv1
Additionally you can also specify the ``protocol_version`` to
`use <http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.ssl_options>`_.
.. code-block:: yaml
cassandra:
cluster:
- 192.168.50.10
- 192.168.50.11
- 192.168.50.12
port: 9000
username: cas_admin
# defaults to 4, if not set
protocol_version: 3
Also all configuration could be passed directly to module as arguments.
.. code-block:: bash
salt minion1 cassandra_cql.info contact_points=delme-nextgen-01 port=9042 cql_user=cassandra cql_pass=cassandra protocol_version=4
salt minion1 cassandra_cql.info ssl_options='{"ca_certs": /path/to/-ca.crt}'
We can also provide the load balancing policy as arguments
.. code-block:: bash
salt minion1 cassandra_cql.cql_query "alter user cassandra with password 'cassandra2' ;" contact_points=scylladb cql_user=user1 cql_pass=password port=9142 protocol_version=4 ssl_options='{"ca_certs": path-to-client-ca.crt}' load_balancing_policy=DCAwareRoundRobinPolicy load_balancing_policy_args='{"local_dc": "datacenter1"}'
"""
# pylint: disable=W0613,W0612,R1719,R1710
import logging
import re
import ssl
import salt.utils.json
import salt.utils.versions
from salt.exceptions import CommandExecutionError
SSL_VERSION = "ssl_version"
log = logging.getLogger(__name__)
__virtualname__ = "cassandra_cql"
HAS_DRIVER = False
try:
# pylint: disable=import-error,no-name-in-module
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.cluster import NoHostAvailable
from cassandra.connection import ConnectionException
from cassandra.connection import ConnectionShutdown
from cassandra.connection import OperationTimedOut
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.policies import ExponentialReconnectionPolicy
from cassandra.policies import HostDistance
from cassandra.policies import HostFilterPolicy
from cassandra.policies import IdentityTranslator
from cassandra.policies import LoadBalancingPolicy
from cassandra.policies import NoSpeculativeExecutionPlan
from cassandra.policies import NoSpeculativeExecutionPolicy
from cassandra.policies import RetryPolicy
from cassandra.policies import RoundRobinPolicy
from cassandra.policies import SimpleConvictionPolicy
from cassandra.policies import TokenAwarePolicy
from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.query import dict_factory
# pylint: enable=import-error,no-name-in-module
HAS_DRIVER = True
LOAD_BALANCING_POLICY_MAP = {
"HostDistance": HostDistance,
"LoadBalancingPolicy": LoadBalancingPolicy,
"RoundRobinPolicy": RoundRobinPolicy,
"DCAwareRoundRobinPolicy": DCAwareRoundRobinPolicy,
"WhiteListRoundRobinPolicy": WhiteListRoundRobinPolicy,
"TokenAwarePolicy": TokenAwarePolicy,
"HostFilterPolicy": HostFilterPolicy,
"SimpleConvictionPolicy": SimpleConvictionPolicy,
"ExponentialReconnectionPolicy": ExponentialReconnectionPolicy,
"RetryPolicy": RetryPolicy,
"IdentityTranslator": IdentityTranslator,
"NoSpeculativeExecutionPlan": NoSpeculativeExecutionPlan,
"NoSpeculativeExecutionPolicy": NoSpeculativeExecutionPolicy,
}
except ImportError:
pass
[docs]
def __virtual__():
"""
Return virtual name of the module only if the python driver can be loaded.
:return: The virtual name of the module.
:rtype: str
"""
if HAS_DRIVER:
return __virtualname__
return (False, "Cannot load cassandra_cql module: python driver not found")
def _async_log_errors(errors):
log.error("Cassandra_cql asynchronous call returned: %s", errors)
def _get_lbp_policy(name, **policy_args):
"""
Returns the Load Balancer Policy class by name
"""
if name in LOAD_BALANCING_POLICY_MAP:
return LOAD_BALANCING_POLICY_MAP.get(name)(**policy_args)
log.error("The policy %s is not available", name)
def _load_properties(property_name, config_option, set_default=False, default=None):
"""
Load properties for the cassandra module from config or pillar.
:param property_name: The property to load.
:type property_name: str or list of str
:param config_option: The name of the config option.
:type config_option: str
:param set_default: Should a default be set if not found in config.
:type set_default: bool
:param default: The default value to be set.
:type default: str or int
:return: The property fetched from the configuration or default.
:rtype: str or list of str
"""
if not property_name:
log.debug("No property specified in function, trying to load from salt configuration")
try:
options = __salt__["config.option"]("cassandra", default={})
except BaseException as e:
log.error("Failed to get cassandra config options. Reason: %s", e)
raise
loaded_property = options.get(config_option)
if not loaded_property:
if set_default:
log.debug("Setting default Cassandra %s to %s", config_option, default)
loaded_property = default
else:
log.error(
"No cassandra %s specified in the configuration or passed to the module.",
config_option,
)
raise CommandExecutionError(f"ERROR: Cassandra {config_option} cannot be empty.")
return loaded_property
return property_name
def _get_ssl_opts():
"""
Parse out ssl_options for Cassandra cluster connection.
Make sure that the ssl_version (if any specified) is valid.
"""
sslopts = __salt__["config.option"]("cassandra", default={}).get("ssl_options", None)
ssl_opts = {}
if sslopts:
ssl_opts["ca_certs"] = sslopts["ca_certs"]
if SSL_VERSION in sslopts:
if not sslopts[SSL_VERSION].startswith("PROTOCOL_"):
valid_opts = ", ".join([x for x in dir(ssl) if x.startswith("PROTOCOL_")])
raise CommandExecutionError(
"Invalid protocol_version specified! Please make sure "
"that the ssl protocol version is one from the SSL "
"fmodule. Valid options are {valid_opts}"
)
ssl_opts[SSL_VERSION] = getattr(ssl, sslopts[SSL_VERSION])
return ssl_opts
return None
def _connect(
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Connect to a Cassandra cluster.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str or list of str
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The session and cluster objects.
:rtype: cluster object, session object
"""
# Lazy load the Cassandra cluster and session for this module by creating a
# cluster and session when cql_query is called the first time. Get the
# Cassandra cluster and session from this module's __context__ after it is
# loaded the first time cql_query is called.
#
# TODO: Call cluster.shutdown() when the module is unloaded on
# master/minion shutdown. Currently, Master.shutdown() and Minion.shutdown()
# do nothing to allow loaded modules to gracefully handle resources stored
# in __context__ (i.e. connection pools). This means that the connection
# pool is orphaned and Salt relies on Cassandra to reclaim connections.
# Perhaps if Master/Minion daemons could be enhanced to call an "__unload__"
# function, or something similar for each loaded module, connection pools
# and the like can be gracefully reclaimed/shutdown.
if (
__context__
and "cassandra_cql_returner_cluster" in __context__
and "cassandra_cql_returner_session" in __context__
):
return (
__context__["cassandra_cql_returner_cluster"],
__context__["cassandra_cql_returner_session"],
)
if contact_points is None:
contact_points = _load_properties(property_name=contact_points, config_option="cluster")
contact_points = (
contact_points if isinstance(contact_points, list) else contact_points.split(",")
)
if port is None:
port = _load_properties(
property_name=port, config_option="port", set_default=True, default=9042
)
if cql_user is None:
cql_user = _load_properties(
property_name=cql_user,
config_option="username",
set_default=True,
default="cassandra",
)
if cql_pass is None:
cql_pass = _load_properties(
property_name=cql_pass,
config_option="password",
set_default=True,
default="cassandra",
)
if protocol_version is None:
protocol_version = _load_properties(
property_name=protocol_version,
config_option="protocol_version",
set_default=True,
default=4,
)
if load_balancing_policy_args is None:
load_balancing_policy_args = _load_properties(
property_name=load_balancing_policy_args,
config_option="load_balancing_policy_args",
set_default=True,
default={},
)
if load_balancing_policy is None:
load_balancing_policy = _load_properties(
property_name=load_balancing_policy,
config_option="load_balancing_policy",
set_default=True,
default="RoundRobinPolicy",
)
if load_balancing_policy_args:
lbp_policy_cls = _get_lbp_policy(load_balancing_policy, **load_balancing_policy_args)
else:
lbp_policy_cls = _get_lbp_policy(load_balancing_policy)
try:
auth_provider = PlainTextAuthProvider(username=cql_user, password=cql_pass)
if ssl_options is None:
ssl_opts = _get_ssl_opts()
else:
ssl_opts = ssl_options
if ssl_opts:
cluster = Cluster(
contact_points,
port=port,
auth_provider=auth_provider,
ssl_options=ssl_opts,
protocol_version=protocol_version,
load_balancing_policy=lbp_policy_cls,
compression=True,
)
else:
cluster = Cluster(
contact_points,
port=port,
auth_provider=auth_provider,
protocol_version=protocol_version,
load_balancing_policy=lbp_policy_cls,
compression=True,
)
for recontimes in range(1, 4):
try:
session = cluster.connect()
break
except OperationTimedOut:
log.warning("Cassandra cluster.connect timed out, try %s", recontimes)
if recontimes >= 3:
raise
# TODO: Call cluster.shutdown() when the module is unloaded on shutdown.
__context__["cassandra_cql_returner_cluster"] = cluster
__context__["cassandra_cql_returner_session"] = session
__context__["cassandra_cql_prepared"] = {}
log.debug("Successfully connected to Cassandra cluster at %s", contact_points)
return cluster, session
except TypeError:
pass
except (ConnectionException, ConnectionShutdown, NoHostAvailable) as err:
log.error("Could not connect to Cassandra cluster at %s", contact_points)
# pylint: disable=W0707
raise CommandExecutionError(str(err))
[docs]
def cql_query(
query,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Run a query on a Cassandra cluster and return a dictionary.
:param query: The query to execute.
:type query: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param params: The parameters for the query, optional.
:type params: str
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: A dictionary from the return values of the query
:rtype: list[dict]
CLI Example:
.. code-block:: bash
salt 'cassandra-server' cassandra_cql.cql_query "SELECT * FROM users_by_name WHERE first_name = 'jane'"
"""
try:
cluster, session = _connect(
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not get Cassandra cluster session.")
raise
except BaseException as e:
log.critical("Unexpected error while getting Cassandra cluster session: %s", e)
raise
session.row_factory = dict_factory
ret = []
# Cassandra changed their internal schema from v2 to v3
# If the query contains a dictionary sorted by versions
# Find the query for the current cluster version.
# https://issues.apache.org/jira/browse/CASSANDRA-6717
if isinstance(query, dict):
cluster_version = version(
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
)
match = re.match(r"^(\d+)\.(\d+)(?:\.(\d+))?", cluster_version)
major, minor, point = match.groups()
# try to find the specific version in the query dictionary
# then try the major version
# otherwise default to the highest version number
try:
query = query[cluster_version]
except KeyError:
query = query.get(major, max(query))
log.debug("New query is: %s", query)
try:
results = session.execute(query)
except BaseException as e:
log.error("Failed to execute query: %s\n reason: %s", query, e)
msg = f"ERROR: Cassandra query failed: {query} reason: {e}"
# pylint: disable=W0707
raise CommandExecutionError(msg)
if results:
for result in results:
values = {}
for key, value in result.items():
# Salt won't return dictionaries with odd types like uuid.UUID
if not isinstance(value, str):
# Must support Cassandra collection types.
# Namely, Cassandras set, list, and map collections.
if not isinstance(value, (set, list, dict)):
value = str(value)
values[key] = value
ret.append(values)
return ret
[docs]
def cql_query_with_prepare(
query,
statement_name,
statement_arguments,
asynchronous=False,
callback_errors=None,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
**kwargs,
):
"""
Run a query on a Cassandra cluster and return a dictionary.
This function should not be used asynchronously for SELECTs -- it will not
return anything and we don't currently have a mechanism for handling a future
that will return results.
:param query: The query to execute.
:type query: str
:param statement_name: Name to assign the prepared statement in the __context__ dictionary
:type statement_name: str
:param statement_arguments: Bind parameters for the SQL statement
:type statement_arguments: list[str]
:param asynchronous: Run this query in asynchronous mode
:type asynchronous: bool
:param async: Run this query in asynchronous mode (an alias to 'asynchronous')
NOTE: currently it overrides 'asynchronous' and it will be dropped in version 3001!
:type async: bool
:param callback_errors: Function to call after query runs if there is an error
:type callback_errors: Function callable
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param params: The parameters for the query, optional.
:type params: str
:param protocol_version: Cassandra protocol version to use.
:type port: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: A dictionary from the return values of the query
:rtype: list[dict]
CLI Example:
.. code-block:: bash
# Insert data asynchronously
salt this-node cassandra_cql.cql_query_with_prepare "name_insert" "INSERT INTO USERS (first_name, last_name) VALUES (?, ?)" \
statement_arguments=['John','Doe'], asynchronous=True
# Select data, should not be asynchronous because there is not currently a facility to return data from a future
salt this-node cassandra_cql.cql_query_with_prepare "name_select" "SELECT * FROM USERS WHERE first_name=?" \
statement_arguments=['John']
"""
# Backward-compatibility with Python 3.7: "async" is a reserved word
if "async" in kwargs:
asynchronous = kwargs.get("async", False)
try:
cluster, session = _connect(
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=None,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=None,
)
except CommandExecutionError:
log.critical("Could not get Cassandra cluster session.")
raise
except BaseException as e:
log.critical("Unexpected error while getting Cassandra cluster session: %s", e)
raise
if statement_name not in __context__["cassandra_cql_prepared"]:
try:
bound_statement = session.prepare(query)
__context__["cassandra_cql_prepared"][statement_name] = bound_statement
except BaseException as e:
log.critical("Unexpected error while preparing SQL statement: %s", e)
raise
else:
bound_statement = __context__["cassandra_cql_prepared"][statement_name]
session.row_factory = dict_factory
ret = []
results = {}
try:
if asynchronous:
future_results = session.execute_async(bound_statement.bind(statement_arguments))
# future_results.add_callbacks(_async_log_errors)
else:
results = session.execute(bound_statement.bind(statement_arguments))
except BaseException as e:
log.error("Failed to execute query: %s\n reason: %s", query, e)
msg = f"ERROR: Cassandra query failed: {query} reason: {e}"
# pylint: disable=W0707
raise CommandExecutionError(msg)
if results and not asynchronous:
for result in results:
values = {}
for key, value in result.items():
# Salt won't return dictionaries with odd types like uuid.UUID
if not isinstance(value, str):
# Must support Cassandra collection types.
# Namely, Cassandras set, list, and map collections.
if not isinstance(value, (set, list, dict)):
value = str(value)
values[key] = value
ret.append(values)
# If this was a synchronous call, then we either have an empty list
# because there was no return, or we have a return
# If this was an asynchronous call we only return the empty list
return ret
[docs]
def version(
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Show the Cassandra version.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The version for this Cassandra cluster.
:rtype: str
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.version
salt 'minion1' cassandra_cql.version contact_points=minion1
"""
query = "select release_version from system.local limit 1;"
try:
ret = cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not get Cassandra version.")
raise
except BaseException as e:
log.critical("Unexpected error while getting Cassandra version: %s", e)
raise
return ret[0].get("release_version")
[docs]
def info(
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Show the Cassandra information for this cluster.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The information for this Cassandra cluster.
:rtype: dict
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.info
salt 'minion1' cassandra_cql.info contact_points=minion1
"""
query = """select cluster_name,
data_center,
partitioner,
host_id,
rack,
release_version,
cql_version,
schema_version,
thrift_version
from system.local
limit 1;"""
ret = {}
try:
ret = cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not list Cassandra info.")
raise
except BaseException as e:
log.critical("Unexpected error while listing Cassandra info: %s", e)
raise
return ret
[docs]
def list_keyspaces(
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
List keyspaces in a Cassandra cluster.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The keyspaces in this Cassandra cluster.
:rtype: list[dict]
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.list_keyspaces
salt 'minion1' cassandra_cql.list_keyspaces contact_points=minion1 port=9000
"""
query = {
"2": "select keyspace_name from system.schema_keyspaces;",
"3": "select keyspace_name from system_schema.keyspaces;",
}
ret = {}
try:
ret = cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not list keyspaces.")
raise
except BaseException as e:
log.critical("Unexpected error while listing keyspaces: %s", e)
raise
return ret
[docs]
def list_column_families(
keyspace=None,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
List column families in a Cassandra cluster for all keyspaces or just the provided one.
:param keyspace: The keyspace to provide the column families for, optional.
:type keyspace: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The column families in this Cassandra cluster.
:rtype: list[dict]
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.list_column_families
salt 'minion1' cassandra_cql.list_column_families contact_points=minion1
salt 'minion1' cassandra_cql.list_column_families keyspace=system
"""
where_clause = f"where keyspace_name = '{keyspace}'" if keyspace else ""
query = {
"2": f"select columnfamily_name from system.schema_columnfamilies {where_clause};",
"3": f"select column_name from system_schema.columns {where_clause};",
}
ret = {}
try:
ret = cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not list column families.")
raise
except BaseException as e:
log.critical("Unexpected error while listing column families: %s", e)
raise
return ret
[docs]
def keyspace_exists(
keyspace,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Check if a keyspace exists in a Cassandra cluster.
:param keyspace The keyspace name to check for.
:type keyspace: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The info for the keyspace or False if it does not exist.
:rtype: dict
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.keyspace_exists keyspace=system
"""
query = {
"2": (
f"select keyspace_name from system.schema_keyspaces where keyspace_name = '{keyspace}';"
),
"3": (
f"select keyspace_name from system_schema.keyspaces where keyspace_name = '{keyspace}';"
),
}
try:
ret = cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not determine if keyspace exists.")
raise
except BaseException as e:
log.critical("Unexpected error while determining if keyspace exists: %s", e)
raise
return True if ret else False
[docs]
def create_keyspace(
keyspace,
replication_strategy="SimpleStrategy",
replication_factor=1,
replication_datacenters=None,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Create a new keyspace in Cassandra.
:param keyspace: The keyspace name
:type keyspace: str
:param replication_strategy: either `SimpleStrategy` or `NetworkTopologyStrategy`
:type replication_strategy: str
:param replication_factor: number of replicas of data on multiple nodes. not used if using NetworkTopologyStrategy
:type replication_factor: int
:param replication_datacenters: string or dict of datacenter names to replication factors, required if using
NetworkTopologyStrategy (will be a dict if coming from state file).
:type replication_datacenters: str | dict[str, int]
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The info for the keyspace or False if it does not exist.
:rtype: dict
CLI Example:
.. code-block:: bash
# CLI Example:
salt 'minion1' cassandra_cql.create_keyspace keyspace=newkeyspace
salt 'minion1' cassandra_cql.create_keyspace keyspace=newkeyspace replication_strategy=NetworkTopologyStrategy \
replication_datacenters='{"datacenter_1": 3, "datacenter_2": 2}'
"""
existing_keyspace = keyspace_exists(
keyspace,
contact_points=contact_points,
cql_user=cql_user,
cql_pass=cql_pass,
port=port,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
if not existing_keyspace:
# Add the strategy, replication_factor, etc.
replication_map = {"class": replication_strategy}
if replication_datacenters:
if isinstance(replication_datacenters, str):
try:
replication_datacenter_map = salt.utils.json.loads(replication_datacenters)
replication_map.update(**replication_datacenter_map)
except BaseException: # pylint: disable=W0703
log.error("Could not load json replication_datacenters.")
return False
else:
replication_map.update(**replication_datacenters)
else:
replication_map["replication_factor"] = replication_factor
query = f"create keyspace {keyspace} with replication = {replication_map} and durable_writes = true;"
try:
cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not create keyspace.")
raise
except BaseException as e:
log.critical("Unexpected error while creating keyspace: %s", e)
raise
[docs]
def drop_keyspace(
keyspace,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Drop a keyspace if it exists in a Cassandra cluster.
:param keyspace: The keyspace to drop.
:type keyspace: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The info for the keyspace or False if it does not exist.
:rtype: dict
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.drop_keyspace keyspace=test
salt 'minion1' cassandra_cql.drop_keyspace keyspace=test contact_points=minion1
"""
existing_keyspace = keyspace_exists(
keyspace,
contact_points=contact_points,
cql_user=cql_user,
cql_pass=cql_pass,
port=port,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
if existing_keyspace:
query = f"""drop keyspace {keyspace};"""
try:
cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not drop keyspace.")
raise
except BaseException as e:
log.critical("Unexpected error while dropping keyspace: %s", e)
raise
return True
[docs]
def list_users(
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
List existing users in this Cassandra cluster.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: The list of existing users.
:rtype: dict
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.list_users
salt 'minion1' cassandra_cql.list_users contact_points=minion1
"""
query = "list users;"
ret = {}
try:
ret = cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not list users.")
raise
except BaseException as e:
log.critical("Unexpected error while listing users: %s", e)
raise
return ret
[docs]
def create_user(
username,
password,
superuser=False,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Create a new cassandra user with credentials and superuser status.
:param username: The name of the new user.
:type username: str
:param password: The password of the new user.
:type password: str
:param superuser: Is the new user going to be a superuser? default: False
:type superuser: bool
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return:
:rtype:
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.create_user username=joe password=secret
salt 'minion1' cassandra_cql.create_user username=joe password=secret superuser=True
salt 'minion1' cassandra_cql.create_user username=joe password=secret superuser=True contact_points=minion1
"""
superuser_cql = "superuser" if superuser else "nosuperuser"
query = f"create user if not exists {username} with password '{password}' {superuser_cql};"
log.debug(
"Attempting to create a new user with username=%s superuser=%s",
username,
superuser_cql,
)
# The create user query doesn't actually return anything if the query succeeds.
# If the query fails, catch the exception, log a messange and raise it again.
try:
cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not create user.")
raise
except BaseException as e:
log.critical("Unexpected error while creating user: %s", e)
raise
return True
[docs]
def list_permissions(
username=None,
resource=None,
resource_type="keyspace",
permission=None,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
List permissions.
:param username: The name of the user to list permissions for.
:type username: str
:param resource: The resource (keyspace or table), if None, permissions for all resources are listed.
:type resource: str
:param resource_type: The resource_type (keyspace or table), defaults to 'keyspace'.
:type resource_type: str
:param permission: A permission name (e.g. select), if None, all permissions are listed.
:type permission: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return: Dictionary of permissions.
:rtype: dict
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.list_permissions
salt 'minion1' cassandra_cql.list_permissions username=joe resource=test_keyspace permission=select
salt 'minion1' cassandra_cql.list_permissions username=joe resource=test_table resource_type=table \
permission=select contact_points=minion1
"""
keyspace_cql = f"{resource_type} {resource}" if resource else "all keyspaces"
permission_cql = f"{permission} permission" if permission else "all permissions"
query = f"list {permission_cql} on {keyspace_cql}"
if username:
query = f"{query} of {username}"
log.debug("Attempting to list permissions with query '%s'", query)
ret = {}
try:
ret = cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not list permissions.")
raise
except BaseException as e:
log.critical("Unexpected error while listing permissions: %s", e)
raise
return ret
[docs]
def grant_permission(
username,
resource=None,
resource_type="keyspace",
permission=None,
contact_points=None,
port=None,
cql_user=None,
cql_pass=None,
protocol_version=None,
load_balancing_policy=None,
load_balancing_policy_args=None,
ssl_options=None,
):
"""
Grant permissions to a user.
:param username: The name of the user to grant permissions to.
:type username: str
:param resource: The resource (keyspace or table), if None, permissions for all resources are granted.
:type resource: str
:param resource_type: The resource_type (keyspace or table), defaults to 'keyspace'.
:type resource_type: str
:param permission: A permission name (e.g. select), if None, all permissions are granted.
:type permission: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type protocol_version: int
:param load_balancing_policy: cassandra.policy class name to use
:type load_balancing_policy: str
:param load_balancing_policy_args: cassandra.policy constructor args
:type load_balancing_policy_args: dict
:param ssl_options: Cassandra protocol version to use.
:type ssl_options: dict
:return:
:rtype:
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra_cql.grant_permission
salt 'minion1' cassandra_cql.grant_permission username=joe resource=test_keyspace permission=select
salt 'minion1' cassandra_cql.grant_permission username=joe resource=test_table resource_type=table \
permission=select contact_points=minion1
"""
permission_cql = f"grant {permission}" if permission else "grant all permissions"
resource_cql = f"on {resource_type} {resource}" if resource else "on all keyspaces"
query = f"{permission_cql} {resource_cql} to {username}"
log.debug("Attempting to grant permissions with query '%s'", query)
try:
cql_query(
query,
contact_points=contact_points,
port=port,
cql_user=cql_user,
cql_pass=cql_pass,
protocol_version=protocol_version,
load_balancing_policy=load_balancing_policy,
load_balancing_policy_args=load_balancing_policy_args,
ssl_options=ssl_options,
)
except CommandExecutionError:
log.critical("Could not grant permissions.")
raise
except BaseException as e:
log.critical("Unexpected error while granting permissions: %s", e)
raise
return True