Source code for saltext.vcf.states.vcf_esxi_firewall

"""State module for ESXi firewall rules."""

from saltext.vcf.clients import esxi_firewall as c

__virtualname__ = "vcf_esxi_firewall"


def __virtual__():
    return __virtualname__


def _ret(name):
    return {"name": name, "changes": {}, "result": True, "comment": ""}


[docs] def rule_enabled(name, enabled=True, allowed_ips=None, all_ip=None, profile=None): """Ensure firewall rule *name* matches the desired enabled / allowed-IP state. *allowed_ips* and *all_ip* are optional — when omitted the allowed-host portion is left untouched. """ ret = _ret(name) rule = c.get_or_none(__opts__, name, profile=profile) if rule is None: ret["result"] = False ret["comment"] = f"Rule {name} not found" return ret actions = [] changes = {} if rule.get("enabled") != bool(enabled): actions.append(f"enabled={enabled}") changes["enabled"] = {"old": rule.get("enabled"), "new": bool(enabled)} current_hosts = rule.get("allowed_hosts") or {} if allowed_ips is not None or all_ip is not None: desired_all = bool(all_ip if all_ip is not None else current_hosts.get("all_ip", False)) desired_ips = ( list(allowed_ips) if allowed_ips is not None else list(current_hosts.get("ip_addresses") or []) ) current_all = bool(current_hosts.get("all_ip", False)) current_ips = list(current_hosts.get("ip_addresses") or []) if current_all != desired_all or sorted(current_ips) != sorted(desired_ips): actions.append("allowed_hosts") changes["allowed_hosts"] = { "old": {"all_ip": current_all, "ip_addresses": current_ips}, "new": {"all_ip": desired_all, "ip_addresses": desired_ips}, } if not actions: ret["comment"] = f"Rule {name} already matches" return ret if __opts__["test"]: ret["result"] = None ret["comment"] = f"Rule {name} would change: {', '.join(actions)}" return ret if "enabled=" + str(enabled) in actions or any(a.startswith("enabled=") for a in actions): c.set_enabled(__opts__, name, enabled, profile=profile) if "allowed_hosts" in actions: c.set_allowed_ips( __opts__, name, changes["allowed_hosts"]["new"]["ip_addresses"], all_ip=changes["allowed_hosts"]["new"]["all_ip"], profile=profile, ) ret["changes"] = changes ret["comment"] = f"Rule {name} updated: {', '.join(actions)}" return ret
[docs] def rule_disabled(name, profile=None): """Shortcut for ``rule_enabled(name, enabled=False)``.""" return rule_enabled(name, enabled=False, profile=profile)