"""VM snapshots via pyVmomi SOAP (vCenter REST has no snapshot surface in 9.2).
Probed against VCF 9.2: ``GET /api/vcenter/vm/{id}/snapshots`` returns 404.
This module uses ``vim.VirtualMachine.CreateSnapshot_Task`` and friends
through the SOAP service to author and manage snapshots.
"""
from pyVmomi import vim
from saltext.vcf.utils import vim as soap
def _find_vm(opts, vm_id_or_name, profile=None):
content = soap.content(opts, profile=profile)
container = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
try:
for vm in container.view:
if vm_id_or_name in (vm._moId, vm.name): # noqa: SLF001
return vm
finally:
container.Destroy()
raise LookupError(f"VM {vm_id_or_name!r} not found")
[docs]
def list_(opts, vm_id_or_name, profile=None):
"""Return a tree of snapshots: ``[{"id", "name", "description", "children": [...]}]``."""
vm = _find_vm(opts, vm_id_or_name, profile=profile)
if not vm.snapshot:
return []
return [_tree(node) for node in vm.snapshot.rootSnapshotList]
[docs]
def current(opts, vm_id_or_name, profile=None):
"""Return the current (active) snapshot dict, or None."""
vm = _find_vm(opts, vm_id_or_name, profile=profile)
if not vm.snapshot or not vm.snapshot.currentSnapshot:
return None
cur = vm.snapshot.currentSnapshot
return {
"id": cur._moId,
"name": _find_name_in_tree(vm.snapshot.rootSnapshotList, cur._moId),
}
[docs]
def create(
opts,
vm_id_or_name,
name,
*,
description="",
memory=False,
quiesce=False,
vss_options=None,
profile=None,
):
"""Create a snapshot, return the ``vim.Task`` moId.
*vss_options* — optional dict for Windows-guest VSS quiesce. Supported keys:
``vss_backup_type`` (1=full, 2=incremental, 5=differential, 6=log),
``vss_backup_context`` (e.g. ``backup_context_backup``),
``vss_partial_file_support`` (bool),
``vss_bootable_system_state`` (bool),
``timeout`` (seconds).
Setting any value implies ``quiesce=True`` and uses
``CreateSnapshotEx_Task`` with a WindowsQuiesceSpec.
"""
vm = _find_vm(opts, vm_id_or_name, profile=profile)
if vss_options:
spec = vim.vm.WindowsQuiesceSpec()
if "timeout" in vss_options:
spec.timeout = int(vss_options["timeout"])
if "vss_backup_type" in vss_options:
spec.vssBackupType = int(vss_options["vss_backup_type"])
if "vss_backup_context" in vss_options:
spec.vssBackupContext = vss_options["vss_backup_context"]
if "vss_partial_file_support" in vss_options:
spec.vssPartialFileSupport = bool(vss_options["vss_partial_file_support"])
if "vss_bootable_system_state" in vss_options:
spec.vssBootableSystemState = bool(vss_options["vss_bootable_system_state"])
task = vm.CreateSnapshotEx_Task(
name=name,
description=description,
memory=bool(memory),
quiesceSpec=spec,
)
else:
task = vm.CreateSnapshot_Task(
name=name, description=description, memory=bool(memory), quiesce=bool(quiesce)
)
return task._moId # noqa: SLF001
[docs]
def revert(opts, vm_id_or_name, snapshot_name, *, suppress_power_on=False, profile=None):
"""Revert to a named snapshot. Returns vim.Task moId."""
vm = _find_vm(opts, vm_id_or_name, profile=profile)
snap = _find_snap_by_name(vm.snapshot.rootSnapshotList, snapshot_name) if vm.snapshot else None
if snap is None:
raise LookupError(f"snapshot {snapshot_name!r} not found on {vm.name}")
task = snap.snapshot.RevertToSnapshot_Task(suppressPowerOn=bool(suppress_power_on))
return task._moId # noqa: SLF001
[docs]
def remove(opts, vm_id_or_name, snapshot_name, *, remove_children=False, profile=None):
"""Delete a snapshot by name. Returns vim.Task moId."""
vm = _find_vm(opts, vm_id_or_name, profile=profile)
snap = _find_snap_by_name(vm.snapshot.rootSnapshotList, snapshot_name) if vm.snapshot else None
if snap is None:
raise LookupError(f"snapshot {snapshot_name!r} not found on {vm.name}")
task = snap.snapshot.RemoveSnapshot_Task(removeChildren=bool(remove_children))
return task._moId # noqa: SLF001
[docs]
def remove_all(opts, vm_id_or_name, profile=None):
"""Delete every snapshot on the VM. Returns vim.Task moId."""
vm = _find_vm(opts, vm_id_or_name, profile=profile)
task = vm.RemoveAllSnapshots_Task()
return task._moId # noqa: SLF001
[docs]
def consolidate(opts, vm_id_or_name, profile=None):
"""Consolidate VM disks (merge orphaned snapshot deltas). Returns vim.Task moId."""
vm = _find_vm(opts, vm_id_or_name, profile=profile)
task = vm.ConsolidateVMDisks_Task()
return task._moId # noqa: SLF001
[docs]
def state(opts, vm_id_or_name, snapshot_name, profile=None):
"""Return ``{present, is_current, has_memory, has_quiesce, children}`` for one named snapshot.
Returns ``{"present": False}`` if the named snapshot does not exist.
"""
vm = _find_vm(opts, vm_id_or_name, profile=profile)
if not vm.snapshot:
return {"present": False}
node = _find_snap_by_name(vm.snapshot.rootSnapshotList, snapshot_name)
if node is None:
return {"present": False}
current_id = (
vm.snapshot.currentSnapshot._moId if vm.snapshot.currentSnapshot else None # noqa: SLF001
)
snap_id = node.snapshot._moId # noqa: SLF001
return {
"present": True,
"id": snap_id,
"is_current": snap_id == current_id,
"has_memory": bool(getattr(node, "backupManifest", None))
or bool(getattr(node, "vmsd", None)), # heuristic; pyVmomi exposes via state metadata
"has_quiesce": bool(getattr(node, "quiesced", False)),
"state": str(node.state) if getattr(node, "state", None) else None,
"children": [child.name for child in (node.childSnapshotList or [])],
}
# ---------------------------------------------------------------------------
# Tree helpers
# ---------------------------------------------------------------------------
def _tree(node):
return {
"id": node.snapshot._moId, # noqa: SLF001
"name": node.name,
"description": node.description,
"create_time": node.createTime.isoformat() if node.createTime else None,
"state": str(node.state),
"quiesced": bool(getattr(node, "quiesced", False)),
"children": [_tree(child) for child in (node.childSnapshotList or [])],
}
def _find_snap_by_name(tree, name):
for node in tree or []:
if node.name == name:
return node
found = _find_snap_by_name(node.childSnapshotList, name)
if found is not None:
return found
return None
def _find_name_in_tree(tree, snap_id):
for node in tree or []:
if node.snapshot._moId == snap_id: # noqa: SLF001
return node.name
found = _find_name_in_tree(node.childSnapshotList, snap_id)
if found is not None:
return found
return None