18. Samples of code¶
18.1. Processes¶
18.1.1. windows.current_process
¶
import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.native_exec.simple_x86 as x86
import windows.native_exec.simple_x64 as x64
# Here is our current process
cp = windows.current_process
print("current process is {cp}".format(cp=windows.current_process))
print("current process is a <{cp.bitness}> bits process".format(cp=cp))
print("current process is a SysWow64 process ? <{cp.is_wow_64}>".format(cp=cp))
print("current process pid <{cp.pid}> and ppid <{cp.ppid}>".format(cp=cp))
print("Here are the current process threads: <{cp.threads}>".format(cp=cp))
print("Let's execute some native code ! (0x41 + 1)")
if windows.current_process.bitness == 32:
# Let's generate some native code
code = x86.MultipleInstr()
code += x86.Mov("Eax", 0x41)
code += x86.Inc("EAX")
code += x86.Ret()
else:
code = x64.MultipleInstr()
code += x64.Mov("RAX", 0x41)
code += x64.Inc("RAX")
code += x64.Ret()
native_code = code.get_code()
v = windows.current_process.execute(native_code)
print("Native code returned <{0}>".format(hex(v)))
print("Allocating memory in current process")
addr = cp.virtual_alloc(0x1000) # Default alloc is RWX (so secure !)
print("Allocated memory is at <{0}>".format(hex(addr)))
print("Writing 'SOME STUFF' in allocation memory")
cp.write_memory(addr, "SOME STUFF")
print("Reading memory : <{0}>".format(repr(cp.read_memory(addr, 20))))
Output
(cmd) python process\current_process.py
current process is <windows.winobject.process.CurrentProcess object at 0x000001DD8BC5CA10>
current process is a <64> bits process
current process is a SysWow64 process ? <False>
current process pid <26976> and ppid <28256>
Here are the current process threads: <[<WinThread 15220 owner "CurrentProcess" at 0x1dd8d20b0d0>, <WinThread 27912 owner "CurrentProcess" at 0x1dd8d20afd0>, <WinThread 27832 owner "CurrentProcess" at 0x1dd8d20af10>, <WinThread 26820 owner "CurrentProcess" at 0x1dd8d20ae90>]>
Let's execute some native code ! (0x41 + 1)
Native code returned <0x42>
Allocating memory in current process
Allocated memory is at <0x1dd8d2f0000>
Writing 'SOME STUFF' in allocation memory
Reading memory : <b'SOME STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'>
18.1.2. Remote process : WinProcess
¶
import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.native_exec.simple_x86 as x86
import windows.native_exec.simple_x64 as x64
print("Creating a notepad") ## Replaced calc.exe by notepad.exe cause of windows 10.
notepad = windows.utils.create_process(r"C:\windows\system32\notepad.exe")
# You don't need to do that in our case, but it's useful to now
print("Looking for notepads in the processes")
all_notepads = [proc for proc in windows.system.processes if proc.name == "notepad.exe"]
print("They are currently <{0}> notepads running on the system".format(len(all_notepads)))
print("Let's play with our notepad: <{notepad}>".format(notepad=notepad))
print("Our notepad pid is {notepad.pid}".format(notepad=notepad))
print("Our notepad is a <{notepad.bitness}> bits process".format(notepad=notepad))
print("Our notepad is a SysWow64 process ? <{notepad.is_wow_64}>".format(notepad=notepad))
print("Our notepad have threads ! <{notepad.threads}>".format(notepad=notepad))
# PEB STUFF
peb = notepad.peb
print("Exploring our notepad PEB ! {peb}".format(peb=peb))
print("Command line is {peb.commandline}".format(peb=peb))
modules = peb.modules
print("Here are 3 loaded modules: {0}".format(modules[:3]))
# See iat_hook.py for module exploration
# Remote alloc / read / write
print("Allocating memory in our notepad")
addr = notepad.virtual_alloc(0x1000)
print("Allocated memory is at <{0}>".format(hex(addr)))
print("Writing 'SOME STUFF' in allocated memory")
notepad.write_memory(addr, "SOME STUFF")
print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20))))
# Remote Execution
print("Execution some native code in our notepad (write 0x424242 at allocated address + return 0x1337)")
if notepad.bitness == 32:
# Let's generate some native code
code = x86.MultipleInstr()
code += x86.Mov(x86.deref(addr), 0x42424242)
code += x86.Mov("EAX", 0x1337)
code += x86.Ret()
else:
code = x64.MultipleInstr()
code += x64.Mov('RAX', addr)
code += x64.Mov(x64.mem("[RAX]"), 0x42424242)
code += x64.Mov("RAX", 0x1337)
code += x64.Ret()
print("Executing native code !")
t = notepad.execute(code.get_code())
t.wait()
print("Return code = {0}".format(hex(t.exit_code)))
print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20))))
print("Executing python code !")
# Make 'windows' importable in remote python
notepad.execute_python("import sys; sys.path.append(r'{0}')".format(sys.path[-1]))
notepad.execute_python("import windows")
# Let's write in the notepad 'current_process' memory :)
notepad.execute_python("addr = {addr}; windows.current_process.write_memory(addr, 'HELLO FROM notepad')".format(addr=addr))
print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20))))
# python_execute is 'safe':
# - it waits for the thread completion
# - it raise an error if remote code raised some
try:
print("Trying to import in remote module 'FAKE_MODULE'")
notepad.execute_python("def func():\n import FAKE_MODULE\nfunc()")
except windows.injection.RemotePythonError as e:
print("Exception in remote process!")
print(e)
print("That's all ! killing the notepad")
notepad.exit()
Output
(cmd) python process\remote_process.py
Creating a notepad
Looking for notepads in the processes
They are currently <1> notepads running on the system
Let's play with our notepad: <<WinProcess "notepad.exe" pid 8400 at 0x6238510>>
Our notepad pid is 8400
Our notepad is a <32> bits process
Our notepad is a SysWow64 process ? <True>
Our notepad have threads ! <[<WinThread 16028 owner "notepad.exe" at 0x6238f10>, <WinThread 5924 owner "notepad.exe" at 0x6238a10>, <WinThread 11620 owner "notepad.exe" at 0x6238fb0>, <WinThread 3480 owner "notepad.exe" at 0x6238ff0>, <WinThread 200 owner "notepad.exe" at 0x62389b0>, <WinThread 16804 owner "notepad.exe" at 0x62389f0>, <WinThread 13340 owner "notepad.exe" at 0x6238930>]>
Exploring our notepad PEB ! <windows.winobject.process.RemotePEB object at 0x061BDA80>
Command line is <Remote_LSA_UNICODE_STRING ""C:\windows\system32\notepad.exe"" at 0x61bdb20>
Here are 3 loaded modules: [<RemoteLoadedModule "notepad.exe" at 0x61bdad0>, <RemoteLoadedModule "ntdll.dll" at 0x61bd940>, <RemoteLoadedModule "kernel32.dll" at 0x61bd8f0>]
Allocating memory in our notepad
Allocated memory is at <0x6f00000>
Writing 'SOME STUFF' in allocated memory
Reading allocated memory : <'SOME STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'>
Execution some native code in our notepad (write 0x424242 at allocated address + return 0x1337)
Executing native code !
Return code = 0x1337L
Reading allocated memory : <'BBBB STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'>
Executing python code !
Reading allocated memory : <'HELLO FROM notepad\x00\x00'>
Trying to import in remote module 'FAKE_MODULE'
Exception in remote process!
Traceback (most recent call last):
File "<string>", line 3, in <module>
File "<string>", line 2, in func
ImportError: No module named FAKE_MODULE
That's all ! killing the notepad
18.1.3. PEB
exploration¶
import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
print("Exploring the current process PEB")
peb = windows.current_process.peb
print("PEB is <{0}>".format(peb))
commandline = peb.commandline
print("Commandline object is {0}".format(commandline))
print("Commandline string is {0}".format(repr(commandline.Buffer)))
imagepath = peb.imagepath
print("Imagepath {0}".format(imagepath))
modules = peb.modules
print("Printing some modules: {0}".format("\n".join(str(m) for m in modules[:6])))
print("=== K32 ===")
print("Looking for kernel32.dll")
k32 = [m for m in modules if m.name == "kernel32.dll"][0]
print("Kernel32 module: {0}".format(k32))
print("Module name = <{0}> | Fullname = <{1}>".format(k32.name, k32.fullname))
print("Kernel32 is loaded at address {0}".format(hex(k32.baseaddr)))
print("=== K32 PE ===")
k32pe = k32.pe
print("PE Representation of k32: {0}".format(k32pe))
exports = k32pe.exports
some_exports = dict((k,v) for k,v in exports.items() if k in [0, 42, "VirtualAlloc", "CreateFileA"])
print("Here are some exports {0}".format(some_exports))
imports = k32pe.imports
print("Import DLL dependancies are (without api-*): {0}".format([x for x in imports.keys() if not x.startswith("api-")]))
NtCreateFile_iat = [x for x in imports["ntdll.dll"] if x.name == "NtCreateFile"][0]
print("IAT Entry for ntdll!NtCreateFile = {0} | addr = {1}".format(NtCreateFile_iat, hex(NtCreateFile_iat.addr)))
print("Sections: {0}".format(k32pe.sections))
Output
(cmd) python process\peb.py
Exploring the current process PEB
PEB is <<windows.winobject.process.PEB object at 0x05A6F710>>
Commandline object is <_LSA_UNICODE_STRING "C:\Python27\python.exe process\peb.py" at 0x5a6f8a0>
Commandline string is 47063282
Imagepath <_LSA_UNICODE_STRING "C:\Python27\python.exe" at 0x5a6f990>
Printing some modules: <LoadedModule "python.exe" at 0x60d2080>
<LoadedModule "ntdll.dll" at 0x60d2030>
<LoadedModule "kernel32.dll" at 0x60d2b20>
<LoadedModule "kernelbase.dll" at 0x60d2b70>
<LoadedModule "python27.dll" at 0x60d2bc0>
<LoadedModule "msvcr90.dll" at 0x60d2c10>
=== K32 ===
Looking for kernel32.dll
Kernel32 module: <LoadedModule "kernel32.dll" at 0x60d2b20>
Module name = <kernel32.dll> | Fullname = <c:\windows\system32\kernel32.dll>
Kernel32 is loaded at address 0x76930000
=== K32 PE ===
PE Representation of k32: <windows.pe_parse.PEFile object at 0x060CCA10>
Here are some exports {0: 1989445168L, 'CreateFileA': 1989795280L, 42: 1989636592L, 'VirtualAlloc': 1989437552L}
Import DLL dependancies are (without api-*): ['kernelbase.dll', 'ntdll.dll']
IAT Entry for ntdll!NtCreateFile = <IATEntry "NtCreateFile" ordinal 272> | addr = 0x769a1a28L
Sections: [<PESection ".text">, <PESection ".rdata">, <PESection ".data">, <PESection ".rsrc">, <PESection ".reloc">]
18.1.4. ApiSetMap¶
import windows
print("Computer is a <{0}>".format(windows.system.version_name))
cp = windows.current_process
apism = cp.peb.apisetmap
print("ApiSetMap: {0} (version = {1})".format(apism, apism.version))
# Find the current version of "api-ms-win-core-processthreads" used by windows
dll_demos_fullname = [x for x in windows.current_process.peb.apisetmap.apisetmap_dict if "api-ms-win-core-processthreads" in x][0]
dll_demos_utilname = 'api-ms-win-core-processthreads-l1-1-'
print("Entries in 'apisetmap_dict' are the full api-dll path extracted")
print(" * apisetmap.apisetmap_dict['{0}'] -> {1}".format(dll_demos_fullname, apism.apisetmap_dict[dll_demos_fullname]))
print("Entries in 'resolution_dict' are the contains the util-part check by windows")
print(" * apisetmap.resolution_dict['{0}'] -> {1}".format(dll_demos_utilname, apism.resolution_dict[dll_demos_utilname]))
print("ApiSetMap.resolve resolve a api-dll based on the util part")
for suffix in ["1", "2", "PART_IS_IGNORED"]:
testname = dll_demos_utilname + suffix
print(" * apisetmap.resolve('{0}') -> {1}".format(testname, apism.resolve(testname)))
testname = "BAD_DLL-3.dll"
try:
print(" * apisetmap.resolve('{0}') -> {1}".format(testname, apism.resolve(testname)))
except KeyError as e:
print(" * apisetmap.resolve('{0}') -> raised: {1!r}".format(testname, e))
Output
(cmd) python process\apisetmap.py
Computer is a <Windows 10>
ApiSetMap: <windows.winobject.apisetmap.ApiSetMapVersion6 object at 0x0645ECB0> (version = 6)
Entries in 'apisetmap_dict' are the full api-dll path extracted
* apisetmap.apisetmap_dict['api-ms-win-core-processthreads-l1-1-3'] -> kernelbase.dll
Entries in 'resolution_dict' are the contains the util-part check by windows
* apisetmap.resolution_dict['api-ms-win-core-processthreads-l1-1-'] -> kernelbase.dll
ApiSetMap.resolve resolve a api-dll based on the util part
* apisetmap.resolve('api-ms-win-core-processthreads-l1-1-1') -> kernelbase.dll
* apisetmap.resolve('api-ms-win-core-processthreads-l1-1-2') -> kernelbase.dll
* apisetmap.resolve('api-ms-win-core-processthreads-l1-1-PART_IS_IGNORED') -> kernelbase.dll
* apisetmap.resolve('BAD_DLL-3.dll') -> raised: KeyError('BAD_DLL-',)
18.1.5. IAT hooking¶
import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))
try:
import winreg
except ImportError:
import _winreg as winreg
import windows
# Here is a demo of IAT hooking in python
# We will hook the 'RegOpenKeyExA' entry of Python27.dll because it is easy to trigger !
# First: let's create our hook
# windows.hooks.RegOpenKeyExACallback is generated based on windows.generated_def.winfuncs
@windows.hooks.RegOpenKeyExACallback
def open_reg_hook(hKey, lpSubKey, ulOptions, samDesired, phkResult, real_function):
print("<in hook> Hook called | hKey = {0} | lpSubKey = <{1}>".format(hex(hKey), lpSubKey.value))
# Our hook can choose to call the real_function or not
if "SECRET" in lpSubKey.value:
print("<in hook> Secret key asked, returning magic handle 0x12345678")
# We must respect the hooked method return-value interface
phkResult[0] = 0x12345678
return 0
if "FAIL" in lpSubKey.value:
print("<in hook> Asked for a failing key: returning 0x2a")
return 42
print("<in hook> Non-secret key : calling normal function")
return real_function()
## Wide version of the Hook for python3 !
@windows.hooks.RegOpenKeyExWCallback
def open_reg_hookw(hKey, lpSubKey, ulOptions, samDesired, phkResult, real_function):
print("<in hook> Hook called | hKey = {0} | lpSubKey = <{1}>".format(hex(hKey), lpSubKey.value))
# Our hook can choose to call the real_function or not
if "SECRET" in lpSubKey.value:
print("<in hook> Secret key asked, returning magic handle 0x12345678")
# We must respect the hooked method return-value interface
phkResult[0] = 0x12345678
return 0
if "FAIL" in lpSubKey.value:
print("<in hook> Asked for a failing key: returning 0x2a")
return 42
print("<in hook> Non-secret key : calling normal function")
return real_function()
# Get the peb of our process
peb = windows.current_process.peb
# Get the pythonxx.dll module
pythondll_module = [m for m in peb.modules if m.name.startswith("python") and m.name.endswith(".dll")][0]
# Get the iat entries for DLL advapi32.dll
adv_imports = pythondll_module.pe.imports['advapi32.dll']
# Get RegOpenKeyExA iat entry
RegOpenKeyEx_iat = [n for n in adv_imports if n.name == "RegOpenKeyExA"]
if not RegOpenKeyEx_iat: # Py3
RegOpenKeyEx_iat = [n for n in adv_imports if n.name == "RegOpenKeyExW"]
open_reg_hook = open_reg_hookw
RegOpenKeyEx_iat = RegOpenKeyEx_iat[0]
# Setup our hook
RegOpenKeyEx_iat.set_hook(open_reg_hook)
### !!!! You must keep the iat_entry alive !!!!
### If the hook is garbage collected while active -> python will crash
# Use python native module _winreg that call 'RegOpenKeyExA'
print("Asking for <MY_SECRET_KEY>")
v = winreg.OpenKey(1234567, "MY_SECRET_KEY")
print("Result = " + hex(v.handle))
print("")
print("Asking for <MY_FAIL_KEY>")
try:
v = winreg.OpenKey(1234567, "MY_FAIL_KEY")
print("Result = " + hex(v.handle))
except WindowsError as e:
print(repr(e))
print("")
print("Asking for <HKEY_CURRENT_USER/Software>")
try:
v = winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Software")
print("Result = " + hex(v.handle))
except WindowsError as e:
print(repr(e))
Output
(cmd) python process\iat_hook.py
Asking for <MY_SECRET_KEY>
<in hook> Hook called | hKey = 0x12d687 | lpSubKey = <MY_SECRET_KEY>
<in hook> Secret key asked, returning magic handle 0x12345678
Result = 0x12345678
Asking for <MY_FAIL_KEY>
<in hook> Hook called | hKey = 0x12d687 | lpSubKey = <MY_FAIL_KEY>
<in hook> Asked for a failing key: returning 0x2a
WindowsError(42, 'Windows Error 0x2A')
Asking for <HKEY_CURRENT_USER/Software>
<in hook> Hook called | hKey = 0x80000001L | lpSubKey = <Software>
<in hook> Non-secret key : calling normal function
Result = 0x428
18.2. Token¶
import windows
import windows.generated_def as gdef
tok = windows.current_process.token
print("Our process token is {0}".format(tok))
print("Retrieving some infos")
print("Username: <{0}>".format(tok.username))
print("User: {0!r}".format(tok.user))
print(" - lookup : {0}".format(windows.utils.lookup_sid(tok.user)))
print("Primary group: {0!r}".format(tok.primary_group))
print(" - lookup : {0}".format(windows.utils.lookup_sid(tok.primary_group)))
print("")
groups = tok.groups
print("Token Groups is {0}".format(groups))
print("First group SID is {0!r}".format(groups.sids[0]))
print("Some sid and attributes:")
for i, group in zip(range(3), groups.sids_and_attributes):
print(" - {0}: {1}".format(group.Sid, group.Attributes))
# Let's play with duplicate !
print("")
imp_tok = tok.duplicate(type=gdef.TokenImpersonation, impersonation_level=gdef.SecurityImpersonation)
print("Duplicate token is {0}".format(imp_tok))
print("Enabling <SeShutDownPrivilege>")
imp_tok.enable_privilege("SeShutDownPrivilege")
cur_thread = windows.current_thread
print("Current thread token is <{0}>".format(cur_thread.token))
print("Setting impersonation token !")
cur_thread.token = imp_tok
print("Current thread token is {0}".format(cur_thread.token))
Output
(cmd) python token\token_demo.py
Our process token is <Token TokenId=0x3f1f98fd Type=TokenPrimary(0x1L)>
Retrieving some infos
Username: <hakril>
User: <PSID "S-1-5-21-184905214-2723199098-2761450773-1001">
- lookup : ('WILLIE', 'hakril')
Primary group: <PSID "S-1-5-21-184905214-2723199098-2761450773-513">
- lookup : ('WILLIE', 'Aucun')
Token Groups is <TokenGroups count=15>
First group SID is <PSID "S-1-5-21-184905214-2723199098-2761450773-513">
Some sid and attributes:
- S-1-5-21-184905214-2723199098-2761450773-513: 7
- S-1-1-0: 7
- S-1-5-114: 16
Duplicate token is <Token TokenId=0x3f1fac85 Type=TokenImpersonation(0x2L) ImpersonationLevel=SecurityImpersonation(0x2L)>
Enabling <SeShutDownPrivilege>
Current thread token is <None>
Setting impersonation token !
Current thread token is <Token TokenId=0x3f1fac85 Type=TokenImpersonation(0x2L) ImpersonationLevel=SecurityImpersonation(0x2L)>
18.3. windows.system
¶
import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
system = windows.system
print("Basic system infos:")
print(" version = {0}".format(system.version))
print(" bitness = {0}".format(system.bitness))
print(" computer_name = {0}".format(system.computer_name))
print(" product_type = {0}".format(system.product_type))
print(" version_name = {0}".format(system.version_name))
print("")
print("There is {0} processes".format(len(system.processes)))
print("There is {0} threads".format(len(system.threads)))
print("")
print("Dumping first logical drive:")
drive = system.logicaldrives[0]
print(" " + str(drive))
print((" " * 8) + "name = {0}".format(drive.name))
print((" " * 8) + "type = {0}".format(drive.type))
print((" " * 8) + "path = {0}".format(drive.path))
print("")
print("Dumping first service:")
serv = windows.system.services[0]
print(" " + str(serv))
print((" " * 8) + "name = {0}".format(serv.name))
print((" " * 8) + "description = {0}".format(serv.description))
print((" " * 8) + "status = {0}".format(serv.status))
print((" " * 8) + "process = {0}".format(repr(serv.process)))
print("")
print("Finding a service in a user process:")
serv = [s for s in windows.system.services if s.process][0]
print(" " + str(serv))
print((" " * 8) + "name = {0}".format(serv.name))
print((" " * 8) + "description = {0}".format(serv.description))
print((" " * 8) + "status = {0}".format(serv.status))
print((" " * 8) + "process = {0}".format(repr(serv.process)))
print("")
print("Enumerating handles:")
handles = system.handles
print(" There are {0} handles:".format(len(handles)))
print(" First handle is: " + str(handles[0]))
print(" Enumerating handles of the current process:")
cp_handles = [h for h in system.handles if h.dwProcessId == windows.current_process.pid]
print(" There are {0} handles for this process".format(len(cp_handles)))
print(" Looking for a File handle:")
file_h = [h for h in cp_handles if h.type == "File"][0]
print(" Handle is {0}".format(file_h))
print(" Name is <{0}>".format(file_h.name))
print("")
print("Dumping the first system module")
kmod = windows.system.modules[0]
print(" " + str(kmod))
print((" " * 8) + "ImageName = {0}".format(kmod.ImageName))
print((" " * 8) + "Base = {0:#x}".format(kmod.Base))
print((" " * 8) + "Size = {0:#x}".format(kmod.Size))
print((" " * 8) + "Flags = {0:#x}".format(kmod.Flags))
print((" " * 8) + "LoadCount = {0}".format(kmod.LoadCount))
Output
(cmd) python system.py
Basic system infos:
version = (10, 0)
bitness = 64
computer_name = WILLIE
product_type = 1
version_name = Windows 10
There is 331 processes
There is 4010 threads
Dumping first logical drive:
<LogicalDrive "B:\" (DRIVE_FIXED)>
name = B:\
type = 3
path = \Device\HarddiskVolume7
Dumping first service:
<Service "1394ohci" SERVICE_STOPPED(0x1)>
name = 1394ohci
description = 1394 OHCI Compliant Host Controller
status = <_SERVICE_STATUS_PROCESS type=SERVICE_KERNEL_DRIVER(0x1) state=SERVICE_STOPPED(0x1)>
process = None
Finding a service in a user process:
<Service "AppIDSvc" SERVICE_RUNNING(0x4)>
name = AppIDSvc
description = Application Identity
status = <_SERVICE_STATUS_PROCESS type=48L state=SERVICE_RUNNING(0x4)>
process = <WinProcess "!cannot-retrieve-name" pid 6060 at 0x35851d0>
Enumerating handles:
There are 208332 handles:
First handle is: <HandleWow64 value=<0x4> in process pid=4>
Enumerating handles of the current process:
There are 275 handles for this process
Looking for a File handle:
Handle is <HandleWow64 value=<0x4> in process pid=15236>
Name is <\Device\ConDrv>
Dumping the first system module
<SystemModuleWow64 name="\SystemRoot\system32\ntoskrnl.exe" base=0xfffff80023000000>
ImageName = \SystemRoot\system32\ntoskrnl.exe
Base = 0xfffff80023000000
Size = 0xab7000
Flags = 0x8804000
LoadCount = 240
18.4. Services¶
import windows
import windows.generated_def as gdef
print("Listing the first 3 services:")
for service in windows.system.services[:3]:
print(" * {0}".format(service))
print("")
TARGET_SERVICE = "TapiSrv"
print("Retriving service <{0}>".format(TARGET_SERVICE))
service = windows.system.services[TARGET_SERVICE]
print("{0}".format(service))
print(" - name: {0!r}".format(service.name))
print(" - description: {0!r}".format(service.description))
print(" - state: {0!r}".format(service.status.state))
print(" - type: {0!r}".format(service.status.type))
print(" - process: {0!r}".format(service.process))
print(" - security-description: {0}".format(service.security_descriptor))
if service.status.state == gdef.SERVICE_RUNNING:
print("Service already running, not trying to start it")
else:
print("Trying to start the service")
service.start()
while service.status.state != gdef.SERVICE_RUNNING:
pass
print("Service started !")
print("{0}".format(service))
print(" - state: {0!r}".format(service.status.state))
print(" - process: {0!r}".format(service.process))
Output
(cmd) python service\service_demo.py
Listing the first 3 services:
* <Service "1394ohci" SERVICE_STOPPED(0x1)>
* <Service "3ware" SERVICE_STOPPED(0x1)>
* <Service "ACPI" SERVICE_RUNNING(0x4)>
Retriving service <TapiSrv>
<Service "TapiSrv" SERVICE_STOPPED(0x1)>
- name: 'TapiSrv'
- description: 'Telephony'
- state: SERVICE_STOPPED(0x1)
- type: 48L
- process: None
- security-description: O:SYG:SYD:(A;;CCLCSWRPWPDTLOCRRC;;;SY)(A;;CCDCLCSWRPWPDTLOCRSDRCWDWO;;;BA)(A;;CCLCSWRPLOCRRC;;;IU)(A;;CCLCSWRPLOCRRC;;;SU)
Trying to start the service
Service started !
<Service "TapiSrv" SERVICE_RUNNING(0x4)>
- state: SERVICE_RUNNING(0x4)
- process: <WinProcess "!cannot-retrieve-name" pid 14700 at 0x4a2c4f0>
18.5. Network
- socket exploration¶
import sys
import os.path
import socket
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
if not windows.utils.check_is_elevated():
print("!!! Demo will fail because closing a connection require elevated process !!!")
print("Working on ipv4")
conns = windows.system.network.ipv4
print("== Listening ==")
print("Some listening connections: {0}".format([c for c in conns if not c.established][:3]))
print("Listening ports are : {0}".format([c.local_port for c in conns if not c.established]))
print("== Established ==")
print("Some established connections: {0}".format([c for c in conns if c.established][:3]))
TARGET_HOST = "localhost"
TARGET_PORT = 80
print("== connection to {0}:{1} ==".format(TARGET_HOST, TARGET_PORT))
s = socket.create_connection((TARGET_HOST, TARGET_PORT))
our_connection = [c for c in windows.system.network.ipv4 if c.established and c.remote_port == TARGET_PORT and c.remote_addr == s.getpeername()[0]]
print("Our connection is {0}".format(our_connection))
print("Sending YOP")
s.send("YOP")
print("Closing socket")
our_connection[0].close()
print("Sending LAIT")
s.send("LAIT")
Output-New
(cmd) python network\network.py
Working on ipv4
== Listening ==
Some listening connections: [<TCP IPV4 Listening socket on 0.0.0.0:80>, <TCP IPV4 Listening socket on 0.0.0.0:135>, <TCP IPV4 Listening socket on 0.0.0.0:445>]
Listening ports are : [80, 135, 445, 902, 912, 27036, 49664, 49665, 49666, 49667, 49671, 49673, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 5556, 6463, 22885, 22886, 27060, 49330, 49331, 49794, 49795, 49867, 52541, 57125, 57138, 65000, 65001, 139, 5556, 57046, 57109, 57110, 57143, 57144, 139, 5556, 139, 5556]
== Established ==
Some established connections: [<TCP IPV4 Connection 127.0.0.1:912 -> 127.0.0.1:49488>, <TCP IPV4 Connection 127.0.0.1:912 -> 127.0.0.1:52332>, <TCP IPV4 Connection 127.0.0.1:49488 -> 127.0.0.1:912>]
== connection to localhost:80 ==
Our connection is [<TCP IPV4 Connection 127.0.0.1:57167 -> 127.0.0.1:80>]
Sending YOP
Closing socket
Sending LAIT
Traceback (most recent call last):
File "network\network.py", line 34, in <module>
s.send("LAIT")
socket.error: [Errno 10054] An existing connection was forcibly closed by the remote host
18.6. Registry
¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
registry = windows.system.registry
print("Registry is <{0}>".format(registry))
current_user = registry("HKEY_CURRENT_USER")
print("HKEY_CURRENT_USER is <{0}>".format(current_user))
subkeys_name = [s.name for s in current_user.subkeys]
print("HKEY_CURRENT_USER subkeys names are:")
pprint.pprint(subkeys_name)
print("Opening 'Software' in HKEY_CURRENT_USER: {0}".format(current_user("Software")))
print("We can also open it in one access: {0}".format(registry(r"HKEY_CURRENT_USER\Sofware")))
print("Looking at CurrentVersion")
windows_info = registry("HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion")
print("Key is {0}".format(windows_info))
print("values are:")
pprint.pprint(windows_info.values)
registered_owner = windows_info["RegisteredOwner"]
print("registered owner = <{0}>".format(registered_owner))
Output
(cmd) python registry\registry.py
Registry is <<windows.winobject.registry.Registry object at 0x061F89F0>>
HKEY_CURRENT_USER is <<PyHKey "HKEY_CURRENT_USER">>
HKEY_CURRENT_USER subkeys names are:
['AppEvents',
'AppXBackupContentType',
'Console',
'Control Panel',
'Environment',
'EUDC',
'Keyboard Layout',
'Network',
'Printers',
'Software',
'System',
'Uninstall',
'Volatile Environment']
Opening 'Software' in HKEY_CURRENT_USER: <PyHKey "HKEY_CURRENT_USER\Software">
We can also open it in one access: <PyHKey "HKEY_CURRENT_USER\Sofware">
Looking at CurrentVersion
Key is <PyHKey "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion">
values are:
[KeyValue(name='SoftwareType', value=u'System', type=1),
KeyValue(name='RegisteredOwner', value=u'hakril', type=1),
...
KeyValue(name='PathName', value=u'C:\\WINDOWS', type=1)]
registered owner = <KeyValue(name='RegisteredOwner', value=u'hakril', type=1)>
18.7. Scheduled tasks¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.generated_def as gdef
tscheduler = windows.system.task_scheduler
print("Task scheduler is {0}".format(tscheduler))
root = tscheduler.root
print("Root folder is {0}".format(root))
print ("Listing sub folders")
for subfolder in root.folders:
print(" * {0}".format(subfolder))
last_name = subfolder.name
demo_folder = "\Microsoft\Windows\AppID"
print("Manually opening subfolder <{0}>".format(demo_folder))
subfolder = root(demo_folder)
print("Working into {0}".format(subfolder))
for task in subfolder.tasks:
print(" * {task.name}".format(task=task))
print("")
print("Analysing task {task}".format(task=task))
print(" * Name: <{task.name}>".format(task=task))
print(" * Path: <{task.path}>".format(task=task))
print(" * Definition: <{task.definition}>".format(task=task))
print("Listing actions:")
for action in task.definition.actions:
print(" * Action: <{action}>".format(action=action))
print(" * Type: <{action.type!r}>".format(action=action))
if getattr(action, "path", None):
print(" * path: <{action.path}>".format(action=action))
print(" * arguments: <{action.arguments}>".format(action=action))
# import pdb;pdb.set_trace()
print("Listing triggers:")
for trigger in task.definition.triggers:
print(" * Trigger type: <{trigger.type!r}>".format(trigger=trigger))
print("")
DEMO_FOLDER_NAME = "PFW_DEMO_FOLDER"
DEMO_TASK_NAME = "PFW_DEMO_TASK"
print("Creating folder <{0}>".format(DEMO_FOLDER_NAME))
demo_folder = root.create_folder(DEMO_FOLDER_NAME)
print("Demo folder is {0}".format(demo_folder))
print("Creating Task definition")
# Create a Task definition
new_task_definition = tscheduler.create()
actions = new_task_definition.actions
# Add an TASK_ACTION_EXEC action to the task def
new_action = actions.create(gdef.TASK_ACTION_EXEC)
new_action.path = sys.executable
new_action.arguments = "-c 'Hello !'"
# Register the new task under 'DEMO_TASK_NAME'
print("Registering task definition as <{0}> in <{1}>".format(DEMO_TASK_NAME, demo_folder))
new_task = demo_folder.register(DEMO_TASK_NAME, new_task_definition)
print("Created task is {0}".format(new_task))
print("Deleting the demo task")
del demo_folder[DEMO_TASK_NAME]
print("Deleting the demo folder")
root.delete_folder(DEMO_FOLDER_NAME)
Output
(cmd) python scheduled_tasks\scheduled_task.py
Task scheduler is <TaskService at 0x6f2a1c0>
Root folder is <TaskFolder "\" at 0x6f2a2b0>
Listing sub folders
* <TaskFolder "\ASUS" at 0x6f2a350>
* <TaskFolder "\Intel" at 0x6f2a300>
* <TaskFolder "\Microsoft" at 0x6f2a3a0>
Manually opening subfolder <\Microsoft\Windows\AppID>
Working into <TaskFolder "\Microsoft\Windows\AppID" at 0x6f2a3f0>
* PolicyConverter
* SmartScreenSpecific
* VerifiedPublisherCertStoreCheck
Analysing task <Task "VerifiedPublisherCertStoreCheck" at 0x6f2a300>
* Name: <VerifiedPublisherCertStoreCheck>
* Path: <\Microsoft\Windows\AppID\VerifiedPublisherCertStoreCheck>
* Definition: <<TaskDefinition at 0x6f2a3a0>>
Listing actions:
* Action: <<ExecAction at 0x6f2a350>>
* Type: <TASK_ACTION_EXEC(0x0L)>
* path: <%windir%\system32\appidcertstorecheck.exe>
* arguments: <None>
Listing triggers:
* Trigger type: <TASK_TRIGGER_BOOT(0x8L)>
Creating folder <PFW_DEMO_FOLDER>
Demo folder is <TaskFolder "\PFW_DEMO_FOLDER" at 0x6f2a3a0>
Creating Task definition
Registering task definition as <PFW_DEMO_TASK> in <<TaskFolder "\PFW_DEMO_FOLDER" at 0x6f2a3a0>>
Created task is <Task "PFW_DEMO_TASK" at 0x6f2a530>
Deleting the demo task
Deleting the demo folder
18.8. Event Log¶
import windows
import windows.generated_def as gdef
evtlogmgr = windows.system.event_log
print("Event log Manager is: {0}".format(evtlogmgr))
print("They are <{0}> channels".format(len(list(evtlogmgr.channels))))
print("They are <{0}> publishers".format(len(list(evtlogmgr.publishers))))
FIREWALL_CHANNEL = "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall"
print("Openning channel <{0}>".format(FIREWALL_CHANNEL))
evtchan = evtlogmgr[FIREWALL_CHANNEL]
print("Channel is {0}".format(evtchan))
# Note that `evtchan.events` is an alias for `evtchan.query().all()`
print("The channel contains <{0}> events".format(len(evtchan.events)))
print("")
EVT_QUERY = "Event/EventData[Data='C:\\WINDOWS\\System32\\svchost.exe'] and Event/System[EventID=2006]"
print("""Querying "{0}">""".format(EVT_QUERY))
query = evtchan.query(EVT_QUERY)
print("Query is {0}".format(query))
event_list = list(query)
print("List contains {0} event".format(len(event_list)))
event = event_list[0]
print("")
print("First event is {0}".format(event))
print("System values:")
print(" * ID: {0}".format(event.id))
print(" * version: {0}".format(event.version))
print(" * level: {0}".format(event.level))
print(" * opcode: {0}".format(event.opcode))
print(" * time_created: {0}".format(event.time_created))
print(" * ID: {0}".format(event.id))
print("Event specific values:")
for name, value in event.data.items():
print(" * <{0}> -> <{1}>".format(name, value))
print("")
evtmeta = event.metadata
print("Event metadata is {0}".format(evtmeta))
print(" * id : {0}".format(evtmeta.id))
print(" * channel_id : {0}".format(evtmeta.channel_id))
print(" * message_id : {0}".format(evtmeta.message_id))
print(" * event_data : {0}".format(evtmeta.event_data))
print(" * EventData template :\n{0}".format(evtmeta.template.replace("\r\n", "\n")))
print("")
print("Exploring complex Evt types:")
print("Channel is still {0}".format(evtchan))
print("Channel config is {0}".format(evtchan.config))
publisher = evtchan.config.publisher
print("Channel publisher is {0}".format(publisher))
print("Channel publisher metadata is {0}".format(publisher.metadata))
print("Publisher's channels are:")
for chan in publisher.metadata.channels:
print(" * {0}".format(chan))
print("Some publisher's event metadata are:")
for evtmeta in list(publisher.metadata.events_metadata)[:3]:
print(" * {0}: id={1}".format(evtmeta, evtmeta.id))
Output
(cmd) python event_log\eventlog.py
Event log Manager is: <windows.winobject.event_log.EvtlogManager object at 0x0592DB70>
They are <1155> channels
They are <1179> publishers
Openning channel <Microsoft-Windows-Windows Firewall With Advanced Security/Firewall>
Channel is <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall">
The channel contains <1037> events
Querying "Event/EventData[Data='C:\WINDOWS\System32\svchost.exe'] and Event/System[EventID=2006]">
Query is <EvtQuery object at 0x06E9F440>
List contains 304 event
First event is <EvtEvent id="2006" time="2018-05-06 08:03:06.210109">
System values:
* ID: 2006
* version: 0
* level: 4
* opcode: 0
* time_created: 131700673862101088
* ID: 2006
Event specific values:
* <ModifyingUser> -> <108703760>
* <RuleName> -> <ByteCodeGeneration>
* <ModifyingApplication> -> <C:\WINDOWS\System32\svchost.exe>
* <RuleId> -> <{318EF1CF-A3FA-4B04-8AAC-712276661117}>
Event metadata is <EventMetadata object at 0x06EB96C0>
* id : 2006
* channel_id : 16
* message_id : 2986346454
* event_data : [u'RuleId', u'RuleName', u'ModifyingUser', u'ModifyingApplication']
* EventData template :
<template xmlns="http://schemas.microsoft.com/win/2004/08/events">
<data name="RuleId" inType="win:UnicodeString" outType="xs:string"/>
<data name="RuleName" inType="win:UnicodeString" outType="xs:string"/>
<data name="ModifyingUser" inType="win:SID" outType="xs:string"/>
<data name="ModifyingApplication" inType="win:UnicodeString" outType="xs:string"/>
</template>
Exploring complex Evt types:
Channel is still <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall">
Channel config is <ChannelConfig "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall">
Channel publisher is <EvtPublisher "Microsoft-Windows-Windows Firewall With Advanced Security">
Channel publisher metadata is <PublisherMetadata "Microsoft-Windows-Windows Firewall With Advanced Security">
Publisher's channels are:
* <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall">
* <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/ConnectionSecurity">
* <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/FirewallVerbose">
* <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/ConnectionSecurityVerbose">
* <EvtChannel "Network Isolation Operational">
Some publisher's event metadata are:
* <EventMetadata object at 0x06EBE710>: id=2000
* <EventMetadata object at 0x06EBE7B0>: id=2001
* <EventMetadata object at 0x06EBE3F0>: id=2002
18.9. Object manager¶
import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.generated_def as gdef
object_manager = windows.system.object_manager
print("Object manager is {0}".format(object_manager))
root = object_manager.root
print("Root object is {0}".format(root))
print("")
print("Listing some of root-subobject:")
# Kernel object of type 'Directory' are iterable
for i, (name, obj) in enumerate(root.items()):
print(" * {0}: {1}".format(name, obj))
if i == 3:
break
print("")
print(r"Retrieving <\Rpc Control\lsasspirpc>:")
# You can retrieve this value in one request
x1 = root[r"\Rpc Control\lsasspirpc"]
# Sub-directory also allow __getitem__
x2 = root["Rpc Control"]["lsasspirpc"]
# You can directly request the object manager that will request `root`
x3 = object_manager[r"\Rpc Control\lsasspirpc"]
assert x1.fullname == x2.fullname == x3.fullname
lsasspirpc = x1
print("Object is: {0}".format(lsasspirpc))
print(" * name: <{0}>".format(lsasspirpc.name))
print(" * path: <{0}>".format(lsasspirpc.path))
print(" * fullname: <{0}>".format(lsasspirpc.fullname))
print(" * type: <{0}>".format(lsasspirpc.type))
print(" * target: <{0}>".format(lsasspirpc.target)) # None on non-symlink
print("")
print("Looking for a SymbolicLink in <ArcName>")
slo = [o for o in root["ArcName"].values() if o.type == "SymbolicLink"][0]
print("Object is: {0}".format(slo))
print(" * name: <{0}>".format(slo.name))
print(" * target: <{0}>".format(slo.target))
Output
(cmd) python object_manager\object_manager.py
Object manager is <windows.winobject.object_manager.ObjectManager object at 0x0370ED10>
Root object is <KernelObject "\" (type="Directory")>
Listing some of root-subobject:
* PendingRenameMutex: <KernelObject "\PendingRenameMutex" (type="Mutant")>
* ObjectTypes: <KernelObject "\ObjectTypes" (type="Directory")>
* storqosfltport: <KernelObject "\storqosfltport" (type="FilterConnectionPort")>
* MicrosoftMalwareProtectionRemoteIoPortWD: <KernelObject "\MicrosoftMalwareProtectionRemoteIoPortWD" (type="FilterConnectionPort")>
Retrieving <\Rpc Control\lsasspirpc>:
Object is: <KernelObject "\Rpc Control\lsasspirpc" (type="ALPC Port")>
* name: <lsasspirpc>
* path: <\Rpc Control>
* fullname: <\Rpc Control\lsasspirpc>
* type: <ALPC Port>
* target: <None>
Looking for a SymbolicLink in <ArcName>
Object is: <KernelObject "\ArcName\multi(0)disk(0)rdisk(0)" (type="SymbolicLink")>
* name: <multi(0)disk(0)rdisk(0)>
* target: <\Device\Harddisk0\Partition0>
18.9.1. find objects¶
import argparse
import windows
import windows.generated_def as gdef
def obj_with_link(obj):
target = obj.target
if target is None:
return str(obj)
return "{0} -> <{1}>".format(obj, target)
def find_name(root, findname):
TODO = [root]
while TODO:
try:
for name, obj in TODO.pop().items():
if findname in name or findname in obj.type:
print("* {0}".format(obj_with_link(obj)))
if obj.type == "Directory":
TODO.append(obj)
except gdef.NtStatusException as e:
print("<{0}> -> {1}".format(obj.fullname, e.name))
parser = argparse.ArgumentParser(prog=__file__)
parser.add_argument('name', nargs='?', default="ls", help='The name of the object to find')
res = parser.parse_args()
objmanag = windows.system.object_manager
print("Looking for object name containing <{0}>".format(res.name))
find_name(objmanag.root, res.name)
Output
(cmd) python object_manager\findobj.py
Looking for object name containing <ls>
* <KernelObject "\KnownDlls32" (type="Directory")>
* <KernelObject "\Win32kCrossSessionGlobals" (type="Section")>
* <KernelObject "\KnownDlls" (type="Directory")>
<\DriverStores\SYSTEM> -> STATUS_ACCESS_DENIED
* <KernelObject "\Device\MailslotRedirector" (type="SymbolicLink")> -> <\Device\Mup\;MailslotRedirector>
* <KernelObject "\Device\Mailslot" (type="Device")>
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\KernelObjects\PrefetchTracesReady> -> STATUS_ACCESS_DENIED
<\KnownDlls\powrprof.dll> -> STATUS_ACCESS_DENIED
* <KernelObject "\RPC Control\lsapolicylookup" (type="ALPC Port")>
* <KernelObject "\RPC Control\lsacap" (type="ALPC Port")>
* <KernelObject "\RPC Control\lsasspirpc" (type="ALPC Port")>
<\Windows\SbApiPort> -> STATUS_ACCESS_DENIED
<\Windows\SbApiPort> -> STATUS_ACCESS_DENIED
<\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED
<\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED
<\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED
18.10. Device manager¶
import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.pipe
import windows.generated_def as gdef
devmgr = windows.system.device_manager
print("Device manager is {0}".format(devmgr))
print("Enumerating the first 3 device classes")
for cls in devmgr.classes[:3]:
print(" * {0}".format(cls))
print("Finding device class 'System'")
# Allow devmgr.classes["name"] ?
system_cls = [cls for cls in devmgr.classes if cls.name == "System"][0]
print(" * {0}".format(system_cls))
print(" Enumerating some devices of 'System'")
devices = system_cls.devices.all()
for devinst in (devices[0], devices[25], devices[35]): # Some "random" devices to have interesting ones
print(" * {0}".format(devinst))
devconf = devinst.allocated_configuration
if not devconf:
continue
print(" Enumerating allocated resources:")
for resource in devconf.resources:
print(" * {0}".format(resource))
# python64 samples\device\device_manager.py
# Device manager is <windows.winobject.device_manager.DeviceManager object at 0x0000000003669908>
# Enumerating the first 3 device classes
# * <DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84>
# * <DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362>
# * <DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C>
# Finding device class 'System'
# * <DeviceClass name="System" guid=4D36E97D-E325-11CE-BFC1-08002BE10318>
# Enumerating some devices of 'System'
# * <DeviceInstance "Motherboard resources" (id=1)>
# * <DeviceInstance "Microsoft ACPI-Compliant Embedded Controller" (id=26)>
# Enumerating allocated resources:
# * <IoResource : [0x00000000000062-0x00000000000062]>
# * <IoResource : [0x00000000000066-0x00000000000066]>
# * <DeviceInstance "High Definition Audio Controller" (id=36)>
# Enumerating allocated resources:
# * <MemoryResource : [0x000000f7080000-0x000000f7083fff]>
# * <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
# * <IrqResource : [0x00000000000011]>
Output
(cmd) python device_manager\device_manager.py
Device manager is <windows.winobject.device_manager.DeviceManager object at 0x00000000033442E8>
Enumerating the first 3 device classes
* <DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84>
* <DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362>
* <DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C>
Finding device class 'System'
* <DeviceClass name="System" guid=4D36E97D-E325-11CE-BFC1-08002BE10318>
Enumerating some devices of 'System'
* <DeviceInstance "Motherboard resources" (id=1)>
* <DeviceInstance "Microsoft ACPI-Compliant Embedded Controller" (id=26)>
Enumerating allocated resources:
* <IoResource : [0x00000000000062-0x00000000000062]>
* <IoResource : [0x00000000000066-0x00000000000066]>
* <DeviceInstance "High Definition Audio Controller" (id=36)>
Enumerating allocated resources:
* <MemoryResource : [0x000000f7080000-0x000000f7083fff]>
* <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
* <IrqResource : [0x00000000000011]>
18.10.1. Enumerate devices¶
import argparse
import windows
import windows.generated_def as gdef
devmgr = windows.system.device_manager
def class_generator(filter=None):
for cls in devmgr.classes:
if filter and cls.name != filter.encode():
continue
yield cls
def main(clsfilter, enumerate_devices, print_devinst_resources, attributes):
for devcls in class_generator(clsfilter):
print(devcls)
if not enumerate_devices:
continue
# Enumerate devices
for devinst in devcls.devices:
print(" * {0}".format(devinst))
# Print attributes
if attributes:
print(" Attributes:")
for attr in attributes:
value = getattr(devinst, attr)
print(" * {0}={1}".format(attr, value))
if not print_devinst_resources:
continue
# Device resources
devconf = devinst.allocated_configuration
if not devconf:
# No allocated configuration
# Check boot conf ?
continue
for resource in devconf.resources:
print(" * {0}".format(resource))
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--class", dest="clsfilter", default=None, help="The classe to list: default all")
parser.add_argument("--no-print-devices", action="store_true", help="Prevent the listing of devices in the matching classes")
parser.add_argument("--print-resources", action="store_true", help="Print the resources allocated to the device instance")
parser.add_argument("--attributes", nargs="+", help="The list of attributes to print for each discovered device instance")
args = parser.parse_args()
print(args)
main(args.clsfilter,
enumerate_devices=not args.no_print_devices,
print_devinst_resources=args.print_resources,
attributes=args.attributes)
$ python64 device_manager\enum_devices.py --no-print-devices
Namespace(clsfilter=None, no_print_devices=True, print_resources=False)
<DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84>
<DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362>
<DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C>
<DeviceClass name="DigitalMediaDevices" guid=14B62F50-3F15-11DD-AE16-0800200C9A66>
<DeviceClass name="PrintQueue" guid=1ED2BBF9-11F0-4084-B21F-AD83A8E6DCDC>
<DeviceClass name="WCEUSBS" guid=25DBCE51-6C8F-4A72-8A6D-B54C2B4FC835>
<DeviceClass name="SecurityAccelerator" guid=268C95A1-EDFE-11D3-95C3-0010DC4050A5>
<DeviceClass name="HidMsr" guid=2A9FE532-0CDC-44F9-9827-76192F2CA2FB>
<DeviceClass name="SystemRecovery" guid=2DB15374-706E-4131-A0C7-D7C78EB0289A>
....
$ python64 device_manager\enum_devices.py --class Processor --attributes name manufacturer device_object_name location_paths
Namespace(attributes=['name', 'manufacturer', 'device_object_name', 'location_paths'], clsfilter='Processor', no_print_devices=False, print_resources=False)
<DeviceClass name="Processor" guid=50127DC3-0F36-415E-A6CC-4CB3BE910B65>
* <DeviceInstance "Intel Processor" (id=1)>
Attributes:
* name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
* manufacturer=Intel
* device_object_name=\Device\00000017
* location_paths=[u'ACPI(_SB_)#ACPI(CPU0)']
* <DeviceInstance "Intel Processor" (id=2)>
Attributes:
* name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
* manufacturer=Intel
* device_object_name=\Device\00000018
* location_paths=[u'ACPI(_SB_)#ACPI(CPU1)']
* <DeviceInstance "Intel Processor" (id=3)>
Attributes:
* name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
* manufacturer=Intel
* device_object_name=\Device\00000019
* location_paths=[u'ACPI(_SB_)#ACPI(CPU2)']
* <DeviceInstance "Intel Processor" (id=4)>
Attributes:
* name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
* manufacturer=Intel
* device_object_name=\Device\0000001a
* location_paths=[u'ACPI(_SB_)#ACPI(CPU3)']
$ python64 device_manager\enum_devices.py --class Display --print-resources
Namespace(clsfilter='Display', no_print_devices=False, print_resources=True)
<DeviceClass name="Display" guid=4D36E968-E325-11CE-BFC1-08002BE10318>
* <DeviceInstance "NVIDIA GeForce GTX 1070" (id=1)>
* <MemoryResource : [0x000000f6000000-0x000000f6ffffff]>
* <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
* <MemoryResource : [0x000000e0000000-0x000000efffffff]>
* <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
* <MemoryResource : [0x000000f0000000-0x000000f1ffffff]>
* <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
* <IoResource : [0x0000000000e000-0x0000000000e07f]>
...
18.11. windows.wintrust
¶
import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows.wintrust
TARGET_FILE = r"C:\windows\system32\ntdll.dll"
print("Checking signature of <{0}>".format(TARGET_FILE))
print(" is_signed: <{0}>".format(windows.wintrust.is_signed(TARGET_FILE)))
print(" check_signature: <{0}>".format(windows.wintrust.check_signature(TARGET_FILE)))
sign_info = windows.wintrust.full_signature_information(TARGET_FILE)
print(" full_signature_information:")
print(" * signed <{0}>".format(sign_info.signed))
print(" * catalog <{0}>".format(sign_info.catalog))
print(" * catalogsigned <{0}>".format(sign_info.catalogsigned))
print(" * additionalinfo <{0}>".format(sign_info.additionalinfo))
print("Checking signature of some loaded DLL")
for module in windows.current_process.peb.modules[:5]:
path = module.fullname
is_signed = windows.wintrust.is_signed(path)
if is_signed:
print("<{0}> : {1}".format(path, is_signed))
else:
sign_info = windows.wintrust.full_signature_information(path)
print("<{0}> : {1} ({2})".format(path, is_signed, sign_info[3]))
Output
(cmd) python crypto\wintrust.py
Checking signature of <C:\windows\system32\ntdll.dll>
is_signed: <True>
check_signature: <0>
full_signature_information:
* signed <True>
* catalog <C:\Windows\system32\CatRoot\{F750E6C3-38EE-11D1-85E5-00C04FC295EE}\Microsoft-Windows-Client-Desktop-Required-Package051420~31bf3856ad364e35~amd64~~10.0.22621.3593.cat>
* catalogsigned <True>
* additionalinfo <0>
Checking signature of some loaded DLL
<c:\users\cleme\appdata\local\programs\python\python311\python.exe> : True
<c:\windows\system32\ntdll.dll> : True
<c:\windows\system32\kernel32.dll> : True
<c:\windows\system32\kernelbase.dll> : True
<c:\windows\system32\ucrtbase.dll> : True
18.12. VectoredException()
¶
18.12.1. In local process¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import ctypes
import windows
from windows.winobject.exception import VectoredException
import windows.generated_def.windef as windef
from windows.generated_def.winstructs import *
@VectoredException
def handler(exc):
print("==Entry of VEH handler==")
if exc[0].ExceptionRecord[0].ExceptionCode == EXCEPTION_ACCESS_VIOLATION:
target_addr = ctypes.cast(exc[0].ExceptionRecord[0].ExceptionInformation[1], ctypes.c_void_p).value
print("Instr at {0} accessed to addr {1}".format(hex(exc[0].ExceptionRecord[0].ExceptionAddress), hex(target_addr)))
print("Resetting page protection to <PAGE_READWRITE>")
windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_READWRITE)
exc[0].ContextRecord[0].EEFlags.TF = 1
return windef.EXCEPTION_CONTINUE_EXECUTION
else:
print("Exception of type {0}".format(exc[0].ExceptionRecord[0].ExceptionCode))
print("Resetting page protection to <PAGE_NOACCESS>")
windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_NOACCESS)
return windef.EXCEPTION_CONTINUE_EXECUTION
windows.winproxy.AddVectoredExceptionHandler(0, handler)
target_page = windows.current_process.virtual_alloc(0x1000)
print("Protected page is at <{0}>".format(hex(target_page)))
print("Setting page protection to <PAGE_NOACCESS>")
windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_NOACCESS)
print("")
v = ctypes.c_uint.from_address(target_page).value
print("Value 1 read")
print("")
v = ctypes.c_uint.from_address(target_page + 0x10).value
print("Value 2 read")
Output
(cmd) python process\veh_segv.py
Protected page is at <0x289b6bc0000>
Setting page protection to <PAGE_NOACCESS>
==Entry of VEH handler==
Instr at 0x7ff8bda3e718 accessed to addr 0x289b6bc0000
Resetting page protection to <PAGE_READWRITE>
==Entry of VEH handler==
Exception of type EXCEPTION_SINGLE_STEP(0x80000004)
Resetting page protection to <PAGE_NOACCESS>
Value 1 read
==Entry of VEH handler==
Instr at 0x7ff8bda3e718 accessed to addr 0x289b6bc0010
Resetting page protection to <PAGE_READWRITE>
==Entry of VEH handler==
Exception of type EXCEPTION_SINGLE_STEP(0x80000004)
Resetting page protection to <PAGE_NOACCESS>
Value 2 read
18.12.2. In remote process¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.test
from windows.generated_def.winstructs import *
python_code = """
import windows
import ctypes
import windows
from windows.winobject.exception import VectoredException
import windows.generated_def.windef as windef
from windows.generated_def.winstructs import *
windows.utils.create_console()
module_to_trace = "gdi32.dll"
nb_repeat = [5]
@VectoredException
def handler(exc):
if exc[0].ExceptionRecord[0].ExceptionCode == EXCEPTION_ACCESS_VIOLATION:
print("")
target_addr = ctypes.cast(exc[0].ExceptionRecord[0].ExceptionInformation[1], ctypes.c_void_p).value
print("Instr at {0} accessed to addr {1} ({2})".format(hex(exc[0].ExceptionRecord[0].ExceptionAddress), hex(target_addr), module_to_trace))
windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_EXECUTE_READWRITE)
nb_repeat[0] -= 1
if nb_repeat[0]:
exc[0].ContextRecord[0].EEFlags.TF = 1
else:
print("No more tracing !")
return windef.EXCEPTION_CONTINUE_EXECUTION
else:
print("Exception of type {0}".format(exc[0].ExceptionRecord[0].ExceptionCode))
print("Resetting page protection to <PAGE_READWRITE>")
windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_READWRITE)
return windef.EXCEPTION_CONTINUE_EXECUTION
windows.winproxy.AddVectoredExceptionHandler(0, handler)
print("Tracing execution in module: <{0}>".format(module_to_trace))
module = [x for x in windows.current_process.peb.modules if x.name == module_to_trace][0]
target_page = module.baseaddr
code_size = module.pe.get_OptionalHeader().SizeOfCode
print("Protected page is at {0}".format(hex(target_page)))
windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_READWRITE)
"""
c = windows.test.pop_proc_64(dwCreationFlags=CREATE_SUSPENDED)
x = c.execute_python(python_code)
c.threads[0].resume()
import time
time.sleep(0.1)
for t in c.threads:
t.suspend()
time.sleep(1)
c.exit()
Output:
(cmd λ) python.exe process\remote_veh_segv.py
(In another console)
Tracing execution in module: <gdi32.dll>
Protected page is at 0x7ffa3c700000L
Instr at 0x7ffa3c70f0f0L accessed to addr 0x7ffa3c70f0f0L (gdi32.dll)
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_READWRITE>
Instr at 0x7ffa3c70f0f5L accessed to addr 0x7ffa3c70f0f5L (gdi32.dll)
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_READWRITE>
Instr at 0x7ffa3c70f0faL accessed to addr 0x7ffa3c70f0faL (gdi32.dll)
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_READWRITE>
Instr at 0x7ffa3c70f0ffL accessed to addr 0x7ffa3c70f0ffL (gdi32.dll)
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_READWRITE>
Instr at 0x7ffa3c70f100L accessed to addr 0x7ffa3c70f100L (gdi32.dll)
No more tracing !
18.13. Debugging¶
18.13.1. Debugger
¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.test
import windows.debug
from windows.generated_def.winstructs import *
class MyDebugger(windows.debug.Debugger):
def on_exception(self, exception):
code = exception.ExceptionRecord.ExceptionCode
addr = exception.ExceptionRecord.ExceptionAddress
print("Got exception {0} at 0x{1:x}".format(code, addr))
class PrintUnicodeString(windows.debug.Breakpoint):
def __init__(self, addr, argument_position):
super(PrintUnicodeString, self).__init__(addr)
self.arg_pos = argument_position
def trigger(self, dbg, exc):
p = dbg.current_process
t = dbg.current_thread
esp = t.context.Esp
unicode_string_addr = p.read_ptr(esp + (self.arg_pos + 1) * 4)
wstring_addr = p.read_ptr(unicode_string_addr + 4)
dll_loaded = p.read_wstring(wstring_addr).lower()
print("Loading <{0}>".format(dll_loaded))
if dll_loaded.endswith("comctl32.dll"):
print("Ask to load <comctl32.dll>: exiting process")
dbg.current_process.exit()
calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS)
d = MyDebugger(calc)
d.add_bp(PrintUnicodeString("ntdll!LdrLoadDll", argument_position=2))
d.loop()
Output
(cmd) python debug\debugger_print_LdrLoaddll.py
Got exception EXCEPTION_BREAKPOINT(0x80000003) at 0x7ff8e5aebd44
Loading <kernel32.dll>
Got exception UNKNOW_EXCEPTION(0x4000001f) at 0x77e58727
Loading <api-ms-win-core-synch-l1-2-0>
Loading <api-ms-win-core-fibers-l1-1-1>
Loading <api-ms-win-core-fibers-l1-1-0>
Loading <api-ms-win-core-synch-l1-2-0>
Loading <api-ms-win-core-localization-l1-2-1>
Loading <kernel32>
Loading <api-ms-win-core-string-l1-1-0>
Loading <api-ms-win-core-datetime-l1-1-1>
Loading <api-ms-win-core-localization-obsolete-l1-2-0>
Loading <c:\windows\system32\imm32.dll>
Loading <comctl32.dll>
Ask to load <comctl32.dll>: exiting process
18.13.1.1. on_setup¶
import windows.debug
class MySetupDebugger(windows.debug.Debugger):
def on_setup(self):
super(MySetupDebugger, self).on_setup()
print("Setup called: {0}".format(self.current_process))
def on_exception(self, exc):
print("Exception: {0}".format(exc.ExceptionRecord.ExceptionCode))
def on_exit_process(self, evt):
print("Process exit: {0}".format(self.current_process))
class SimpleDebugger(windows.debug.Debugger):
def on_exception(self, exc):
print("Exception: {0}".format(exc.ExceptionRecord.ExceptionCode))
def on_exit_process(self, evt):
print("Process exit: {0}".format(self.current_process))
print("== With on_setup ==")
dbg = MySetupDebugger.debug(r"c:\windows\system32\whoami.exe")
dbg.loop()
print("\n== Without on_setup ==")
dbg = SimpleDebugger.debug(r"c:\windows\system32\whoami.exe")
dbg.loop()
Output
(cmd) python debug\debugger_on_setup.py
== With on_setup ==
Setup called: <WinProcess "whoami.exe" pid 29796 at 0x4ccb790>
<whoami output>
Process exit: <WinProcess "whoami.exe" pid 29796 (DEAD) at 0x4ccb790>
== Without on_setup ==
Exception: EXCEPTION_BREAKPOINT(0x80000003L)
<whoami output>
Process exit: <WinProcess "whoami.exe" pid 33328 (DEAD) at 0x4ccbdb0>
18.13.1.2. Single stepping¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.test
import windows.debug
import windows.native_exec.simple_x86 as x86
from windows.generated_def.winstructs import *
class MyDebugger(windows.debug.Debugger):
def __init__(self, *args, **kwargs):
super(MyDebugger, self).__init__(*args, **kwargs)
self.single_step_counter = 0
def on_exception(self, exception):
code = exception.ExceptionRecord.ExceptionCode
addr = exception.ExceptionRecord.ExceptionAddress
print("Got exception {0} at 0x{1:x}".format(code, addr))
def on_single_step(self, exception):
code = exception.ExceptionRecord.ExceptionCode
addr = exception.ExceptionRecord.ExceptionAddress
print("Got single_step {0} at 0x{1:x}".format(code, addr))
self.single_step_counter -= 1
if self.single_step_counter > 0:
return self.single_step()
else:
print("No more single step: exiting")
self.current_process.exit()
class SingleStepOnWrite(windows.debug.MemoryBreakpoint):
"""Check that BP/dbg can trigger single step and that instruction follows"""
def trigger(self, dbg, exc):
fault_addr = exc.ExceptionRecord.ExceptionInformation[1]
eip = dbg.current_thread.context.pc
print("Instruction at <{0:#x}> wrote at <{1:#x}>".format(eip, fault_addr))
dbg.single_step_counter = 4
return dbg.single_step()
calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS)
d = MyDebugger(calc)
code = calc.virtual_alloc(0x1000)
data = calc.virtual_alloc(0x1000)
injected = x86.MultipleInstr()
injected += x86.Mov("EAX", 0)
injected += x86.Mov(x86.deref(data), "EAX")
injected += x86.Add("EAX", 4)
injected += x86.Mov(x86.deref(data + 4), "EAX")
injected += x86.Add("EAX", 8)
injected += x86.Mov(x86.deref(data + 8), "EAX")
injected += x86.Nop()
injected += x86.Nop()
injected += x86.Ret()
calc.write_memory(code, injected.get_code())
d.add_bp(SingleStepOnWrite(data, size=8, events="W"))
calc.create_thread(code, 0)
d.loop()
Output
(cmd) python debug\debugger_membp_singlestep.py
Got exception EXCEPTION_BREAKPOINT(0x80000003) at 0x7ff8e5aebd44
Got exception UNKNOW_EXCEPTION(0x4000001f) at 0x77e58727
Instruction at <0xa50006> wrote at <0xa60000>
Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa5000c
Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa50011
Instruction at <0xa50011> wrote at <0xa60004>
Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa50017
Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa5001c
Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa50022
Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa50023
No more single step: exiting
18.13.1.3. windows.debug.FunctionCallBP
¶
import sys
import os.path
import pprint
import threading
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.test
import windows.debug
import windows.generated_def as gdef
# The debugge python will just print the result of
# 3 call to IsDebuggerPresent
TARGET_PYTHON_CODE = '"\
import ctypes;\
import time;\
IsDebuggerPresent = ctypes.windll.kernel32.IsDebuggerPresent;\
print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\
time.sleep(1);\
print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\
time.sleep(1);\
print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\
"'
# This breakpoint now nothing about its target argument
# It only now how to break at the return of the function
# This allow us to change the return value of any function
class IncrementReturnValue(windows.debug.FunctionCallBP):
def __init__(self, addr, initialvalue):
super(IncrementReturnValue, self).__init__(addr)
self.initialvalue = initialvalue
def trigger(self, dbg, exc):
# Ask to break a the return of the function
# callback is ret_trigger
self.break_on_ret(dbg, exc)
def ret_trigger(self, dbg, exc):
ctx = dbg.current_thread.context
# Func result is an alias to EAX/RAX
ctx.func_result = self.initialvalue
# Set the new context for the target thread
dbg.current_thread.set_context(ctx)
self.initialvalue += 1
d = windows.debug.Debugger.debug(sys.executable, [sys.executable, "-c", TARGET_PYTHON_CODE])
# We could also give the direct address of the function
# But it would require to wait for the module to be loaded
d.add_bp(IncrementReturnValue("kernelbase!IsDebuggerPresent", 42))
d.loop()
Output
(cmd) python debug\change_function_ret_value.py
[DEBUGGE] IsDebuggerPresent=42
[DEBUGGE] IsDebuggerPresent=43
[DEBUGGE] IsDebuggerPresent=44
18.13.1.4. windows.debug.FunctionBP
¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.test
import windows.debug
from windows.generated_def.winstructs import *
class FollowNtCreateFile(windows.debug.FunctionBP):
TARGET = windows.winproxy.NtCreateFile
COUNTER = 3
def trigger(self, dbg, exc):
if not self.COUNTER:
print("Exiting process")
dbg.current_process.exit()
return
params = self.extract_arguments(dbg.current_process, dbg.current_thread)
filename = params["ObjectAttributes"].contents.ObjectName.contents.Buffer
handle_addr = params["FileHandle"].value
self.data = (filename, handle_addr)
self.break_on_ret(dbg, exc)
def ret_trigger(self, dbg, exc):
filename, handle_addr = self.data
ret_value = dbg.current_thread.context.func_result # EAX / RAX depending of bitness
handle_value = dbg.current_process.read_ptr(handle_addr)
if ret_value:
print("NtCreateFile of <{0}> FAILED (result={1:#x})".format(filename, ret_value))
return
print("NtCreateFile of <{0}>: handle = {1:#x}".format(filename, handle_value))
# Manual verification
fhandle = [h for h in windows.system.handles if h.dwProcessId == dbg.current_process.pid and h.wValue == handle_value]
if not fhandle:
raise ValueError("handle not found!")
fhandle = fhandle[0]
print("Handle manually found! typename=<{0}>, name=<{1}>".format(fhandle.type, fhandle.name))
print("")
self.COUNTER -= 1
if __name__ == "__main__":
calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS)
d = windows.debug.Debugger(calc)
d.add_bp(FollowNtCreateFile())
d.loop()
Output
(cmd) python debug\debug_functionbp.py
NtCreateFile of <50173784>: handle = 0x1c4
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Globalization\Sorting\SortDefault.nls>
NtCreateFile of <50181528>: handle = 0x1ec
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Fonts\StaticCache.dat>
NtCreateFile of <50195912>: handle = 0x1fc
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Branding\Basebrd\basebrd.dll>
Exiting process
18.13.1.5. Debugger.attach
¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.test
import windows.debug
from windows.generated_def.winstructs import *
# Just a debugger that follow NtCreateFile and print filename & handler
from debug_functionbp import FollowNtCreateFile
def follow_create_file(pid):
print("Finding process with pid <{0}>".format(pid))
target = [p for p in windows.system.processes if p.pid == pid][0]
print("Target is {0}".format(target))
dbg = windows.debug.Debugger.attach(target)
print("Debugger attached: {0}".format(dbg))
print("")
dbg.add_bp(FollowNtCreateFile())
dbg.loop()
if __name__ == "__main__":
# Create a non-debugged process safe to debug
calc = windows.test.pop_proc_32(dwCreationFlags=0)
# Give ovnly the PID to follow_create_file
follow_create_file(calc.pid)
Output
(cmd) python debug\attach.py
Finding process with pid <27576>
Target is <WinProcess "winver.exe" pid 27576 at 0x223f0862d90>
Debugger attached: <windows.debug.debugger.Debugger object at 0x00000223F084A110>
NtCreateFile of <54203712>: handle = 0x1c8
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Globalization\Sorting\SortDefault.nls>
NtCreateFile of <54268840>: handle = 0x1f0
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Fonts\StaticCache.dat>
NtCreateFile of <54280288>: handle = 0x200
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Branding\Basebrd\basebrd.dll>
Exiting process
18.13.1.6. Native code tester¶
import sys
import argparse
import windows
import windows.test
import windows.debug as dbg
import windows.native_exec.simple_x86 as x86
import windows.native_exec.simple_x64 as x64
from windows.generated_def import *
def hexdump(string, start_addr=0):
result = ""
if len(string) == 0:
return
ascii = list("."*256)
for i in range(1,0x7f):
ascii[i] = chr(i)
ascii[0x0] = "."
ascii[0x7] = "."
ascii[0x8] = "."
ascii[0x9] = "."
ascii[0xa] = "."
ascii[0x1b] = "."
ascii[0xd] = "."
ascii[0xff] = "\x13"
ascii = "".join(ascii)
offset = 0
while (offset+0x10) <= len(string):
line = string[offset:(offset+0x10)]
linebuf = " %08X " % (offset + start_addr)
for i in range(0,16):
if i == 8:
linebuf += " "
linebuf += "%02X " % ord(line[i])
linebuf += " "
for i in range(0,16):
linebuf += ascii[ord(line[i])]
result += linebuf+"\n"
offset += 0x10
if (len(string) % 0x10) > 0:
linebuf = " %08X " % (offset + start_addr)
for i in range((len(string)-(len(string) % 0x10)),(len(string))):
if i == 8:
linebuf += " "
linebuf += "%02X " % ord(string[i])
linebuf += " "*(0x10-(len(string) % 0x10))
linebuf += " "
for i in range((len(string)-(len(string) % 0x10)),(len(string))):
linebuf += ascii[ord(string[i])]
result += linebuf+"\n"
return result
class StartStepBP(dbg.breakpoints.HXBreakpoint):
def trigger(self, dbg, exc):
dbg.del_bp(self)
dbg.on_single_step(exc) # Trigger single step processing
class CodeTesteur(dbg.Debugger):
def __init__(self, process, code, register_start={}, steps=False):
super(CodeTesteur, self).__init__(process)
self.initial_code = code
code += "\xcc"
self.code_addr = self.write_code_in_target(process, code)
register_start["pc"] = self.code_addr
self.thread_exec = process.threads[0]
self.context_exec = self.thread_exec.context
self.setup_target_context(self.context_exec, register_start)
print("Startup context is:")
self.context_exec.dump()
print(self.context_exec.EEFlags)
self.thread_exec.suspend()
self.thread_exec.set_context(self.context_exec)
self.thread_exec.resume()
self.init_breakpoint = False
# Test code
if steps:
self.steps = True
self.add_bp(StartStepBP(self.code_addr))
self.last_context = self.context_exec
self.last_code = self.initial_code
def write_code_in_target(self, process, code):
addr = process.virtual_alloc(len(code))
process.write_memory(addr, code)
return addr
def setup_target_context(self, ctx, register_start):
for name, value in register_start.items():
if not hasattr(ctx, name):
raise ValueError("Unknown register to setup <{0}>".format(name))
setattr(ctx, name, value)
def on_single_step(self, x):
print("* New step !")
# print("EIP = {0:#x}".format(self.current_thread.context.pc))
self.report_ctx_diff(self.last_context, self.current_thread.context)
self.last_context = self.current_thread.context
self.last_code = self.report_code_diff(self.last_code)
self.single_step()
print ("")
def on_exception(self, x):
exc_code = x.ExceptionRecord.ExceptionCode
exc_addr = x.ExceptionRecord.ExceptionAddress
if not self.init_breakpoint and exc_code == EXCEPTION_BREAKPOINT:
self.init_breakpoint = True
return
ctx = self.current_thread.context
print("==Post-exec context==")
ctx.dump()
print(ctx.EEFlags)
if exc_code == EXCEPTION_BREAKPOINT and exc_addr == self.context_exec.pc + len(self.initial_code):
print("<Normal terminaison>")
else:
print("<{0}> at <{1:#x}>".format(exc_code, exc_addr))
if exc_code == EXCEPTION_ACCESS_VIOLATION:
exc_infos = x.ExceptionRecord.ExceptionInformation
read_write = "write" if exc_infos[0] else "read"
target_addr = exc_infos[1]
print(" * Access of type <{0}> at address <{1:#x}>".format(read_write, target_addr))
self.report_ctx_diff(self.context_exec, ctx)
self.report_code_diff(self.initial_code)
self.current_process.exit()
return
def report_ctx_diff(self, start, now):
print("== DIFF ==")
for name, start_value in start.regs():
now_value = getattr(now, name)
if start_value != now_value:
diff = now_value - start_value
print("{0}: {1:#x} -> {2:#x} ({3:+#x})".format(name, start_value, now_value, diff))
if start.sp > now.sp:
print("Negative Stack: dumping:")
data = self.current_process.read_memory(now.sp, start.sp - now.sp)
print(hexdump(data, start.sp))
def report_code_diff(self, initial_code):
code_size = len(initial_code)
final_code = self.current_process.read_memory(self.code_addr, code_size)
if final_code == initial_code:
return initial_code
print("== Executable code DIFF == ")
print("Before:")
print(hexdump(initial_code, self.code_addr))
print("After:")
print(hexdump(final_code, self.code_addr))
return final_code
def test_code_x86(code, regs=None, raw=False, steps=False, **kwargs):
print("Testing x86 code")
process = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS)
if raw:
code = code.replace(" ", "").decode('hex')
else:
code = x86.assemble(code)
start_register = {}
if regs:
for name_value in regs.split(";"):
name, value = name_value.split("=")
name = name.strip().capitalize()
if name == "Eflags":
name = "EFlags"
value = int(value.strip(), 0)
start_register[name] = value
x = CodeTesteur(process, code, start_register, steps)
x.loop()
def test_code_x64(code, regs=None, raw=False, **kwargs):
print("Testing x64 code")
if windows.current_process.bitness == 32:
raise ValueError("Cannot debug a 64b process from 32b python")
process = windows.test.pop_proc_64(dwCreationFlags=DEBUG_PROCESS)
if raw:
code = code.replace(" ", "").decode('hex')
else:
code = x64.assemble(code)
start_register = {}
if regs:
for name_value in regs.split(";"):
name, value = name_value.split("=")
name = name.strip().capitalize()
if name == "Eflags":
name = "EFlags"
value = int(value.strip(), 0)
start_register[name] = value
x = CodeTesteur(process, code, start_register)
x.loop()
parser = argparse.ArgumentParser(prog=__file__)
parser.add_argument('--x64', action='store_const', dest="func", const=test_code_x64, default=test_code_x86, help='Code is x64')
parser.add_argument('--raw', action='store_true', help='argument is raw assembled code (in hex)')
parser.add_argument('--steps', action='store_true', help='Get all step info')
parser.add_argument('code', help='The code to execute')
parser.add_argument('regs', nargs="?", help='The default values of the registers')
res = parser.parse_args()
res.func(**res.__dict__)
Ouput
(cmd) python.exe test_code.py "mov eax, 0x42424242" "eax=0x11223344"
Testing x86 code
Startup context is:
Eip -> 0x3f0000L
Esp -> 0x3bfae4L
Eax -> 0x11223344L
Ebx -> 0x5a6000L
Ecx -> 0x0L
Edx -> 0x0L
Ebp -> 0x0L
Edi -> 0x0L
Esi -> 0x0L
EFlags -> 0x202L
EEflags(0x202L:IF)
==Post-exec context==
Eip -> 0x3f0007L
Esp -> 0x3bfae4L
Eax -> 0x42424242L
Ebx -> 0x5a6000L
Ecx -> 0x0L
Edx -> 0x0L
Ebp -> 0x0L
Edi -> 0x0L
Esi -> 0x0L
EFlags -> 0x202L
EEflags(0x202L:IF)
<Normal terminaison>
==DIFF==
Eip: 0x3f0000 -> 0x3f0007 (+0x7)
Eax: 0x11223344 -> 0x42424242 (+0x31200efe)
(cmd) python64 test_code.py --x64 "mov r15, 0x11223344; push r14; call r15" "rcx=1; r14=0x4242424243434343"
Testing x64 code
Startup context is:
Rip -> 0x205a1d60000L
Rsp -> 0xe24a88fa88L
Rax -> 0x0L
Rbx -> 0x0L
Rcx -> 0x1L
Rdx -> 0xe24aaf9000L
Rbp -> 0x0L
Rdi -> 0x0L
Rsi -> 0x0L
R8 -> 0x0L
R9 -> 0x0L
R10 -> 0x0L
R11 -> 0x0L
R12 -> 0x0L
R13 -> 0x0L
R14 -> 0x4242424243434343L
R15 -> 0x0L
EFlags -> 0x200L
EEflags(0x200L:IF)
==Post-exec context==
Rip -> 0x11223344L
Rsp -> 0xe24a88fa78L
Rax -> 0x0L
Rbx -> 0x0L
Rcx -> 0x1L
Rdx -> 0xe24aaf9000L
Rbp -> 0x0L
Rdi -> 0x0L
Rsi -> 0x0L
R8 -> 0x0L
R9 -> 0x0L
R10 -> 0x0L
R11 -> 0x0L
R12 -> 0x0L
R13 -> 0x0L
R14 -> 0x4242424243434343L
R15 -> 0x11223344L
EFlags -> 0x10202L
EEflags(0x10202L:IF|RF)
<EXCEPTION_ACCESS_VIOLATION(0xc0000005L)> at <0x11223344>
==DIFF==
Rip: 0x205a1d60000 -> 0x11223344 (-0x20590b3ccbc)
Rsp: 0xe24a88fa88 -> 0xe24a88fa78 (-0x10)
R15: 0x0 -> 0x11223344 (+0x11223344)
EFlags: 0x200 -> 0x10202 (+0x10002)
Negative Stack: dumping:
E24A88FA88 0C 00 D6 A1 05 02 00 00 43 43 43 43 42 42 42 42 ........CCCCBBBB
18.13.2. SymbolDebugger
¶
import argparse
import os
import windows
import windows.debug
import windows.test
parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)')
args = parser.parse_args()
print(args)
if args.dbghelp:
symbols.set_dbghelp_path(args.dbghelp)
else:
if "PFW_DBGHELP_PATH" not in os.environ:
print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail")
class MyInfoBP(windows.debug.Breakpoint):
COUNT = 0
def trigger(self, dbg, exc):
cursym = dbg.current_resolver[exc.ExceptionRecord.ExceptionAddress]
print("Breakpoint triggered at: {0}".format(cursym))
print(repr(cursym))
MyInfoBP.COUNT += 1
if MyInfoBP.COUNT == 4:
print("Quitting")
dbg.current_process.exit()
print("")
dbg = windows.debug.SymbolDebugger.debug("C:\\windows\\system32\\notepad.exe")
dbg.add_bp(MyInfoBP("kernelbase!CreateFileInternal"))
dbg.add_bp(MyInfoBP("ntdll!LdrpInitializeProcess"))
dbg.loop()
Output
(cmd) python debug\symbol_debugger.py
Namespace(dbghelp=None)
Breakpoint triggered at: ntdll!LdrpInitializeProcess
<SymbolInfoW name="LdrpInitializeProcess" start=0x7ff8e5aecca0 tag=SymTagPublicSymbol>
Breakpoint triggered at: KERNELBASE!CreateFileInternal
<SymbolInfoW name="CreateFileInternal" start=0x7ff8e33b4b70 tag=SymTagPublicSymbol>
Breakpoint triggered at: KERNELBASE!CreateFileInternal
<SymbolInfoW name="CreateFileInternal" start=0x7ff8e33b4b70 tag=SymTagPublicSymbol>
Breakpoint triggered at: KERNELBASE!CreateFileInternal
<SymbolInfoW name="CreateFileInternal" start=0x7ff8e33b4b70 tag=SymTagPublicSymbol>
Quitting
18.13.3. LocalDebugger
¶
18.13.3.1. In current process¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.debug
from windows.generated_def.winstructs import *
import windows.native_exec.simple_x86 as x86
class SingleSteppingDebugger(windows.debug.LocalDebugger):
SINGLE_STEP_COUNT = 4
def on_exception(self, exc):
code = self.get_exception_code()
context = self.get_exception_context()
print("EXCEPTION !!!! Got a {0!r} at 0x{1:x}".format(code, context.pc))
self.SINGLE_STEP_COUNT -= 1
if self.SINGLE_STEP_COUNT:
return self.single_step()
return EXCEPTION_CONTINUE_EXECUTION
class RewriteBreakpoint(windows.debug.HXBreakpoint):
def trigger(self, dbg, exc):
context = dbg.get_exception_context()
print("GOT AN HXBP at 0x{0:x}".format(context.pc))
# Rewrite the infinite loop with 2 nop
windows.current_process.write_memory(self.addr, b"\x90\x90")
# Ask for a single stepping
return dbg.single_step()
d = SingleSteppingDebugger()
# Infinite loop + nop + ret
code = x86.assemble("label :begin; jmp :begin; nop; ret")
func = windows.native_exec.create_function(code, [PVOID])
print("Code addr = 0x{0:x}".format(func.code_addr))
# Create a thread that will infinite loop
t = windows.current_process.create_thread(func.code_addr, 0)
# Add a breakpoint on the infitine loop
d.add_bp(RewriteBreakpoint(func.code_addr))
t.wait()
print("Done!")
Output
(cmd) python debug\local_debugger.py
Code addr = 0x25f8532000e
GOT AN HXBP at 0x25f8532000e
EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004) at 0x7ff8e377257d
EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004) at 0x7ff8e5a6aa80
EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004) at 0x7ff8e5aafde0
EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004) at 0x7ff8e5aafdf4
Done!
18.13.3.2. In remote process¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import ctypes
import windows
import windows.test
from windows.generated_def.winstructs import *
remote_code = """
import windows
from windows.generated_def.winstructs import *
windows.utils.create_console()
class YOLOHXBP(windows.debug.HXBreakpoint):
def trigger(self, dbg, exc):
p = windows.current_process
arg_pos = 2
context = dbg.get_exception_context()
esp = context.Esp
unicode_string_addr = p.read_ptr(esp + (arg_pos + 1) * 4)
wstring_addr = p.read_ptr(unicode_string_addr + 4)
dll_loaded = p.read_wstring(wstring_addr)
print("I AM LOADING <{0}>".format(dll_loaded))
d = windows.debug.LocalDebugger()
exp = windows.current_process.peb.modules[1].pe.exports
#windows.utils.FixedInteractiveConsole(locals()).interact()
ldr = exp["LdrLoadDll"]
d.add_bp(YOLOHXBP(ldr))
"""
c = windows.test.pop_proc_32(dwCreationFlags=CREATE_SUSPENDED)
c.execute_python(remote_code)
c.threads[0].resume()
import time
time.sleep(2)
c.exit()
Ouput:
(cmd λ) python.exe debug\local_debugger_remote_process.py
(In another console)
I AM LOADING <C:\Windows\system32\uxtheme.dll>
I AM LOADING <C:\Windows\system32\uxtheme.dll>
I AM LOADING <C:\Windows\system32\uxtheme.dll>
I AM LOADING <C:\Windows\system32\uxtheme.dll>
I AM LOADING <kernel32.dll>
I AM LOADING <C:\Windows\WinSxS\x86_microsoft.windows.gdiplus_6595b64144ccf1df_1.1.9600.17415_none_dad8722c5bcc2d8f\gdiplus.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32>
I AM LOADING <C:\Windows\SysWOW64\oleacc.dll>
I AM LOADING <OLEAUT32.DLL>
I AM LOADING <C:\Windows\system32\ole32.dll>
I AM LOADING <C:\Windows\system32\MSCTF.dll>
I AM LOADING <C:\Windows\SysWOW64\msxml6.dll>
I AM LOADING <C:\Windows\system32\shell32.dll>
I AM LOADING <C:\Windows\SYSTEM32\WINMM.dll>
I AM LOADING <C:\Windows\system32\ole32.dll>
18.14. Symbols¶
18.14.1. VirtualSymbolHandler¶
import os
import windows
import windows.generated_def as gdef
from windows.debug import symbols
import argparse
parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)')
args = parser.parse_args()
print(args)
if args.dbghelp:
symbols.set_dbghelp_path(args.dbghelp)
else:
if "PFW_DBGHELP_PATH" not in os.environ:
print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail")
symbols.engine.options = 0 # Disable defered load
sh = symbols.VirtualSymbolHandler()
ntmod = sh.load_file(r"c:\windows\system32\ntdll.dll", addr=0x420000)
print("Ntdll module is: {0}".format(ntmod))
print(" * name = {0}".format(ntmod.name))
print(" * addr = {0:#x}".format(ntmod.addr))
print(" * path = {0:}".format(ntmod.path))
print(" * type = {0:}".format(ntmod.type))
print(" * pdb = {0:}".format(ntmod.pdb))
print("")
TEST_FUNCTION = "LdrLoadDll"
print("Resolving function <{0}>".format(TEST_FUNCTION))
loaddll = sh["ntdll!" + TEST_FUNCTION]
print("Symbol found !")
print(" * __repr__: {0!r}".format(loaddll))
print(" * __str__: {0}".format(loaddll))
print(" * addr: {0:#x}".format(loaddll.addr))
print(" * name: {0}".format(loaddll.name))
print(" * fullname: {0}".format(loaddll.fullname))
print(" * module: {0}".format(loaddll.module))
print("")
print("Loading kernelbase")
kbasemod = sh.load_file(r"c:\windows\system32\kernelbase.dll", addr=0x1230000)
print("Loaded modules are: {0}".format(sh.modules))
LOOKUP_ADDR = 0x1231242
print("Looking up address: {0:#x}".format(LOOKUP_ADDR))
lookupsym = sh[LOOKUP_ADDR]
print("Symbol resolved !")
print(" * __repr__: {0!r}".format(lookupsym))
print(" * __str__: {0}".format(lookupsym))
print(" * start: {0:#x}".format(lookupsym.start))
print(" * addr: {0:#x}".format(lookupsym.addr))
print(" * displacement: {0:#x}".format(lookupsym.displacement))
print(" * name: {0}".format(lookupsym.name))
print(" * fullname: {0}".format(lookupsym.fullname))
print(" * module: {0}".format(lookupsym.module))
Output
(cmd) python debug\symbols\virtsymdemo.py
Namespace(dbghelp=None)
Ntdll module is: <SymbolModule name="ntdll" type=SymPdb pdb="ntdll.pdb" addr=0x420000>
* name = ntdll
* addr = 0x420000
* path = c:\windows\system32\ntdll.dll
* type = <SYM_TYPE SymPdb(0x3)>
* pdb = c:\Symbols\ntdll.pdb\8D5D5ED5D5B8AA609A82600C14E3004D1\ntdll.pdb
Resolving function <LdrLoadDll>
Symbol found !
* __repr__: <SymbolInfoW name="LdrLoadDll" start=0x44a160 tag=SymTagFunction>
* __str__: ntdll!LdrLoadDll
* addr: 0x44a160
* name: LdrLoadDll
* fullname: ntdll!LdrLoadDll
* module: <SymbolModule name="ntdll" type=SymPdb pdb="ntdll.pdb" addr=0x420000>
Loading kernelbase
Loaded modules are: [<SymbolModule name="ntdll" type=SymPdb pdb="ntdll.pdb" addr=0x420000>, <SymbolModule name="kernelbase" type=SymPdb pdb="kernelbase.pdb" addr=0x1230000>]
Looking up address: 0x1231242
Symbol resolved !
* __repr__: <SymbolInfoW name="PsspThunkWin32Nt_HANDLE_ENTRY" start=0x1231240 displacement=0x2 tag=SymTagPublicSymbol>
* __str__: kernelbase!PsspThunkWin32Nt_HANDLE_ENTRY+0x2
* start: 0x1231240
* addr: 0x1231242
* displacement: 0x2
* name: PsspThunkWin32Nt_HANDLE_ENTRY
* fullname: kernelbase!PsspThunkWin32Nt_HANDLE_ENTRY+0x2
* module: <SymbolModule name="kernelbase" type=SymPdb pdb="kernelbase.pdb" addr=0x1230000>
18.14.2. ProcessSymbolHandler¶
import os
import argparse
import windows
import windows.test
import windows.generated_def as gdef
from windows.debug import symbols
parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)')
args = parser.parse_args()
print(args)
if args.dbghelp:
symbols.set_dbghelp_path(args.dbghelp)
else:
if "PFW_DBGHELP_PATH" not in os.environ:
print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail")
if windows.current_process.bitness == 32:
target = windows.test.pop_proc_32()
else:
target = windows.test.pop_proc_64()
print("Target is {0}".format(target))
sh = symbols.ProcessSymbolHandler(target)
import time;time.sleep(0.1) # Just wait for the process initialisation
sh.refresh() # Refresh symbol list (Only meaningful for ProcessSymbolHandler)
print("Some loaded modules are:".format())
for sm in sh.modules[:3]:
print(" * {0}".format(sm))
createserv = sh["advapi32!CreateServiceEx"]
print("")
TEST_FUNCTION = "advapi32!CreateServiceEx"
print("Resolving function <{0}>".format(TEST_FUNCTION))
createserv = sh[TEST_FUNCTION]
print("Symbol found !")
print(" * __repr__: {0!r}".format(createserv))
print(" * __str__: {0}".format(createserv))
print(" * addr: {0:#x}".format(createserv.addr))
print(" * name: {0}".format(createserv.name))
print(" * fullname: {0}".format(createserv.fullname))
print(" * module: {0}".format(createserv.module))
target.exit()
Output
(cmd) python debug\symbols\processsymdemo.py
Namespace(dbghelp=None)
Target is <WinProcess "winver.exe" pid 18600 at 0x23a15d4cdd0>
Some loaded modules are:
* <SymbolModule name="winver" type=SymDeferred pdb="" addr=0x7ff658a30000>
* <SymbolModule name="ntdll" type=SymDeferred pdb="" addr=0x7ff8e5a10000>
* <SymbolModule name="KERNEL32" type=SymDeferred pdb="" addr=0x7ff8e3760000>
Resolving function <advapi32!CreateServiceEx>
Symbol found !
* __repr__: <SymbolInfoW name="CreateServiceEx" start=0x7ff8e4b2d2e0 tag=SymTagPublicSymbol>
* __str__: advapi32!CreateServiceEx
* addr: 0x7ff8e4b2d2e0
* name: CreateServiceEx
* fullname: advapi32!CreateServiceEx
* module: <SymbolModule name="advapi32" type=SymPdb pdb="advapi32.pdb" addr=0x7ff8e4b10000>
18.14.3. Symbol search¶
import argparse
import os
import windows
import windows.debug.symbols as symbols
parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('pattern')
parser.add_argument('file', help="The PE file to load")
parser.add_argument('--addr', type=lambda x: int(x, 0), default=0, help="The load address of the PE")
parser.add_argument('--tag', type=lambda x: int(x, 0), default=0)
parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)')
args = parser.parse_args()
if args.dbghelp:
symbols.set_dbghelp_path(args.dbghelp)
else:
if "PFW_DBGHELP_PATH" not in os.environ:
print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail")
sh = symbols.VirtualSymbolHandler()
mod = sh.load_file(path=args.file, addr=args.addr)
res = sh.search(args.pattern, mod=mod, tag=args.tag)
print("{0} symbols found:".format(len(res)))
for sym in res:
print(" * {0!r}".format(sym))
Output
$ python64 debug\symbols\symsearch.py "CreateFile*" c:\windows\system32\kernelbase.dll
Namespace(addr=0, dbghelp=None, file='c:\\windows\\system32\\kernelbase.dll', pattern='CreateFile*', tag=0)
16 symbols found:
* <SymbolInfoA name="CreateFileInternal" start=0x180024250 tag=SymTagFunction>
* <SymbolInfoA name="CreateFileMappingFromApp" start=0x1800809d0 tag=SymTagFunction>
* <SymbolInfoA name="CreateFileMoniker" start=0x180087e40 tag=SymTagFunction>
* <SymbolInfoA name="CreateFile2" start=0x180074550 tag=SymTagFunction>
* <SymbolInfoA name="CreateFileA" start=0x1800240d0 tag=SymTagFunction>
* <SymbolInfoA name="CreateFileMappingNumaW" start=0x18002ca40 tag=SymTagFunction>
* <SymbolInfoA name="CreateFileMapping2" start=0x1800fb6d0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFileInternal" start=0x180024250 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFileMappingW" start=0x18002cd00 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFileDowngrade_Win7" start=0x180082ff0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFileDowngrade_Vista" start=0x18007eba0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFileMappingFromApp" start=0x1800809d0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFile2" start=0x180074550 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFileW" start=0x1800241d0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFileA" start=0x1800240d0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="CreateFileMappingNumaW" start=0x18002ca40 tag=SymTagPublicSymbol>
$ python64 debug\symbols\symsearch.py "NtCreate*" c:\windows\system32\ntdll.dll --addr 0x42000000
Namespace(addr=1107296256, dbghelp=None, file='c:\\windows\\system32\\ntdll.dll', pattern='NtCreate*', tag=0)
47 symbols found:
* <SymbolInfoA name="NtCreateProcessEx" start=0x4209ca00 tag=SymTagPublicSymbol>
* <SymbolInfoA name="NtCreateIRTimer" start=0x4209d530 tag=SymTagPublicSymbol>
* <SymbolInfoA name="NtCreateRegistryTransaction" start=0x4209d750 tag=SymTagPublicSymbol>
* <SymbolInfoA name="NtCreateTimer" start=0x4209d810 tag=SymTagPublicSymbol>
* <SymbolInfoA name="NtCreateKeyedEvent" start=0x4209d5d0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="NtCreateFile" start=0x4209cb00 tag=SymTagPublicSymbol>
* <SymbolInfoA name="NtCreateSymbolicLinkObject" start=0x4209d7d0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="NtCreatePrivateNamespace" start=0x4209d6d0 tag=SymTagPublicSymbol>
* <SymbolInfoA name="NtCreateKey" start=0x4209c400 tag=SymTagPublicSymbol>
...
18.15. WMI¶
18.15.1. WMI requests¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
print("WMI requester is {0}".format(windows.system.wmi))
print("Selecting * from 'Win32_Process'")
result = windows.system.wmi.select("Win32_Process")
print("They are <{0}> processes".format(len(result)))
print("Looking for ourself via pid")
us = [p for p in result if int(p["ProcessId"]) == windows.current_process.pid][0]
print("Some info about our process:")
print(" * {0} -> {1}".format("Name", us["Name"]))
print(" * {0} -> {1}".format("ProcessId", us["ProcessId"]))
print(" * {0} -> {1}".format("OSName", us["OSName"]))
print(" * {0} -> {1}".format("UserModeTime", us["UserModeTime"]))
print(" * {0} -> {1}".format("WindowsVersion", us["WindowsVersion"]))
print(" * {0} -> {1}".format("CommandLine", us["CommandLine"]))
print("<Select Caption,FileSystem,FreeSpace from Win32_LogicalDisk>:")
for vol in windows.system.wmi.query("select Caption,FileSystem,FreeSpace from Win32_LogicalDisk"):
# Filter out system-properties for the sample
print(" * " + str({k:v for k,v in vol.items() if not k.startswith("_")}))
print("\n ==== Advanced use ====")
print("Listing some namespaces:")
for namespace in [ns for ns in windows.system.wmi.namespaces if "2" in ns]:
print(" * {0}".format(namespace))
security2 = windows.system.wmi["root\\SecurityCenter2"]
print("Querying non-default namespace: {0}".format(security2))
print("Listing some available classes:")
for clsname in [x for x in security2.classes if x["__CLASS"].endswith("Product")]:
print(" * {0}".format(clsname))
print("Listing <AntiVirusProduct>:")
for av in security2.select("AntiVirusProduct"):
print(" * {0}".format(av["displayName"]))
Output
(cmd) python wmi\wmi_request.py
WMI requester is <windows.winobject.wmi.WmiManager object at 0x000001CD1A46E150>
Selecting * from 'Win32_Process'
They are <329> processes
Looking for ourself via pid
Some info about our process:
* Name -> python.exe
* ProcessId -> 28460
* OSName -> Microsoft Windows 11 Pro|C:\Windows|\Device\Harddisk0\Partition3
* UserModeTime -> 0
* WindowsVersion -> 10.0.22631
* CommandLine -> C:\Users\cleme\AppData\Local\Programs\Python\Python311\python.exe wmi\wmi_request.py
<Select Caption,FileSystem,FreeSpace from Win32_LogicalDisk>:
* {'Caption': 'C:', 'FileSystem': 'NTFS', 'FreeSpace': '925749731328'}
==== Advanced use ====
Listing some namespaces:
* CIMV2
* SecurityCenter2
* StandardCimv2
Querying non-default namespace: <WmiNamespace "root\SecurityCenter2">
Listing some available classes:
* <WmiObject class "AntiSpywareProduct">
* <WmiObject class "AntiVirusProduct">
* <WmiObject class "FirewallProduct">
Listing <AntiVirusProduct>:
* Windows Defender
18.15.2. WMI Create Process¶
import time
import windows
wmispace = windows.system.wmi["root\\cimv2"]
print("WMI namespace is <{0}>".format(wmispace))
proc_class = wmispace.get_object("Win32_process")
print("Process class is {0}".format(proc_class))
inparam_cls = proc_class.get_method("Create").inparam
print("Method Create InParams is <{0}>".format(inparam_cls))
print("Method Create InParams properties are <{0}>".format(inparam_cls.properties))
print("Creating instance of inparam")
inparam = inparam_cls()
print("InParam instance is <{0}>".format(inparam))
print("Setting <CommandLine>")
inparam["CommandLine"] = r"c:\windows\system32\notepad.exe"
print("Executing method")
# This API may change for something that better wraps cls/object/Parameters handling
outparam = wmispace.exec_method(proc_class, "Create", inparam)
print("OutParams is {0}".format(outparam))
print("Out params values are: {0}".format(outparam.properties))
target = windows.WinProcess(pid=int(outparam["ProcessId"]))
print("Created process is {0}".format(target))
print("Waiting 1s")
time.sleep(1)
print("Killing the process")
target.exit(0)
Output
(cmd) python wmi\create_process.py
WMI namespace is <<WmiNamespace "root\cimv2">>
Process class is <WmiObject class "Win32_Process">
Method Create InParams is <<WmiObject class "__PARAMETERS">>
Method Create InParams properties are <['CommandLine', 'CurrentDirectory', 'ProcessStartupInformation']>
Creating instance of inparam
InParam instance is <<WmiObject instance of "__PARAMETERS">>
Setting <CommandLine>
Executing method
OutParams is <WmiObject instance of "__PARAMETERS">
Out params values are: ['ProcessId', 'ReturnValue']
Created process is <WinProcess "notepad.exe" pid 26744 at 0x180e84d48d0>
Waiting 1s
Killing the process
18.16. windows.com
¶
18.16.1. INetFwPolicy2
¶
import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))
import windows
import windows.generated_def as gdef
from windows.generated_def import interfaces
# This code is a simple version of the firewall code in windows.winoject.network
print("Initialisation of COM")
windows.com.init()
print("Creating INetFwPolicy2 variable")
firewall = interfaces.INetFwPolicy2()
print("{0} (value = {1})".format(firewall, firewall.value))
print("")
print("Generating CLSID")
NetFwPolicy2CLSID = windows.com.IID.from_string("E2B3C97F-6AE1-41AC-817A-F6F92166D7DD")
print(repr(NetFwPolicy2CLSID))
print("")
print("Creating COM instance")
windows.com.create_instance(NetFwPolicy2CLSID, firewall)
print("{0} (value = 0x{1:0})".format(firewall, firewall.value))
print("")
print("Checking for enabled profiles")
for profile in [gdef.NET_FW_PROFILE2_DOMAIN, gdef.NET_FW_PROFILE2_PRIVATE, gdef.NET_FW_PROFILE2_PUBLIC]:
enabled = gdef.VARIANT_BOOL()
firewall.get_FirewallEnabled(profile, enabled)
print(" * {0} -> {1}".format(profile, enabled.value))
Output
(cmd) python com\com_inetfwpolicy2.py
Initialisation of COM
Creating INetFwPolicy2 variable
<INetFwPolicy2<NULL> at 0x2925a3e2350> (value = None)
Generating CLSID
<GUID "E2B3C97F-6AE1-41AC-817A-F6F92166D7DD">
Creating COM instance
<INetFwPolicy2 at 0x2925a3e2350> (value = 0x2827524184096)
Checking for enabled profiles
* NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_DOMAIN(0x1) -> True
* NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_PRIVATE(0x2) -> True
* NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_PUBLIC(0x4) -> True
18.16.2. ICallInterceptor
¶
import windows
import windows.generated_def as gdef
from windows import winproxy
# POC of ICallInterceptor
# Based on works by Pavel Yosifovich
# http://blogs.microsoft.co.il/pavely/2018/02/28/intercepting-com-objects-with-cogetinterceptor/
windows.com.init()
# Create an interceptor for the firewall (INetFwPolicy2)
interceptor = gdef.ICallInterceptor()
winproxy.CoGetInterceptor(gdef.INetFwPolicy2.IID, None, interceptor.IID, interceptor)
# The PythonForWindows firewall object is a real/valid INetFwPolicy2
# used for demos of ICallFrameEvents.Invoke
real_firewall = windows.system.network.firewall
# Custom Python ICallFrameEvents implementation
class MySink(windows.com.COMImplementation):
IMPLEMENT = gdef.ICallFrameEvents
def OnCall(self, this, frame):
ifname = gdef.PWSTR()
methodname = gdef.PWSTR()
print("Hello from python sink !")
frame.GetNames(ifname, methodname)
print("Catching call to <{0}.{1}>".format(ifname.value, methodname.value))
param0info = gdef.CALLFRAMEPARAMINFO()
param0 = windows.com.Variant()
frame.GetParamInfo(0, param0info)
frame.GetParam(0, param0)
print("Info about parameters 0:")
windows.utils.sprint(param0info, name=" * param0info")
print("param0 value = {0}".format(param0.aslong))
frame.Invoke(real_firewall)
frame.SetReturnValue(1234)
print("Leaving the sink !")
return 0
# Create and register our ICallFrameEvents sink
xsink = MySink()
interceptor.RegisterSink(xsink)
# Create the INetFwPolicy2 interceptor interface
fakefirewall = gdef.INetFwPolicy2()
interceptor.QueryInterface(fakefirewall.IID, fakefirewall)
# Calling one of the INetFwPolicy2 function for testing
# Testing on https://msdn.microsoft.com/en-us/library/windows/desktop/aa365316(v=vs.85).aspx
enabled = gdef.VARIANT_BOOL()
res = fakefirewall.get_FirewallEnabled(2, enabled)
print("return value = {0}".format(res))
print("firewall enabled = {0}".format(enabled))
# Test a function taking a POINTER(ICallFrameEvents) (PTR to interface)
print("Testing a function taking a PTR to a COM interface")
sink2 = gdef.ICallFrameEvents()
print("Before call: {0}".format((sink2, sink2.value)))
interceptor.GetRegisteredSink(sink2)
print("After call: {0}".format((sink2, sink2.value)))
# (cmd) python samples\com\icallinterceptor.py
# Hello from python sink !
# Catching call to <INetFwPolicy2.FirewallEnabled>
# Info about parameters 0:
# * param0info.fIn -> 0x1
# * param0info.fOut -> 0x0
# * param0info.stackOffset -> 0x4L
# * param0info.cbParam -> 0x4L
# param0 value = 2
# Leaving the sink !
# return value = 1234
# firewall enabled = VARIANT_BOOL(True)
# Testing a function taking a PTR to a COM interface
# Before call: (<ICallFrameEvents object at 0x066EF3F0>, None)
# After call: (<ICallFrameEvents object at 0x066EF3F0>, 107934504)
Output
(cmd) python com\icallinterceptor.py
Hello from python sink !
Catching call to <INetFwPolicy2.FirewallEnabled>
Info about parameters 0:
* param0info.fIn -> 0x1
* param0info.fOut -> 0x0
* param0info.stackOffset -> 0x8
* param0info.cbParam -> 0x8
param0 value = 2
Leaving the sink !
return value = 1234
firewall enabled = VARIANT_BOOL(True)
Testing a function taking a PTR to a COM interface
Before call: (<ICallFrameEvents<NULL> at 0x1fb65de5550>, None)
After call: (<ICallFrameEvents at 0x1fb65de5550>, 2179257488408)
18.17. windows.crypto
¶
18.17.1. Encryption demo¶
This sample is a working POC able to generate key-pair, encrypt and decrypt file.
import argparse
import getpass
import windows.crypto as crypto
from windows import winproxy
from windows.generated_def import *
import windows.crypto.generation as gencrypt
# http://stackoverflow.com/questions/1461272/basic-questions-on-microsoft-cryptoapi
def crypt(src, dst, certs, **kwargs):
"""Encrypt the content of 'src' file with the certifacts in 'certs' into 'dst'"""
# Open every certificates in the certs list
certlist = [crypto.Certificate.from_file(x) for x in certs]
# Encrypt the content of 'src' with all the public keys(certs)
res = crypto.encrypt(certlist, src.read())
print("Encryption done. Result:")
print(repr(res))
# Write the result in 'dst'
dst.write(res)
dst.close()
src.close()
def decrypt(src, pfxfile, password, outfile=None, **kwargs):
"""Decrypt the content of 'src' with the private key in 'pfxfile'. the 'pfxfile' is open using the 'password'"""
# Open the 'pfx' with the given password
if password is None:
password = getpass.getpass()
pfx = crypto.import_pfx(pfxfile.read(), password)
# Decrypt the content of the file
decrypted = crypto.decrypt(pfx, src.read())
if outfile is None:
print(u"Result = <{0}>".format(decrypted))
else:
with open(outfile, "wb") as f:
f.write(decrypted)
return decrypted
def sign(src, pfxfile, password, outfile=None, **kwargs):
if password is None:
password = getpass.getpass()
pfx = crypto.import_pfx(pfxfile.read(), password)
# Decrypt the content of the file
certs = pfx.certs
if len(certs) != 1:
raise ValueError("PFX containing exactly one certificate expected for signature")
cert = certs[0]
signed = crypto.sign(cert, src.read())
if outfile is None:
print(u"Result = <{0}>".format(signed))
else:
with open(outfile, "wb") as f:
f.write(signed)
return signed
PFW_TMP_KEY_CONTAINER = "PythonForWindowsTMPContainer"
def genkeys(common_name, pfxpassword, outname, keysize=2048, **kwargs):
"""Generate a SHA256/RSA key pair. A self-signed certificate with 'common_name' is stored as 'outname'.cer.
The private key is stored in 'outname'.pfx protected with 'pfxpassword'"""
cert_store = crypto.CertificateStore.new_in_memory()
# Create a TMP context that will hold our newly generated key-pair
with crypto.CryptContext(PFW_TMP_KEY_CONTAINER, None, PROV_RSA_FULL, 0, retrycreate=True) as ctx:
key = HCRYPTKEY()
keysize_flags = keysize << 16
# Generate a key-pair that is exportable
winproxy.CryptGenKey(ctx, AT_KEYEXCHANGE, CRYPT_EXPORTABLE | keysize_flags, key)
# It does NOT destroy the key-pair from the container,
# It only release the key handle
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa379918(v=vs.85).aspx
winproxy.CryptDestroyKey(key)
# Descrption of the key-container that will be used to generate the certificate
KeyProvInfo = CRYPT_KEY_PROV_INFO()
KeyProvInfo.pwszContainerName = PFW_TMP_KEY_CONTAINER
KeyProvInfo.pwszProvName = None
KeyProvInfo.dwProvType = PROV_RSA_FULL
KeyProvInfo.dwFlags = 0
KeyProvInfo.cProvParam = 0
KeyProvInfo.rgProvParam = None
#KeyProvInfo.dwKeySpec = AT_SIGNATURE
KeyProvInfo.dwKeySpec = AT_KEYEXCHANGE
crypt_algo = CRYPT_ALGORITHM_IDENTIFIER()
crypt_algo.pszObjId = szOID_RSA_SHA256RSA
certif_name = "CN={0}".format(common_name)
# Generate a self-signed certificate based on the given key-container and signature algorithme
certif = gencrypt.generate_selfsigned_certificate(certif_name, key_info=KeyProvInfo, signature_algo=crypt_algo)
# Add the newly created certificate to our TMP cert-store
cert_store.add_certificate(certif)
# Generate a pfx from the TMP cert-store
print("Password is <{0}>".format(pfxpassword))
pfx = gencrypt.generate_pfx(cert_store, pfxpassword)
if outname is None:
outname = common_name.lower()
# Dump the certif (public key) and pfx (public + private keys)
with open(outname + ".cer", "wb") as f:
# The encoded certif only contains the public key
f.write(certif.encoded)
with open(outname + ".pfx", "wb") as f:
f.write(pfx)
print(certif)
# Destroy the TMP key container
prov = HCRYPTPROV()
winproxy.CryptAcquireContextW(prov, PFW_TMP_KEY_CONTAINER, None, PROV_RSA_FULL, CRYPT_DELETEKEYSET)
# Openssl commands to check ce certif/pfx
## Read certificate info (.cer)
### openssl x509 -inform der -in {certif} -text -noout
## Read pfx info (ask to another password to encrypt Private key before print/export)
### openssl pkcs12 -info -in {pfx} -nokeys
## Read pfx info !!!! PRINT PRIVATE KEY !!!!
### openssl pkcs12 -info -in {pfx} -nodes
## Read ASN1 data
### openssl asn1parse -inform DER -in {file}
parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers(description='valid subcommands')
cryptparse = subparsers.add_parser('crypt')
cryptparse.set_defaults(func=crypt)
cryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to encrypt')
cryptparse.add_argument('dst', type=argparse.FileType('wb'), help='The encrypted file')
cryptparse.add_argument('certs', type=str, nargs='+',
help='List of certfile used to encrypt the src')
decryptparse = subparsers.add_parser('decrypt')
decryptparse.set_defaults(func=decrypt)
decryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to decrypt')
decryptparse.add_argument('pfxfile', type=argparse.FileType('rb'), help='PFX file to use')
decryptparse.add_argument('--password', help='Password of the PFX')
decryptparse.add_argument('--outfile', default=None, help='The outputfile default is print')
decryptparse = subparsers.add_parser('sign')
decryptparse.set_defaults(func=sign)
decryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to sign')
decryptparse.add_argument('pfxfile', type=argparse.FileType('rb'), help='PFX file to use')
decryptparse.add_argument('--password', help='Password of the PFX')
decryptparse.add_argument('--outfile', default=None, help='The outputfile default is print')
genkeysparse = subparsers.add_parser('genkey', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
genkeysparse.set_defaults(func=genkeys)
genkeysparse.add_argument('common_name', nargs='?', metavar='CommonName', default='DEFAULT', help='the common name of the certificate')
genkeysparse.add_argument('outname', nargs='?',help='The filename base for the generated files')
genkeysparse.add_argument('--pfxpassword', nargs='?', help='Password to protect the PFX')
genkeysparse.add_argument('--keysize', nargs='?', type=lambda x: int(x, 0), default=2048, help='The size of the RSA key')
res = parser.parse_args()
res.func(**res.__dict__)
Ouput:
(cmd λ) python crypto\encryption_demo.py genkey YOLOCERTIF mykey --pfxpassword MYPASSWORD
<CertificatContext "YOLOCERTIF" serial="1b a4 3e 17 f7 ed ec ab 4f f8 11 46 48 e9 29 25">
(cmd λ) ls
mykey.cer mykey.pfx
(cmd λ) echo|set /p="my secret message" > message.txt
(cmd λ) python crypto\encryption_demo.py crypt message.txt message.crypt mykey.cer
Encryption done. Result:
bytearray(b'0\x82\x01\x19\x06\t*\x86H\x86\xf7\r\x01\x07\x03\xa0\x82\x01\n0\x82\x01\x06\x02\x01\x001\x81\xc30\x81
\xc0\x02\x01\x000)0\x151\x130\x11\x06\x03U\x04\x03\x13\nYOLOCERTIF\x02\x10\x1b\xa4>\x17\xf7\xed\xec\xabO\xf8\x11
FH\xe9)%0\r\x06\t*\x86H\x86\xf7\r\x01\x01\x01\x05\x00\x04\x81\x80V\x89)\xf5\xaaM\x99cEA\x17^\xa2D~\x94\xe3\xf2\x1f
\x05Y\xc2\xbb\xb2\xbbYBpU6\x870\xce\xe7\xd2M{\xbb\xb9K\xa0\xf5\xe5\x93\xca\xedF\x80.x\xdc\xf2\x0c\xa6UO\x01\r\xaf
\xd0Z\xd9\xabnzR\xd4j=\xca\xc2RG\xcd\x11u\x82\x7f\x8c\xd8t\xb9\xf9\xe8%\xfal\xaaHPj;\xecKk]\t%\xfd\x91\xcc\xe0lWf
\xc6\x12x\x1am\xc8\x01t\xac\xa6\xf3#\x02\xd4J \x8eZ\xbb\x10W\xe1 0;\x06\t*\x86H\x86\xf7\r\x01\x07\x010\x14\x06\x08*
\x86H\x86\xf7\r\x03\x07\x04\x08\x14F\x04\xad\xed9\xed<\x80\x18\x80]6\xccTV\xbc\xb8*\x84QY!~\xb3\n\x1aV\xd4\rf\xd1n:')
(cmd λ) python crypto\encryption_demo.py decrypt --password BADPASS message.crypt mykey.pfx
Traceback (most recent call last):
File "..\samples\encryption_demo.py", line 103, in <module>
res.func(**res.__dict__)
File "..\samples\encryption_demo.py", line 26, in decrypt
pfx = crypto.import_pfx(pfxfile.read(), password)
File "c:\users\hakril\documents\work\pythonforwindows\windows\crypto\certificate.py", line 153, in import_pfx
cert_store = winproxy.PFXImportCertStore(pfx, password, flags)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 1065, in PFXImportCertStore
return PFXImportCertStore.ctypes_function(pPFX, szPassword, dwFlags)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 148, in perform_call
return self._cprototyped(*args)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 69, in kernel32_error_check
raise WinproxyError(func_name)
windows.winproxy.error.WinproxyError: PFXImportCertStore: [Error 86] The specified network password is not correct.
(cmd λ) python crypto\encryption_demo.py decrypt --password MYPASSWORD message.crypt mykey.pfx
Result = <my secret message>
18.17.2. Certificate demo¶
import hashlib
import base64
import windows.crypto
windowscert = b"""-----BEGIN CERTIFICATE-----
MIIFBDCCA+ygAwIBAgITMwAAAQZuwyXEMckYDgAAAAABBjANBgkqhkiG9w0BAQsF
ADCBhDELMAkGA1UEBhMCVVMxEzARBgNVBAgTCldhc2hpbmd0b24xEDAOBgNVBAcT
B1JlZG1vbmQxHjAcBgNVBAoTFU1pY3Jvc29mdCBDb3Jwb3JhdGlvbjEuMCwGA1UE
AxMlTWljcm9zb2Z0IFdpbmRvd3MgUHJvZHVjdGlvbiBQQ0EgMjAxMTAeFw0xNjEw
MTEyMDM5MzFaFw0xODAxMTEyMDM5MzFaMHAxCzAJBgNVBAYTAlVTMRMwEQYDVQQI
EwpXYXNoaW5ndG9uMRAwDgYDVQQHEwdSZWRtb25kMR4wHAYDVQQKExVNaWNyb3Nv
ZnQgQ29ycG9yYXRpb24xGjAYBgNVBAMTEU1pY3Jvc29mdCBXaW5kb3dzMIIBIjAN
BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyWcaCYghNInk3ecpyu2uZ7LCV9QS
7GWYr41ufTkcL66ewHxlAoWjmkKG6W2Bp9BYYQok10iDeDGACE9Vjr6m4Jdh+YuN
RLxMnHC8JTGzk96CzmdBPAuUWdAcHNmTkIWQF6AXzsbBWsekQejvDBygAOCuIYh4
sBgNa5cjTxQc7Iyp9c7RxBmThV5BNFTOnSN6D9N8zU+ENgIZuyHxGvqzRdrhU4G4
Cg/h1CkI4TgeZQZCeUNPnWV6DMuvPCiqGEia5phOJZyENKND0Sx6eQZrYnuz1gMn
YaEnO+ggegtt4pWpqg8Ch0jNrkL1fb3Kzz7E34/K9dcTgaOymfF6qUKabQIDAQAB
o4IBgDCCAXwwHwYDVR0lBBgwFgYKKwYBBAGCNwoDBgYIKwYBBQUHAwMwHQYDVR0O
BBYEFBEciVg/vsVmKtr/hmHt7KM6g8lSMFIGA1UdEQRLMEmkRzBFMQ0wCwYDVQQL
EwRNT1BSMTQwMgYDVQQFEysyMjk4NzkrMTQ3NDQ5YmUtMTVhOC00ZWJhLTkzZjMt
ZDExMGE1YzQ1NTUyMB8GA1UdIwQYMBaAFKkpAjmOFsSXeM2Q+Z5PmuF8Va9TMFQG
A1UdHwRNMEswSaBHoEWGQ2h0dHA6Ly93d3cubWljcm9zb2Z0LmNvbS9wa2lvcHMv
Y3JsL01pY1dpblByb1BDQTIwMTFfMjAxMS0xMC0xOS5jcmwwYQYIKwYBBQUHAQEE
VTBTMFEGCCsGAQUFBzAChkVodHRwOi8vd3d3Lm1pY3Jvc29mdC5jb20vcGtpb3Bz
L2NlcnRzL01pY1dpblByb1BDQTIwMTFfMjAxMS0xMC0xOS5jcnQwDAYDVR0TAQH/
BAIwADANBgkqhkiG9w0BAQsFAAOCAQEAvYC1iawgKoxXAotQXaN0lj1J5VX01/un
7JybZF4sPMG4acoFT85Ao5U6TK5ATPB7yPUulAivp8908DwTGqN+Ju6iH+UkvAb+
a/WcHVEMxQXK5eOFNE6yekUArBGbMNWlTFrpwklmVTnL9R+4aApTEe6ITT1KLDio
5uFw98n5Sqgh+In073czyiTG7MVhBexbOfhgnciXoufeyhwy1pYgjouSqSQZs4bj
cUwQTwGlS2Gd5a+3nblhjn+QhSszIo1K5n1udLPFWtn29BuGlSrtTXPv5OCfNtLO
l2ec6CyjDQc6HcQBNCsbJVq6qGtQbYNE+ih+KhIU4tO5jf25xthf2g==
-----END CERTIFICATE-----"""
raw_cert = base64.decodebytes(b"".join(windowscert.split(b"\n")[1:-1]))
cert = windows.crypto.Certificate.from_buffer(raw_cert)
print("Analysing certificate: {0}".format(cert))
print(" * name: <{0}>".format(cert.name))
print(" * issuer: <{0}>".format(cert.issuer))
print(" * raw_serial: <{0}>".format(cert.raw_serial))
print(" * serial: <{0}>".format(cert.serial))
print(" * encoded start: <{0!r}>".format(cert.encoded[:20]))
print("")
chains = cert.chains
print("This certificate has {0} certificate chain(s)".format(len(chains)))
for i, chain in enumerate(chains):
print("Chain {0}:".format(i))
for ccert in chain:
print(" {0}:".format(ccert))
print(" * issuer: <{0}>".format(ccert.issuer))
print ("")
cert_to_verif = ccert
print("Looking for <{0}> in trusted certificates".format(cert_to_verif.name))
root_store = windows.crypto.CertificateStore.from_system_store("Root")
# This is not the correct way verify the validity of a certificate chain.
# I would say that if the goal is to verify the signature of the certificate: use wintrust.
# (or maybe CertVerifyCertificateChainPolicy : https://msdn.microsoft.com/en-us/library/windows/desktop/aa377163(v=vs.85).aspx)
matchs = [c for c in root_store.certs if c == cert_to_verif]
print("matches = {0}".format(matchs))
if matchs:
print("Found it !")
else:
print("Not found :(")
## Extract certificates of a PE file
## This code is not a fixed API and the current state of my tests
print ("")
print ("== PE Analysis ==")
TARGET_FILE = r"C:\windows\system32\ntdll.dll"
print("Target sha1 = <{0}>".format(hashlib.sha1(open(TARGET_FILE, "rb").read()).hexdigest()))
cryptobj = windows.crypto.CryptObject(TARGET_FILE)
print("Analysing {0}".format(cryptobj))
print("File has {0} signer(s):".format(cryptobj.crypt_msg.nb_signer))
for i, signer in enumerate(cryptobj.crypt_msg.signers):
print("Signer {0}:".format(i))
print(" * Issuer: {0!r}".format(signer.Issuer.data))
print(" * HashAlgorithme: {0}".format(signer.HashAlgorithm.pszObjId))
cert = cryptobj.cert_store.find(signer.Issuer, signer.SerialNumber)
print(" * Certificate: {0}".format(cert))
print("")
print("File embdeds {0} certificate(s):".format(cryptobj.crypt_msg.nb_cert))
for i, certificate in enumerate(cryptobj.crypt_msg.certs):
print(" * {0}) {1}".format(i, certificate))
Output
(cmd) python crypto\certificate.py
Analysing certificate: <Certificate "Microsoft Windows" serial="33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06">
* name: <Microsoft Windows>
* issuer: <Microsoft Windows Production PCA 2011>
* raw_serial: <[51, 0, 0, 1, 6, 110, 195, 37, 196, 49, 201, 24, 14, 0, 0, 0, 0, 1, 6]>
* serial: <33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06>
* encoded start: <bytearray(b'0\x82\x05\x040\x82\x03\xec\xa0\x03\x02\x01\x02\x02\x133\x00\x00\x01\x06')>
This certificate has 1 certificate chain(s)
Chain 0:
<Certificate "Microsoft Windows" serial="33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06">:
* issuer: <Microsoft Windows Production PCA 2011>
<Certificate "Microsoft Windows Production PCA 2011" serial="61 07 76 56 00 00 00 00 00 08">:
* issuer: <Microsoft Root Certificate Authority 2010>
<Certificate "Microsoft Root Certificate Authority 2010" serial="28 cc 3a 25 bf ba 44 ac 44 9a 9b 58 6b 43 39 aa">:
* issuer: <Microsoft Root Certificate Authority 2010>
Looking for <Microsoft Root Certificate Authority 2010> in trusted certificates
matches = []
Not found :(
== PE Analysis ==
Target sha1 = <5f7905b56952e28316edcbce0206d7bec34de2a1>
Analysing <CryptObject "C:\windows\system32\ntdll.dll" content_type=CERT_QUERY_CONTENT_PKCS7_SIGNED_EMBED(0xa)>
File has 1 signer(s):
Signer 0:
* Issuer: bytearray(b'0\x81\x841\x0b0\t\x06\x03U\x04\x06\x13\x02US1\x130\x11\x06\x03U\x04\x08\x13\nWashington1\x100\x0e\x06\x03U\x04\x07\x13\x07Redmond1\x1e0\x1c\x06\x03U\x04\n\x13\x15Microsoft Corporation1.0,\x06\x03U\x04\x03\x13%Microsoft Windows Production PCA 2011')
* HashAlgorithme: b'2.16.840.1.101.3.4.2.1'
* Certificate: <Certificate "Microsoft Windows" serial="33 00 00 04 5c 3d 56 72 66 6c b7 54 17 00 00 00 00 04 5c">
File embdeds 2 certificate(s):
* 0) <Certificate "Microsoft Windows" serial="33 00 00 04 5c 3d 56 72 66 6c b7 54 17 00 00 00 00 04 5c">
* 1) <Certificate "Microsoft Windows Production PCA 2011" serial="61 07 76 56 00 00 00 00 00 08">
18.18. windows.alpc
¶
18.18.1. simple alpc communication¶
import multiprocessing
import windows.alpc
from windows.generated_def import LPC_CONNECTION_REQUEST, LPC_REQUEST
PORT_NAME = r"\RPC Control\PythonForWindowsPORT"
def alpc_server():
server = windows.alpc.AlpcServer(PORT_NAME) # Create the ALPC Port
print("[SERV] PORT <{0}> CREATED".format(PORT_NAME))
msg = server.recv() # Wait for a message
print("[SERV] Message type = {0:#x}".format(msg.type))
print("[SERV] Received data: <{0}>".format(msg.data.decode()))
assert msg.type & 0xfff == LPC_CONNECTION_REQUEST # Check that message is a connection request
print("[SERV] Connection request")
server.accept_connection(msg)
msg = server.recv() # Wait for a real message
print("")
print("[SERV] Received message: <{0}>".format(msg.data.decode()))
print("[SERV] Message type = {0:#x}".format(msg.type))
assert msg.type & 0xfff == LPC_REQUEST
# We can reply by two ways:
# - Send the same message with modified data
# - Recreate a Message and copy the MessageId
msg.data = u"REQUEST '{0}' DONE".format(msg.data.decode()).encode()
server.send(msg)
def alpc_client():
print("Client pid = {0}".format(windows.current_process.pid))
# Creation an 'AlpcClient' with a port name will connect to the port with an empty message
client = windows.alpc.AlpcClient(PORT_NAME)
print("[CLIENT] Connected: {0}".format(client))
# Send a message / wait for the response
response = client.send_receive(b"Hello world !")
print("[CLIENT] Response: <{0}>".format(response.data.decode()))
# You can also send message without waiting for a response with 'client.send'
if __name__ == "__main__":
proc = multiprocessing.Process(target=alpc_server, args=())
proc.start()
import time; time.sleep(1)
alpc_client()
print("BYE")
proc.terminate()
Output
(cmd) python alpc\simple_alpc.py
[SERV] PORT <\RPC Control\PythonForWindowsPORT> CREATED
Client pid = 15840
[SERV] Message type = 0x300a
[SERV] Received data: <>
[SERV] Connection request
[CLIENT] Connected: <windows.alpc.AlpcClient object at 0x06919290>
[SERV] Received message: <Hello world !>
[SERV] Message type = 0x3001
[CLIENT] Response: <REQUEST 'Hello world !' DONE>
BYE
18.18.2. advanced alpc communication¶
import sys
import multiprocessing
import windows.alpc
from windows.generated_def import LPC_CONNECTION_REQUEST, LPC_REQUEST
import windows.generated_def as gdef
import ctypes
import tempfile
PORT_NAME = r"\RPC Control\PythonForWindowsPORT_2"
PORT_CONTEXT = 0x11223344
def full_alpc_server():
print("server pid = {0}".format(windows.current_process.pid))
server = windows.alpc.AlpcServer(PORT_NAME)
print("[SERV] PORT <{0}> CREATED".format(PORT_NAME))
msg = server.recv()
print("[SERV] == Message received ==")
if msg.type & 0xfff == LPC_CONNECTION_REQUEST:
print(" * ALPC connection request: <{0}>".format(msg.data.decode()))
msg.data = b"Connection message response"
server.accept_connection(msg, port_context=PORT_CONTEXT)
else:
raise ValueError("Expected connection")
while True:
msg = server.recv()
print("[SERV] == Message received ==")
# print(" * Data: {0}".format(msg.data))
# print("[SERV] RECV Message type = {0:#x}".format(msg.type))
# print("[SERV] RECV Message Valid ATTRS = {0:#x}".format(msg.attributes.ValidAttributes))
# print("[SERV] RECV Message ATTRS = {0:#x}".format(msg.attributes.AllocatedAttributes))
if msg.type & 0xfff == LPC_REQUEST:
print(" * ALPC request: <{0}>".format(msg.data.decode()))
print(" * view_is_valid <{0}>".format(msg.view_is_valid))
if msg.view_is_valid:
print(" * message view attribute:")
windows.utils.print_ctypes_struct(msg.view_attribute, " - VIEW", hexa=True)
view_data = windows.current_process.read_string(msg.view_attribute.ViewBase)
print(" * Reading view content: <{0}>".format(view_data))
# Needed in Win7 - TODO: why is there a different behavior ?
msg.attributes.ValidAttributes -= gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE
print(" * security_is_valid <{0}>".format(msg.security_is_valid))
print(" * handle_is_valid <{0}>".format(msg.handle_is_valid))
if msg.handle_is_valid:
if msg.handle_attribute.Handle:
print(" * message handle attribute:")
windows.utils.print_ctypes_struct(msg.handle_attribute, " - HANDLE", hexa=True)
if msg.handle_attribute.ObjectType == 1:
f = windows.utils.create_file_from_handle(msg.handle_attribute.Handle)
print(" - File: {0}".format(f))
print(" - content: <{0}>".format(f.read()))
else:
print(" - unknow object type == {0}".format(msg.handle_attribute.ObjectType))
msg.attributes.ValidAttributes -= gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE
print(" * context_is_valid <{0}>".format(msg.context_is_valid))
if msg.context_is_valid:
print(" * message context attribute:")
windows.utils.print_ctypes_struct(msg.context_attribute, " - CTX", hexa=True)
if msg.attributes.ValidAttributes & gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE:
print(" * message token attribute:")
token_struct = msg.attributes.get_attribute(gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE)
windows.utils.print_ctypes_struct(token_struct, " - TOKEN", hexa=True)
# We can reply by to way:
# - Send the same message with modified data
# - Recreate a Message and copy the MessageId
msg.data = "REQUEST '{0}' DONE".format(msg.data.decode()).encode()
sys.stdout.flush()
server.send(msg)
else:
print(ValueError("Unexpected message type <{0}>".format(msg.type & 0xfff)))
def send_message_with_handle(client):
print("")
print("[Client] == Sending a message with a handle ==")
# Craft a file with some data
f = tempfile.NamedTemporaryFile()
f.write(b"Tempfile data <3")
f.seek(0)
# New message with a Handle
msg = windows.alpc.AlpcMessage()
msg.attributes.ValidAttributes |= gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE
msg.handle_attribute.Flags = gdef.ALPC_HANDLEFLG_DUPLICATE_SAME_ACCESS
msg.handle_attribute.Handle = windows.utils.get_handle_from_file(f)
msg.handle_attribute.ObjectType = 0
msg.handle_attribute.DesiredAccess = 0
msg.data = b"some message with a file"
client.send_receive(msg)
def send_message_with_view(client):
print("")
print("[Client] == Sending a message with a view ==")
# Create View
section = client.create_port_section(0, 0, 0x4000)
view = client.map_section(section[0], 0x4000)
# New message with a View
msg = windows.alpc.AlpcMessage(0x2000)
msg.attributes.ValidAttributes |= gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE
msg.view_attribute.Flags = 0
msg.view_attribute.ViewBase = view.ViewBase
msg.view_attribute.SectionHandle = view.SectionHandle
msg.view_attribute.ViewSize = 0x4000
msg.data = b"some message with a view"
windows.current_process.write_memory(view.ViewBase, b"The content of the view :)\x00")
client.send_receive(msg)
def alpc_client():
print("Client pid = {0}".format(windows.current_process.pid))
client = windows.alpc.AlpcClient()
# You can create a non-connected AlpcClient and send a custom
# 'AlpcMessage' for complexe alpc port connection.
connect_message = windows.alpc.AlpcMessage()
connect_message.data = b"Connection request client message"
print("[CLIENT] == Connecting to port ==")
connect_response = client.connect_to_port(PORT_NAME, connect_message)
print("[CLIENT] Connected with response: <{0}>".format(connect_response.data.decode()))
# AlpcClient send/recv/send_receive methods accept both string or
# AlpcMessage for complexe message.
print("")
print("[CLIENT] == Sending a message ==")
msg = windows.alpc.AlpcMessage()
msg.data = b"Complex Message 1"
print(" * Sending Message <{0}>".format(msg.data.decode()))
response = client.send_receive(msg)
print("[CLIENT] Server response: <{0}>".format(response.data.decode()))
print("[CLIENT] RESP Message Valid ATTRS = {0}".format(response.valid_attributes))
send_message_with_handle(client)
send_message_with_view(client)
sys.stdout.flush()
if __name__ == "__main__":
proc = multiprocessing.Process(target=full_alpc_server, args=())
proc.start()
import time; time.sleep(0.5)
alpc_client()
import time; time.sleep(0.5)
print("BYE")
proc.terminate()
Output
(cmd) python alpc\advanced_alpc.py
server pid = 2300
[SERV] PORT <\RPC Control\PythonForWindowsPORT_2> CREATED
Client pid = 14848
[CLIENT] == Connecting to port ==
[SERV] == Message received ==
* ALPC connection request: <Connection request client message>
[CLIENT] Connected with response: <Connection message response>
[CLIENT] == Sending a message ==
* Sending Message <Complex Message 1>
[SERV] == Message received ==
* ALPC request: <Complex Message 1>
* view_is_valid <False>
* security_is_valid <False>
* handle_is_valid <False>
* context_is_valid <True>
* message context attribute:
- CTX.PortContext -> 0x11223344
- CTX.MessageContext -> None
- CTX.Sequence -> 0x1L
- CTX.MessageId -> 0x0L
- CTX.CallbackId -> 0x0L
* message token attribute:
- TOKEN.TokenId -> 0x4101ef8eL
- TOKEN.AuthenticationId -> 0x54177L
- TOKEN.ModifiedId -> 0x4077ab89L
[CLIENT] Server response: <REQUEST 'Complex Message 1' DONE>
[CLIENT] RESP Message Valid ATTRS = [ALPC_MESSAGE_CONTEXT_ATTRIBUTE(0x20000000L)]
[Client] == Sending a message with a handle ==
[SERV] == Message received ==
* ALPC request: <some message with a file>
* view_is_valid <False>
* security_is_valid <False>
* handle_is_valid <True>
* message handle attribute:
- HANDLE.Flags -> 0x0L
- HANDLE.Handle -> 0x2cc
- HANDLE.ObjectType -> 0x1L
- HANDLE.DesiredAccess -> 0x13019fL
- File: <open file '<fdopen>', mode 'r' at 0x049ECA18>
- content: <Tempfile data <3>
* context_is_valid <True>
* message context attribute:
- CTX.PortContext -> 0x11223344
- CTX.MessageContext -> None
- CTX.Sequence -> 0x2L
- CTX.MessageId -> 0x0L
- CTX.CallbackId -> 0x0L
* message token attribute:
- TOKEN.TokenId -> 0x4101ef8eL
- TOKEN.AuthenticationId -> 0x54177L
- TOKEN.ModifiedId -> 0x4077ab89L
[Client] == Sending a message with a view ==
[SERV] == Message received ==
* ALPC request: <some message with a view>
* view_is_valid <True>
* message view attribute:
- VIEW.Flags -> 0x0L
- VIEW.SectionHandle -> None
- VIEW.ViewBase -> 0x4780000
- VIEW.ViewSize -> 0x4000
* Reading view content: <The content of the view :)>
* security_is_valid <False>
* handle_is_valid <False>
* context_is_valid <True>
* message context attribute:
- CTX.PortContext -> 0x11223344
- CTX.MessageContext -> None
- CTX.Sequence -> 0x3L
- CTX.MessageId -> 0x0L
- CTX.CallbackId -> 0x0L
* message token attribute:
- TOKEN.TokenId -> 0x4101ef8eL
- TOKEN.AuthenticationId -> 0x54177L
- TOKEN.ModifiedId -> 0x4077ab89L
[SERV] == Message received ==
Unexpected message type <12>
[SERV] == Message received ==
Unexpected message type <5>
[SERV] == Message received ==
Unexpected message type <12>
[SERV] == Message received ==
Unexpected message type <12>
BYE
18.19. windows.rpc
¶
18.19.1. Manual UAC¶
import argparse
import sys
import windows.rpc
import windows.generated_def as gdef
from windows.rpc import ndr
# NDR Descriptions
class NDRPoint(ndr.NdrStructure):
MEMBERS = [ndr.NdrLong, ndr.NdrLong]
class NdrUACStartupInfo(ndr.NdrStructure):
MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString),
ndr.NdrLong,
ndr.NdrLong,
ndr.NdrLong,
ndr.NdrLong,
ndr.NdrLong,
ndr.NdrLong,
ndr.NdrLong,
ndr.NdrLong,
ndr.NdrLong,
NDRPoint]
class RAiLaunchAdminProcessParameters(ndr.NdrParameters):
MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString),
ndr.NdrUniquePTR(ndr.NdrWString),
ndr.NdrLong,
ndr.NdrLong,
ndr.NdrWString,
ndr.NdrWString,
NdrUACStartupInfo,
ndr.NdrLong,
ndr.NdrLong]
class NdrProcessInformation(ndr.NdrParameters):
MEMBERS = [ndr.NdrLong] * 4
# Parsing args
parser = argparse.ArgumentParser(prog=__file__)
parser.add_argument('--target', default=sys.executable, help='Executable to launch')
parser.add_argument('--cmdline', default="", help='The commandline for the process')
parser.add_argument('--uacflags', type=lambda x: int(x, 0), default=0x11)
parser.add_argument('--creationflags', type=lambda x: int(x, 0), default=gdef.CREATE_UNICODE_ENVIRONMENT)
params = parser.parse_args()
print(params)
# Connecting to RPC Interface.
UAC_UIID = "201ef99a-7fa0-444c-9399-19ba84f12a1a"
client = windows.rpc.find_alpc_endpoint_and_connect(UAC_UIID)
iid = client.bind(UAC_UIID)
# Marshalling parameters.
parameters = RAiLaunchAdminProcessParameters.pack([
params.target, # Application Path
params.cmdline, # Commandline
params.uacflags, # UAC-Request Flag
params.creationflags, # dwCreationFlags
"", # StartDirectory
"WinSta0\\Default", # Station
# Startup Info
(None, # Title
0, # dwX
0, # dwY
0, # dwXSize
0, # dwYSize
0, # dwXCountChars
0, # dwYCountChars
0, # dwFillAttribute
0, # dwFlags
5, # wShowWindow
# Point structure: Use MonitorFromPoint to setup StartupInfo.hStdOutput
(0, 0)),
0, # Window-Handle to know if UAC can steal focus
0xffffffff]) # UAC Timeout
result = client.call(iid, 0, parameters)
stream = ndr.NdrStream(result)
ph, th, pid, tid = NdrProcessInformation.unpack(stream)
return_value = ndr.NdrLong.unpack(stream)
print("Return value = {0:#x}".format(return_value))
target = windows.winobject.process.WinProcess(handle=ph)
print("Created process is {0}".format(target))
print(" * bitness is {0}".format(target.bitness))
print(" * integrity: {0}".format(target.token.integrity))
print(" * elevated: {0}".format(target.token.is_elevated))
Output:
(cmd λ) python rpc\uac.py
Namespace(cmdline='', creationflags=CREATE_UNICODE_ENVIRONMENT(0x400L), target='C:\\Python27\\python.exe', uacflags=17)
# UAC pop - asking to execute python.exe | Clicking Yes
Return value = 0x6
Created process is <WinProcess "python.exe" pid 19304 at 0x455f7d0>
* bitness is 32
* integrity: SECURITY_MANDATORY_HIGH_RID(0x3000L)
* elevated: True
# The new python.exe in another window
>>> windows.current_process.token.integrity
SECURITY_MANDATORY_HIGH_RID(0x3000L)
>>> windows.current_process.token.is_elevated
True
18.19.2. Manual LsarEnumeratePrivileges
¶
import windows.rpc
from windows.rpc import ndr
class PLSAPR_OBJECT_ATTRIBUTES(ndr.NdrStructure):
MEMBERS = [ndr.NdrLong,
ndr.NdrUniquePTR(ndr.NdrWString),
ndr.NdrUniquePTR(ndr.NdrLong), # We dont care of the subtype as we will pass None
ndr.NdrLong,
ndr.NdrUniquePTR(ndr.NdrLong), # We dont care of the subtype as we will pass None
ndr.NdrUniquePTR(ndr.NdrLong)] # We dont care of the subtype as we will pass None
## From: RPCVIEW
# long Proc44_LsarOpenPolicy2(
# [in][unique][string] wchar_t* arg_0,
# [in]struct Struct_364_t* arg_1,
# [in]long arg_2,
# [out][context_handle] void** arg_3);
# This function has a [out][context_handle] meaning it return a context_handle
# Context handle are represented by 5 NdrLong where the first one is always 0
# PythonForWindows represent context_handle using NdrContextHandle
class LsarOpenPolicy2Parameter(ndr.NdrParameters):
MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString),
PLSAPR_OBJECT_ATTRIBUTES,
ndr.NdrLong]
## From: RPCVIEW
# long Proc2_LsarEnumeratePrivileges(
# [in][context_handle] void* arg_0,
# [in][out]long *arg_1,
# [out]struct Struct_110_t* arg_2,
# [in]long arg_3);
# This function has a [in][context_handle] meaning it expect a context_handle
# We can pass the NdrContextHandle returned by Proc44_LsarOpenPolicy2
class LsarEnumeratePrivilegesParameter(ndr.NdrParameters):
MEMBERS = [ndr.NdrContextHandle,
ndr.NdrLong,
ndr.NdrLong]
class LSAPR_POLICY_PRIVILEGE_DEF(object):
@classmethod
def unpack(cls, stream):
size1 = ndr.NdrShort.unpack(stream)
ptr = ndr.NdrShort.unpack(stream)
size2 = ndr.NdrLong.unpack(stream)
luid = ndr.NdrHyper.unpack(stream)
return ptr, luid
class LSAPR_PRIVILEGE_ENUM_BUFFER(object):
@classmethod
def unpack(cls, stream):
entries = ndr.NdrLong.unpack(stream)
array_size = ndr.NdrLong.unpack(stream)
array_ptr = ndr.NdrLong.unpack(stream)
# Unpack pointed array
array_size2 = ndr.NdrLong.unpack(stream)
assert array_size == array_size2
x = []
# unpack each elements LSAPR_POLICY_PRIVILEGE_DEF
for i in range(array_size2):
ptr, luid = LSAPR_POLICY_PRIVILEGE_DEF.unpack(stream)
if ptr:
x.append(luid)
# unpack pointed strings
result = []
for luid in x:
name = ndr.NdrWcharConformantVaryingArrays.unpack(stream)
result.append((luid, name))
return result
# Actual code
## LSASS alpc endpoints is fixed, no need for the epmapper
client = windows.rpc.RPCClient(r"\RPC Control\lsasspirpc")
## Bind to the desired interface
iid = client.bind('12345778-1234-abcd-ef00-0123456789ab', version=(0,0))
## Craft parameters and call 'LsarOpenPolicy2'
params = LsarOpenPolicy2Parameter.pack([None, (0, None, None, 0, None, None), 0x20000000])
res = client.call(iid, 44, params)
## Unpack the resulting handle
handle = ndr.NdrContextHandle.unpack(ndr.NdrStream(res))
# As context_handle have 4 NdrLong of effective data
# We can represent them as GUID
# NdrContextHandle is just a wrapper packing/unpacking GUID and taking
# care of the leading NdrLong(0) in the actual ndr representation of context_handle
print("Context Handle is: {0}\n".format(handle))
## Craft parameters and call 'LsarEnumeratePrivileges'
x = LsarEnumeratePrivilegesParameter.pack([handle, 0, 10000]);
res = client.call(iid, 2, x)
print("Privileges:")
## Unpack the resulting 'LSAPR_PRIVILEGE_ENUM_BUFFER'
priviledges = LSAPR_PRIVILEGE_ENUM_BUFFER.unpack(ndr.NdrStream(res))
for priv in priviledges:
print(priv)
Output
(cmd) python rpc\lsass.py
(2, u'SeCreateTokenPrivilege')
(3, u'SeAssignPrimaryTokenPrivilege')
(4, u'SeLockMemoryPrivilege')
(5, u'SeIncreaseQuotaPrivilege')
(6, u'SeMachineAccountPrivilege')
(7, u'SeTcbPrivilege')
(8, u'SeSecurityPrivilege')
(9, u'SeTakeOwnershipPrivilege')
(10, u'SeLoadDriverPrivilege')
(11, u'SeSystemProfilePrivilege')
(12, u'SeSystemtimePrivilege')
(13, u'SeProfileSingleProcessPrivilege')
(14, u'SeIncreaseBasePriorityPrivilege')
(15, u'SeCreatePagefilePrivilege')
(16, u'SeCreatePermanentPrivilege')
(17, u'SeBackupPrivilege')
(18, u'SeRestorePrivilege')
(19, u'SeShutdownPrivilege')
(20, u'SeDebugPrivilege')
(21, u'SeAuditPrivilege')
(22, u'SeSystemEnvironmentPrivilege')
(23, u'SeChangeNotifyPrivilege')
(24, u'SeRemoteShutdownPrivilege')
(25, u'SeUndockPrivilege')
(26, u'SeSyncAgentPrivilege')
(27, u'SeEnableDelegationPrivilege')
(28, u'SeManageVolumePrivilege')
(29, u'SeImpersonatePrivilege')
(30, u'SeCreateGlobalPrivilege')
(31, u'SeTrustedCredManAccessPrivilege')
(32, u'SeRelabelPrivilege')
(33, u'SeIncreaseWorkingSetPrivilege')
(34, u'SeTimeZonePrivilege')
(35, u'SeCreateSymbolicLinkPrivilege')
(36, u'SeDelegateSessionUserImpersonatePrivilege')
18.20. windows.pipe
¶
18.20.1. Communication with an injected process¶
import windows
import windows.test
import windows.pipe
p = windows.test.pop_proc_32()
print("Child is {0}".format(p))
PIPE_NAME = "PFW_Pipe"
rcode = """
import windows
import windows.pipe
f = open('tst.txt', "w+")
fh = windows.utils.get_handle_from_file(f)
hm = windows.winproxy.CreateFileMappingA(fh, dwMaximumSizeLow=0x1000, lpName=None)
addr = windows.winproxy.MapViewOfFile(hm, dwNumberOfBytesToMap=0x1000)
windows.pipe.send_object("{pipe}", addr)
"""
with windows.pipe.create(PIPE_NAME) as np:
print("Created pipe is {0}".format(np))
p.execute_python(rcode.format(pipe=PIPE_NAME))
print("Receiving object from injected process")
addr = np.recv()
print("Remote Address = {0:#x}".format(addr))
print("Querying memory in target at <{0:#x}>".format(addr))
print(" * {0}".format(p.query_memory(addr)))
print("Querying mapped file in target at <{0:#x}>".format(addr))
print(" * {0}".format(p.get_mapped_filename(addr)))
p.exit()
Output
(cmd) python pipe\child_send_object.py
Child is <WinProcess "notepad.exe" pid 4316 at 0x672fcf0>
Created pipe is <PipeConnection name="\\.\pipe\PFW_Pipe" server=True>
Receiving object from injected process
Remote Address = 0x97a0000
Querying memory in target at <0x97a0000>
* <MEMORY_BASIC_INFORMATION32 BaseAddress=0x97a0000 RegionSize=0x001000 State=MEM_COMMIT(0x1000L) Type=MEM_MAPPED(0x40000L) Protect=PAGE_READWRITE(0x4L)>
Querying mapped file in target at <0x97a0000>
* \Device\HarddiskVolume2\Users\hakril\Documents\projets\PythonForWindows\samples\tst.txt
18.21. windows.security
¶
18.21.1. Security Descriptor¶
import windows.security
SDDL = "O:BAG:AND:(A;OI;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)(D;CIIO;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)"
sd = windows.security.SecurityDescriptor.from_string(SDDL)
print("Security descriptor is: {0}".format(sd))
print("Owner: {0}".format(sd.owner))
print(" - lookup: {0}".format(windows.utils.lookup_sid(sd.owner)))
print("Group: {0}".format(sd.group))
print(" - lookup: {0}".format(windows.utils.lookup_sid(sd.group)))
dacl = sd.dacl
print("Dacl: {0}".format(dacl))
for i, ace in enumerate(dacl):
print("")
print(" ACE [{0}]: {1}".format(i, ace))
print(" - Header-AceType: {0}".format(ace.Header.AceType))
print(" - Header-AceFlags: {0}".format(ace.Header.AceFlags))
print(" - Header-flags: {0}".format(ace.Header.flags))
print(" - Mask: {0}".format(ace.Mask))
print(" - mask: {0}".format(ace.mask))
print(" - Sid: {0}".format(ace.sid))
Output
(cmd) python security\security_descriptor.py
Security descriptor is: O:BAG:AND:(A;OI;CCDCLCSWRPWPRCWDWOGA;;;S-1-0-0)(D;CIIO;CCDCLCSWRPWPRCWDWOGA;;;S-1-0-0)
Owner: S-1-5-32-544
- lookup: ('BUILTIN', 'Administrators')
Group: S-1-5-7
- lookup: ('NT AUTHORITY', 'ANONYMOUS LOGON')
Dacl: <Acl count=2>
ACE [0]: <AccessAllowedACE mask=269353023>
- Header-AceType: ACCESS_ALLOWED_ACE_TYPE(0x0)
- Header-AceFlags: 1
- Header-flags: [OBJECT_INHERIT_ACE(0x1)]
- Mask: 269353023
- mask: [1, 2, 4, 8, 16, 32, READ_CONTROL(0x20000), WRITE_DAC(0x40000), WRITE_OWNER(0x80000), GENERIC_ALL(0x10000000)]
- Sid: S-1-0-0
ACE [1]: <AccessDeniedACE mask=269353023>
- Header-AceType: ACCESS_DENIED_ACE_TYPE(0x1)
- Header-AceFlags: 10
- Header-flags: [CONTAINER_INHERIT_ACE(0x2), INHERIT_ONLY_ACE(0x8)]
- Mask: 269353023
- mask: [1, 2, 4, 8, 16, 32, READ_CONTROL(0x20000), WRITE_DAC(0x40000), WRITE_OWNER(0x80000), GENERIC_ALL(0x10000000)]
- Sid: S-1-0-0
18.21.2. Query SACL¶
import sys
import windows.security
TARGET = r"C:\windows\notepad.exe" # On WIN10 (at least) notepad.exe has a AuditACE
if not windows.current_process.token.elevated:
print(ValueError("This sample should be run as admin to demonstration SACL access"))
print("")
print("[NO-PRIV] Querying <{0}> SecurityDescriptor without SACL".format(TARGET))
sd = windows.security.SecurityDescriptor.from_filename(TARGET)
print("sacl = {0}".format(sd.sacl))
print("")
print("[NO-PRIV] Querying <{0}> SecurityDescriptor with SACL".format(TARGET))
try:
sd = windows.security.SecurityDescriptor.from_filename(TARGET, query_sacl=True)
print("sacl = {0}".format(sd.sacl))
except WindowsError as e:
print(e)
print("")
print("Enabling <SeSecurityPrivilege>")
try:
windows.current_process.token.enable_privilege("SeSecurityPrivilege")
except ValueError as e:
print("[ERROR] {0}".format(e))
exit(1)
print("")
print("[PRIV] Querying <{0}> SecurityDescriptor with SACL".format(TARGET))
sd = windows.security.SecurityDescriptor.from_filename(TARGET, query_sacl=True)
print("sacl = {0}".format(sd.sacl))
print(list(sd.sacl))
Output
(cmd) python security\query_sacl.py
This sample should be run as admin to demonstration SACL access
[NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor without SACL
sacl = <Acl count=0>
[NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL
None: [Error 1314] A required privilege is not held by the client.
Enabling <SeSecurityPrivilege>
[ERROR] <Token TokenId=0xd6db5cc Type=TokenPrimary(0x1L)> has no privilege <SeSecurityPrivilege>
(cmd-admin) python security\query_sacl.py
[NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor without SACL
sacl = <Acl count=0>
[NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL
None: [Error 1314] A required privilege is not held by the client.
Enabling <SeSecurityPrivilege>
[PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL
sacl = <Acl count=1>
[<SystemAuditACE mask=852246>]
18.22. ETW (Event Tracing for Windows)¶
18.22.1. Trace processing¶
import ctypes
import struct
import windows
import windows.generated_def as gdef
makeg = gdef.GUID.from_string
# This sample record the ETW event of provider CBB61B6D-A2CF-471A-9A58-A4CD5C08FFBA
# related to the UAC (service AppInfo)
# The ETW session is called MY_UAC_MONITOR
# Is then trigger the UAC and display the retrieved event afterward
def show(event):
print("{0:#x}: {1}".format(event.EventHeader.TimeStamp, event))
print(" guid: {0}".format(event.guid))
print(" id: {0}".format(event.id))
print(" opcode: {0}".format(event.opcode))
print(" level: {0}".format(event.level))
print(" data: {0!r}".format(event.user_data.replace("\x00", "")))
return 0
session_name = "MY_UAC_MONITOR"
logfile_name = "uac.trace"
print("Recording UAC event in file <{0}> using session named <{1}>".format(logfile_name, session_name))
my_trace = windows.system.etw.open_trace(session_name, logfile=logfile_name)
my_trace.stop(soft=True) # Stop previous trace with this name if exists
my_trace.start()
my_trace.enable("CBB61B6D-A2CF-471A-9A58-A4CD5C08FFBA", 0xff, 0xff)
# Trigger UAC
windows.winproxy.ShellExecuteA(None, "runas", "mmc.exe", "BAD_MMC_FILENAME", None , 5)
my_trace.stop()
my_trace.process(show) #: Process the events registered in the trace (and logfile)
Output
(cmd) python etw\uac_trace.py
Recording UAC event in file <uac.trace> using session named <MY_UAC_MONITOR>
0x1d65bb1febaceea: <EventRecord provider="68FDD900-4A3E-11D1-84F4-0000F80464E3" id=0>
guid: 68FDD900-4A3E-11D1-84F4-0000F80464E3
id: 0
opcode: 0
level: 0
data: '\x01\n\x01\x05\xbbG\x04-D\xd5\xff\xb1[\xd6\x01Zb\x02\t\x01\x04\xf0\x0c\t\x06\xc4\xff\xff\xff@tzres.dll,-302\n\x05\x03@tzres.dll,-301\x03\x05\x02\xc4\xff\xff\xff\xc0\x92\x1c\xd2D[\xd6\x01\x80\x96\x98\xea\xce\xba\xfe\xb1[\xd6\x01\x01MY_UAC_MONITORC:\\Users\\hakril\\Documents\\projets\\PythonForWindows\\samples\\uac.trace'
[...]
0x1d65bb1fec4c011: <EventRecord provider="DEB74A23-5444-3F3B-924B-0E653973F55A" id=11>
guid: DEB74A23-5444-3F3B-924B-0E653973F55A
id: 11
opcode: 0
level: 0
data: '\x9e\x06\x04\x19\x14\x04\x08\x04\xff\xff\xff\xffWinSta0\\DefaultC:\\Windows\\System32\\mmc.exe"C:\\Windows\\System32\\mmc.exe" BAD_MMC_FILENAMEC:\\Users\\hakril\\Documents\\projets\\PythonForWindows\\samples'
0x1d65bb1fec4e48b: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=10>
guid: C0B508D3-5459-339F-A213-889C238CA5B1
id: 10
opcode: 0
level: 0
data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe"C:\\WINDOWS\\SysWOW64\\mmc.exe" BAD_MMC_FILENAME` '
0x1d65bb1fec7c8eb: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=13>
guid: C0B508D3-5459-339F-A213-889C238CA5B1
id: 13
opcode: 0
level: 0
data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe'
0x1d65bb1fec7c8f0: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=14>
guid: C0B508D3-5459-339F-A213-889C238CA5B1
id: 14
opcode: 0
level: 0
data: '"C:\\WINDOWS\\SysWOW64\\mmc.exe" BAD_MMC_FILENAME'
[...]
0x1d65bb1feca018d: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=11>
guid: 172FF31C-2D80-31A6-FCA8-EB000D380666
id: 11
opcode: 0
level: 0
data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe'
0x1d65bb1feca030d: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=25>
guid: 172FF31C-2D80-31A6-FCA8-EB000D380666
id: 25
opcode: 0
level: 0
data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe\x08\x02TRUEFALSE\x10'
0x1d65bb1fecfcdc0: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=27>
guid: 172FF31C-2D80-31A6-FCA8-EB000D380666
id: 27
opcode: 0
level: 0
data: '\x05TRUETRUE'
[...]
18.22.2. Enumeration¶
import windows
etwmgr = windows.system.etw
print("ETW Manager is: {0}".format(etwmgr))
print("")
print("Listing some ETW sessions:")
for sess in etwmgr.sessions[:2]:
print(" * {0}".format(sess))
print(" * name: {0}".format(sess.name))
print(" * guid: {0}".format(sess.guid))
print(" * id: {0}".format(sess.id))
print(" * logfile: {0}".format(sess.logfile))
sess = etwmgr.sessions[1]
target_id = sess.id
NB_MATCH = 0
print("")
print("Looking for providers for: {0}".format(sess))
for provider in windows.system.etw.providers:
if NB_MATCH == 3:
break
for instance in provider.instances:
if NB_MATCH == 3:
break
for session in instance.sessions:
if session.LoggerId == target_id and instance.Pid:
proc = [p for p in windows.system.processes if p.pid == instance.Pid][0]
print("Found a provider/session for target:")
print(" * Provider: {0}".format(provider))
print(" * Instance: {0}".format(instance))
print(" * Process: {0}".format(proc))
NB_MATCH += 1
if NB_MATCH == 3:
break
break
Output
(cmd) python etw\etw_enumeration.py
ETW Manager is: <windows.winobject.event_trace.EtwManager object at 0x03AFBF70>
Listing some ETW sessions:
* <EventTraceProperties name="AppModel" guid=A922A8BE-2450-438E-9520-FBCDFB46B0BD>
* name: AppModel
* guid: A922A8BE-2450-438E-9520-FBCDFB46B0BD
* id: 4
* logfile:
* <EventTraceProperties name="LwtNetLog" guid=603BA31E-EC5A-4CDE-BE87-ED0A16C3B170>
* name: LwtNetLog
* guid: 603BA31E-EC5A-4CDE-BE87-ED0A16C3B170
* id: 14
* logfile: C:\WINDOWS\System32\LogFiles\WMI\LwtNetLog.etl
Looking for providers for: <EventTraceProperties name="LwtNetLog" guid=603BA31E-EC5A-4CDE-BE87-ED0A16C3B170>
Found a provider/session for target:
* Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE">
* Instance: <TraceProviderInstanceInfo Pid=5256 EnableCount=1>
* Process: <WinProcess "RuntimeBroker.exe" pid 5256 at 0x54c39d0>
Found a provider/session for target:
* Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE">
* Instance: <TraceProviderInstanceInfo Pid=10768 EnableCount=1>
* Process: <WinProcess "chrome.exe" pid 10768 at 0x54c3930>
Found a provider/session for target:
* Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE">
* Instance: <TraceProviderInstanceInfo Pid=10236 EnableCount=1>
* Process: <WinProcess "YourPhone.exe" pid 10236 at 0x54c37d0>