import ctypes
import msvcrt
import os
import sys
import code
import math
import datetime
import warnings
from collections import namedtuple
import windows
from windows.dbgprint import dbgprint
import windows.generated_def as gdef
from .. import winproxy
from ..generated_def.winstructs import *
# Function resolution !
# should be in winproxy ?
def get_func_addr(dll_name, func_name):
# Load the DLL
ctypes.WinDLL(dll_name)
modules = windows.current_process.peb.modules
if not dll_name.lower().endswith(".dll"):
dll_name += ".dll"
mod = [x for x in modules if x.name == dll_name][0]
return mod.pe.exports[func_name]
def get_remote_func_addr(target, dll_name, func_name):
name_modules = [m for m in target.peb.modules if m.name == dll_name]
if not len(name_modules):
raise ValueError("Module <{0}> not loaded in target <{1}>".format(dll_name, target))
mod = name_modules[0]
return mod.pe.exports[func_name]
def is_wow_64(hProcess):
try:
fnIsWow64Process = get_func_addr("kernel32.dll", "IsWow64Process")
except winproxy.WinproxyError:
return False
IsWow64Process = ctypes.WINFUNCTYPE(BOOL, HANDLE, ctypes.POINTER(BOOL))(fnIsWow64Process)
Wow64Process = BOOL()
res = IsWow64Process(hProcess, ctypes.byref(Wow64Process))
if res:
return bool(Wow64Process)
raise ctypes.WinError()
[docs]
def create_file_from_handle(handle, mode="r"):
"""Return a Python :class:`file` around a ``Windows`` HANDLE"""
flags = os.O_BINARY if "b" in mode else os.O_TEXT
fd = msvcrt.open_osfhandle(handle, flags)
kwargs = {}
if windows.pycompat.is_py3 and flags == os.O_TEXT:
# Buffering, encoding
args = (100, "ascii")
else:
# Buffering
args = (0,)
# In py2 os.fdopen do not accept kwargs
return os.fdopen(fd, mode, *args)
[docs]
def get_handle_from_file(f):
"""Get the ``Windows`` HANDLE of a python :class:`file`"""
return msvcrt.get_osfhandle(f.fileno())
[docs]
def create_console():
"""Create a new console displaying STDOUT.
Useful in injection of GUI process"""
winproxy.AllocConsole()
stdout_handle = winproxy.GetStdHandle(gdef.STD_OUTPUT_HANDLE)
console_stdout = create_file_from_handle(stdout_handle, "w")
sys.stdout = console_stdout
stdin_handle = winproxy.GetStdHandle(gdef.STD_INPUT_HANDLE)
console_stdin = create_file_from_handle(stdin_handle, "r")
sys.stdin = console_stdin
stderr_handle = winproxy.GetStdHandle(gdef.STD_ERROR_HANDLE)
console_stderr = create_file_from_handle(stderr_handle, "w")
sys.stderr = console_stderr
[docs]
def create_process(path, args=None, dwCreationFlags=0, show_windows=True):
"""A convenient wrapper arround :func:`windows.winproxy.CreateProcessW`"""
proc_info = PROCESS_INFORMATION()
lpStartupInfo = None
if show_windows:
StartupInfo = STARTUPINFOW()
StartupInfo.cb = ctypes.sizeof(StartupInfo)
StartupInfo.dwFlags = 0
lpStartupInfo = ctypes.byref(StartupInfo)
lpCommandLine = None
if isinstance(path, bytes):
path = path.decode()
if args:
unicode_args = []
for arg in args:
if isinstance(arg, bytes):
arg = arg.decode()
unicode_args.append(arg)
lpCommandLine = (" ".join(unicode_args))
windows.winproxy.CreateProcessW(path, lpCommandLine=lpCommandLine, dwCreationFlags=dwCreationFlags, lpProcessInformation=ctypes.byref(proc_info), lpStartupInfo=lpStartupInfo)
dbgprint("CreateProcessW new process handle {:#x}".format(proc_info.hProcess), "HANDLE")
dbgprint("CreateProcessW new thread handle {:#x}".format(proc_info.hThread), "HANDLE")
dbgprint("Automatic close of thread handle {:#x}".format(proc_info.hThread), "HANDLE")
windows.winproxy.CloseHandle(proc_info.hThread) # Give access to a WinThread in addition of the WinProcess ?
return windows.winobject.process.WinProcess(pid=proc_info.dwProcessId, handle=proc_info.hProcess)
def device_io_control(handle, iocode, buffer):
outbuffer = ctypes.c_buffer(0x1000)
returned_size = gdef.DWORD()
windows.winproxy.DeviceIoControl(handle, iocode, buffer, lpOutBuffer=outbuffer, lpBytesReturned=returned_size)
return outbuffer[:returned_size.value]
# TODO: remove/rename me
# Real API with WinProcess.create() ?
def tmp_cp_as(path, token):
proc_info = PROCESS_INFORMATION()
windows.winproxy.CreateProcessAsUserW(token, path, lpCommandLine=None, dwCreationFlags=gdef.CREATE_NEW_CONSOLE, lpProcessInformation=ctypes.byref(proc_info), lpStartupInfo=None)
return windows.winobject.process.WinProcess(pid=proc_info.dwProcessId, handle=proc_info.hProcess)
def find_handle(proc, value):
return [h for h in windows.system.handles if h.dwProcessId == proc.pid and h.wValue == value]
def lookup_privilege_value(privilege_name):
luid = LUID()
winproxy.LookupPrivilegeValueW(None, privilege_name, byref(luid))
return luid
def lookup_privilege_name(privilege_value):
if isinstance(privilege_value, tuple):
luid = LUID(privilege_value[1], privilege_value[0])
privilege_value = luid
size = DWORD(0x100)
buff = ctypes.create_unicode_buffer(size.value)
winproxy.LookupPrivilegeNameW(None, privilege_value, buff, size)
return buff[:size.value]
def lookup_sid(psid):
"""Retrieves the name of the Computer/Domain and the name of the Account for a given SID
:returns: (:class:`unicode`, :class:`unicode`) - A tuple of two unicode strings
"""
usernamesize = gdef.DWORD(0x1000)
computernamesize = gdef.DWORD(0x1000)
username = ctypes.create_unicode_buffer(usernamesize.value)
computername = ctypes.create_unicode_buffer(computernamesize.value)
peUse = gdef.SID_NAME_USE()
winproxy.LookupAccountSidW(None, psid, username, usernamesize, computername, computernamesize, peUse)
return computername[:computernamesize.value], username[:usernamesize.value]
def lookup_name(computer, user):
sid_size = gdef.DWORD()
domain_size = gdef.DWORD(0)
stype = gdef.SID_NAME_USE()
try:
windows.winproxy.LookupAccountNameW(computer, user, None, sid_size, None, domain_size, stype)
except WindowsError as e:
pass
domain = ctypes.create_unicode_buffer(domain_size.value)
sid = ctypes.c_buffer(sid_size.value)
windows.winproxy.LookupAccountNameW(computer, user, sid, sid_size, domain, domain_size, stype)
psid = PSID.from_buffer_copy(ctypes.pointer(sid))
psid._save_value = sid # Save the SID object so that it's now deleted prematurly
return domain[:domain_size.value], psid, stype.value
[docs]
def enable_privilege(lpszPrivilege, bEnablePrivilege):
"""
Enable or disable a privilege::
enable_privilege(SE_DEBUG_NAME, True)
"""
tp = TOKEN_PRIVILEGES()
luid = LUID()
hToken = HANDLE()
winproxy.OpenProcessToken(winproxy.GetCurrentProcess(), TOKEN_ALL_ACCESS, byref(hToken))
winproxy.LookupPrivilegeValueW(None, lpszPrivilege, byref(luid))
tp.PrivilegeCount = 1
tp.Privileges[0].Luid = luid
if bEnablePrivilege:
tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED
else:
tp.Privileges[0].Attributes = 0
winproxy.AdjustTokenPrivileges(hToken, False, byref(tp), sizeof(TOKEN_PRIVILEGES))
winproxy.CloseHandle(hToken)
if winproxy.GetLastError() == gdef.ERROR_NOT_ALL_ASSIGNED:
raise ValueError("Failed to get privilege {0}".format(lpszPrivilege))
return True
[docs]
def check_is_elevated():
"""Return ``True`` if process is Admin"""
hToken = HANDLE()
elevation = TOKEN_ELEVATION()
cbsize = DWORD()
winproxy.OpenProcessToken(winproxy.GetCurrentProcess(), TOKEN_ALL_ACCESS, byref(hToken))
winproxy.GetTokenInformation(hToken, TokenElevation, byref(elevation), sizeof(elevation), byref(cbsize))
winproxy.CloseHandle(hToken)
return elevation.TokenIsElevated
[docs]
def check_debug():
"""Check that kernel is in debug mode (beware of NOUMEX):
https://msdn.microsoft.com/en-us/library/windows/hardware/ff556253(v=vs.85).aspx#_______noumex______
"""
options = windows.system.registry(r'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Control')['SystemStartOptions']
control = options.value
if "DEBUG" not in control:
# print "[-] Enable debug boot!"
# print "> bcdedit /debug on"
return False
if "DEBUG=NOUMEX" not in control:
pass
# print "[*] Warning noumex not set!"
# print "> bcdedit /set noumex on"
return True
UNIX_EPOCH = datetime.datetime(1970, 1, 1, 0, 0)
WINDOWS_EPOCH = datetime.datetime(1601, 1, 1, 0, 0)
# https://docs.microsoft.com/en-us/cpp/atl-mfc-shared/date-type?view=vs-2019
# Why keep it simple and have only one epoch ? :D
# I don't want to name this "DATE_EPOCH" as everything is a DATE
# So let's go with COMDATE as this structure seems very related to COM/AUTOMATION
COMDATE_EPOCH = datetime.datetime(1899, 12, 30, 0, 0)
WIN_TO_UNIX_EPOCH_SECOND = int((UNIX_EPOCH - WINDOWS_EPOCH).total_seconds())
WIN_TICK_PER_SECOND_INT = 10**7
WIN_TICK_PER_SECOND_FLOAT = 10.0**7
WIN_TO_UNIX_EPOCH_WIN_TICKS = WIN_TO_UNIX_EPOCH_SECOND * WIN_TICK_PER_SECOND_INT
# TODO: look in python stblib how filetime -> unix timestamp translation is down (os.stat code ?)
def unix_timestamp_from_filetime(filetime):
# Round the filetime
last_number = (filetime % 10)
# We do some sort of "manual rounding cause of py2 vs py3
# PY2: round(0.5) == 1
# PY3: round(0.5) == 0
if last_number == 5:
rounding = 1
else:
rounding = round(last_number / 10.0)
round_win_ticks = ((filetime // 10) + int(rounding)) * 10
return round((round_win_ticks - WIN_TO_UNIX_EPOCH_WIN_TICKS) / WIN_TICK_PER_SECOND_FLOAT, 7)
def datetime_from_filetime(filetime):
"""return a :class:`datetime.datetime` from a ``windows`` FILETIME int"""
# Manual non-approx rounding as filetime will not have a perfect representation as Python float
# We do some sort of "manual rounding cause of py2 vs py3
# PY2: round(0.5) == 1
# PY3: round(0.5) == 0
last_number = (filetime % 10)
if last_number == 5:
rounding = 1
else:
rounding = round(last_number / 10.0)
round_microsecond = (filetime // 10) + int(rounding)
return WINDOWS_EPOCH + datetime.timedelta(microseconds=round_microsecond)
def filetime_from_datetime(dtime):
"""Return the FILETIME value from a :class:`datetime.datetime` in a python :class:`int`"""
return int((dtime - WINDOWS_EPOCH).total_seconds()) * WIN_TICK_PER_SECOND_INT
def datetime_from_comdate(comtime):
# Hour values are expressed as the absolute value of the fractional part of the number.
if comtime < 0:
# The date timeline becomes discontinuous for date values less than 0 (before 30 December 1899). This is because the whole-number portion of the date value is treated as signed, while the fractional part is treated as unsigned.
# other words, the whole-number part of the date value may be positive or negative, while the fractional part of the date value is always added to the overall logical date.
# WTF :D
dec, nb = math.modf(comtime)
final_delta = nb + abs(dec)
return COMDATE_EPOCH + datetime.timedelta(final_delta)
return COMDATE_EPOCH + datetime.timedelta(comtime)
def datetime_from_systemtime(systime):
return datetime.datetime(
year=systime.wYear,
month=systime.wMonth,
day=systime.wDay,
hour=systime.wHour,
minute=systime.wMinute,
second=systime.wSecond,
microsecond=systime.wMilliseconds * 1000,
)
class FixedInteractiveConsole(code.InteractiveConsole):
def raw_input(self, prompt=">>>"):
sys.stdout.write(prompt)
return raw_input("")
[docs]
def pop_shell(locs=None):
"""Pop a console with an InterativeConsole"""
if locs is None:
locs = globals()
create_console()
FixedInteractiveConsole(locs).interact()
def get_kernel_modules():
warnings.warn("get_kernel_modules() will be removed: use windows.system.modules instead", DeprecationWarning)
return windows.system.modules
class FileStreamInformation(gdef.FILE_STREAM_INFORMATION):
@property
def name(self):
return gdef.LPWSTR(ctypes.addressof(self) + type(self).StreamName.offset).value
@property
def next(self):
if not self.NextEntryOffset:
return None
return type(self).from_address(ctypes.addressof(self) + self.NextEntryOffset)
def all(self):
return list(self)
def __iter__(self):
while self:
yield self
self = self.next
def __repr__(self):
return "<ADS name='{0}'>".format(self.name)
ntqueryinformationfile_info_structs = {
gdef.FileAccessInformation: gdef.FILE_ACCESS_INFORMATION,
gdef.FileAlignmentInformation: gdef.FILE_ALIGNMENT_INFORMATION,
gdef.FileAllInformation: gdef.FILE_ALL_INFORMATION,
gdef.FileAttributeTagInformation: gdef.FILE_ATTRIBUTE_TAG_INFORMATION,
gdef.FileBasicInformation: gdef.FILE_BASIC_INFORMATION,
gdef.FileEaInformation: gdef.FILE_EA_INFORMATION ,
gdef.FileInternalInformation: gdef.FILE_INTERNAL_INFORMATION,
gdef.FileIoPriorityHintInformation: gdef.FILE_IO_PRIORITY_HINT_INFORMATION,
gdef.FileModeInformation: gdef.FILE_MODE_INFORMATION,
gdef.FileNetworkOpenInformation: gdef.FILE_NETWORK_OPEN_INFORMATION,
gdef.FileNameInformation: gdef.FILE_NAME_INFORMATION,
gdef.FilePositionInformation: gdef.FILE_POSITION_INFORMATION,
gdef.FileStandardInformation: gdef.FILE_STANDARD_INFORMATION,
gdef.FileIsRemoteDeviceInformation: gdef.FILE_IS_REMOTE_DEVICE_INFORMATION,
gdef.FileStreamInformation: FileStreamInformation,
}
def query_file_information(file_or_handle, file_info_class):
if not isinstance(file_or_handle, windows.pycompat.int_types):
file_or_handle = windows.utils.get_handle_from_file(file_or_handle)
handle = file_or_handle
io_status = gdef.IO_STATUS_BLOCK()
info = ntqueryinformationfile_info_structs[file_info_class]()
# Do helper for 'is_pointer' / get pointed_size & co ? (useful for winproxy)
pinfo = ctypes.pointer(info)
try:
windows.winproxy.NtQueryInformationFile(handle, io_status, pinfo, ctypes.sizeof(info), FileInformationClass=file_info_class)
except Exception as e:
if not (e.winerror & 0xffffffff) == gdef.STATUS_BUFFER_OVERFLOW:
raise
# STATUS_BUFFER_OVERFLOW -> Guess we have a FILE_NAME_INFORMATION somewhere that need a bigger buffer
if file_info_class == gdef.FileNameInformation:
file_name_length = pinfo[0].FileNameLength
elif file_info_class == gdef.FileAllInformation:
file_name_length = pinfo[0].NameInformation.FileNameLength
elif file_info_class == gdef.FileStreamInformation:
file_name_length = 0x10000
else:
raise
full_size = ctypes.sizeof(info) + file_name_length # We add a little too much size for the sake of simplicity
buffer = ctypes.c_buffer(full_size)
windows.winproxy.NtQueryInformationFile(handle, io_status, buffer, full_size, FileInformationClass=file_info_class)
pinfo = ctypes.cast(buffer, ctypes.POINTER(ntqueryinformationfile_info_structs[file_info_class]))
info = pinfo[0]
# return list of ADS if FileStreamInformation ?
return info
class EAInfo(gdef.FILE_FULL_EA_INFORMATION):
@property
def name(self):
return gdef.LPCSTR(ctypes.addressof(self) + type(self).EaName.offset).value
@property
def value(self):
value_addr = ctypes.addressof(self) + type(self).EaName.offset + self.EaNameLength + 1 # +1 -> Name \x00
return (ctypes.c_char * self.EaValueLength).from_address(value_addr)[:]
@property
def next(self):
# NextEntryOffset is Relative to our current offset
if not self.NextEntryOffset:
return None
try: # First entry
raw_buffer = self._b_base_._raw_buffer_
except AttributeError as e:
raw_buffer = self._raw_buffer_
curoffset = getattr(self, "_raw_buffer_offset_", 0)
new = type(self).from_buffer(raw_buffer, curoffset + self.NextEntryOffset)
# Keep the underlying buffer easily accessible
new._raw_buffer_ = raw_buffer
new._raw_buffer_offset_ = curoffset + self.NextEntryOffset
return new
def __iter__(self):
while self:
yield self
self = self.next
def __repr__(self):
return '<{0} name="{1}">'.format(type(self).__name__, self.name)
MAXIMUM_EA_SIZE = 0x0000ffff
def query_extended_attributes(file_or_handle):
if isinstance(file_or_handle, file):
file_or_handle = windows.utils.get_handle_from_file(file_or_handle)
# Check EaSize
x = windows.utils.query_file_information(file_or_handle, gdef.FileEaInformation)
if not x.EaSize:
return
io_status = gdef.IO_STATUS_BLOCK()
# Handle Win10 / Win7
# Saw on Win10 -> EaSize > MAXIMUM_EA_SIZE
# Saw on Win7 -> EaSize not enought (STATUS_BUFFER_OVERFLOW)
buffsize = max(MAXIMUM_EA_SIZE, x.EaSize)
buffer = windows.utils.BUFFER(EAInfo)(size=buffsize)
windows.winproxy.NtQueryEaFile(file_or_handle, io_status, buffer, buffsize, False, None, 0, None, True)
return buffer[0]
ntqueryvolumeinformationfile_info_structs = {
gdef.FileFsAttributeInformation: gdef.FILE_FS_ATTRIBUTE_INFORMATION,
gdef.FileFsControlInformation: gdef.FILE_FS_CONTROL_INFORMATION,
gdef.FileFsDeviceInformation: gdef.FILE_FS_DEVICE_INFORMATION,
gdef.FileFsDriverPathInformation: gdef.FILE_FS_DRIVER_PATH_INFORMATION,
gdef.FileFsFullSizeInformation: gdef.FILE_FS_FULL_SIZE_INFORMATION,
gdef.FileFsObjectIdInformation: gdef.FILE_FS_OBJECTID_INFORMATION,
gdef.FileFsSizeInformation: gdef.FILE_FS_SIZE_INFORMATION,
gdef.FileFsVolumeInformation: gdef.FILE_FS_VOLUME_INFORMATION,
gdef.FileFsSectorSizeInformation: gdef.FILE_FS_SECTOR_SIZE_INFORMATION,
}
# TODO: FileFsDriverPathInformation
# TODO: Extended FILE_FS_VOLUME_INFORMATION that can read the real value of 'VolumeLabel'
def query_volume_information(file_or_handle, volume_info_class):
if not isinstance(file_or_handle, windows.pycompat.int_types):
file_or_handle = get_handle_from_file(file_or_handle)
handle = file_or_handle
io_status = gdef.IO_STATUS_BLOCK()
info = ntqueryvolumeinformationfile_info_structs[volume_info_class]()
# Do helper for 'is_pointer' / get pointed_size & co ? (useful for winproxy)
pinfo = ctypes.pointer(info)
try:
windows.winproxy.NtQueryVolumeInformationFile(handle, io_status, pinfo, ctypes.sizeof(info), FsInformationClass=volume_info_class)
except WindowsError as e:
# import pdb;pdb.set_trace()
if not (e.winerror & 0xffffffff) == gdef.STATUS_BUFFER_OVERFLOW:
raise
if volume_info_class == gdef.FileFsAttributeInformation:
file_name_length = pinfo[0].FileSystemNameLength
elif volume_info_class == gdef.FileFsVolumeInformation:
# Well VolumeLabelLength is clearly broken (after testing..) so we are adding some bytes to it..
file_name_length = pinfo[0].VolumeLabelLength + 0x100 # I have seen cases where the VolumeLabelLength is not even enough..
else:
raise
full_size = ctypes.sizeof(info) + file_name_length # We add a little too much size for the sake of simplicity
buffer = ctypes.c_buffer(full_size)
windows.winproxy.NtQueryVolumeInformationFile(handle, io_status, buffer, full_size, FsInformationClass=volume_info_class)
pinfo = ctypes.cast(buffer, ctypes.POINTER(ntqueryvolumeinformationfile_info_structs[volume_info_class]))
info = pinfo[0]
return info
return info
# String stuff
def ntstatus(code):
return windows.generated_def.ntstatus.NtStatusException(code)
_WINERROR_BY_VALUE = None
def winerror(code):
global _WINERROR_BY_VALUE
if not _WINERROR_BY_VALUE: # Lazy init
_WINERROR_BY_VALUE = gdef.FlagMapper(*(getattr(gdef, error) for error in gdef.meta.errors))
val = _WINERROR_BY_VALUE[code]
if val is code: # Not found
val = _WINERROR_BY_VALUE[code & 0xffff] # Hresult: extract code (https://en.wikipedia.org/wiki/HRESULT)
return val
[docs]
def get_long_path(path):
"""Return the long path form for ``path``.
:raise: :class:`~windows.winproxy.WinproxyError` if ``path`` does not exists
:param path: a valid Windows path
:type path: :class:`str` | :obj:`unicode`
:returns: :class:`str` | :obj:`unicode` -- same type as ``path`` parameter
"""
size = 0x1000
buffer = ctypes.create_unicode_buffer(size)
rsize = winproxy.GetLongPathNameW(path, buffer, size)
return buffer[:rsize]
[docs]
def get_short_path(path):
"""Return the short path form for ``path``
:raise: :class:`~windows.winproxy.WinproxyError` if ``path`` does not exists
:param path: a valid Windows path
:type path: :class:`str` | :obj:`unicode`
:returns: :class:`str` | :obj:`unicode` -- same type as ``path`` parameter
"""
size = 0x1000
buffer = ctypes.create_unicode_buffer(size)
rsize = winproxy.GetShortPathNameW(path, buffer, size)
return buffer[:rsize]
def dospath_to_ntpath(dospath):
ustring = gdef.UNICODE_STRING()
windows.winproxy.RtlDosPathNameToNtPathName_U(dospath, ustring, None, None)
return ustring.str
def get_shared_mapping(name=None, handle=INVALID_HANDLE_VALUE, size=0x1000):
# TODO: real code
h = windows.winproxy.CreateFileMappingW(handle, dwMaximumSizeLow=size, lpName=name)
addr = windows.winproxy.MapViewOfFile(h, dwNumberOfBytesToMap=size)
return addr
def create_file(name, access=gdef.GENERIC_READ, share=gdef.FILE_SHARE_READ, security=None, creation=gdef.OPEN_EXISTING, flags=gdef.FILE_ATTRIBUTE_NORMAL):
return windows.winproxy.CreateFileW(name, access, share, security, creation, flags, 0)
def mapfile(file):
fhandle = get_handle_from_file(file)
h = windows.winproxy.CreateFileMappingA(fhandle, None, PAGE_READONLY, 0, 0, None)
addr = windows.winproxy.MapViewOfFile(h, dwDesiredAccess=FILE_MAP_READ, dwNumberOfBytesToMap=0)
return addr
def decompress_buffer(buffer, comptype=gdef.COMPRESSION_FORMAT_LZNT1, uncompress_size=None):
if uncompress_size is None:
uncompress_size = len(buffer) * 10
result_size = DWORD()
uncompressed = ctypes.c_buffer(uncompress_size)
windows.winproxy.RtlDecompressBuffer(comptype, uncompressed, uncompress_size, buffer, len(buffer), result_size)
return uncompressed[:result_size.value]
def compress_buffer(buffer, comptype=gdef.COMPRESSION_FORMAT_LZNT1):
uncompress_size = len(buffer)
CompressedBufferSize = uncompress_size + 0x1000
CompressedBuffer = ctypes.c_buffer(CompressedBufferSize)
chunk = 4096
final_size = gdef.DWORD()
work_space_size = gdef.ULONG()
ignore_data = gdef.ULONG()
windows.winproxy.RtlGetCompressionWorkSpaceSize(comptype, work_space_size, ignore_data)
work_space = ctypes.c_buffer(work_space_size.value)
windows.winproxy.RtlCompressBuffer(comptype, buffer, uncompress_size, CompressedBuffer, CompressedBufferSize, chunk, final_size, work_space)
return CompressedBuffer[:final_size.value]
# sid.py + real SID type ?
def get_known_sid(sid_type):
size = DWORD()
try:
windows.winproxy.CreateWellKnownSid(sid_type, None, None, size)
except WindowsError:
pass
buffer = ctypes.c_buffer(size.value)
windows.winproxy.CreateWellKnownSid(sid_type, None, buffer, size)
return ctypes.cast(buffer, PSID)
UnloadEventTraceInfo = namedtuple("UnloadEventTraceInfo", ["size", "nb_elt", "array_ptr"])
def get_unload_event_trace():
x = PULONG()
y = PULONG()
z = PVOID()
windows.winproxy.RtlGetUnloadEventTraceEx(x, y, z)
return UnloadEventTraceInfo(x[0], y[0], z.value)
[docs]
class VirtualProtected(object):
"""
A context manager usable like `VirtualProtect` that will restore the old protection at exit ::
with utils.VirtualProtected(IATentry.addr, ctypes.sizeof(PVOID), gdef.PAGE_EXECUTE_READWRITE):
IATentry.value = 0x42424242
"""
def __init__(self, addr, size, new_protect):
if (addr % 0x1000):
addr = addr - addr % 0x1000
self.addr = addr
self.size = size
self.new_protect = new_protect
def __enter__(self):
self.old_protect = DWORD()
winproxy.VirtualProtect(self.addr, self.size, self.new_protect, ctypes.byref(self.old_protect))
return self
def __exit__(self, exc_type, exc_value, traceback):
winproxy.VirtualProtect(self.addr, self.size, self.old_protect.value, ctypes.byref(self.old_protect))
return False
[docs]
class DisableWow64FsRedirection(object):
"""
A context manager that disable the SysWow64 Filesystem Redirection ::
if is_process_32_bits:
def pop_calc_64():
with windows.utils.DisableWow64FsRedirection():
return windows.utils.create_process(r"C:\Windows\system32\calc.exe", True)
"""
def __enter__(self):
if windows.current_process.bitness == 64 or windows.system.bitness == 32:
return self
self.OldValue = PVOID()
winproxy.Wow64DisableWow64FsRedirection(ctypes.byref(self.OldValue))
return self
def __exit__(self, exc_type, exc_value, traceback):
if windows.current_process.bitness == 64 or windows.system.bitness == 32:
return False
winproxy.Wow64RevertWow64FsRedirection(self.OldValue)
return False