Source code for saltext.influxdb.returners.influxdb_return

"""
Return data to an InfluxDB <v2 server.

.. important::
    Using this returner requires :ref:`minion configuration <influxdb-returner-setup>`.

Usage
-----

To use the influxdb returner, append '--return influxdb' to the Salt command.

.. code-block:: bash

    salt '*' test.ping --return influxdb

To use the alternative configuration, append '--return_config alternative' to the Salt command.

.. code-block:: bash

    salt '*' test.ping --return influxdb --return_config alternative

To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the Salt command.

.. code-block:: bash

    salt '*' test.ping --return influxdb --return_kwargs '{"db": "another-salt"}'

"""

import logging

import requests
import salt.returners
import salt.utils.jid
from salt.utils.decorators import memoize

try:
    import influxdb
    import influxdb.influxdb08

    HAS_INFLUXDB = True
except ImportError:
    HAS_INFLUXDB = False

# HTTP API header used to check the InfluxDB version
influxDBVersionHeader = "X-Influxdb-Version"  # pylint: disable=invalid-name

log = logging.getLogger(__name__)

__virtualname__ = "influxdb"


def __virtual__():
    if not HAS_INFLUXDB:
        return (
            False,
            "Could not import influxdb returner; influxdb python client is not installed.",
        )
    return __virtualname__


def _get_options(ret=None):
    """
    Get the influxdb options from salt.
    """
    attrs = {
        "host": "host",
        "port": "port",
        "db": "db",
        "user": "user",
        "password": "password",
    }

    _options = salt.returners.get_returner_options(
        __virtualname__, ret, attrs, __salt__=__salt__, __opts__=__opts__
    )
    return _options


@memoize
def _get_version(host, port, user, password):
    version = None
    # check the InfluxDB version via the HTTP API
    try:
        result = requests.get(f"http://{host}:{port}/ping", auth=(user, password), timeout=120)
        if influxDBVersionHeader in result.headers:
            version = result.headers[influxDBVersionHeader]
    except Exception as ex:  # pylint: disable=broad-except
        log.critical(
            "Failed to query InfluxDB version from HTTP API within InfluxDB returner: %s",
            ex,
        )
    return version


def _get_serv(ret=None):
    """
    Return an influxdb client object
    """
    _options = _get_options(ret)
    host = _options.get("host")
    port = _options.get("port")
    database = _options.get("db")
    user = _options.get("user")
    password = _options.get("password")
    version = _get_version(host, port, user, password)

    if version and "v0.8" in version:
        return influxdb.influxdb08.InfluxDBClient(
            host=host, port=port, username=user, password=password, database=database
        )
    else:
        return influxdb.InfluxDBClient(
            host=host, port=port, username=user, password=password, database=database
        )


[docs] def returner(ret): """ Return data to a influxdb data store """ serv = _get_serv(ret) # strip the 'return' key to avoid data duplication in the database json_return = salt.utils.json.dumps(ret["return"]) del ret["return"] json_full_ret = salt.utils.json.dumps(ret) # create legacy request in case an InfluxDB 0.8.x version is used if "influxdb08" in serv.__module__: req = [ { "name": "returns", "columns": ["fun", "id", "jid", "return", "full_ret"], "points": [[ret["fun"], ret["id"], ret["jid"], json_return, json_full_ret]], } ] # create InfluxDB 0.9+ version request else: req = [ { "measurement": "returns", "tags": {"fun": ret["fun"], "id": ret["id"], "jid": ret["jid"]}, "fields": {"return": json_return, "full_ret": json_full_ret}, } ] try: serv.write_points(req) except Exception as ex: # pylint: disable=broad-except log.critical("Failed to store return with InfluxDB returner: %s", ex)
[docs] def save_load(jid, load, minions=None): # pylint: disable=unused-argument """ Save the load to the specified jid """ serv = _get_serv(ret=None) # create legacy request in case an InfluxDB 0.8.x version is used if "influxdb08" in serv.__module__: req = [ { "name": "jids", "columns": ["jid", "load"], "points": [[jid, salt.utils.json.dumps(load)]], } ] # create InfluxDB 0.9+ version request else: req = [ { "measurement": "jids", "tags": {"jid": jid}, "fields": {"load": salt.utils.json.dumps(load)}, } ] try: serv.write_points(req) except Exception as ex: # pylint: disable=broad-except log.critical("Failed to store load with InfluxDB returner: %s", ex)
[docs] def save_minions(jid, minions, syndic_id=None): # pylint: disable=unused-argument """ Included for API consistency """
[docs] def get_load(jid): """ Return the load data that marks a specified jid """ serv = _get_serv(ret=None) sql = f"select load from jids where jid = '{jid}'" log.debug(">> Now in get_load %s", jid) data = serv.query(sql) log.debug(">> Now Data: %s", data) if data: return data return {}
[docs] def get_jid(jid): """ Return the information returned when the specified job id was executed """ serv = _get_serv(ret=None) sql = f"select id, full_ret from returns where jid = '{jid}'" data = serv.query(sql) ret = {} if data: points = data[0]["points"] for point in points: ret[point[3]] = salt.utils.json.loads(point[2]) return ret
[docs] def get_fun(fun): """ Return a dict of the last function called for all minions """ serv = _get_serv(ret=None) sql = """select first(id) as fid, first(full_ret) as fret from returns where fun = '{}' group by fun, id """.format( fun ) data = serv.query(sql) ret = {} if data: points = data[0]["points"] for point in points: ret[point[1]] = salt.utils.json.loads(point[2]) return ret
[docs] def get_jids(): """ Return a list of all job ids """ serv = _get_serv(ret=None) sql = "select distinct(jid) from jids group by load" # [{u'points': [[0, jid, load], # [0, jid, load]], # u'name': u'jids', # u'columns': [u'time', u'distinct', u'load']}] data = serv.query(sql) ret = {} if data: for _, jid, load in data[0]["points"]: ret[jid] = salt.utils.jid.format_jid_instance(jid, salt.utils.json.loads(load)) return ret
[docs] def get_minions(): """ Return a list of minions """ serv = _get_serv(ret=None) sql = "select distinct(id) from returns" data = serv.query(sql) ret = [] if data: for jid in data[0]["points"]: ret.append(jid[1]) return ret
[docs] def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument """ Do any work necessary to prepare a JID, including sending a custom id """ return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)