"""
Manage snaps, plugs and snap services statefully.
"""
import json
import logging
import time
from pathlib import Path
from salt.defaults import NOT_SET
from salt.exceptions import CommandExecutionError
from salt.exceptions import SaltInvocationError
log = logging.getLogger(__name__)
__virtualname__ = "snap"
[docs]
class NoServices(CommandExecutionError):
"""
Raised when no services to manage can be found
"""
def __virtual__():
try:
__salt__["snap.list"] # pylint: disable=pointless-statement
return __virtualname__
except KeyError:
return False, "Did not find `snap` execution module"
[docs]
def connected(name, connector, target=None):
"""
Ensure a snap's plug is connected.
name
The name of the snap to connect.
connector
The name of the snap's plug to connect.
target
A specification for the slot to connect to. Optional.
Full spec: ``<snap_name>:<slot_name>``.
If left unspecified, connects the plug to a slot of the
core snap with a name matching ``plug``.
If specified as a snap name only, connects the plug to
the only slot in the provided snap that matches the connection
interface, but fails if multiple potential slots exist.
"""
ret = {
"name": name,
"result": True,
"comment": "The connection is already established",
"changes": {},
}
try:
if target is None:
target = f"core:{connector}"
elif ":" not in target:
try:
interface = __salt__["snap.plugs"](name)[connector]["interface"]
except KeyError as err:
raise SaltInvocationError(
f"The snap '{name}' does not have a plug named '{connector}'"
) from err
try:
possible_slots = __salt__["snap.slots"](target, interface=interface)
except CommandExecutionError as err:
if "is not installed" not in str(err) or not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
if len(possible_slots) != 1:
raise SaltInvocationError(
f"The target snap '{target}' does not expose exactly one slot with interface type '{interface}', but {len(possible_slots)}"
)
target = f"{target}:{next(iter(possible_slots))}"
try:
curr = __salt__["snap.connections"](name)
except CommandExecutionError as err:
if "is not installed" not in str(err) or not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
target_snap, target_slot = target.split(":", maxsplit=1)
for conn in curr["established"]:
if (
conn["plug"]["plug"] == connector
and conn["slot"]["snap"] == target_snap
and conn["slot"]["slot"] == target_slot
):
return ret
if __opts__["test"]:
ret["result"] = None
ret["comment"] = (
f"Would have connected plug {name}:{connector} to slot "
f"{target_snap}:{target_slot}"
)
ret["changes"]["connected"] = f"{target_snap}:{target_slot}"
return ret
__salt__["snap.connect"](name, connector, target=target)
curr_new = __salt__["snap.connections"](name)
for conn in curr_new["established"]:
if (
conn["plug"]["plug"] == connector
and conn["slot"]["snap"] == target_snap
and conn["slot"]["slot"] == target_slot
):
ret["comment"] = (
f"Connected plug {name}:{connector} to slot {target_snap}:{target_slot}"
)
ret["changes"]["connected"] = f"{target_snap}:{target_slot}"
return ret
raise CommandExecutionError("Tried to connect the plug, but the connection is not reported")
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
ret["changes"] = {}
return ret
[docs]
def disconnected(name, connector, target=None):
"""
Ensure a snap's plug(s)/slot(s) is/are connected.
name
The name of the snap.
connector
The name of the snap's plug/slot to disconnect.
target
A specification for the slot/plug to disconnect from in the format
`<snap_name>:<slot_or_plug_name>``. Optional.
If left unspecified, disconnects all connections of the plug/slot.
"""
ret = {"name": name, "result": True, "comment": "The connection is already cut", "changes": {}}
try:
try:
if __salt__["snap.plugs"](name, connector):
typ = "slot"
this = "plug"
elif __salt__["snap.slots"](name, connector):
typ = "plug"
this = "slot"
else:
raise SaltInvocationError(
"The snap carries neither a slot nor a plug with this name"
)
except CommandExecutionError as err:
if "is not installed" not in str(err) or not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
curr = __salt__["snap.connections"](name)
disconnect = []
target_snap = target_entity = None
if target:
target_snap, target_entity = target.split(":", maxsplit=1)
for conn in curr["established"]:
if conn[this]["snap"] != name or conn[this][this] != connector:
continue
if target and (conn[typ]["snap"] != target_snap or conn[typ][typ] != target_entity):
continue
disconnect.append((conn[typ]["snap"], conn[typ][typ]))
if not disconnect:
return ret
if __opts__["test"]:
ret["result"] = None
ret["comment"] = f"Would have disconnected some {typ}s"
ret["changes"]["disconnected"] = {
f"{typ}s": [f"{tgt}:{ent}" for tgt, ent in disconnect]
}
return ret
__salt__["snap.disconnect"](name, connector, target=target)
curr_new = __salt__["snap.connections"](name)
still_present = []
for conn in curr_new["established"]:
if conn[this]["snap"] != name or conn[this][this] != connector:
continue
if target and (conn[typ]["snap"] != target_snap or conn[typ][typ] != target_entity):
continue
still_present.append((conn[typ]["snap"], conn[typ][typ]))
actually_disconnected = [
f"{tgt}:{ent}" for tgt, ent in set(disconnect).difference(still_present)
]
if actually_disconnected:
ret["changes"]["disconnected"] = {
f"{typ}s": [
f"{tgt}:{ent}" for tgt, ent in set(disconnect).difference(still_present)
]
}
if still_present:
ret["result"] = False
ret["comment"] = f"Tried to disconnect some {typ}s, but some connections remained"
return ret
ret["comment"] = f"Disconnected some {typ}s"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
ret["changes"] = {}
return ret
[docs]
def enabled(name):
"""
Ensure a snap is enabled.
name
The name of the snap.
"""
ret = {"name": name, "result": True, "comment": "The snap is already enabled", "changes": {}}
try:
try:
if __salt__["snap.is_enabled"](name):
return ret
except CommandExecutionError as err:
if "is not installed" not in str(err) or not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
ret["changes"]["enabled"] = name
if __opts__["test"]:
ret["result"] = None
ret["comment"] = "Would have enabled the snap"
return ret
__salt__["snap.enable"](name)
if not __salt__["snap.is_enabled"](name):
raise CommandExecutionError("Tried to enable the snap, but it is still disabled")
ret["comment"] = "Enabled the snap"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
ret["changes"] = {}
return ret
[docs]
def disabled(name):
"""
Ensure a snap is disabled.
name
The name of the snap.
"""
ret = {"name": name, "result": True, "comment": "The snap is already disabled", "changes": {}}
try:
try:
if not __salt__["snap.is_enabled"](name):
return ret
except CommandExecutionError as err:
if "is not installed" not in str(err) or not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
ret["changes"]["disabled"] = name
if __opts__["test"]:
ret["result"] = None
ret["comment"] = "Would have disabled the snap"
return ret
__salt__["snap.disable"](name)
if __salt__["snap.is_enabled"](name):
raise CommandExecutionError("Tried to disable the snap, but it is still enabled")
ret["comment"] = "Disabled the snap"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
ret["changes"] = {}
return ret
[docs]
def installed(
name, channel="latest/stable", revision=None, classic=False, held=None, assertions=None
):
"""
Ensure a snap is installed.
name
The name of the snap or a path to a local ``.snap`` file.
In the latter case, ensure you also possess the accompanying
assertions and specify the ``assertions`` parameter or import
them in some other way before this state runs.
channel
Follow this channel instead of ``latest/stable``.
revision
Install this revision instead of the latest one available
in the channel. Optional. If unset, will not upgrade after
installation. Set this to ``latest`` to ensure this snap
is kept up to date.
classic
Enable classic mode and disable security confinement.
Defaults to false.
held
Whether the snap should be excluded from general refreshes.
Optional. If unset, leaves the setting unmanaged.
assertions
When installing a snap from a file, import accompanying assertions
from this file. Optional (if the assertions are imported otherwise).
"""
def _check_changes(curr, isfile):
nonlocal revision
changes = {}
if not isfile:
if channel and curr["channel"] != channel:
changes["channel"] = {"old": curr["channel"], "new": channel}
if revision == "latest":
upgrades = __salt__["snap.list_upgrades"]()
if name in upgrades:
revision = upgrades[name]["revision"]
else:
revision = None
if held is not None and curr["held"] is not held:
changes["held"] = {"old": curr["held"], "new": held}
if revision is not None and curr["revision"] != str(revision):
changes["revision"] = {"old": curr["revision"], "new": str(revision)}
return changes
ret = {
"name": name,
"result": True,
"comment": "The snap is already installed as specified",
"changes": {},
}
try:
snap_name = name
isfile = name.endswith(".snap") # this suffix is required by snap
if isfile:
path = Path(name)
if not path.is_absolute():
raise SaltInvocationError(f"Specified path '{name}' is not absolute")
if not path.exists():
msg = f"Specified path '{name}' does not exist"
if __opts__["test"]:
ret["result"] = None
ret["comment"] = f"{msg}. If we weren't testing, ths would be an error"
return ret
raise SaltInvocationError(msg)
snapinfo = __salt__["snap.info"](name, verbose=True)
snap_name = snapinfo["name"]
match = {"snap-sha3-384": snapinfo["sha3-384"]}
curr_assrts = __salt__["snap.known"]("snap-revision", **match)
if not curr_assrts:
if __opts__["test"]:
if assertions:
msg = "Would have imported assertions and installed the snap"
ret["changes"]["installed"] = name
else:
msg = "Missing assertions. If we weren't testing, this would be an error"
ret["result"] = None
ret["comment"] = msg
return ret
if not assertions:
raise CommandExecutionError(
"Missing assertions, either import them or pass the assertions file in `assertions`"
)
__salt__["snap.ack"](assertions)
curr_assrts = __salt__["snap.known"]("snap-revision", **match)
if not curr_assrts:
raise CommandExecutionError(
"Imported assertions, but still could not find snap-revision"
)
revision = curr_assrts[0]["snap-revision"]
curr = __salt__["snap.list"](snap_name)
if curr:
changes = _check_changes(curr[snap_name], isfile)
verb = "modified"
else:
changes = {"installed": name}
verb = "installed"
if not changes:
return ret
if __opts__["test"]:
ret["result"] = None
ret["comment"] = f"Would have {verb} the snap"
ret["changes"] = changes
return ret
held_change = changes.pop("held", None)
if changes:
__salt__["snap.install"](
name,
channel=channel,
revision=revision if not isfile else None,
classic=classic,
refresh=bool(curr) and not isfile,
)
ret["changes"].update(changes)
if held_change:
func = "hold" if held else "unhold"
__salt__[f"snap.{func}"](name)
ret["changes"].update({"held": held_change})
new = __salt__["snap.list"](snap_name)
if not new:
raise CommandExecutionError(
f"{verb.capitalize()} the snap, but it could not be found afterwards"
)
new_changes = _check_changes(new[snap_name], isfile)
if new_changes:
ret["result"] = False
ret["comment"] = (
f"{verb.capitalize()} the snap, but there are still "
f"pending changes: {json.dumps(new_changes)}"
)
for change in new_changes:
ret["changes"].pop(change, None)
return ret
ret["comment"] = f"{verb.capitalize()} the snap"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
ret["changes"] = {}
return ret
[docs]
def option_managed(name, option=None, value=NOT_SET, options=None):
"""
Ensure a snap's options are set as specified.
name
The name of the snap.
option
The name of the option. Either this and ``value`` or ``options`` is required.
value
The value to set.. Either this and ``option`` or ``options`` is required.
If the value is specified as None/``null`` (YAML), the value will be unset.
options
A mapping of option names to their values to set. If an option's value
is None, it will be unset.
"""
def _check_changes(data):
changes = {}
to_set = []
to_unset = []
for opt, val in options.items():
if val is None:
if opt in data:
changes[opt] = {"old": data[opt], "new": val, "unset": True}
to_unset.append(opt)
elif opt not in data:
changes[opt] = {"old": None, "new": val, "set": True}
to_set.append(opt)
elif data[opt] != val:
changes[opt] = {"old": data[opt], "new": val}
to_set.append(opt)
return changes, to_set, to_unset
ret = {
"name": name,
"result": True,
"comment": "All options are in the correct state",
"changes": {},
}
try:
single = option and value is not NOT_SET
if not (single or options):
raise SaltInvocationError("Either option and value or options are required")
if single and options:
raise SaltInvocationError(
"Either specify option and value or options, not both variants"
)
options = options or {option: value}
try:
curr = __salt__["snap.options"](name)
except CommandExecutionError as err:
if "is not installed" not in str(err) or not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
changes, to_set, to_unset = _check_changes(curr)
if not changes:
return ret
if __opts__["test"]:
ret["result"] = None
ret["comment"] = "Would have modified some options"
ret["changes"] = changes
return ret
errs = {}
for opt in to_unset:
try:
__salt__["snap.option_unset"](name, opt)
ret["changes"][opt] = changes[opt]
except CommandExecutionError as err:
errs[opt] = str(err)
for opt in to_set:
try:
__salt__["snap.option_set"](name, opt, changes[opt]["new"])
ret["changes"][opt] = changes[opt]
except CommandExecutionError as err:
errs[opt] = str(err)
if errs:
msg = "\n".join(f"{opt}: {err}" for opt, err in errs.items())
raise CommandExecutionError(f"Encountered some errors:\n{msg}")
new_changes, _, _ = _check_changes(__salt__["snap.options"](name))
if new_changes:
ret["result"] = False
ret["comment"] = (
f"Modified some snap options, but there are still "
f"pending changes: {json.dumps(new_changes)}"
)
for change in new_changes:
ret["changes"].pop(change, None)
return ret
ret["comment"] = "Modified some options"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
return ret
[docs]
def removed(name, purge=False):
"""
Ensure a snap is removed.
name
The name of the snap.
purge
Don't save a snapshot of its data. Defaults to false.
"""
ret = {"name": name, "result": True, "comment": "The snap is already absent", "changes": {}}
try:
curr = __salt__["snap.list"](name)
if not curr:
return ret
ret["changes"]["removed"] = name
if __opts__["test"]:
ret["result"] = None
ret["comment"] = "Would have removed the snap"
return ret
__salt__["snap.remove"](name, purge=purge)
new = __salt__["snap.list"](name)
if new:
raise CommandExecutionError("Tried to remove the snap, but it is still present")
ret["comment"] = "Removed the snap"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
ret["changes"] = {}
return ret
[docs]
def service_running(
name, service=None, enabled=None, timeout=10
): # pylint: disable=redefined-outer-name
"""
Ensure a snap service is running.
This state supports the ``watch`` requisite.
name
The name of the snap.
service
Restrict this state to one of its services. Optional.
If unset, will manage all services of the snap.
enabled
Ensure the service is enabled to start at boot time.
Optional. Can only be set to true.
timeout
This state waits for each service. If it is still not at the desired
state after this amount of seconds, it will fail. Defaults to 10.
reload
When the ``watch`` requisite causes a restart, attempt to reload the
service instead. Defaults to false.
"""
ret = {
"name": name,
"result": True,
"comment": "All services are in the correct state",
"changes": {},
}
try:
try:
start, enable = _check_service_changes(
name, service, running=True, enabled=enabled or None
)
except NoServices as err:
if not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
if not (enable or start):
return ret
if __opts__["test"]:
ret["result"] = None
ret["comment"] = "Would have started/enabled some services"
if start:
ret["changes"]["started"] = start
if enable:
ret["changes"]["enabled"] = enable
return ret
errs = {}
ret["changes"] = {"enabled": [], "started": []}
for serv in set(start + enable):
try:
__salt__["snap.service_start"](serv, enable=enabled)
if serv in start:
_timer(
__salt__["snap.service_running"],
serv,
True,
timeout,
"Tried to start the snap service, but it is still not running",
)
ret["changes"]["started"].append(serv)
if serv in enable:
_timer(
__salt__["snap.service_enabled"],
serv,
True,
timeout if serv not in start else 1,
"Tried to enable the snap service, but it is still not enabled",
)
ret["changes"]["enabled"].append(serv)
except CommandExecutionError as err:
errs[serv] = str(err)
if errs:
msg = "\n".join(f"{service}: {err}" for service, err in errs.items())
raise CommandExecutionError(f"Encountered some errors:\n{msg}")
ret["comment"] = "Started/enabled some services"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
for typ in ("enabled", "started"):
if typ in ret["changes"] and not ret["changes"][typ]:
ret["changes"].pop(typ)
return ret
[docs]
def service_dead(
name, service=None, disabled=None, timeout=10
): # pylint: disable=redefined-outer-name
"""
Ensure a snap service is dead.
This state supports the ``watch`` requisite.
name
The name of the snap.
service
Restrict this state to one of its services. Optional.
If unset, will manage all services of the snap.
disabled
Ensure the service is disabled (does not start at boot time).
Optional. Can only be set to true.
timeout
This state waits for each service. If it is still not at the desired
state after this amount of seconds, it will fail. Defaults to 10.
"""
ret = {
"name": name,
"result": True,
"comment": "All services are in the correct state",
"changes": {},
}
try:
try:
stop, disable = _check_service_changes(
name, service, running=False, enabled=False if disabled else None
)
except NoServices as err:
if not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
if not (stop or disable):
return ret
if __opts__["test"]:
ret["result"] = None
ret["comment"] = "Would have stopped/disabled some services"
if stop:
ret["changes"]["stopped"] = stop
if disable:
ret["changes"]["disabled"] = disable
return ret
errs = {}
ret["changes"] = {"disabled": [], "stopped": []}
for serv in set(stop + disable):
try:
__salt__["snap.service_stop"](serv, disable=disabled)
if serv in stop:
_timer(
__salt__["snap.service_running"],
serv,
False,
timeout,
"Tried to stop the snap service, but it is still running",
)
ret["changes"]["stopped"].append(serv)
if serv in disable:
_timer(
__salt__["snap.service_enabled"],
serv,
False,
timeout if serv not in stop else 1,
"Tried to disable the snap service, but it is still enabled",
)
ret["changes"]["disabled"].append(serv)
except CommandExecutionError as err:
errs[serv] = str(err)
if errs:
msg = "\n".join(f"{service}: {err}" for service, err in errs.items())
raise CommandExecutionError(f"Encountered some errors:\n{msg}")
ret["comment"] = "Stopped/disabled some services"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
for typ in ("disabled", "stopped"):
if typ in ret["changes"] and not ret["changes"][typ]:
ret["changes"].pop(typ)
return ret
def _check_service_changes(name, service, running, enabled): # pylint: disable=redefined-outer-name
if service:
services = __salt__["snap.services"](f"{name}.{service}")
else:
services = __salt__["snap.services"](snap=name)
if not services:
raise NoServices("Did not find any service to manage")
running_state = []
enable_state = []
for sname, status in services.items():
if status["running"] is not running:
running_state.append(sname)
if enabled is not None and status["enabled"] is not enabled:
enable_state.append(sname)
return running_state, enable_state
def _timer(func, serv, check_exp, timeout, msg):
start_time = time.time()
while func(serv) is not check_exp:
if time.time() - start_time > timeout:
raise CommandExecutionError(msg)
time.sleep(0.25)
[docs]
def mod_watch(name, sfun=None, reload=False, **kwargs):
"""
Support the ``watch`` requisite for ``snap.service_running`` and ``snap.service_dead``.
"""
ret = {"name": name, "changes": {}, "result": True, "comment": ""}
pp_suffix = "ed"
services = {}
typ_func = {}
try:
# Note that we don't have to worry about enabled/disabled since
# mod_watch would otherwise not run (the normal state would report changes).
# https://docs.saltproject.io/en/latest/ref/states/requisites.html#watch
if sfun in ["service_dead", "service_running"]:
try:
running_services, _ = _check_service_changes(
name, kwargs.get("service"), running=False, enabled=None
)
except NoServices as err:
if not __opts__["test"]:
raise
ret["result"] = None
ret["comment"] = str(err) + " - if we weren't testing, this would be an error"
return ret
if sfun == "service_dead":
if not running_services:
ret["comment"] = "All snap services are already stopped."
return ret
verb = "stop"
pp_suffix = "ped"
typ_func[verb] = __salt__["snap.service_stop"]
services = {verb: running_services}
check_exp = False
# "service_running" == sfun evidently
else:
check_exp = True
dead_services, _ = _check_service_changes(
name, kwargs.get("service"), running=True, enabled=None
)
typ_func = {
"start": __salt__["snap.service_start"],
"restart": __salt__["snap.service_restart"],
}
services["restart"] = running_services
services["start"] = dead_services
verb = "(re)start"
else:
ret["comment"] = f"Unable to trigger watch for snap.{sfun}"
ret["result"] = False
return ret
if __opts__["test"]:
ret["result"] = None
ret["comment"] = f"Would have {verb}{pp_suffix} some services."
ret["changes"] = {typ + pp_suffix: lst for typ, lst in services.items() if lst}
if reload and "restarted" in ret["changes"]:
ret["changes"]["reloaded"] = ret["changes"].pop("restarted")
return ret
errs = {}
ret["changes"] = {typ + pp_suffix: [] for typ in services}
for typ, lst in services.items():
params = {}
if typ == "restart":
params["reload"] = reload
for serv in lst:
try:
typ_func[typ](serv, **params)
_timer(
__salt__["snap.service_running"],
serv,
check_exp,
kwargs.get("timeout", 10),
f"Tried to {typ} the snap service, but it is still not {sfun.split('_')[-1]}.",
)
ret["changes"][typ + pp_suffix].append(serv)
except CommandExecutionError as err:
errs[serv] = str(err)
if errs:
msg = "\n".join(f"{service}: {err}" for service, err in errs.items())
raise CommandExecutionError(f"Encountered some errors:\n{msg}")
ret["comment"] = f"{verb}{pp_suffix} some services"
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
for typ in services:
lookup = typ + pp_suffix
if lookup in ret["changes"] and not ret["changes"][lookup]:
ret["changes"].pop(lookup)
if reload and "restarted" in ret["changes"]:
ret["changes"]["reloaded"] = ret["changes"].pop("restarted")
return ret