Source code for saltext.mysql.returners.mysql

"""
Return data to a mysql server

:maintainer:    Dave Boucha <dave@saltstack.com>, Seth House <shouse@saltstack.com>
:maturity:      mature
:depends:       python-mysqldb
:platform:      all

To enable this returner, the minion will need the python client for mysql
installed and the following values configured in the minion or master
config. These are the defaults:

.. code-block:: yaml

    mysql.host: 'salt'
    mysql.user: 'salt'
    mysql.pass: 'salt'
    mysql.db: 'salt'
    mysql.port: 3306

SSL is optional. The defaults are set to None. If you do not want to use SSL,
either exclude these options or set them to None.

.. code-block:: yaml

    mysql.ssl_ca: None
    mysql.ssl_cert: None
    mysql.ssl_key: None

Alternative configuration values can be used by prefacing the configuration
with `alternative.`. Any values not found in the alternative configuration will
be pulled from the default location. As stated above, SSL configuration is
optional. The following ssl options are simply for illustration purposes:

.. code-block:: yaml

    alternative.mysql.host: 'salt'
    alternative.mysql.user: 'salt'
    alternative.mysql.pass: 'salt'
    alternative.mysql.db: 'salt'
    alternative.mysql.port: 3306
    alternative.mysql.ssl_ca: '/etc/pki/mysql/certs/localhost.pem'
    alternative.mysql.ssl_cert: '/etc/pki/mysql/certs/localhost.crt'
    alternative.mysql.ssl_key: '/etc/pki/mysql/certs/localhost.key'

Should you wish the returner data to be cleaned out every so often, set
`keep_jobs_seconds` to the number of hours for the jobs to live in the
tables.  Setting it to `0` will cause the data to stay in the tables. The
default setting for `keep_jobs_seconds` is set to `86400`.

Should you wish to archive jobs in a different table for later processing,
set `archive_jobs` to True.  Salt will create 3 archive tables

- `jids_archive`
- `salt_returns_archive`
- `salt_events_archive`

and move the contents of `jids`, `salt_returns`, and `salt_events` that are
more than `keep_jobs_seconds` seconds old to these tables.

Use the following mysql database schema:

.. code-block:: sql

    CREATE DATABASE  `salt`
      DEFAULT CHARACTER SET utf8
      DEFAULT COLLATE utf8_general_ci;

    USE `salt`;

    --
    -- Table structure for table `jids`
    --

    DROP TABLE IF EXISTS `jids`;
    CREATE TABLE `jids` (
      `jid` varchar(255) NOT NULL,
      `load` mediumtext NOT NULL,
      UNIQUE KEY `jid` (`jid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    --
    -- Table structure for table `salt_returns`
    --

    DROP TABLE IF EXISTS `salt_returns`;
    CREATE TABLE `salt_returns` (
      `fun` varchar(50) NOT NULL,
      `jid` varchar(255) NOT NULL,
      `return` mediumtext NOT NULL,
      `id` varchar(255) NOT NULL,
      `success` varchar(10) NOT NULL,
      `full_ret` mediumtext NOT NULL,
      `alter_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
      KEY `id` (`id`),
      KEY `jid` (`jid`),
      KEY `fun` (`fun`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    --
    -- Table structure for table `salt_events`
    --

    DROP TABLE IF EXISTS `salt_events`;
    CREATE TABLE `salt_events` (
    `id` BIGINT NOT NULL AUTO_INCREMENT,
    `tag` varchar(255) NOT NULL,
    `data` mediumtext NOT NULL,
    `alter_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    `master_id` varchar(255) NOT NULL,
    PRIMARY KEY (`id`),
    KEY `tag` (`tag`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Required python modules: MySQLdb

To use the mysql returner, append '--return mysql' to the salt command.

.. code-block:: bash

    salt '*' test.ping --return mysql

To use the alternative configuration, append '--return_config alternative' to the salt command.

.. versionadded:: 2015.5.0

.. code-block:: bash

    salt '*' test.ping --return mysql --return_config alternative

To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.

.. versionadded:: 2016.3.0

.. code-block:: bash

    salt '*' test.ping --return mysql --return_kwargs '{"db": "another-salt"}'

"""

