Source code for windows.winobject.registry

import sys
import ctypes
import itertools
import struct
from collections import namedtuple, defaultdict

import windows
from windows.dbgprint import dbgprint
import windows.generated_def as gdef
from windows import winproxy, security
from windows.pycompat import basestring, int_types, is_py3

WENCODING = "utf-16-le"

# So _winreg does not handle unicode stuff in Py2 :(
# Need to rewrite everything to get it working with unicode

class WinRegistryKey(gdef.HKEY):
    _close_function = staticmethod(winproxy.RegCloseKey)

    def __del__(self):
        if sys is None or sys.path is None: # Late shutdown (not sur winproxy is still up)
            return
        if self: # Not NULL handle ?
            dbgprint(u"Closing registry key handle {0:#x}".format(self.value), 'REGISTRY')
            self._close_function(self)




class ExpectWindowsError(object):
    def __init__(self, errornumber):
        self.errornumber = errornumber

    def __enter__(self):
        pass

    def __exit__(self, etype, e, tb):
        return (etype in (winproxy.WinproxyError, WindowsError) and e.winerror == self.errornumber)

# Translation reg-buffer <-> python methodes
def Reg2Py_QWORD(buffer, size):
    return buffer.cast(gdef.PULONG64)[0]

def Py2Reg_QWORD(obj):
    return struct.pack("<Q", obj)

def Reg2Py_DWORD(buffer, size):
    # Check size ?
    return buffer.cast(gdef.LPDWORD)[0]

def Py2Reg_DWORD(obj):
    return struct.pack("<I", obj)


def Reg2Py_DWORD_BIG_ENDIAN(buffer, size):
    # Check size ?
    return (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) + buffer[3]

def Py2Reg_DWORD_BIG_ENDIAN(obj):
    return struct.pack(">I", obj)


def Reg2Py_BINARY(buffer, size):
    return bytes(bytearray(buffer[:size]))

def Py2Reg_BINARY(obj):
    # latin-1 encoding if py3 & type is str ?
    return obj


