Source code for windows.com

import sys
import struct
import ctypes
import functools
from ctypes import HRESULT, byref, cast

import windows
from windows import winproxy
from windows.generated_def.winstructs import *

import windows.generated_def as gdef
from windows.generated_def import RPC_C_IMP_LEVEL_IMPERSONATE, CLSCTX_INPROC_SERVER
from windows.generated_def import interfaces
from windows.generated_def.interfaces import generate_IID, IID

from windows.pycompat import int_types, basestring

# We have    windows.com.COMImplementation
# So we need windows.com.COMInterface
COMInterface = interfaces.COMInterface

# Simple raw -> UUID
# "-".join("{:02X}".format(c) for c in struct.unpack("<IHHHBBBBBB", x))

[docs] def init(): """Init COM with some default parameters""" try: t = winproxy.CoInitializeEx() except WindowsError as e: t = e.winerror if t: return t & 0xffffffff return initsecurity()
def initsecurity(): # Should take some parameters.. return winproxy.CoInitializeSecurity(0, -1, None, 0, 0, RPC_C_IMP_LEVEL_IMPERSONATE, 0,0,0) class Dispatch(interfaces.IDispatch): def TypeInfoCount(self): count = gdef.UINT() self.GetTypeInfoCount(count) return count def type_info(self, idx): type_info = TypeInfo() self.GetTypeInfo(idx, 0, type_info) return type_info class TypeInfo(interfaces.ITypeInfo): def func(self, idx): res = gdef.LPFUNCDESC() self.GetFuncDesc(idx, res) return res def attr(self): res = gdef.LPTYPEATTR() self.GetTypeAttr(res) return res def names(self, memid): size = gdef.UINT() x = (gdef.BSTR * 10)(*tuple(gdef.BSTR() for i in range(10))) self.GetNames(memid, x, 10, size) return x[:size.value] def docu(self, id): res = gdef.BSTR() self.GetDocumentation(id, res, None, None, None) return res class MemoryIStream(gdef.IStream): @classmethod def create(cls): self = cls() windows.winproxy.CreateStreamOnHGlobal(ppstm=self) return self
[docs] def create_instance(clsiid, targetinterface, custom_iid=None, context=CLSCTX_INPROC_SERVER | CLSCTX_LOCAL_SERVER): """A simple wrapper around ``CoCreateInstance <https://msdn.microsoft.com/en-us/library/windows/desktop/ms686615(v=vs.85).aspx>``""" if custom_iid is None: custom_iid = targetinterface.IID if isinstance(clsiid, basestring): clsiid = IID.from_string(clsiid) if isinstance(custom_iid, basestring): custom_iid = IID.from_string(custom_iid) winproxy.CoCreateInstance(byref(clsiid), None, context, byref(custom_iid), byref(targetinterface)) return targetinterface
def resolve_progid(progid): clsid = CLSID() winproxy.CLSIDFromProgID(progid, clsid) # We just filed the CLSID: refresh the __repr__ clsid.update_strid() return clsid # Improved COM object # Todo: ctypes_generation extended struct ? class SafeArray(SAFEARRAY): @classmethod def create(cls, elttype, dims): # Array of bound for each dimension bound_array = windows.utils.BUFFER(gdef.SAFEARRAYBOUND)(nbelt=len(dims)) for i, dim in enumerate(dims): bound_array[i].lLbound = 0 bound_array[i].cElements = dim array_ptr = windows.winproxy.SafeArrayCreate(elttype, len(dims), bound_array) return cls.from_PSAFEARRAY(array_ptr) @classmethod def of_type(cls, addr, t): self = cls.from_address(addr) self.elt_type = t return self @classmethod def from_PSAFEARRAY(self, psafearray): res = cast(psafearray, POINTER(SafeArray))[0] return res def to_list(self, t=None): if t is None: if hasattr(self, "elt_type"): t = self.elt_type else: raise ValueError("Missing type of the array") if self.cDims != 1: raise NotImplementedError("tagSAFEARRAY if dims != 1") nb_element = self.rgsabound[0].cElements llbound = self.rgsabound[0].lLbound if self.cbElements != ctypes.sizeof(t): raise ValueError("Size of elements != sizeof(type)") data = [t.from_address(self.pvData + (i + llbound) * ctypes.sizeof(t)).value for i in range(nb_element)] return data def put_element(self, indexes, element): indexes_array = windows.utils.BUFFER(gdef.LONG)(*indexes) if not isinstance(element, VARIANT): pass # element = Variant(element) # import pdb; pdb.set_trace() # element = ctypes.pointer(variant) # print(hex(ctypes.addressof(element))) # windows.utils.sprint(element) # import pdb; pdb.set_trace() windows.winproxy.SafeArrayPutElement(self, indexes_array, element) def get_element(self, indexes): indexes_array = windows.utils.BUFFER(gdef.LONG)(*indexes) element = ctypes.create_string_buffer(0x1000) windows.winproxy.SafeArrayGetElement(self, indexes_array, element) return element # Need cleaning ? def __repr__(self): bound_array = windows.utils.resized_array(self.rgsabound, self.cDims) dims = [int(b.cElements) for b in bound_array] return "<{0} of dimension {1}: {2}>".format(type(self).__name__, len(dims), dims) #VT_VALUE_TO_TYPE = { #VT_I2 : SHORT, #VT_I4 : LONG, #VT_BSTR : BSTR, #VT_VARIANT : VARIANT, #VT_UI1 : UCHAR, #VT_UI2 : USHORT, #VT_UI4 : DWORD, #VT_I8 : LONGLONG, #VT_UI8 : ULONG64, #VT_INT : INT, #VT_UINT : UINT, #VT_HRESULT : HRESULT, #VT_PTR : PVOID, #VT_LPSTR : LPCSTR, #VT_LPWSTR : LPWSTR, #} # VARIANT type checker # Allow to guess a VARIANT_TYPE og a python value def never_match(value): return False def check_type_null(value): return value is None def check_type_i4(value): # 31 ? as we may want to keep sign :) return isinstance(value, int_types) and (value).bit_length() <= 32 def check_type_i8(value): # 63 ? as we may want to keep sign :) return isinstance(value, int_types) and (value).bit_length() <= 64 def check_type_bstr(value): return isinstance(value, basestring) def check_type_bool(value): return isinstance(value, bool) def check_type_safearray(value): return isinstance(value, LPSAFEARRAY) def check_type_array(value): return True VARIAN_NAME_3_TYPE = [f[1] for f in VARIANT._fields_ if f[0] == "_VARIANT_NAME_3"][0] empty = object() class Variant(VARIANT): def __init__(self, value=empty, type=None): if type is not None: self.set_value_and_type(value, type) return elif value is empty: self.vt = VT_EMPTY return self.guess_type_and_set_value(value) # Copy raw-ctypes fields which is a descriptor :) rawvt = VARIANT.vt # Most of the value in the colunm[1] # are attribute of the sub-union _VARIANT_NAME_3 # This union must be ctypes-anonymous for this code to works # We want to access these directly from the VARIANT # to allow custom descriptor for complexe type to be referenced here CHECK_TYPE = [ # Order is important # as VT_I4 check may match VT_BOOL values # VT_BOOL check must be before VT_I4 one (VT_BOOL, "boolVal", check_type_bool), (VT_I4, "lVal", check_type_i4), (VT_I8, "llVal", check_type_i8), (VT_BSTR, "bstrVal", check_type_bstr), (VT_SAFEARRAY , "parray", check_type_safearray), (VT_NULL, None, check_type_null), (VT_EMPTY, None, never_match), (VT_DISPATCH, "pdispVal", never_match), # I cannot recognize DISPATCH ptr for now (VT_UNKNOWN, "punkVal", never_match), # recognise PFW ComInterface ? # Test: do not allow auto-creation of small int values # I don't know but a feel it may confuse some API expecting VT_I4 (VT_I2, "iVal", never_match), (VT_UI1, "bVal", never_match), ] VARIANT_TYPE_BY_NAME = {f[0]: f[1] for f in VARIAN_NAME_3_TYPE._fields_} QUICK_CHECK_TYPE = {x: y for x,y, _ in CHECK_TYPE} def get_vt(self): rawvt = super(Variant, self).vt return gdef.VARENUM.mapper[self.rawvt] def set_vt(self, value): self.rawvt = value vt = property(get_vt, set_vt) def set_value_and_type(self, value, type): attr = self.QUICK_CHECK_TYPE[type] # No check: user must be careful about non-match value&type setattr(self, attr, value) self.vt = type def get_value_based_on_type(self): rawvt = self.rawvt if rawvt & VT_ARRAY: realtype = rawvt & ~VT_ARRAY attr = self.QUICK_CHECK_TYPE[realtype] attrtype = self.VARIANT_TYPE_BY_NAME[attr] array = SafeArray.from_PSAFEARRAY(self._VARIANT_NAME_3.parray) return array.to_list(attrtype) attr = self.QUICK_CHECK_TYPE[rawvt] if attr is None: return None if attr == "punkVal": # Quick hack for COM interface type # Do something clean with CHECK_TYPE ? x = gdef.IUnknown(self.punkVal) x.AddRef() return x return getattr(self, attr) def guess_type_and_set_value(self, value): for t, attr, check in self.CHECK_TYPE: try: checkres = check(value) except TypeError as e: continue if checkres: self.vt = t if attr is not None: setattr(self, attr, value) return True raise ValueError("Could not guess VT_TYPE for <{0}> of type <{1}>".format(value, type(value))) value = property(get_value_based_on_type, guess_type_and_set_value) # quick_check: bypass python lookup-limitation def generate_getter(vt_type, transfo=(lambda x:x), quick_check=QUICK_CHECK_TYPE): attr = quick_check[vt_type] @property def getter(self): if not self.rawvt == vt_type: raise ValueError("Invalid vt-type for attribute expected <{0}> got <{1}>".format(vt_type, self.vt)) return transfo(getattr(self, attr)) return getter asbstr = generate_getter(VT_BSTR) aslong = generate_getter(VT_I4) asbool = generate_getter(VT_BOOL) asdispatch = generate_getter(VT_DISPATCH, transfo=interfaces.IDispatch) asshort = generate_getter(VT_I2) asbyte = generate_getter(VT_UI1) asunknown = generate_getter(VT_UNKNOWN) def __repr__(self): return """<{0} of type {1}>""".format(type(self).__name__, self.vt) # Deprecated: remove me when test pass :) # ImprovedVariant.MAPPER = { # VT_UI1: ImprovedVariant.asbyte.fget, # VT_I2: ImprovedVariant.asshort.fget, # VT_DISPATCH: ImprovedVariant.asdispatch.fget, # VT_BOOL: ImprovedVariant.asbool.fget, # VT_I4: ImprovedVariant.aslong.fget, # VT_BSTR: ImprovedVariant.asbstr.fget, # VT_EMPTY: (lambda x: None), # VT_NULL: (lambda x: None), # VT_UNKNOWN: ImprovedVariant.asunknown.fget, # (VT_ARRAY | VT_BSTR): ImprovedVariant.asbstr_array.fget, # (VT_ARRAY | VT_I4): ImprovedVariant.aslong_array.fget, # (VT_ARRAY | VT_UI1): ImprovedVariant.asbyte_array.fget, # (VT_ARRAY | VT_BOOL): ImprovedVariant.asbool_array.fget # }
[docs] class COMImplementation(object): """The base class to implements COM object respecting a given interface""" IMPLEMENT = None def get_index_of_method(self, method): # This code is horrible but not totally my fault # the PyCFuncPtrObject->index is not exposed to Python.. # repr is: '<COM method offset 2: WinFunctionType at 0x035DDBE8>' rpr = repr(method) if not rpr.startswith("<COM method offset ") or ":" not in rpr: raise ValueError("Could not extract offset of {0}".format(rpr)) return int(rpr[len("<COM method offset "): rpr.index(":")]) def extract_methods_order(self, interface): index_and_method = sorted((self.get_index_of_method(m),name, m) for name, m in interface._functions_.items()) return index_and_method def verify_implem(self, interface): for func_name in interface._functions_: implem = getattr(self, func_name, None) if implem is None: raise ValueError("<{0}> implementing <{1}> has no method <{2}>".format(type(self).__name__, self.IMPLEMENT.__name__, func_name)) if not callable(implem): raise ValueError("{0} implementing <{1}>: <{2}> is not callable".format(type(self).__name__, self.IMPLEMENT.__name__, func_name)) return True def _create_vtable(self, interface): implems = [] names = [] for index, name, method in self.extract_methods_order(interface): func_implem = getattr(self, name) #'this' is a COM-interface of the type we are implementing types = [method.restype, interface] + list(method.argtypes) implems.append(ctypes.WINFUNCTYPE(*types)(func_implem)) names.append(name) class Vtable(ctypes.Structure): _fields_ = [(name, ctypes.c_void_p) for name in names] return Vtable(*[ctypes.cast(x, ctypes.c_void_p) for x in implems]), implems def __init__(self): self.verify_implem(self.IMPLEMENT) vtable, implems = self._create_vtable(self.IMPLEMENT) self.vtable = vtable self.implems = implems self.vtable_pointer = ctypes.pointer(self.vtable) self._as_parameter_ = ctypes.addressof(self.vtable_pointer)
[docs] def QueryInterface(self, this, piid, result): """Default ``QueryInterface`` implementation that returns ``self`` if piid is the implemented interface""" if piid[0] in (gdef.IUnknown.IID, self.IMPLEMENT.IID): result[0] = this return 1 return E_NOINTERFACE
[docs] def AddRef(self, *args): """Default ``AddRef`` implementation that returns ``1``""" return 1
[docs] def Release(self, *args): """Default ``Release`` implementation that returns ``1``""" return 0