Source code for windows.winobject.wmi

import windows
import ctypes
import struct
import functools
from functools import partial
from collections import namedtuple

from ctypes.wintypes import *

import windows.com
import windows.generated_def as gdef
from windows.generated_def.winstructs import *

from windows.pycompat import basestring

# Common error check for all WMI COM interfaces
# This 'just' add the corresponding 'WBEMSTATUS' to the hresult error code
class WmiComInterface(object):
    """Base class used for COM call error checking for WMI interfaces"""
    def errcheck(self, result, func, args):
        if result < 0:
            wmitag = gdef.WBEMSTATUS.mapper[result & 0xffffffff]
            raise ctypes.WinError(result, repr(wmitag))
        return args

sentinel = object()
# POC
class QualifierSet(gdef.IWbemQualifierSet):
    def get_variant(self, name):
        """Retrieve the value of property ``name`` as a :class:`~windows.com.Variant`

        :return: :class:`~windows.com.Variant`
        """
        if not isinstance(name, basestring):
            nametype = type(name).__name__
            raise TypeError("WmiObject attributes name must be str, not <{0}>".format(nametype))
        variant_res = windows.com.Variant()
        self.Get(name, 0, variant_res, None)
        return variant_res

    def get(self, name, default=sentinel):
        """Return the value of the property ``name``. The return value depends of the type of the property and can vary"""
        try:
            return self.get_variant(name).value
        except WindowsError as e:
            if (e.winerror & 0xffffffff) != gdef.WBEM_E_NOT_FOUND:
                raise
            if default is sentinel:
                raise
            return default

    def names(self):
        res = POINTER(windows.com.SafeArray)()
        x = ctypes.pointer(res)
        self.GetNames(0, cast(x, POINTER(POINTER(gdef.SAFEARRAY))))
        # need to free the safearray / unlock ?
        properties = [p for p in res[0].to_list(BSTR)]
        return properties


# https://docs.microsoft.com/en-us/windows/desktop/api/wbemcli/nn-wbemcli-iwbemclassobject

WmiMethod = namedtuple("WmiMethod", ["name", "inparam", "outparam"])

