Source code for saltext.vcf.clients.vim_host_security

"""Per-ESXi security and storage config via SOAP.

Companion to :mod:`vim_host_config` (NTP/AD/services/advanced). This
module covers:

* Lockdown mode (``HostAccessManager``)
* Local account CRUD (``HostLocalAccountManager``)
* iSCSI software initiator (``HostStorageSystem`` + iSCSI manager)
"""

from pyVmomi import vim

from saltext.vcf.utils import vim as soap


def _host(opts, host_id_or_name, profile=None):
    content = soap.content(opts, profile=profile)
    container = content.viewManager.CreateContainerView(content.rootFolder, [vim.HostSystem], True)
    try:
        for h in container.view:
            if host_id_or_name in (h._moId, h.name):  # noqa: SLF001
                return h
    finally:
        container.Destroy()
    raise LookupError(f"host {host_id_or_name!r} not found")


# ---------------------------------------------------------------------------
# Lockdown mode
# ---------------------------------------------------------------------------


[docs] def lockdown_get(opts, host, profile=None): """Return ``{"mode": "lockdownDisabled"|"lockdownNormal"|"lockdownStrict", "exception_users": [...]}``.""" h = _host(opts, host, profile=profile) mode = str(h.config.adminDisabled and "lockdownNormal" or h.config.lockdownMode) if h.config.lockdownMode is not None: mode = str(h.config.lockdownMode) am = h.configManager.hostAccessManager if am is None: return {"mode": mode, "exception_users": []} users = ( list(am.QueryLockdownExceptions() or []) if hasattr(am, "QueryLockdownExceptions") else [] ) return {"mode": mode, "exception_users": users}
[docs] def lockdown_set(opts, host, mode, profile=None): """Set lockdown *mode* (``lockdownDisabled`` | ``lockdownNormal`` | ``lockdownStrict``).""" h = _host(opts, host, profile=profile) h.configManager.hostAccessManager.ChangeLockdownMode(mode=mode)
[docs] def lockdown_set_exception_users(opts, host, users, profile=None): """Replace the exception-user list (users exempt from lockdown).""" h = _host(opts, host, profile=profile) h.configManager.hostAccessManager.UpdateLockdownExceptions(users=list(users))
# --------------------------------------------------------------------------- # Local users (HostLocalAccountManager) # ---------------------------------------------------------------------------
[docs] def user_list(opts, host, search_str="", exact=False, find_users=True, profile=None): """List local accounts matching *search_str* (empty = all). Uses ``UserDirectory.RetrieveUserGroups`` on the host's userDirectory. Returns a list of ``{principal, full_name, id, group, ...}`` dicts. """ _host(opts, host, profile=profile) # The host's userDirectory exposes RetrieveUserGroups, but vCenter-managed # hosts route through their own content. We don't have direct access to # the host's content from here so fall back to the ServiceInstance content's # userDirectory which queries vCenter SSO. content = soap.content(opts, profile=profile) directory = content.userDirectory results = directory.RetrieveUserGroups( domain=None, searchStr=search_str, belongsToGroup=None, belongsToUser=None, exactMatch=bool(exact), findUsers=bool(find_users), findGroups=False, ) out = [] for u in results or []: out.append( { "principal": u.principal, "full_name": u.fullName, "group": bool(u.group), } ) return out
[docs] def user_create(opts, host, username, password, description="", profile=None): """Create a local user on *host*. Returns nothing on success.""" h = _host(opts, host, profile=profile) spec = vim.host.LocalAccountManager.AccountSpecification( id=username, password=password, description=description, ) h.configManager.accountManager.CreateUser(user=spec)
[docs] def user_update(opts, host, username, password=None, description=None, profile=None): """Update a local user (password and/or description).""" h = _host(opts, host, profile=profile) spec = vim.host.LocalAccountManager.AccountSpecification(id=username) if password is not None: spec.password = password if description is not None: spec.description = description h.configManager.accountManager.UpdateUser(user=spec)
def user_delete(opts, host, username, profile=None): h = _host(opts, host, profile=profile) h.configManager.accountManager.RemoveUser(userName=username) # --------------------------------------------------------------------------- # iSCSI software initiator # ---------------------------------------------------------------------------
[docs] def iscsi_status(opts, host, profile=None): """Return software iSCSI initiator status. Shape:: {"enabled": bool, "hba_device": str|None, "iqn": str|None, "static_targets": [...], "send_targets": [...], "auth_type": "chap"|"none"} """ h = _host(opts, host, profile=profile) storage = h.configManager.storageSystem hba = _iscsi_hba(storage) if hba is None: return { "enabled": False, "hba_device": None, "iqn": None, "static_targets": [], "send_targets": [], "auth_type": "none", } auth = hba.authenticationProperties return { "enabled": True, "hba_device": hba.device, "iqn": hba.iScsiName, "static_targets": [ {"address": t.address, "port": t.port, "iqn": t.iScsiName} for t in (hba.configuredStaticTarget or []) ], "send_targets": [ {"address": t.address, "port": t.port} for t in (hba.configuredSendTarget or []) ], "auth_type": "chap" if auth and auth.chapAuthEnabled else "none", }
[docs] def iscsi_enable(opts, host, profile=None): """Enable the software iSCSI initiator. Returns the HBA device name.""" h = _host(opts, host, profile=profile) storage = h.configManager.storageSystem storage.UpdateSoftwareInternetScsiEnabled(enabled=True) storage.RescanAllHba() hba = _iscsi_hba(storage) return hba.device if hba else None
def iscsi_disable(opts, host, profile=None): h = _host(opts, host, profile=profile) h.configManager.storageSystem.UpdateSoftwareInternetScsiEnabled(enabled=False)
[docs] def iscsi_add_send_target(opts, host, address, port=3260, profile=None): """Add a Send Targets discovery address; the initiator will discover LUNs from it.""" h = _host(opts, host, profile=profile) storage = h.configManager.storageSystem hba = _iscsi_hba(storage) if hba is None: raise LookupError("software iSCSI initiator not enabled") target = vim.host.InternetScsiHba.SendTarget(address=address, port=int(port)) storage.AddInternetScsiSendTargets(iScsiHbaDevice=hba.device, targets=[target]) storage.RescanHba(hbaDevice=hba.device)
def iscsi_remove_send_target(opts, host, address, port=3260, profile=None): h = _host(opts, host, profile=profile) storage = h.configManager.storageSystem hba = _iscsi_hba(storage) if hba is None: raise LookupError("software iSCSI initiator not enabled") target = vim.host.InternetScsiHba.SendTarget(address=address, port=int(port)) storage.RemoveInternetScsiSendTargets(iScsiHbaDevice=hba.device, targets=[target])
[docs] def iscsi_set_chap( opts, host, *, name, password, direction="prohibited", profile=None, ): """Configure CHAP on the software iSCSI initiator. *direction* is one of ``required`` (mutual CHAP), ``preferred`` (mutual if peer supports), ``discouraged`` (per-target settings win), or ``prohibited`` (CHAP off). """ h = _host(opts, host, profile=profile) storage = h.configManager.storageSystem hba = _iscsi_hba(storage) if hba is None: raise LookupError("software iSCSI initiator not enabled") auth_props = vim.host.InternetScsiHba.AuthenticationProperties( chapAuthEnabled=direction != "prohibited", chapName=name, chapSecret=password, chapAuthenticationType=f"chapAuthSetting.{direction}", ) storage.UpdateInternetScsiAuthenticationProperties( iScsiHbaDevice=hba.device, authenticationProperties=auth_props )
def _iscsi_hba(storage_system): for hba in storage_system.storageDeviceInfo.hostBusAdapter or []: if isinstance(hba, vim.host.InternetScsiHba): return hba return None