Source code for saltext.influxdb.modules.influxdbmod

"""
Interface with InfluxDB 0.9-1.x

.. important::
    You can optionally specify default connection parameters via the general :ref:`influxdb setup <influxdb-setup>`.

Most functions in this module allow you to override or provide some or all
of these settings via keyword arguments:

.. code-block:: bash

    salt '*' influxdb.foo_function influxdb_user='influxadmin' influxdb_password='s3cr1t'

This overrides ``user`` and ``password`` while still using the defaults for
``host`` and ``port``.
"""

import collections
import logging
from collections.abc import Sequence

import salt.utils.json
from salt.state import STATE_INTERNAL_KEYWORDS as _STATE_INTERNAL_KEYWORDS

try:
    import influxdb

    HAS_INFLUXDB = True
except ImportError:
    HAS_INFLUXDB = False


log = logging.getLogger(__name__)

__virtualname__ = "influxdb"


def __virtual__():
    if HAS_INFLUXDB:
        return __virtualname__
    return (
        False,
        "The influxdb execution module could not be loaded: influxdb library not available.",
    )


def _client(
    influxdb_user=None,
    influxdb_password=None,
    influxdb_host=None,
    influxdb_port=None,
    **client_args,
):
    if not influxdb_user:
        influxdb_user = __salt__["config.option"]("influxdb.user", "root")
    if not influxdb_password:
        influxdb_password = __salt__["config.option"]("influxdb.password", "root")
    if not influxdb_host:
        influxdb_host = __salt__["config.option"]("influxdb.host", "localhost")
    if not influxdb_port:
        influxdb_port = __salt__["config.option"]("influxdb.port", 8086)
    for ignore in _STATE_INTERNAL_KEYWORDS:
        if ignore in client_args:
            del client_args[ignore]
    return influxdb.InfluxDBClient(
        host=influxdb_host,
        port=influxdb_port,
        username=influxdb_user,
        password=influxdb_password,
        **client_args,
    )