def Reg2Py_SZ(buffer, size):
    # Buffer is UTF16. buffer is extended-buffer
    if size == 0:
        return u""
    if buffer[size - 1] == 0 and buffer[size - 2] == 0:
        # NULL TERMINATED: EASY
        return buffer.as_wstring()
    # Not null terminated: keep last byte
    assert not size % 2
    return (gdef.WCHAR * (size // 2)).from_buffer(buffer)[:]

def Py2Reg_SZ(obj):
    return obj.encode(WENCODING)

def Reg2Py_Multi_SZ(buffer, size):
    if not size:
        return []
    # Simple path
    if is_py3:
        rawstr = bytes(buffer)
    else:
        rawstr = "".join([chr(c) for c in buffer[:size]])
    try:
        unistr = rawstr.decode(WENCODING)
        return unistr.rstrip(u"\x00").split(u"\x00")
    except UnicodeDecodeError as e:
        pass
    # Complexe-path
    # This is not some valide UTF-16
    # Try our best to extract some stuff from raw
    return rawstr.rstrip(b"\x00").split(b"\x00")


def Py2Reg_Multi_SZ(obj):
    # Work on encoded values (to prevent str/unicode errors)
    uni_list = [s.encode(WENCODING) for s in obj]
    # Separate by UTF-16 NULL BYTE (2 \x00)
    uni_str = b"\x00\x00".join(uni_list)
    # Add UTF-16 NULL byte for final string + final UTF-16 \x00 (4 \x00)
    return uni_str + b"\x00\x00\x00\x00"

DECODE_METHOD = 0
ENCODE_METHOD = 1


KNOWN_ENCODE_DECODE_METHODS = {
    gdef.REG_SZ: (Reg2Py_SZ, Py2Reg_SZ),
    gdef.REG_EXPAND_SZ: (Reg2Py_SZ, Py2Reg_SZ),
    gdef.REG_MULTI_SZ: (Reg2Py_Multi_SZ, Py2Reg_Multi_SZ),
    gdef.REG_DWORD: (Reg2Py_DWORD, Py2Reg_DWORD),
    gdef.REG_DWORD_BIG_ENDIAN: (Reg2Py_DWORD_BIG_ENDIAN, Py2Reg_DWORD_BIG_ENDIAN),
    gdef.REG_QWORD: (Reg2Py_QWORD, Py2Reg_QWORD),
    # Binary formats
    gdef.REG_LINK: (Reg2Py_BINARY, Py2Reg_BINARY), # TESTING
    gdef.REG_BINARY: (Reg2Py_BINARY, Py2Reg_BINARY),
    gdef.REG_NONE: (Reg2Py_BINARY, Py2Reg_BINARY),
}

# All unknown format are seens as binary data
UNKNOWM_FORMAT = (Reg2Py_BINARY, Py2Reg_BINARY)
ENCODE_DECODE_METHODS = defaultdict(lambda: UNKNOWM_FORMAT, KNOWN_ENCODE_DECODE_METHODS)

def decode_registry_buffer(type, buffer, size):
    try:
        return ENCODE_DECODE_METHODS[type][DECODE_METHOD](buffer, size)
    except UnicodeDecodeError as e:
        # Best effort if any decoding error happen
        return "".join(chr(c) for c in buffer[:size])


KeyValue = namedtuple("KeyValue", ["name", "value", "type"])
"""A registry value (name, value, type)"""


[docs] class PyHKey(object): """A windows registry key""" def __init__(self, surkey, name, sam=gdef.KEY_READ): self.surkey = surkey self.name = name self.fullname = self.surkey.fullname + "\\" + self.name if self.name else self.surkey.name self.sam = sam self._phkey = None #self.phkey def __repr__(self): return '<PyHKey "{0}">'.format(self.fullname) def _open_key(self, handle, name, sam): result = WinRegistryKey() winproxy.RegOpenKeyExW(handle, name, 0, sam, result) # TODO: options REG_OPTION_OPEN_LINK dbgprint(u"Opening registry key <{0}> (handle={1:#x})".format(name, result.value), "REGISTRY") return result def _create_key(self, parent, name, sam): result = WinRegistryKey() flags = 0 winproxy.RegCreateKeyExW(parent, name, 0, None, flags, sam, None, result, None) dbgprint(u"Creating registry key <{0}> (handle={1:#x})".format(name, result.value), "REGISTRY") return result @property def phkey(self): if self._phkey is not None: return self._phkey try: self._phkey = self._open_key(self.surkey.phkey, self.name, self.sam) except WindowsError as e: raise WindowsError(e.winerror, "Could not open registry key <{0}> ({1})".format(self.fullname, e.strerror)) return self._phkey @property def exists(self): # May have been deleted in between # So <self._phkey> tells use nothing if self._phkey: # Not None + pointer not NULL try: self.get_key_size_info() except WindowsError as e: if e.winerror == gdef.ERROR_KEY_DELETED: return False raise return True try: tmpphkey = self._open_key(self.surkey.phkey, self.name, gdef.KEY_READ) except WindowsError as e: return False # tmpphkey will be garbage collected and auto-closed return True @property def subkeys(self): """The subkeys of the registry key :type: [:class:`PyHKey`] - A list of keys""" res = [] with ExpectWindowsError(259): default_name_size = 256 + 1 name_size = gdef.DWORD(default_name_size) name_buffer = ctypes.create_unicode_buffer(name_size.value) for i in itertools.count(): name_size.value = default_name_size winproxy.RegEnumKeyExW(self.phkey, i, name_buffer, name_size, None, None, None, None) res.append(name_buffer[:name_size.value]) # Will allow key name with \x00 inside return [PyHKey(self, n) for n in res] def get_key_size_info(self): max_name_len = gdef.DWORD() max_value_len = gdef.DWORD() winproxy.RegQueryInfoKeyW(self.phkey, None, None, None, None, None, None, None, max_name_len, max_value_len, None, None) return (max_name_len.value, max_value_len.value) @property def values(self): """The values of the registry key :type: [:class:`KeyValue`] - A list of values""" res = [] # Get max info keys max_name_size, max_data_size = self.get_key_size_info() # Null terminators max_name_size += 1 max_data_size += 2 with ExpectWindowsError(259): for i in itertools.count(): value_type = gdef.DWORD() namesize = gdef.DWORD(max_name_size) keyname = ctypes.create_unicode_buffer(namesize.value) datasize = gdef.DWORD(max_data_size) databuffer = windows.utils.BUFFER(gdef.BYTE, nbelt=datasize.value)() # A value can have been added in-between. # So recheck the size given by get_key_size_info :) # But check 10 times max as RegEnumValueW may bug (seen) and always return ERROR_MORE_DATA even with enought size for _ in range(10): try: winproxy.RegEnumValueW(self.phkey, i, keyname, namesize, None, value_type, databuffer, datasize) break except WindowsError as e: if e.winerror != gdef.ERROR_MORE_DATA: raise # I found some strange Windows where even with a big enought buffer: # - the data was filled # - ERROR_MORE_DATA was returned ## To prevent such bug to trigger and infinite loop, two things # - If the retuned namesize <= the passed keysize and keyname is not empty -> return the data` # - Max 10 test to prevent Infinite loop if ((namesize.value <= max_name_size) and (datasize.value <= max_data_size) and (keyname[:namesize.value].count("\x00") < namesize.value)): # Not just 0 Zero ? break # Update the sizes / buffers & try again :) max_name_size, max_data_size = self.get_key_size_info() max_name_size = max(max_name_size + 1, namesize.value + 1) # namesize.value may be > to max_name_size apparently (guessed) max_data_size = max(max_data_size + 2, datasize.value + 2) # datasize.value may be > to max_data_size apparently (seen) namesize = gdef.DWORD(max_name_size) keyname = ctypes.create_unicode_buffer(namesize.value) datasize = gdef.DWORD(max_data_size) databuffer = windows.utils.BUFFER(gdef.BYTE, nbelt=datasize.value)() else: # Probably a windows bug that prevent us from retrieving the data # Raise something (thus preventing getting the other values..) ? ignore it ? raise ValueError("Could not extract registry key values, problably a Windows/hook bug") vobj = decode_registry_buffer(value_type.value, databuffer, datasize.value) res.append(KeyValue(keyname.value, vobj, value_type.value)) return res @property def info(self): # Need other stuff ? nb_key = gdef.DWORD() nb_values = gdef.DWORD() last_modif = gdef.FILETIME() winproxy.RegQueryInfoKeyW(self.phkey, None, None, None, nb_key, None, None, nb_values, None, None, None, last_modif) return nb_key.value, nb_values.value, int(last_modif) @property def last_write(self): return self.info[2]
[docs] def get(self, value_name): """Retrieves the value ``value_name`` :rtype: :class:`KeyValue` """ type = gdef.DWORD(0) size = gdef.DWORD(0x100) while True: buffer = windows.utils.BUFFER(gdef.BYTE, nbelt=size.value)() try: winproxy.RegQueryValueExW(self.phkey, value_name, None, type, buffer, size) break except WindowsError as e: if e.winerror != gdef.ERROR_MORE_DATA: raise size.value *= 2 buffer = windows.utils.BUFFER(gdef.BYTE, nbelt=size.value)() continue vobj = decode_registry_buffer(type.value, buffer, size.value) return KeyValue(value_name, vobj, type.value)
def _guess_value_type(self, value): if isinstance(value, basestring): return gdef.REG_SZ elif isinstance(value, int_types): return gdef.REG_DWORD # elif isinstance(value, (list, tuple)): # if all(isinstance(v, basestring) in value): # return _winreg.REG_MULTI_SZ raise ValueError("Cannot guest registry type of value to set <{0}>".format(value))
[docs] def set(self, name, value, type=None): """Set the value for ``name`` to ``value``. if ``type`` is None try to guess items""" if type is None: type = self._guess_value_type(value) buffer = ENCODE_DECODE_METHODS[type][ENCODE_METHOD](value) if isinstance(buffer, bytes): # Should not be unicode at this point buffer = windows.utils.BUFFER(gdef.BYTE).from_buffer_copy(buffer) return winproxy.RegSetValueExW(self.phkey, name, 0, type, buffer, len(buffer))
[docs] def delete_value(self, name): """Delete the value with ``name``""" return winproxy.RegDeleteValueW(self.phkey, name)
[docs] def open_subkey(self, name, sam=None): """Open the subkey ``name`` :rtype: :class:`PyHKey` """ if sam is None: sam = self.sam return PyHKey(self, name, sam)
[docs] def reopen(self, sam): """Reopen the registry key with a new ``sam`` :rtype: :class:`PyHKey` """ return PyHKey(self.surkey, self.name, sam)
[docs] def create(self): """Create the registry key""" try: self._phkey = self._create_key(self.surkey.phkey, self.name, self.sam) except WindowsError as e: raise WindowsError(e.winerror, "Could not create registry key <{0}> ({1})".format(self.fullname, e.strerror)) return self
[docs] def delete(self): """Delete the registry key""" # Allow a 'recursive' param to empty before delete ? try: windows.winproxy.RegDeleteKeyExW(self.surkey.phkey, self.name, self.sam, 0) except WindowsError as e: raise WindowsError(e.winerror, "Could not delete registry key <{0}> ({1})".format(self.fullname, e.strerror)) return None
def empty(self): windows.winproxy.RegDeleteTreeW(self.phkey, None) ## Security API def get_security_descriptor(self, query_sacl=False, flags=security.SecurityDescriptor.DEFAULT_SECURITY_INFORMATION): open_flags = gdef.READ_CONTROL if query_sacl: open_flags |= gdef.ACCESS_SYSTEM_SECURITY # tmp_handle will autoclose if SecurityDescriptor.from_handle fails tmp_handle = self._open_key(None, self.fullname, open_flags) return security.SecurityDescriptor.from_handle(tmp_handle, query_sacl=query_sacl, flags=flags, obj_type="key") def set_security_descriptor(self, sd): if isinstance(sd, basestring): sd = security.SecurityDescriptor.from_string(sd) open_flags = gdef.WRITE_OWNER | gdef.WRITE_DAC if (sd.sacl and any(ace.Header.AceType != gdef.SYSTEM_MANDATORY_LABEL_ACE_TYPE for ace in sd.sacl)): # Print error if requested but no SecurityPrivilege ? open_flags |= gdef.ACCESS_SYSTEM_SECURITY tmp_handle = self._open_key(self.surkey.phkey, self.name, open_flags) # tmp_handle will autoclose if SecurityDescriptor.from_handle fails sd._apply_to_handle_and_type(tmp_handle, gdef.SE_KERNEL_OBJECT) security_descriptor = property(get_security_descriptor, set_security_descriptor) _sentinel = object()
[docs] def update_security_descriptor(self, owner=_sentinel, dacl=_sentinel, sacl=_sentinel, group=_sentinel): """[WIP] Api for simple SD update may change""" # In any case, open the key with the given bitness of explicitly stated in the sam open_flags = self.sam & (gdef.KEY_WOW64_64KEY | gdef.KEY_WOW64_32KEY) set_security_info_flags = 0 temp_sd = security.SecurityDescriptor.create() if owner is not self._sentinel: open_flags |= gdef.WRITE_OWNER set_security_info_flags |= gdef.OWNER_SECURITY_INFORMATION temp_sd.owner = owner if dacl is not self._sentinel: open_flags |= gdef.WRITE_DAC set_security_info_flags |= gdef.DACL_SECURITY_INFORMATION temp_sd.owner = dacl if sacl is not self._sentinel: temp_sd.sacl = sacl set_security_info_flags |= gdef.LABEL_SECURITY_INFORMATION if any(ace.Header.AceType != gdef.SYSTEM_MANDATORY_LABEL_ACE_TYPE for ace in sacl): open_flags |= gdef.ACCESS_SYSTEM_SECURITY set_security_info_flags |= gdef.SACL_SECURITY_INFORMATION if group is not self._sentinel: raise NotImplementedError("update_security_descriptor with group") # Create the temporary handle for assignement tmp_handle = self._open_key(self.surkey.phkey, self.name, open_flags) # tmp_handle will autoclose if SecurityDescriptor.from_handle fails temp_sd._apply_to_handle_and_type(tmp_handle, gdef.SE_KERNEL_OBJECT, set_security_info_flags)
##! Security API
[docs] def __setitem__(self, name, value): rtype = None if not (isinstance(value, basestring) or isinstance(value, int_types)): value, rtype = value return self.set(name, value, rtype)
__getitem__ = get __delitem__ = delete_value __call__ = open_subkey
class DummyPHKEY(object): def __init__(self, phkey, name): self.phkey = phkey self.name = name HKEY_LOCAL_MACHINE = PyHKey(DummyPHKEY(gdef.HKEY_LOCAL_MACHINE, "HKEY_LOCAL_MACHINE"), "", gdef.KEY_READ) HKEY_CLASSES_ROOT = PyHKey(DummyPHKEY(gdef.HKEY_CLASSES_ROOT, "HKEY_CLASSES_ROOT"), "", gdef.KEY_READ ) HKEY_CURRENT_USER = PyHKey(DummyPHKEY(gdef.HKEY_CURRENT_USER, "HKEY_CURRENT_USER"), "", gdef.KEY_READ) HKEY_DYN_DATA = PyHKey(DummyPHKEY(gdef.HKEY_DYN_DATA, "HKEY_DYN_DATA"), "", gdef.KEY_READ) HKEY_PERFORMANCE_DATA = PyHKey(DummyPHKEY(gdef.HKEY_PERFORMANCE_DATA, "HKEY_PERFORMANCE_DATA"), "", gdef.KEY_READ) HKEY_USERS = PyHKey(DummyPHKEY(gdef.HKEY_USERS, "HKEY_USERS"), "", gdef.KEY_READ )
[docs] class Registry(object): """The ``Windows`` registry""" registry_base_keys = { "HKEY_LOCAL_MACHINE" : HKEY_LOCAL_MACHINE, "HKEY_CLASSES_ROOT" : HKEY_CLASSES_ROOT, "HKEY_CURRENT_USER" : HKEY_CURRENT_USER, "HKEY_DYN_DATA" : HKEY_DYN_DATA, "HKEY_PERFORMANCE_DATA": HKEY_PERFORMANCE_DATA, "HKEY_USERS" : HKEY_USERS } def __init__(self, sam=gdef.KEY_READ): self.sam = sam
[docs] @classmethod def reopen(cls, sam): """Return a new :class:`Registry` using ``sam`` as the new default :rtype: :class:`Registry` """ return cls(sam)
[docs] def __call__(self, name, sam=None): """Get a registry key:: registry(r"HKEY_LOCAL_MACHINE\\Software") registry("HKEY_LOCAL_MACHINE")("Software") :rtype: :class:`PyHKey` """ if sam is None: sam = self.sam if name in self.registry_base_keys: key = self.registry_base_keys[name] if sam != key.sam: key = key.reopen(sam) return key if "\\" not in name: raise ValueError("Unknow registry base key <{0}>".format(name)) base_name, subkey = name.split("\\", 1) if base_name not in self.registry_base_keys: raise ValueError("Unknow registry base key <{0}>".format(base_name)) return self.registry_base_keys[base_name](subkey, sam)