Source code for saltext.vcf.clients.vcenter_vm

"""Resource layer for vCenter VMs (/api/vcenter/vm)."""

import socket
import time

import requests

from saltext.vcf.clients import vcenter_cluster
from saltext.vcf.clients import vcenter_datacenter
from saltext.vcf.clients import vcenter_host
from saltext.vcf.clients import vim_vm
from saltext.vcf.clients import vim_vm_customization
from saltext.vcf.utils import vcenter

PATH = "/api/vcenter/vm"


def list_(opts, profile=None):
    return vcenter.api_get(opts, PATH, profile=profile)


def get(opts, vm, profile=None):
    return vcenter.api_get(opts, f"{PATH}/{vm}", profile=profile)


def get_or_none(opts, vm, profile=None):
    try:
        return get(opts, vm, profile=profile)
    except requests.HTTPError as exc:
        if exc.response is not None and exc.response.status_code == 404:
            return None
        raise


def get_by_name(opts, name, profile=None):
    matches = search(opts, names=[name], profile=profile)
    for vm in matches:
        if vm.get("name") == name:
            return vm
    return None


def _power(opts, vm, action, profile=None):
    return vcenter.api_post(opts, f"{PATH}/{vm}/power", params={"action": action}, profile=profile)


def power_on(opts, vm, profile=None):
    return _power(opts, vm, "start", profile=profile)


def power_off(opts, vm, profile=None):
    return _power(opts, vm, "stop", profile=profile)


def reset(opts, vm, profile=None):
    return _power(opts, vm, "reset", profile=profile)


[docs] def deploy(opts, name, spec, profile=None): """Deploy a VM for PoC validation. Supports cloning from an existing template/source VM when ``template`` or ``source`` is provided. Without a source, creates a bare VM. """ source = spec.get("source") or spec.get("template") if source: customization = _customization_spec(spec) return { "task": vim_vm.clone( opts, source, name, folder=spec.get("folder"), datastore=spec.get("datastore"), host=spec.get("host"), resource_pool=spec.get("resource_pool"), cluster=spec.get("cluster"), power_on=spec.get("power_on", True), cpu_count=spec.get("cpu_count"), memory_mb=spec.get("memory_mb"), annotation=spec.get("annotation"), customization=customization, profile=profile, ) } return { "task": vim_vm.create( opts, name, spec["folder"], spec["datastore"], cpu_count=spec.get("cpu_count", 1), memory_mb=spec.get("memory_mb", 1024), guest_id=spec.get("guest_id", "otherGuest64"), cluster=spec.get("cluster"), host=spec.get("host"), resource_pool=spec.get("resource_pool"), annotation=spec.get("annotation", ""), profile=profile, ) }
def wait_reachable(target_ip, port=22, timeout=120, interval=10): deadline = time.monotonic() + float(timeout) while True: try: with socket.create_connection((target_ip, int(port)), timeout=min(float(interval), 10)): return True except OSError: if time.monotonic() >= deadline: return False time.sleep(float(interval)) def _customization_spec(spec): customization = spec.get("customization") if not customization: return None ctype = customization.get("type", "linux") if ctype != "linux": raise ValueError(f"unsupported VM customization type {ctype!r}") return vim_vm_customization.linux_spec( customization.get("hostname") or spec.get("vm_name") or spec.get("name"), customization["domain"], time_zone=customization.get("time_zone", "UTC"), nics=customization.get("nics"), dns_servers=customization.get("dns_servers"), dns_search_path=customization.get("dns_search_path"), ) _FILTER_KEYS = ( ("power_states", "power_states"), ("names", "names"), ("hosts", "hosts"), ("clusters", "clusters"), ("folders", "folders"), ("datacenters", "datacenters"), ("resource_pools", "resource_pools"), ("vms", "vms"), )
[docs] def tree(opts, profile=None): """Compose a nested ``{datacenter: {cluster: {host: [vm,...]}}}`` map. Single REST call each for datacenters, clusters, hosts, vms, then aggregated in Python. VMs not bound to a host (templates, disconnected) land under the sentinel ``"__unbound__"`` host bucket within their cluster bucket. """ datacenters = vcenter_datacenter.list_(opts, profile=profile) clusters = vcenter_cluster.list_(opts, profile=profile) hosts = vcenter_host.list_(opts, profile=profile) vms = list_(opts, profile=profile) host_to_cluster = {h["host"]: h.get("cluster") for h in hosts} cluster_to_dc = {c["cluster"]: c.get("datacenter") for c in clusters} result = {} for dc in datacenters: result[dc["datacenter"]] = {"name": dc.get("name"), "clusters": {}} for c in clusters: bucket = result.setdefault(c.get("datacenter"), {"name": None, "clusters": {}}) bucket["clusters"].setdefault(c["cluster"], {"name": c.get("name"), "hosts": {}}) for h in hosts: dc_id = cluster_to_dc.get(h.get("cluster")) dc_bucket = result.setdefault(dc_id, {"name": None, "clusters": {}}) cl_bucket = dc_bucket["clusters"].setdefault(h.get("cluster"), {"name": None, "hosts": {}}) cl_bucket["hosts"].setdefault(h["host"], {"name": h.get("name"), "vms": []}) for v in vms: host = v.get("host") cluster = host_to_cluster.get(host) dc_id = cluster_to_dc.get(cluster) dc_bucket = result.setdefault(dc_id, {"name": None, "clusters": {}}) cl_bucket = dc_bucket["clusters"].setdefault(cluster, {"name": None, "hosts": {}}) host_key = host or "__unbound__" host_bucket = cl_bucket["hosts"].setdefault(host_key, {"name": None, "vms": []}) host_bucket["vms"].append(v) return result
[docs] def summary(opts, profile=None): """Aggregate counts from ``list_()``: by_power_state, by_cpu_count, total.""" vms = list_(opts, profile=profile) by_power_state = {} by_cpu = {} total_memory = 0 for v in vms: ps = v.get("power_state", "UNKNOWN") by_power_state[ps] = by_power_state.get(ps, 0) + 1 cpu = v.get("cpu_count") if cpu is not None: by_cpu[cpu] = by_cpu.get(cpu, 0) + 1 mem = v.get("memory_size_MiB") if isinstance(mem, int): total_memory += mem return { "total": len(vms), "by_power_state": by_power_state, "by_cpu_count": by_cpu, "total_memory_MiB": total_memory, }