"""
Minion data cache plugin for MySQL database.
.. versionadded:: 2018.3.0
It is up to the system administrator to set up and configure the MySQL
infrastructure. All is needed for this plugin is a working MySQL server.
.. warning::
The mysql.database and mysql.table_name will be directly added into certain
queries. Salt treats these as trusted input.
The module requires the database (default ``salt_cache``) to exist but creates
its own table if needed. The keys are indexed using the ``bank`` and
``etcd_key`` columns.
To enable this cache plugin, the master will need the python client for
MySQL installed. This can be easily installed with pip:
.. code-block:: bash
pip install pymysql
Optionally, depending on the MySQL agent configuration, the following values
could be set in the master config. These are the defaults:
.. code-block:: yaml
mysql.host: 127.0.0.1
mysql.port: 2379
mysql.user: None
mysql.password: None
mysql.database: salt_cache
mysql.table_name: cache
# This may be enabled to create a fresh connection on every call
mysql.fresh_connection: false
To use the mysql as a minion data cache backend, set the master ``cache`` config
value to ``mysql``:
.. code-block:: yaml
cache: mysql
.. _`MySQL documentation`: https://github.com/coreos/mysql
"""
import copy
import logging
import time
import salt.payload
import salt.utils.stringutils
from salt.exceptions import SaltCacheError
try:
# Trying to import MySQLdb
import MySQLdb
import MySQLdb.converters
import MySQLdb.cursors
from MySQLdb.connections import OperationalError
# Define the interface error as a subclass of exception
# It will never be thrown/used, it is defined to support the pymysql error below
class InterfaceError(Exception):
pass
except ImportError:
try:
# MySQLdb import failed, try to import PyMySQL
import pymysql
from pymysql.err import InterfaceError
pymysql.install_as_MySQLdb()
import MySQLdb
import MySQLdb.converters
import MySQLdb.cursors
from MySQLdb.err import OperationalError
except ImportError:
MySQLdb = None
_DEFAULT_DATABASE_NAME = "salt_cache"
_DEFAULT_CACHE_TABLE_NAME = "cache"
_RECONNECT_INTERVAL_SEC = 0.050
log = logging.getLogger(__name__)
# Module properties
__virtualname__ = "mysql"
__func_alias__ = {"ls": "list"}
[docs]
def __virtual__():
"""
Confirm that a python mysql client is installed.
"""
return bool(MySQLdb), "No python mysql client installed." if MySQLdb is None else ""
[docs]
def force_reconnect():
"""
Force a reconnection to the MySQL database, by removing the client from
Salt's __context__.
"""
__context__.pop("mysql_client", None)
[docs]
def run_query(conn, query, args=None, retries=3):
"""
Get a cursor and run a query. Reconnect up to ``retries`` times if
needed.
Returns: cursor, affected rows counter
Raises: SaltCacheError, AttributeError, OperationalError, InterfaceError
"""
if __context__.get("mysql_fresh_connection"):
# Create a new connection if configured
conn = MySQLdb.connect(**__context__["mysql_kwargs"])
__context__["mysql_client"] = conn
if conn is None:
conn = __context__.get("mysql_client")
try:
cur = conn.cursor()
if not args:
log.debug("Doing query: %s", query)
out = cur.execute(query)
else:
log.debug("Doing query: %s args: %s ", query, repr(args))
out = cur.execute(query, args)
return cur, out
except (AttributeError, OperationalError, InterfaceError) as e:
if retries == 0:
raise
# reconnect creating new client
time.sleep(_RECONNECT_INTERVAL_SEC)
if conn is None:
log.debug("mysql_cache: creating db connection")
else:
log.info("mysql_cache: recreating db connection due to: %r", e)
__context__["mysql_client"] = MySQLdb.connect(**__context__["mysql_kwargs"])
return run_query(
conn=__context__.get("mysql_client"),
query=query,
args=args,
retries=(retries - 1),
)
except Exception as e: # pylint: disable=broad-except
if len(query) > 150:
query = query[:150] + "<...>"
raise SaltCacheError(
"Error running {}{}: {}".format(query, f"- args: {args}" if args else "", e)
) from e
def _create_table():
"""
Create table if needed
"""
# Explicitly check if the table already exists as the library logs a
# warning on CREATE TABLE
query = """SELECT COUNT(TABLE_NAME) FROM information_schema.tables
WHERE table_schema = %s AND table_name = %s"""
cur, _ = run_query(
__context__.get("mysql_client"),
query,
args=(__context__["mysql_kwargs"]["db"], __context__["mysql_table_name"]),
)
r = cur.fetchone()
cur.close()
if r[0] == 1:
query = """
SELECT COUNT(TABLE_NAME)
FROM
information_schema.columns
WHERE
table_schema = %s
AND table_name = %s
AND column_name = 'last_update'
"""
cur, _ = run_query(
__context__["mysql_client"],
query,
args=(__context__["mysql_kwargs"]["db"], __context__["mysql_table_name"]),
)
r = cur.fetchone()
cur.close()
if r[0] == 1:
return
else:
query = """
ALTER TABLE {}.{}
ADD COLUMN last_update TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP
""".format(
__context__["mysql_kwargs"]["db"], __context__["mysql_table_name"]
)
cur, _ = run_query(__context__["mysql_client"], query)
cur.close()
return
query = """CREATE TABLE IF NOT EXISTS {} (
bank CHAR(255),
etcd_key CHAR(255),
data MEDIUMBLOB,
last_update TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(bank, etcd_key)
);""".format(
__context__["mysql_table_name"]
)
log.info("mysql_cache: creating table %s", __context__["mysql_table_name"])
cur, _ = run_query(__context__.get("mysql_client"), query)
cur.close()
def _init_client():
"""Initialize connection and create table if needed"""
if __context__.get("mysql_client") is not None:
return
opts = copy.deepcopy(__opts__)
mysql_kwargs = {
"autocommit": True,
"host": opts.pop("mysql.host", "127.0.0.1"),
"user": opts.pop("mysql.user", None),
"passwd": opts.pop("mysql.password", None),
"db": opts.pop("mysql.database", _DEFAULT_DATABASE_NAME),
"port": opts.pop("mysql.port", 3306),
"unix_socket": opts.pop("mysql.unix_socket", None),
"connect_timeout": opts.pop("mysql.connect_timeout", None),
}
mysql_kwargs["autocommit"] = True
__context__["mysql_table_name"] = opts.pop("mysql.table_name", "salt")
__context__["mysql_fresh_connection"] = opts.pop("mysql.fresh_connection", False)
# Gather up any additional MySQL configuration options
for k in opts:
if k.startswith("mysql."):
_key = k.split(".")[1]
mysql_kwargs[_key] = opts.get(k)
# TODO: handle SSL connection parameters
for k, v in copy.deepcopy(mysql_kwargs).items():
if v is None:
mysql_kwargs.pop(k)
kwargs_copy = mysql_kwargs.copy()
kwargs_copy["passwd"] = "<hidden>"
log.info("mysql_cache: Setting up client with params: %r", kwargs_copy)
__context__["mysql_kwargs"] = mysql_kwargs
# The MySQL client is created later on by run_query
_create_table()
[docs]
def store(bank, key, data):
"""
Store a key value.
"""
_init_client()
data = salt.payload.dumps(data)
query = "REPLACE INTO {} (bank, etcd_key, data) values(%s,%s,%s)".format(
__context__["mysql_table_name"]
)
args = (bank, key, data)
cur, cnt = run_query(__context__.get("mysql_client"), query, args=args)
cur.close()
if cnt not in (1, 2):
raise SaltCacheError(f"Error storing {bank} {key} returned {cnt}")
[docs]
def fetch(bank, key):
"""
Fetch a key value.
"""
_init_client()
query = "SELECT data FROM {} WHERE bank=%s AND etcd_key=%s".format(
__context__["mysql_table_name"]
)
cur, _ = run_query(__context__.get("mysql_client"), query, args=(bank, key))
r = cur.fetchone()
cur.close()
if r is None:
return {}
return salt.payload.loads(r[0])
[docs]
def flush(bank, key=None):
"""
Remove the key from the cache bank with all the key content.
"""
_init_client()
query = "DELETE FROM {} WHERE bank=%s".format(__context__["mysql_table_name"])
if key is None:
data = (bank,)
else:
data = (bank, key)
query += " AND etcd_key=%s"
cur, _ = run_query(__context__.get("mysql_client"), query, args=data)
cur.close()
[docs]
def ls(bank):
"""
Return an iterable object containing all entries stored in the specified
bank.
"""
_init_client()
query = "SELECT etcd_key FROM {} WHERE bank=%s".format(__context__["mysql_table_name"])
cur, _ = run_query(__context__.get("mysql_client"), query, args=(bank,))
out = [row[0] for row in cur.fetchall()]
cur.close()
return out
[docs]
def contains(bank, key):
"""
Checks if the specified bank contains the specified key.
"""
_init_client()
if key is None:
data = (bank,)
query = "SELECT COUNT(data) FROM {} WHERE bank=%s".format(__context__["mysql_table_name"])
else:
data = (bank, key)
query = "SELECT COUNT(data) FROM {} WHERE bank=%s AND etcd_key=%s".format(
__context__["mysql_table_name"]
)
cur, _ = run_query(__context__.get("mysql_client"), query, args=data)
r = cur.fetchone()
cur.close()
return r[0] == 1
[docs]
def updated(bank, key):
"""
Return the integer Unix epoch update timestamp of the specified bank and
key.
"""
_init_client()
query = "SELECT UNIX_TIMESTAMP(last_update) FROM {} WHERE bank=%s " "AND etcd_key=%s".format(
__context__["mysql_table_name"]
)
data = (bank, key)
cur, _ = run_query(__context__.get("mysql_client"), query=query, args=data)
r = cur.fetchone()
cur.close()
return int(r[0]) if r else r