import logging
import sys
from contextlib import contextmanager

import salt.exceptions
import salt.returners
import salt.utils.data
import salt.utils.job
import salt.utils.json

try:
    # Trying to import MySQLdb
    import MySQLdb
    import MySQLdb.converters
    import MySQLdb.cursors
    from MySQLdb.connections import OperationalError
except ImportError:
    try:
        # MySQLdb import failed, try to import PyMySQL
        import pymysql

        pymysql.install_as_MySQLdb()
        import MySQLdb
        import MySQLdb.converters
        import MySQLdb.cursors
        from MySQLdb.err import OperationalError
    except ImportError:
        MySQLdb = None

log = logging.getLogger(__name__)

# Define the module's virtual name
__virtualname__ = "mysql"


[docs] def __virtual__(): """ Confirm that a python mysql client is installed. """ return bool(MySQLdb), "No python mysql client installed." if MySQLdb is None else ""
def _get_options(ret=None): """ Returns options used for the MySQL connection. """ defaults = { "host": "salt", "user": "salt", "pass": "salt", "db": "salt", "port": 3306, "ssl_ca": None, "ssl_cert": None, "ssl_key": None, } attrs = { "host": "host", "user": "user", "pass": "pass", "db": "db", "port": "port", "ssl_ca": "ssl_ca", "ssl_cert": "ssl_cert", "ssl_key": "ssl_key", } _options = salt.returners.get_returner_options( __virtualname__, ret, attrs, __salt__=__salt__, __opts__=__opts__, defaults=defaults, ) # post processing for k, v in _options.items(): if isinstance(v, str) and v.lower() == "none": # Ensure 'None' is rendered as None _options[k] = None if k == "port": # Ensure port is an int _options[k] = int(v) return _options @contextmanager def _get_serv(ret=None, commit=False): """ Return a mysql cursor """ _options = _get_options(ret) connect = True if __context__ and "mysql_returner_conn" in __context__: try: log.debug("Trying to reuse MySQL connection pool") conn = __context__["mysql_returner_conn"] conn.ping() connect = False except OperationalError as exc: log.debug("OperationalError on ping: %s", exc) if connect: log.debug("Generating new MySQL connection pool") try: # An empty ssl_options dictionary passed to MySQLdb.connect will # effectively connect w/o SSL. ssl_options = {} if _options.get("ssl_ca"): ssl_options["ca"] = _options.get("ssl_ca") if _options.get("ssl_cert"): ssl_options["cert"] = _options.get("ssl_cert") if _options.get("ssl_key"): ssl_options["key"] = _options.get("ssl_key") conn = MySQLdb.connect( host=_options.get("host"), user=_options.get("user"), passwd=_options.get("pass"), db=_options.get("db"), port=_options.get("port"), ssl=ssl_options, ) try: __context__["mysql_returner_conn"] = conn except TypeError: pass except OperationalError as exc: raise salt.exceptions.SaltMasterError( f"MySQL returner could not connect to database: {exc}" ) cursor = conn.cursor() try: yield cursor except MySQLdb.DatabaseError as err: error = err.args sys.stderr.write(str(error)) cursor.execute("ROLLBACK") raise else: if commit: cursor.execute("COMMIT") else: cursor.execute("ROLLBACK")
[docs] def returner(ret): """ Return data to a mysql server """ # if a minion is returning a standalone job, get a jobid if ret["jid"] == "req": ret["jid"] = prep_jid(nocache=ret.get("nocache", False)) save_load(ret["jid"], ret) try: with _get_serv(ret, commit=True) as cur: sql = """INSERT INTO `salt_returns` (`fun`, `jid`, `return`, `id`, `success`, `full_ret`) VALUES (%s, %s, %s, %s, %s, %s)""" cleaned_return = salt.utils.data.decode(ret) cur.execute( sql, ( ret["fun"], ret["jid"], salt.utils.json.dumps(cleaned_return["return"]), ret["id"], ret.get("success", False), salt.utils.json.dumps(cleaned_return), ), ) except salt.exceptions.SaltMasterError as exc: log.critical(exc) log.critical("Could not store return with MySQL returner. MySQL server unavailable.")
[docs] def event_return(events): """ Return event to mysql server Requires that configuration be enabled via 'event_return' option in master config. """ with _get_serv(events, commit=True) as cur: for event in events: tag = event.get("tag", "") data = event.get("data", "") sql = """INSERT INTO `salt_events` (`tag`, `data`, `master_id`) VALUES (%s, %s, %s)""" cur.execute(sql, (tag, salt.utils.json.dumps(data), __opts__["id"]))
# pylint: disable=unused-argument
[docs] def save_load(jid, load, minions=None): """ Save the load to the specified jid id """ with _get_serv(commit=True) as cur: sql = """INSERT INTO `jids` (`jid`, `load`) VALUES (%s, %s)""" json_data = salt.utils.json.dumps(salt.utils.data.decode(load)) try: cur.execute(sql, (jid, json_data)) except MySQLdb.IntegrityError: # https://github.com/saltstack/salt/issues/22171 # Without this try/except we get tons of duplicate entry errors # which result in job returns not being stored properly pass
[docs] def save_minions(jid, minions, syndic_id=None): # pylint: disable=unused-argument """ Included for API consistency """
[docs] def get_load(jid): """ Return the load data that marks a specified jid """ with _get_serv(ret=None, commit=True) as cur: sql = """SELECT `load` FROM `jids` WHERE `jid` = %s;""" cur.execute(sql, (jid,)) data = cur.fetchone() if data: return salt.utils.json.loads(data[0]) return {}
[docs] def get_jid(jid): """ Return the information returned when the specified job id was executed """ with _get_serv(ret=None, commit=True) as cur: sql = """SELECT id, full_ret FROM `salt_returns` WHERE `jid` = %s""" cur.execute(sql, (jid,)) data = cur.fetchall() ret = {} if data: for minion, full_ret in data: ret[minion] = salt.utils.json.loads(full_ret) return ret
[docs] def get_fun(fun): """ Return a dict of the last function called for all minions """ with _get_serv(ret=None, commit=True) as cur: sql = """SELECT s.id,s.jid, s.full_ret FROM `salt_returns` s JOIN ( SELECT MAX(`jid`) as jid from `salt_returns` GROUP BY fun, id) max ON s.jid = max.jid WHERE s.fun = %s """ cur.execute(sql, (fun,)) data = cur.fetchall() ret = {} if data: for minion, _, full_ret in data: ret[minion] = salt.utils.json.loads(full_ret) return ret
[docs] def get_jids(): """ Return a list of all job ids """ with _get_serv(ret=None, commit=True) as cur: sql = """SELECT DISTINCT `jid`, `load` FROM `jids`""" cur.execute(sql) data = cur.fetchall() ret = {} for jid in data: ret[jid[0]] = salt.utils.jid.format_jid_instance(jid[0], salt.utils.json.loads(jid[1])) return ret
[docs] def get_jids_filter(count, filter_find_job=True): """ Return a list of all job ids :param int count: show not more than the count of most recent jobs :param bool filter_find_jobs: filter out 'saltutil.find_job' jobs """ with _get_serv(ret=None, commit=True) as cur: sql = """SELECT * FROM ( SELECT DISTINCT `jid` ,`load` FROM `jids` {0} ORDER BY `jid` DESC limit {1} ) `tmp` ORDER BY `jid`;""" where = """WHERE `load` NOT LIKE '%"fun": "saltutil.find_job"%' """ cur.execute(sql.format(where if filter_find_job else "", count)) data = cur.fetchall() ret = [] for jid in data: ret.append( salt.utils.jid.format_jid_instance_ext(jid[0], salt.utils.json.loads(jid[1])) ) return ret
[docs] def get_minions(): """ Return a list of minions """ with _get_serv(ret=None, commit=True) as cur: sql = """SELECT DISTINCT id FROM `salt_returns`""" cur.execute(sql) data = cur.fetchall() ret = [] for minion in data: ret.append(minion[0]) return ret
[docs] def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument """ Do any work necessary to prepare a JID, including sending a custom id """ return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)
def _purge_jobs(timestamp): """ Purge records from the returner tables. :param job_age_in_seconds: Purge jobs older than this :return: """ with _get_serv() as cur: try: sql = ( "delete from `jids` where jid in (select distinct jid from salt_returns" " where alter_time < %s)" ) cur.execute(sql, (timestamp,)) cur.execute("COMMIT") except MySQLdb.Error as e: log.error("mysql returner archiver was unable to delete contents of table 'jids'") log.error(str(e)) raise salt.exceptions.SaltRunnerError(str(e)) try: sql = "delete from `salt_returns` where alter_time < %s" cur.execute(sql, (timestamp,)) cur.execute("COMMIT") except MySQLdb.Error as e: log.error( "mysql returner archiver was unable to delete contents of table 'salt_returns'" ) log.error(str(e)) raise salt.exceptions.SaltRunnerError(str(e)) try: sql = "delete from `salt_events` where alter_time < %s" cur.execute(sql, (timestamp,)) cur.execute("COMMIT") except MySQLdb.Error as e: log.error( "mysql returner archiver was unable to delete contents of table 'salt_events'" ) log.error(str(e)) raise salt.exceptions.SaltRunnerError(str(e)) return True def _archive_jobs(timestamp): """ Copy rows to a set of backup tables, then purge rows. :param timestamp: Archive rows older than this timestamp :return: """ source_tables = ["jids", "salt_returns", "salt_events"] with _get_serv() as cur: target_tables = {} for table_name in source_tables: try: tmp_table_name = table_name + "_archive" sql = f"create table if not exists {tmp_table_name} like {table_name}" cur.execute(sql) cur.execute("COMMIT") target_tables[table_name] = tmp_table_name except MySQLdb.Error as e: log.error("mysql returner archiver was unable to create the archive tables.") log.error(str(e)) raise salt.exceptions.SaltRunnerError(str(e)) try: sql = ( "insert into `{}` select * from `{}` where jid in (select distinct jid" " from salt_returns where alter_time < %s)".format(target_tables["jids"], "jids") ) cur.execute(sql, (timestamp,)) cur.execute("COMMIT") except MySQLdb.Error as e: log.error("mysql returner archiver was unable to copy contents of table 'jids'") log.error(str(e)) raise salt.exceptions.SaltRunnerError(str(e)) except Exception as e: # pylint: disable=broad-except log.error(e) raise try: sql = "insert into `{}` select * from `{}` where alter_time < %s".format( target_tables["salt_returns"], "salt_returns" ) cur.execute(sql, (timestamp,)) cur.execute("COMMIT") except MySQLdb.Error as e: log.error("mysql returner archiver was unable to copy contents of table 'salt_returns'") log.error(str(e)) raise salt.exceptions.SaltRunnerError(str(e)) try: sql = "insert into `{}` select * from `{}` where alter_time < %s".format( target_tables["salt_events"], "salt_events" ) cur.execute(sql, (timestamp,)) cur.execute("COMMIT") except MySQLdb.Error as e: log.error("mysql returner archiver was unable to copy contents of table 'salt_events'") log.error(str(e)) raise salt.exceptions.SaltRunnerError(str(e)) return _purge_jobs(timestamp)
[docs] def clean_old_jobs(): """ Called in the master's event loop every loop_interval. Archives and/or deletes the events and job details from the database. :return: """ keep_jobs_seconds = int(salt.utils.job.get_keep_jobs_seconds(__opts__)) if keep_jobs_seconds > 0: try: with _get_serv() as cur: sql = "select date_sub(now(), interval {} second) as stamp;".format( keep_jobs_seconds ) cur.execute(sql) rows = cur.fetchall() stamp = rows[0][0] if __opts__.get("archive_jobs", False): _archive_jobs(stamp) else: _purge_jobs(stamp) except MySQLdb.Error as e: log.error("Mysql returner was unable to get timestamp for purge/archive of jobs") log.error(str(e)) raise salt.exceptions.SaltRunnerError(str(e))