Source code for saltext.vcf.clients.vim_scheduled_task

"""Scheduled tasks via ``ScheduledTaskManager`` (SOAP)."""

from pyVmomi import vim

from saltext.vcf.utils import vim as soap


def _stm(opts, profile=None):
    content = soap.content(opts, profile=profile)
    return content.scheduledTaskManager


def _find_task(opts, task_id_or_name, profile=None):
    stm = _stm(opts, profile=profile)
    for task in stm.scheduledTask or []:
        if task_id_or_name in (task._moId, task.info.name):  # noqa: SLF001
            return task
    raise LookupError(f"scheduled task {task_id_or_name!r} not found")


def _to_dict(task):
    info = task.info
    return {
        "id": task._moId,  # noqa: SLF001
        "name": info.name,
        "description": info.description,
        "enabled": bool(info.enabled),
        "entity": info.entity._moId if info.entity else None,  # noqa: SLF001
        "scheduler": type(info.scheduler).__name__ if info.scheduler else None,
        "last_modified_user": info.lastModifiedUser,
        "next_run_time": info.nextRunTime.isoformat() if info.nextRunTime else None,
        "state": str(info.state) if info.state else None,
    }


[docs] def list_(opts, entity=None, profile=None): """List scheduled tasks. If *entity* is given (moId), filter to tasks scoped to it.""" stm = _stm(opts, profile=profile) if entity is not None: # Find the managed entity by moId content = soap.content(opts, profile=profile) ent = None for view_type in (vim.HostSystem, vim.VirtualMachine, vim.ClusterComputeResource): container = content.viewManager.CreateContainerView( content.rootFolder, [view_type], True ) try: for e in container.view: if e._moId == entity: # noqa: SLF001 ent = e break finally: container.Destroy() if ent is not None: break if ent is None: return [] tasks = stm.RetrieveEntityScheduledTask(entity=ent) else: tasks = stm.scheduledTask or [] return [_to_dict(t) for t in tasks]
def get(opts, task_id_or_name, profile=None): return _to_dict(_find_task(opts, task_id_or_name, profile=profile)) def get_or_none(opts, task_id_or_name, profile=None): try: return get(opts, task_id_or_name, profile=profile) except LookupError: return None
[docs] def create(opts, entity_moid, spec, profile=None): """Create a scheduled task. *spec* is a dict matching ``vim.scheduler.ScheduledTaskSpec``. Required keys: ``name``, ``description``, ``scheduler`` (dict with ``type`` like ``OnceTaskScheduler``, ``WeeklyTaskScheduler``, etc.), ``action`` (dict with ``method`` and ``arguments``). ``enabled`` defaults to True. """ stm = _stm(opts, profile=profile) content = soap.content(opts, profile=profile) # Locate entity ent = None for view_type in (vim.HostSystem, vim.VirtualMachine, vim.ClusterComputeResource): container = content.viewManager.CreateContainerView(content.rootFolder, [view_type], True) try: for e in container.view: if e._moId == entity_moid: # noqa: SLF001 ent = e break finally: container.Destroy() if ent is not None: break if ent is None: raise LookupError(f"entity {entity_moid!r} not found") spec_obj = _build_task_spec(spec) new_task = stm.CreateScheduledTask(entity=ent, spec=spec_obj) return new_task._moId # noqa: SLF001
def _build_task_spec(spec): task_spec = vim.scheduler.ScheduledTaskSpec() task_spec.name = spec["name"] task_spec.description = spec.get("description", "") task_spec.enabled = bool(spec.get("enabled", True)) sched = spec.get("scheduler") or {} sched_type = sched.get("type", "OnceTaskScheduler") sched_cls = getattr(vim.scheduler, sched_type) sched_obj = sched_cls() for k, v in sched.items(): if k == "type": continue setattr(sched_obj, k, v) task_spec.scheduler = sched_obj action = spec.get("action") or {} method_action = vim.action.MethodAction() method_action.name = action.get("method", "") method_action.argument = [ vim.action.MethodActionArgument(value=v) for v in action.get("arguments", []) ] task_spec.action = method_action if "notification" in spec: task_spec.notification = spec["notification"] return task_spec def reconfigure(opts, task_id_or_name, spec, profile=None): task = _find_task(opts, task_id_or_name, profile=profile) spec_obj = _build_task_spec(spec) task.ReconfigureScheduledTask(spec=spec_obj) return get(opts, task_id_or_name, profile=profile) def delete(opts, task_id_or_name, profile=None): task = _find_task(opts, task_id_or_name, profile=profile) task.RemoveScheduledTask() return True def run_now(opts, task_id_or_name, profile=None): task = _find_task(opts, task_id_or_name, profile=profile) task.RunScheduledTask() return True