# https://docs.microsoft.com/en-us/windows/desktop/WmiSdk/calling-a-method
[docs] class WmiObject(gdef.IWbemClassObject, WmiComInterface): """The WmiObject (which wrap ``IWbemClassObject``) contains and manipulates both class definitions and class object instances. Can be used as a mapping to access properties. """
[docs] def get_variant(self, name): """Retrieve the value of property ``name`` as a :class:`~windows.com.Variant` :return: :class:`~windows.com.Variant` """ if not isinstance(name, basestring): nametype = type(name).__name__ raise TypeError("WmiObject attributes name must be str, not <{0}>".format(nametype)) variant_res = windows.com.Variant() self.Get(name, 0, variant_res, None, None) return variant_res
[docs] def get(self, name): """Return the value of the property ``name``. The return value depends of the type of the property and can vary""" return self.get_variant(name).value
[docs] def get_method(self, name): """Return the information about the method ``name`` :returns: :class:`WmiMethod` """ inpararm = type(self)() outpararm = type(self)() variant_res = windows.com.Variant() self.GetMethod(name, 0, inpararm, outpararm) return WmiMethod(name, inpararm, outpararm)
def get_text(self, flags=0): text = gdef.BSTR() self.GetObjectText(flags, text) return text.value def put_variant(self, name, variant): if not isinstance(name, basestring): nametype = type(name).__name__ raise TypeError("WmiObject attributes name must be str, not <{0}>".format(nametype)) return self.Put(name, 0, variant, 0)
[docs] def put(self, name, value): """Set the property ``name`` to ``value``""" if isinstance(value, gdef.VARIANT): variant_value = value else: variant_value = windows.com.Variant(value) return self.put_variant(name, variant_value)
[docs] def spawn_instance(self): """Create a new object of the class represented by the current :class:`WmiObject` :returns: :class:`WmiObject` """ instance = type(self)() self.SpawnInstance(0, instance) return instance
@property def genus(self): """The genus of the object. :returns: ``WBEM_GENUS_CLASS(0x1L)`` if the :class:`WmiObject` is a Class and ``WBEM_GENUS_INSTANCE(0x2L)`` for instances and events. """ return gdef.tag_WBEM_GENUS_TYPE.mapper[self.get("__GENUS")] ## Higher level API
[docs] def get_properties(self, system_properties=False): """Return the list of properties names available for the current object. If ``system_properties`` is ``False`` property names begining with ``_`` are ignored. :returns: [:class:`str`] -- A list of string .. note: About system properties: https://docs.microsoft.com/en-us/windows/desktop/wmisdk/wmi-system-properties """ res = POINTER(windows.com.SafeArray)() x = ctypes.pointer(res) self.GetNames(None, 0, None, cast(x, POINTER(POINTER(gdef.SAFEARRAY)))) # need to free the safearray / unlock ? properties = [p for p in res[0].to_list(BSTR) if system_properties or (not p.startswith("_"))] return properties
properties = property(get_properties) #: The properties of the object (exclude system properties) def get_methods(self): self.BeginMethodEnumeration(0) methods = {} while True: name = gdef.BSTR() inpararm = type(self)() outpararm = type(self)() if self.NextMethod(0, name, inpararm, outpararm) == gdef.WBEM_S_NO_MORE_DATA: self.EndMethodEnumeration() return methods namestr = name.value methods[namestr] = WmiMethod(namestr, inpararm, outpararm) windows.winproxy.SysFreeString(name) del name methods = property(get_methods) @property def qualifier_set(self): # changer de nom ? res = QualifierSet() self.GetQualifierSet(res) return res def get_p_set(self, name): # Changer de nom ? res = QualifierSet() self.GetPropertyQualifierSet(name, res) return res # Make WmiObject a mapping object
[docs] def keys(self): """The properties of the object (include system properties)""" return self.get_properties(system_properties=True)
__getitem__ = get __setitem__ = put def items(self): return [(k, self.get(k)) for k in self.properties] def values(self): # Not sur anyone will use this but keep the dict interface return [x[1] for x in self.items()] ## Make it callable like any class :D __call__ = spawn_instance def __repr__(self): if not self: return """<{0} (NULL)>""".format(type(self).__name__,) if self.genus == gdef.WBEM_GENUS_CLASS: return """<{0} class "{1}">""".format(type(self).__name__, self.get("__Class")) return """<{0} instance of "{1}">""".format(type(self).__name__, self.get("__Class")) def __sprint__(self): return """ {0}\n {1} """.format(repr(self), "\n".join(": ".join([x[0], str(x[1])]) for x in sorted(self.items())))
class WmiTextSrc(gdef.IWbemObjectTextSrc, WmiComInterface): CLSID_WbemObjectTextSrc = "8D1C559D-84F0-4bb3-A7D5-56A7435A9BA6" # CLSID_WbemObjectTextSrc @classmethod def create(cls): self = cls() windows.com.create_instance(cls.CLSID_WbemObjectTextSrc, self) return self def get_text(self, obj, textformat): btext = gdef.BSTR() self.GetText(0, obj, textformat, None, btext) result = btext.value print("TODO: SysFreeString") return result
[docs] class WmiEnumeration(gdef.IEnumWbemClassObject, WmiComInterface): """Represent an enumeration of object that can be itered""" DEFAULT_TIMEOUT = gdef.WBEM_INFINITE #: The default timeout
[docs] def next(self, timeout=None): """Return the next object in the enumeration with `timeout`. :raises: ``WindowsError(WBEM_S_TIMEDOUT)`` if timeout expire :returns: :class:`WmiObject` """ timeout = self.DEFAULT_TIMEOUT if timeout is None else timeout # For now the count is hardcoded to 1 obj = WmiObject() return_count = gdef.ULONG(0) error = self.Next(timeout, 1, obj, return_count) if error == gdef.WBEM_S_TIMEDOUT: raise ctypes.WinError(gdef.WBEM_S_TIMEDOUT, "Wmi timeout") elif error == WBEM_S_FALSE: return None else: return obj
[docs] def __iter__(self): """Return an iterator with ``DEFAULT_TIMEOUT``""" return self.iter_timeout(self.DEFAULT_TIMEOUT)
[docs] def iter_timeout(self, timeout=None): """Return an iterator with a custom ``timeout``""" while True: obj = self.next(timeout) if obj is None: return yield obj
[docs] def all(self): """Return all elements in the enumeration as a list :returns: [:class:`WmiObject`] - A list of :class:`WmiObject` """ return list(self) # SqlAlchemy like :)
[docs] class WmiCallResult(gdef.IWbemCallResult, WmiComInterface): """The result of a WMI call/query. Real result value type depends of the context""" def __init__(self, result_type=None, namespace_name=None): self.result_type = result_type self.namespace_name = namespace_name
[docs] def get_call_status(self, timeout=gdef.WBEM_INFINITE): """The status of the call""" status = gdef.LONG() self.GetCallStatus(timeout, status) return WBEMSTATUS.mapper[status.value & 0xffffffff]
[docs] def get_result_object(self, timeout=gdef.WBEM_INFINITE): """The result as a :class:`WmiObject` (returned by :func:`WmiNamespace.exec_method`)""" result = WmiObject() self.GetResultObject(timeout, result) return result
[docs] def get_result_string(self, timeout=gdef.WBEM_INFINITE): """The result as a :class:`WmiObject` (returned by :func:`WmiNamespace.put_instance`)""" result = gdef.BSTR() self.GetResultString(timeout, result) return result
[docs] def get_result_service(self, timeout=gdef.WBEM_INFINITE): """The result as a :class:`WmiNamespace` (not used yet)""" result = WmiNamespace() self.GetResultServices(timeout, result) return result
@property def result(self): """The result of the correct type based on ``self.result_type``""" if self.result_type is None: raise ValueError("Cannot call <result> with no result_type") return getattr(self, "get_result_" + self.result_type)()
class WmiLocator(gdef.IWbemLocator, WmiComInterface): pass # Just for the WMI errcheck callback # !TEST CODE
[docs] class WmiNamespace(gdef.IWbemServices, WmiComInterface): r"""An object to perform wmi request to a given ``namespace``""" #CLSID_WbemAdministrativeLocator_IID = windows.com.IID.from_string('CB8555CC-9128-11D1-AD9B-00C04FD8FDFF') WbemLocator_CLSID = windows.com.IID.from_string('4590F811-1D3A-11D0-891F-00AA004B2E24') DEFAULT_ENUM_FLAGS = (gdef.WBEM_FLAG_RETURN_IMMEDIATELY | WBEM_FLAG_FORWARD_ONLY) #: The defauls flags used for enumeration. ``(WBEM_FLAG_RETURN_IMMEDIATELY | WBEM_FLAG_FORWARD_ONLY)`` def __init__(self, namespace): self.name = namespace
[docs] @classmethod def connect(cls, namespace, user=None, password=None): """Connect to ``namespace`` using ``user`` and ``password`` for authentification if given :return: :class:`WmiNamespace` - The connected :class:`WmiNamespace`""" # this method assert com is initialised self = cls(namespace) # IWbemServices subclass locator = WmiLocator() windows.com.create_instance(cls.WbemLocator_CLSID, locator) locator.ConnectServer(namespace, user, password , None, gdef.WBEM_FLAG_CONNECT_USE_MAX_WAIT, None, None, self) locator.Release() return self
[docs] def query(self, query): """Return the list of :class:`WmiObject` matching ``query``. This API is the `simple one`, if you need timeout or complexe feature see :func:`exec_query` :return: [:class:`WmiObject`] - A list of :class:`WmiObject` """ return list(self.exec_query(query))
[docs] def select(self, clsname, deep=True): """Return the list of :class:`WmiObject` that are instance of ``clsname``. Deep has the same meaning as in :func:`create_instance_enum`. This API is the `simple one`, if you need timeout or complexe feature see :func:`create_instance_enum` :return: [:class:`WmiObject`] - A list of :class:`WmiObject` """ return list(self.create_instance_enum(clsname, deep=deep))
[docs] def exec_query(self, query, flags=DEFAULT_ENUM_FLAGS, ctx=None): """Execute a WQL query with custom flags and returns a ::class:`WmiEnumeration` that can be used to iter the result with timeouts :returns: :class:`WmiEnumeration` """ enumerator = WmiEnumeration() self.ExecQuery("WQL", query, flags, ctx, enumerator) return enumerator
# Create friendly name for create_class_enum & create_instance_enum ?
[docs] def create_class_enum(self, superclass, flags=DEFAULT_ENUM_FLAGS, deep=True): """Enumerate the classes in the ``namespace`` that match ``superclass``. if ``superclass`` is None will enumerate all top-level class. ``deep`` allow to returns all subclasses :returns: :class:`WmiEnumeration` .. note:: See https://docs.microsoft.com/en-us/windows/desktop/api/wbemcli/nf-wbemcli-iwbemservices-createclassenum """ flags |= gdef.WBEM_FLAG_DEEP if deep else gdef.WBEM_FLAG_SHALLOW enumerator = WmiEnumeration() self.CreateClassEnum(superclass, flags, None, enumerator) return enumerator
@property def classes(self): """The list of classes in the namespace. This a a wrapper arround :func:`create_class_enum`. :return: [:class:`WmiObject`] - A list of :class:`WmiObject` """ return self.create_class_enum(None, deep=True)
[docs] def create_instance_enum(self, clsname, flags=DEFAULT_ENUM_FLAGS, deep=True): """Enumerate the instances of ``clsname``. Deep allows to enumerate the instance of subclasses as well :returns: :class:`WmiEnumeration` Example: >>> windows.system.wmi["root\\subscription"].create_instance_enum("__EventConsumer", deep=False).all() [] >>> windows.system.wmi["root\\subscription"].create_instance_enum("__EventConsumer", deep=True).all() [<WmiObject instance of "NTEventLogEventConsumer">] .. note:: See https://docs.microsoft.com/en-us/windows/desktop/api/wbemcli/nf-wbemcli-iwbemservices-createinstanceenum """ flags |= gdef.WBEM_FLAG_DEEP if deep else gdef.WBEM_FLAG_SHALLOW enumerator = WmiEnumeration() self.CreateInstanceEnum(clsname, flags, None, enumerator) return enumerator
[docs] def get_object(self, path): """Return the object matching ``path``. If ``path`` is a class name return the class object`` :return: :class:`WmiObject` """ result = WmiObject() self.GetObject(path, gdef.WBEM_FLAG_RETURN_WBEM_COMPLETE, None, result, None) return result
[docs] def put_instance(self, instance, flags=gdef.WBEM_FLAG_CREATE_ONLY): """Creates or updates an instance of an existing class in the namespace :return: :class:`WmiCallResult` ``(string)`` - Used to retrieve the string representing the path of the object created/updated """ res = WmiCallResult(result_type="string") self.PutInstance(instance, flags, None, res) return res
[docs] def delete_instance(self, instance, flags=0): """TODO: Document""" if isinstance(instance, gdef.IWbemClassObject): instance = instance["__Path"] return self.DeleteInstance(instance, flags, None, None)
[docs] def exec_method(self, obj, method, inparam, flags=0): """Exec method named on ``object`` with ``inparam``. :params obj: The :class:`WmiObject` or path of the object the call apply to :params method: The name of the method to call on the object :params inparam: The :class:`WmiObject` representing the input parameters and retrieve using :func:`WmiObject.get_method` :returns: :class:`WmiCallResult` ``(object)`` if flag `WBEM_FLAG_RETURN_IMMEDIATELY` was passed :returns: :class:`WmiObject` the outparam object if flag `WBEM_FLAG_RETURN_IMMEDIATELY` was NOT passed .. note:: This API will lakely change to better wrap with WmiObject/inparam/Dict & co """ if flags & gdef.WBEM_FLAG_RETURN_IMMEDIATELY: # semisynchronous call -> WmiCallResult result = WmiCallResult(result_type="object") outparam = None else: # Synchronous call -> WmiObject (outparam) result = None outparam = WmiObject() if isinstance(obj, gdef.IWbemClassObject): obj = obj.get("__Path") # Flags 0 -> synchronous call # No WmiCallResult result is directly in outparam self.ExecMethod(obj, method, 0, None, inparam, outparam, result) return outparam or result
def __repr__(self): null = "" if self else " (NULL)" return """<{0} "{1}"{2}>""".format(type(self).__name__, self.name, null)
[docs] class WmiManager(dict): """The main WMI class exposed, used to list and access differents WMI namespace, can be used as a dict to access :class:`WmiNamespace` by name Example: >>> windows.system.wmi["root\\SecurityCenter2"] <WmiNamespace "root\SecurityCenter2"> """ DEFAULT_NAMESPACE = "root\\cimv2" #: The default namespace for :func:`select` & :func:`query` def __init__(self): # Someone is going to use wmi: let's init com ! windows.com.init() self.wmi_requester_by_namespace = {} @property def default_namespace(self): return self[self.DEFAULT_NAMESPACE] @property def select(self): r""":func:`WmiRequester.select` for default WMI namespace 'root\\cimv2'""" return self.default_namespace.select @property def query(self): r""":func:`WmiRequester.query` for default WMI namespace 'root\\cimv2'""" return self.default_namespace.query def get_subnamespaces(self, root="root"): return [x["Name"] for x in self[root].select("__NameSpace")] namespaces = property(get_subnamespaces) """The list of available WMI namespaces""" def _open_wmi_requester(self, namespace): return WmiNamespace.connect(namespace) def __missing__(self, key): self[key] = self._open_wmi_requester(key) return self[key] def __repr__(self): return object.__repr__(self)