[docs] def list_dbs(**client_args): """ List all InfluxDB databases. CLI Example: .. code-block:: bash salt '*' influxdb.list_dbs """ client = _client(**client_args) return client.get_list_database()
[docs] def db_exists(name, **client_args): """ Checks if a database exists in InfluxDB. name Name of the database to check. CLI Example: .. code-block:: bash salt '*' influxdb.db_exists <name> """ if name in [db["name"] for db in list_dbs(**client_args)]: return True return False
[docs] def create_db(name, **client_args): """ Create a database. name Name of the database to create. CLI Example: .. code-block:: bash salt '*' influxdb.create_db <name> """ if db_exists(name, **client_args): log.info("DB '%s' already exists", name) return False client = _client(**client_args) client.create_database(name) return True
[docs] def drop_db(name, **client_args): """ Drop a database. name Name of the database to drop. CLI Example: .. code-block:: bash salt '*' influxdb.drop_db <name> """ if not db_exists(name, **client_args): log.info("DB '%s' does not exist", name) return False client = _client(**client_args) client.drop_database(name) return True
[docs] def list_users(**client_args): """ List all users. CLI Example: .. code-block:: bash salt '*' influxdb.list_users """ client = _client(**client_args) return client.get_list_users()
[docs] def user_exists(name, **client_args): """ Check if a user exists. name Name of the user to check. CLI Example: .. code-block:: bash salt '*' influxdb.user_exists <name> """ if user_info(name, **client_args): return True return False
[docs] def user_info(name, **client_args): """ Get information about given user. name Name of the user for which to get information. CLI Example: .. code-block:: bash salt '*' influxdb.user_info <name> """ matching_users = (user for user in list_users(**client_args) if user.get("user") == name) try: return next(matching_users) except StopIteration: pass
[docs] def create_user(name, passwd, admin=False, **client_args): """ Create a user. name Name of the user to create. passwd Password of the new user. admin : False Whether the user should have cluster administration privileges or not. CLI Example: .. code-block:: bash salt '*' influxdb.create_user <name> <password> salt '*' influxdb.create_user <name> <password> admin=True """ if user_exists(name, **client_args): log.info("User '%s' already exists", name) return False client = _client(**client_args) client.create_user(name, passwd, admin) return True
[docs] def set_user_password(name, passwd, **client_args): """ Change password of a user. name Name of the user for whom to set the password. passwd New password of the user. CLI Example: .. code-block:: bash salt '*' influxdb.set_user_password <name> <password> """ if not user_exists(name, **client_args): log.info("User '%s' does not exist", name) return False client = _client(**client_args) client.set_user_password(name, passwd) return True
[docs] def grant_admin_privileges(name, **client_args): """ Grant cluster administration privileges to a user. name Name of the user to whom admin privileges will be granted. CLI Example: .. code-block:: bash salt '*' influxdb.grant_admin_privileges <name> """ client = _client(**client_args) client.grant_admin_privileges(name) return True
[docs] def revoke_admin_privileges(name, **client_args): """ Revoke cluster administration privileges from a user. name Name of the user from whom admin privileges will be revoked. CLI Example: .. code-block:: bash salt '*' influxdb.revoke_admin_privileges <name> """ client = _client(**client_args) client.revoke_admin_privileges(name) return True
[docs] def remove_user(name, **client_args): """ Remove a user. name Name of the user to remove CLI Example: .. code-block:: bash salt '*' influxdb.remove_user <name> """ if not user_exists(name, **client_args): log.info("User '%s' does not exist", name) return False client = _client(**client_args) client.drop_user(name) return True
[docs] def get_retention_policy(database, name, **client_args): """ Get an existing retention policy. database Name of the database for which the retention policy was defined. name Name of the retention policy. CLI Example: .. code-block:: bash salt '*' influxdb.get_retention_policy metrics default """ client = _client(**client_args) try: return next( p for p in client.get_list_retention_policies(database) if p.get("name") == name ) except StopIteration: return {}
[docs] def retention_policy_exists(database, name, **client_args): """ Check if retention policy with given name exists. database Name of the database for which the retention policy was defined. name Name of the retention policy to check. CLI Example: .. code-block:: bash salt '*' influxdb.retention_policy_exists metrics default """ if get_retention_policy(database, name, **client_args): return True return False
[docs] def drop_retention_policy(database, name, **client_args): """ Drop a retention policy. database Name of the database for which the retention policy will be dropped. name Name of the retention policy to drop. CLI Example: .. code-block:: bash salt '*' influxdb.drop_retention_policy mydb mypr """ client = _client(**client_args) client.drop_retention_policy(name, database) return True
[docs] def create_retention_policy(database, name, duration, replication, default=False, **client_args): """ Create a retention policy. database Name of the database for which the retention policy will be created. name Name of the new retention policy. duration Duration of the new retention policy. Durations such as 1h, 90m, 12h, 7d, and 4w, are all supported and mean 1 hour, 90 minutes, 12 hours, 7 day, and 4 weeks, respectively. For infinite retention – meaning the data will never be deleted – use 'INF' for duration. The minimum retention period is 1 hour. replication Replication factor of the retention policy. This determines how many independent copies of each data point are stored in a cluster. default : False Whether or not the policy as default will be set as default. CLI Example: .. code-block:: bash salt '*' influxdb.create_retention_policy metrics default 1d 1 """ client = _client(**client_args) client.create_retention_policy(name, duration, replication, database, default) return True
[docs] def alter_retention_policy(database, name, duration, replication, default=False, **client_args): """ Modify an existing retention policy. name Name of the retention policy to modify. database Name of the database for which the retention policy was defined. duration New duration of given retention policy. Durations such as 1h, 90m, 12h, 7d, and 4w, are all supported and mean 1 hour, 90 minutes, 12 hours, 7 day, and 4 weeks, respectively. For infinite retention – meaning the data will never be deleted – use 'INF' for duration. The minimum retention period is 1 hour. replication New replication of given retention policy. This determines how many independent copies of each data point are stored in a cluster. default : False Whether or not to set the modified policy as default. CLI Example: .. code-block:: bash salt '*' influxdb.alter_retention_policy metrics default 1d 1 """ client = _client(**client_args) client.alter_retention_policy(name, database, duration, replication, default) return True
[docs] def list_privileges(name, **client_args): """ List privileges from a user. name Name of the user from whom privileges will be listed. CLI Example: .. code-block:: bash salt '*' influxdb.list_privileges <name> """ client = _client(**client_args) res = {} for item in client.get_list_privileges(name): res[item["database"]] = item["privilege"].split()[0].lower() return res
[docs] def grant_privilege(database, privilege, username, **client_args): """ Grant a privilege on a database to a user. database Name of the database to grant the privilege on. privilege Privilege to grant. Can be one of 'read', 'write' or 'all'. username Name of the user to grant the privilege to. CLI Example: .. code-block:: bash salt '*' influxdb.grant_privilege db read user """ client = _client(**client_args) client.grant_privilege(privilege, database, username) return True
[docs] def revoke_privilege(database, privilege, username, **client_args): """ Revoke a privilege on a database from a user. database Name of the database to grant the privilege on. privilege Privilege to grant. Can be one of 'read', 'write' or 'all'. username Name of the user to grant the privilege to. CLI Example: .. code-block:: bash salt '*' influxdb.revoke_privilege db read user """ client = _client(**client_args) client.revoke_privilege(privilege, database, username) return True
[docs] def continuous_query_exists(database, name, **client_args): """ Check if continuous query with given name exists on the database. database Name of the database for which the continuous query was defined. name Name of the continuous query to check. CLI Example: .. code-block:: bash salt '*' influxdb.continuous_query_exists metrics default """ if get_continuous_query(database, name, **client_args): return True return False
[docs] def get_continuous_query(database, name, **client_args): """ Get an existing continuous query. database Name of the database for which the continuous query was defined. name Name of the continuous query to get. CLI Example: .. code-block:: bash salt '*' influxdb.get_continuous_query mydb cq_month """ client = _client(**client_args) try: for db, cqs in client.query("SHOW CONTINUOUS QUERIES").items(): if db[0] == database: return next(cq for cq in cqs if cq.get("name") == name) except StopIteration: return {} return {}
[docs] def create_continuous_query( database, name, query, resample_time=None, coverage_period=None, **client_args ): """ Create a continuous query. database Name of the database for which the continuous query will be created on. name Name of the continuous query to create. query The continuous query string. resample_time : None Duration between continuous query resampling. coverage_period : None Duration specifying time period per sample. CLI Example: .. code-block:: bash salt '*' influxdb.create_continuous_query mydb cq_month 'SELECT mean(*) INTO mydb.a_month.:MEASUREMENT FROM mydb.a_week./.*/ GROUP BY time(5m), *' """ client = _client(**client_args) full_query = "CREATE CONTINUOUS QUERY {name} ON {database}" if resample_time: full_query += " RESAMPLE EVERY {resample_time}" if coverage_period: full_query += " FOR {coverage_period}" full_query += " BEGIN {query} END" query = full_query.format( name=name, database=database, query=query, resample_time=resample_time, coverage_period=coverage_period, ) client.query(query) return True
[docs] def drop_continuous_query(database, name, **client_args): """ Drop a continuous query. database Name of the database for which the continuous query will be drop from. name Name of the continuous query to drop. CLI Example: .. code-block:: bash salt '*' influxdb.drop_continuous_query mydb my_cq """ client = _client(**client_args) query = f"DROP CONTINUOUS QUERY {name} ON {database}" client.query(query) return True
def _pull_query_results(resultset): """ Parses a ResultSet returned from InfluxDB into a dictionary of results, grouped by series names and optional JSON-encoded grouping tags. """ _results = collections.defaultdict(dict) for _header, _values in resultset.items(): _header, _group_tags = _header if _group_tags: _results[_header][salt.utils.json.dumps(_group_tags)] = list(_values) else: _results[_header] = list(_values) return dict(sorted(_results.items()))
[docs] def query(database, query, **client_args): """ Execute a query. database Name of the database to query on. query InfluxQL query string. CLI Example: .. code-block:: bash salt '*' influxdb.query mydb 'SELECT * FROM "foobar"' """ client = _client(**client_args) _result = client.query(query, database=database) if isinstance(_result, Sequence): return [_pull_query_results(_query_result) for _query_result in _result if _query_result] return [_pull_query_results(_result) if _result else {}]