18. Samples of code

18.1. Processes

18.1.1. windows.current_process

import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.native_exec.simple_x86 as x86
import windows.native_exec.simple_x64 as x64
# Here is our current process
cp = windows.current_process

print("current process is {cp}".format(cp=windows.current_process))
print("current process is a <{cp.bitness}> bits process".format(cp=cp))
print("current process is a SysWow64 process ? <{cp.is_wow_64}>".format(cp=cp))
print("current process pid <{cp.pid}>  and ppid <{cp.ppid}>".format(cp=cp))
print("Here are the current process threads: <{cp.threads}>".format(cp=cp))

print("Let's execute some native code ! (0x41 + 1)")

if windows.current_process.bitness == 32:
    # Let's generate some native code
    code =  x86.MultipleInstr()
    code += x86.Mov("Eax", 0x41)
    code += x86.Inc("EAX")
    code += x86.Ret()
else:
    code =  x64.MultipleInstr()
    code += x64.Mov("RAX", 0x41)
    code += x64.Inc("RAX")
    code += x64.Ret()

native_code = code.get_code()

v = windows.current_process.execute(native_code)
print("Native code returned <{0}>".format(hex(v)))

print("Allocating memory in current process")
addr = cp.virtual_alloc(0x1000) # Default alloc is RWX (so secure !)
print("Allocated memory is at <{0}>".format(hex(addr)))

print("Writing 'SOME STUFF' in allocation memory")
cp.write_memory(addr, "SOME STUFF")
print("Reading memory : <{0}>".format(repr(cp.read_memory(addr, 20))))


Output

(cmd) python process\current_process.py
current process is <windows.winobject.process.CurrentProcess object at 0x06407430>
current process is a <32> bits process
current process is a SysWow64 process ? <True>
current process pid <11344>  and ppid <14984>
Here are the current process threads: <[<WinThread 17288 owner "CurrentProcess" at 0x6a68870>, <WinThread 15176 owner "CurrentProcess" at 0x6a687b0>, <WinThread 8972 owner "CurrentProcess" at 0x6a68bf0>, <WinThread 5568 owner "CurrentProcess" at 0x6a68dd0>]>
Let's execute some native code ! (0x41 + 1)
Native code returned <0x42>
Allocating memory in current process
Allocated memory is at <0x5510000>
Writing 'SOME STUFF' in allocation memory
Reading memory : <'SOME STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'>

18.1.2. Remote process : WinProcess

import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.native_exec.simple_x86 as x86
import windows.native_exec.simple_x64 as x64

print("Creating a notepad") ## Replaced calc.exe by notepad.exe cause of windows 10.
notepad = windows.utils.create_process(r"C:\windows\system32\notepad.exe")
# You don't need to do that in our case, but it's useful to now
print("Looking for notepads in the processes")
all_notepads = [proc for proc in windows.system.processes if proc.name == "notepad.exe"]
print("They are currently <{0}> notepads running on the system".format(len(all_notepads)))

print("Let's play with our notepad: <{notepad}>".format(notepad=notepad))
print("Our notepad pid is {notepad.pid}".format(notepad=notepad))
print("Our notepad is a <{notepad.bitness}> bits process".format(notepad=notepad))
print("Our notepad is a SysWow64 process ? <{notepad.is_wow_64}>".format(notepad=notepad))
print("Our notepad have threads ! <{notepad.threads}>".format(notepad=notepad))

# PEB STUFF
peb = notepad.peb
print("Exploring our notepad PEB ! {peb}".format(peb=peb))
print("Command line is {peb.commandline}".format(peb=peb))
modules = peb.modules
print("Here are 3 loaded modules: {0}".format(modules[:3]))
# See iat_hook.py for module exploration


# Remote alloc / read / write

print("Allocating memory in our notepad")
addr = notepad.virtual_alloc(0x1000)
print("Allocated memory is at <{0}>".format(hex(addr)))
print("Writing 'SOME STUFF' in allocated memory")
notepad.write_memory(addr, "SOME STUFF")
print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20))))


# Remote Execution

print("Execution some native code in our notepad (write 0x424242 at allocated address + return 0x1337)")

if notepad.bitness == 32:
    # Let's generate some native code
    code =  x86.MultipleInstr()
    code += x86.Mov(x86.deref(addr), 0x42424242)
    code += x86.Mov("EAX", 0x1337)
    code += x86.Ret()
else:
    code =  x64.MultipleInstr()
    code += x64.Mov('RAX', addr)
    code += x64.Mov(x64.mem("[RAX]"), 0x42424242)
    code += x64.Mov("RAX", 0x1337)
    code += x64.Ret()

print("Executing native code !")
t = notepad.execute(code.get_code())
t.wait()
print("Return code = {0}".format(hex(t.exit_code)))
print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20))))

print("Executing python code !")
# Make 'windows' importable in remote python
notepad.execute_python("import sys; sys.path.append(r'{0}')".format(sys.path[-1]))

notepad.execute_python("import windows")
# Let's write in the notepad 'current_process' memory :)
notepad.execute_python("addr = {addr}; windows.current_process.write_memory(addr, 'HELLO FROM notepad')".format(addr=addr))
print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20))))

# python_execute is 'safe':
# - it waits for the thread completion
# - it raise an error if remote code raised some

try:
    print("Trying to import in remote module 'FAKE_MODULE'")
    notepad.execute_python("def func():\n   import FAKE_MODULE\nfunc()")
except windows.injection.RemotePythonError as e:
    print("Exception in remote process!")
    print(e)

print("That's all ! killing the notepad")
notepad.exit()







Output

(cmd) python process\remote_process.py
Creating a notepad
Looking for notepads in the processes
They are currently <1> notepads running on the system
Let's play with our notepad: <<WinProcess "notepad.exe" pid 8400 at 0x6238510>>
Our notepad pid is 8400
Our notepad is a <32> bits process
Our notepad is a SysWow64 process ? <True>
Our notepad have threads ! <[<WinThread 16028 owner "notepad.exe" at 0x6238f10>, <WinThread 5924 owner "notepad.exe" at 0x6238a10>, <WinThread 11620 owner "notepad.exe" at 0x6238fb0>, <WinThread 3480 owner "notepad.exe" at 0x6238ff0>, <WinThread 200 owner "notepad.exe" at 0x62389b0>, <WinThread 16804 owner "notepad.exe" at 0x62389f0>, <WinThread 13340 owner "notepad.exe" at 0x6238930>]>
Exploring our notepad PEB ! <windows.winobject.process.RemotePEB object at 0x061BDA80>
Command line is <Remote_LSA_UNICODE_STRING ""C:\windows\system32\notepad.exe"" at 0x61bdb20>
Here are 3 loaded modules: [<RemoteLoadedModule "notepad.exe" at 0x61bdad0>, <RemoteLoadedModule "ntdll.dll" at 0x61bd940>, <RemoteLoadedModule "kernel32.dll" at 0x61bd8f0>]
Allocating memory in our notepad
Allocated memory is at <0x6f00000>
Writing 'SOME STUFF' in allocated memory
Reading allocated memory : <'SOME STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'>
Execution some native code in our notepad (write 0x424242 at allocated address + return 0x1337)
Executing native code !
Return code = 0x1337L
Reading allocated memory : <'BBBB STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'>
Executing python code !
Reading allocated memory : <'HELLO FROM notepad\x00\x00'>
Trying to import in remote module 'FAKE_MODULE'
Exception in remote process!
Traceback (most recent call last):
  File "<string>", line 3, in <module>
  File "<string>", line 2, in func
ImportError: No module named FAKE_MODULE

That's all ! killing the notepad

18.1.3. PEB exploration

import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows

print("Exploring the current process PEB")
peb = windows.current_process.peb
print("PEB is <{0}>".format(peb))
commandline = peb.commandline
print("Commandline object is {0}".format(commandline))
print("Commandline string is {0}".format(repr(commandline.Buffer)))

imagepath = peb.imagepath
print("Imagepath  {0}".format(imagepath))

modules = peb.modules
print("Printing some modules: {0}".format("\n".join(str(m) for m in modules[:6])))

print("=== K32  ===")
print("Looking for kernel32.dll")
k32 = [m for m in modules if m.name == "kernel32.dll"][0]
print("Kernel32 module: {0}".format(k32))

print("Module name = <{0}> | Fullname = <{1}>".format(k32.name, k32.fullname))
print("Kernel32 is loaded at address {0}".format(hex(k32.baseaddr)))

print("=== K32 PE ===")
k32pe = k32.pe
print("PE Representation of k32: {0}".format(k32pe))
exports = k32pe.exports
some_exports = dict((k,v) for k,v in exports.items() if k in [0, 42, "VirtualAlloc", "CreateFileA"])
print("Here are some exports {0}".format(some_exports))

imports = k32pe.imports
print("Import DLL dependancies are (without api-*): {0}".format([x for x in imports.keys() if not x.startswith("api-")]))

NtCreateFile_iat = [x for x in imports["ntdll.dll"] if x.name == "NtCreateFile"][0]
print("IAT Entry for ntdll!NtCreateFile = {0} | addr = {1}".format(NtCreateFile_iat, hex(NtCreateFile_iat.addr)))
print("Sections: {0}".format(k32pe.sections))

Output

(cmd) python process\peb.py
Exploring the current process PEB
PEB is <<windows.winobject.process.PEB object at 0x05A6F710>>
Commandline object is <_LSA_UNICODE_STRING "C:\Python27\python.exe  process\peb.py" at 0x5a6f8a0>
Commandline string is 47063282
Imagepath  <_LSA_UNICODE_STRING "C:\Python27\python.exe" at 0x5a6f990>
Printing some modules: <LoadedModule "python.exe" at 0x60d2080>
<LoadedModule "ntdll.dll" at 0x60d2030>
<LoadedModule "kernel32.dll" at 0x60d2b20>
<LoadedModule "kernelbase.dll" at 0x60d2b70>
<LoadedModule "python27.dll" at 0x60d2bc0>
<LoadedModule "msvcr90.dll" at 0x60d2c10>
=== K32  ===
Looking for kernel32.dll
Kernel32 module: <LoadedModule "kernel32.dll" at 0x60d2b20>
Module name = <kernel32.dll> | Fullname = <c:\windows\system32\kernel32.dll>
Kernel32 is loaded at address 0x76930000
=== K32 PE ===
PE Representation of k32: <windows.pe_parse.PEFile object at 0x060CCA10>
Here are some exports {0: 1989445168L, 'CreateFileA': 1989795280L, 42: 1989636592L, 'VirtualAlloc': 1989437552L}
Import DLL dependancies are (without api-*): ['kernelbase.dll', 'ntdll.dll']
IAT Entry for ntdll!NtCreateFile = <IATEntry "NtCreateFile" ordinal 272> | addr = 0x769a1a28L
Sections: [<PESection ".text">, <PESection ".rdata">, <PESection ".data">, <PESection ".rsrc">, <PESection ".reloc">]

18.1.4. ApiSetMap

import windows

print("Computer is a <{0}>".format(windows.system.version_name))

cp = windows.current_process
apism = cp.peb.apisetmap

print("ApiSetMap: {0} (version = {1})".format(apism, apism.version))

dll_demos_fullname = 'api-ms-win-core-processthreads-l1-1-3'
dll_demos_utilname = 'api-ms-win-core-processthreads-l1-1-'

print("Entries in 'apisetmap_dict' are the full api-dll path extracted")
print(" * apisetmap.apisetmap_dict['{0}'] -> {1}".format(dll_demos_fullname, apism.apisetmap_dict[dll_demos_fullname]))
print("Entries in 'resolution_dict' are the contains the util-part check by windows")
print(" * apisetmap.resolution_dict['{0}'] -> {1}".format(dll_demos_utilname, apism.resolution_dict[dll_demos_utilname]))

print("ApiSetMap.resolve resolve a api-dll based on the util part")
for suffix in ["1", "2", "PART_IS_IGNORED"]:
    testname = dll_demos_utilname + suffix
    print(" * apisetmap.resolve('{0}') -> {1}".format(testname, apism.resolve(testname)))

testname = "BAD_DLL-3.dll"
try:
    print(" * apisetmap.resolve('{0}') -> {1}".format(testname, apism.resolve(testname)))
except KeyError as e:
    print(" * apisetmap.resolve('{0}') -> raised: {1!r}".format(testname, e))

Output

(cmd) python process\apisetmap.py
Computer is a <Windows 10>
ApiSetMap: <windows.winobject.apisetmap.ApiSetMapVersion6 object at 0x0645ECB0> (version = 6)
Entries in 'apisetmap_dict' are the full api-dll path extracted
 * apisetmap.apisetmap_dict['api-ms-win-core-processthreads-l1-1-3'] -> kernelbase.dll
Entries in 'resolution_dict' are the contains the util-part check by windows
 * apisetmap.resolution_dict['api-ms-win-core-processthreads-l1-1-'] -> kernelbase.dll
ApiSetMap.resolve resolve a api-dll based on the util part
 * apisetmap.resolve('api-ms-win-core-processthreads-l1-1-1') -> kernelbase.dll
 * apisetmap.resolve('api-ms-win-core-processthreads-l1-1-2') -> kernelbase.dll
 * apisetmap.resolve('api-ms-win-core-processthreads-l1-1-PART_IS_IGNORED') -> kernelbase.dll
 * apisetmap.resolve('BAD_DLL-3.dll') -> raised: KeyError('BAD_DLL-',)

18.1.5. IAT hooking

import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import _winreg
import windows

# Here is a demo of IAT hooking in python
# We will hook the 'RegOpenKeyExA' entry of Python27.dll because it is easy to trigger !

# First: let's create our hook
# windows.hooks.RegOpenKeyExACallback is generated based on windows.generated_def.winfuncs
@windows.hooks.RegOpenKeyExACallback
def open_reg_hook(hKey, lpSubKey, ulOptions, samDesired, phkResult, real_function):
    print("<in hook> Hook called | hKey = {0} | lpSubKey = <{1}>".format(hex(hKey), lpSubKey.value))
    # Our hook can choose to call the real_function or not
    if "SECRET" in lpSubKey.value:
        print("<in hook> Secret key asked, returning magic handle 0x12345678")
        # We must respect the hooked method return-value interface
        phkResult[0] = 0x12345678
        return 0
    if "FAIL" in lpSubKey.value:
        print("<in hook> Asked for a failing key: returning 0x2a")
        return 42
    print("<in hook> Non-secret key : calling normal function")
    return real_function()


# Get the peb of our process
peb = windows.current_process.peb

# Get the pythonxx.dll module
pythondll_module = [m for m in peb.modules if m.name.startswith("python") and m.name.endswith(".dll")][0]

# Get the iat entries for DLL advapi32.dll
adv_imports = pythondll_module.pe.imports['advapi32.dll']

# Get RegOpenKeyExA iat entry
RegOpenKeyExA_iat = [n for n in adv_imports if n.name == "RegOpenKeyExA"][0]

# Setup our hook
RegOpenKeyExA_iat.set_hook(open_reg_hook)

### !!!! You must keep the iat_entry alive !!!!
### If the hook is garbage collected while active -> python will crash

# Use python native module _winreg that call 'RegOpenKeyExA'
print("Asking for <MY_SECRET_KEY>")
v = _winreg.OpenKey(1234567, "MY_SECRET_KEY")
print("Result = " + hex(v.handle))

print("")
print("Asking for <MY_FAIL_KEY>")
try:
    v = _winreg.OpenKey(1234567, "MY_FAIL_KEY")
    print("Result = " + hex(v.handle))
except WindowsError as e:
    print(repr(e))

print("")
print("Asking for <HKEY_CURRENT_USER/Software>")
try:
    v = _winreg.OpenKey(_winreg.HKEY_CURRENT_USER, "Software")
    print("Result = " + hex(v.handle))
except WindowsError as e:
    print(repr(e))

Output

(cmd) python process\iat_hook.py
Asking for <MY_SECRET_KEY>
<in hook> Hook called | hKey = 0x12d687 | lpSubKey = <MY_SECRET_KEY>
<in hook> Secret key asked, returning magic handle 0x12345678
Result = 0x12345678

Asking for <MY_FAIL_KEY>
<in hook> Hook called | hKey = 0x12d687 | lpSubKey = <MY_FAIL_KEY>
<in hook> Asked for a failing key: returning 0x2a
WindowsError(42, 'Windows Error 0x2A')

Asking for <HKEY_CURRENT_USER/Software>
<in hook> Hook called | hKey = 0x80000001L | lpSubKey = <Software>
<in hook> Non-secret key : calling normal function
Result = 0x428

18.2. Token

import windows
import windows.generated_def as gdef

tok = windows.current_process.token
print("Our process token is {0}".format(tok))
print("Retrieving some infos")
print("Username: <{0}>".format(tok.username))
print("User: {0!r}".format(tok.user))
print("  - lookup : {0}".format(windows.utils.lookup_sid(tok.user)))
print("Primary group: {0!r}".format(tok.primary_group))
print("  - lookup : {0}".format(windows.utils.lookup_sid(tok.primary_group)))

print("")
groups = tok.groups
print("Token Groups is {0}".format(groups))
print("First group SID is {0!r}".format(groups.sids[0]))
print("Some sid and attributes:")
for i, group in zip(range(3), groups.sids_and_attributes):
    print(" - {0}: {1}".format(group.Sid, group.Attributes))

# Let's play with duplicate !
print("")
imp_tok = tok.duplicate(type=gdef.TokenImpersonation, impersonation_level=gdef.SecurityImpersonation)
print("Duplicate token is {0}".format(imp_tok))
print("Enabling <SeShutDownPrivilege>")
imp_tok.enable_privilege("SeShutDownPrivilege")

cur_thread = windows.current_thread
print("Current thread token is <{0}>".format(cur_thread.token))
print("Setting impersonation token !")
cur_thread.token = imp_tok
print("Current thread token is {0}".format(cur_thread.token))

Output

(cmd) python token\token_demo.py
Our process token is <Token TokenId=0x3f1f98fd Type=TokenPrimary(0x1L)>
Retrieving some infos
Username: <hakril>
User: <PSID "S-1-5-21-184905214-2723199098-2761450773-1001">
  - lookup : ('WILLIE', 'hakril')
Primary group: <PSID "S-1-5-21-184905214-2723199098-2761450773-513">
  - lookup : ('WILLIE', 'Aucun')

Token Groups is <TokenGroups count=15>
First group SID is <PSID "S-1-5-21-184905214-2723199098-2761450773-513">
Some sid and attributes:
 - S-1-5-21-184905214-2723199098-2761450773-513: 7
 - S-1-1-0: 7
 - S-1-5-114: 16

Duplicate token is <Token TokenId=0x3f1fac85 Type=TokenImpersonation(0x2L) ImpersonationLevel=SecurityImpersonation(0x2L)>
Enabling <SeShutDownPrivilege>
Current thread token is <None>
Setting impersonation token !
Current thread token is <Token TokenId=0x3f1fac85 Type=TokenImpersonation(0x2L) ImpersonationLevel=SecurityImpersonation(0x2L)>

18.3. windows.system

import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
system = windows.system

print("Basic system infos:")
print("    version = {0}".format(system.version))
print("    bitness = {0}".format(system.bitness))
print("    computer_name = {0}".format(system.computer_name))
print("    product_type = {0}".format(system.product_type))
print("    version_name = {0}".format(system.version_name))
print("")
print("There is {0} processes".format(len(system.processes)))
print("There is {0} threads".format(len(system.threads)))
print("")

print("Dumping first logical drive:")
drive = system.logicaldrives[0]
print("    " + str(drive))
print((" " * 8) + "name = {0}".format(drive.name))
print((" " * 8) + "type = {0}".format(drive.type))
print((" " * 8) + "path = {0}".format(drive.path))
print("")

print("Dumping first service:")
serv = windows.system.services[0]
print("    " + str(serv))
print((" " * 8) + "name = {0}".format(serv.name))
print((" " * 8) + "description = {0}".format(serv.description))
print((" " * 8) + "status = {0}".format(serv.status))
print((" " * 8) + "process = {0}".format(repr(serv.process)))
print("")

print("Finding a service in a user process:")
serv = [s for s in windows.system.services if s.process][0]
print("    " + str(serv))
print((" " * 8) + "name = {0}".format(serv.name))
print((" " * 8) + "description = {0}".format(serv.description))
print((" " * 8) + "status = {0}".format(serv.status))
print((" " * 8) + "process = {0}".format(repr(serv.process)))
print("")

print("Enumerating handles:")
handles = system.handles
print("    There are {0} handles:".format(len(handles)))
print("    First handle is: " + str(handles[0]))

print("    Enumerating handles of the current process:")
cp_handles = [h for h in system.handles if h.dwProcessId == windows.current_process.pid]
print("        There are {0} handles for this process".format(len(cp_handles)))
print("    Looking for a File handle:")
file_h = [h for h in cp_handles if h.type == "File"][0]
print("        Handle is {0}".format(file_h))
print("        Name is <{0}>".format(file_h.name))
print("")


print("Dumping the first system module")
kmod = windows.system.modules[0]
print("    " + str(kmod))
print((" " * 8) + "ImageName = {0}".format(kmod.ImageName))
print((" " * 8) + "Base = {0:#x}".format(kmod.Base))
print((" " * 8) + "Size = {0:#x}".format(kmod.Size))
print((" " * 8) + "Flags = {0:#x}".format(kmod.Flags))
print((" " * 8) + "LoadCount = {0}".format(kmod.LoadCount))

Output

(cmd) python system.py
Basic system infos:
    version = (10, 0)
    bitness = 64
    computer_name = WILLIE
    product_type = 1
    version_name = Windows 10

There is 331 processes
There is 4010 threads

Dumping first logical drive:
    <LogicalDrive "B:\" (DRIVE_FIXED)>
        name = B:\
        type = 3
        path = \Device\HarddiskVolume7

Dumping first service:
    <Service "1394ohci" SERVICE_STOPPED(0x1)>
        name = 1394ohci
        description = 1394 OHCI Compliant Host Controller
        status = <_SERVICE_STATUS_PROCESS type=SERVICE_KERNEL_DRIVER(0x1) state=SERVICE_STOPPED(0x1)>
        process = None

Finding a service in a user process:
    <Service "AppIDSvc" SERVICE_RUNNING(0x4)>
        name = AppIDSvc
        description = Application Identity
        status = <_SERVICE_STATUS_PROCESS type=48L state=SERVICE_RUNNING(0x4)>
        process = <WinProcess "!cannot-retrieve-name" pid 6060 at 0x35851d0>

Enumerating handles:
    There are 208332 handles:
    First handle is: <HandleWow64 value=<0x4> in process pid=4>
    Enumerating handles of the current process:
        There are 275 handles for this process
    Looking for a File handle:
        Handle is <HandleWow64 value=<0x4> in process pid=15236>
        Name is <\Device\ConDrv>

Dumping the first system module
    <SystemModuleWow64 name="\SystemRoot\system32\ntoskrnl.exe" base=0xfffff80023000000>
        ImageName = \SystemRoot\system32\ntoskrnl.exe
        Base = 0xfffff80023000000
        Size = 0xab7000
        Flags = 0x8804000
        LoadCount = 240

18.4. Services

import windows
import windows.generated_def as gdef

print("Listing the first 3 services:")
for service in windows.system.services[:3]:
    print(" * {0}".format(service))
print("")

TARGET_SERVICE = b"TapiSrv"
print("Retriving service <{0}>".format(TARGET_SERVICE))
service = windows.system.services[TARGET_SERVICE]
print("{0}".format(service))
print(" - name: {0!r}".format(service.name))
print(" - description: {0!r}".format(service.description))
print(" - state: {0!r}".format(service.status.state))
print(" - type: {0!r}".format(service.status.type))
print(" - process: {0!r}".format(service.process))
print(" - security-description: {0}".format(service.security_descriptor))

if service.status.state == gdef.SERVICE_RUNNING:
    print("Service already running, not trying to start it")
else:
    print("Trying to start the service")
    service.start()
    while service.status.state != gdef.SERVICE_RUNNING:
        pass
    print("Service started !")
    print("{0}".format(service))
    print(" - state: {0!r}".format(service.status.state))
    print(" - process: {0!r}".format(service.process))

Output

(cmd) python service\service_demo.py
Listing the first 3 services:
 * <Service "1394ohci" SERVICE_STOPPED(0x1)>
 * <Service "3ware" SERVICE_STOPPED(0x1)>
 * <Service "ACPI" SERVICE_RUNNING(0x4)>

Retriving service <TapiSrv>
<Service "TapiSrv" SERVICE_STOPPED(0x1)>
 - name: 'TapiSrv'
 - description: 'Telephony'
 - state: SERVICE_STOPPED(0x1)
 - type: 48L
 - process: None
 - security-description: O:SYG:SYD:(A;;CCLCSWRPWPDTLOCRRC;;;SY)(A;;CCDCLCSWRPWPDTLOCRSDRCWDWO;;;BA)(A;;CCLCSWRPLOCRRC;;;IU)(A;;CCLCSWRPLOCRRC;;;SU)
Trying to start the service
Service started !
<Service "TapiSrv" SERVICE_RUNNING(0x4)>
 - state: SERVICE_RUNNING(0x4)
 - process: <WinProcess "!cannot-retrieve-name" pid 14700 at 0x4a2c4f0>

18.5. Network - socket exploration

import sys
import os.path
import socket
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows

if not windows.utils.check_is_elevated():
    print("!!! Demo will fail because closing a connection require elevated process !!!")

print("Working on ipv4")
conns = windows.system.network.ipv4

print("== Listening ==")
print("Some listening connections: {0}".format([c for c in conns if not c.established][:3]))
print("Listening ports are : {0}".format([c.local_port for c in conns if not c.established]))

print("== Established ==")
print("Some established connections: {0}".format([c for c in conns if c.established][:3]))

TARGET_HOST = "localhost"
TARGET_PORT = 80
print("== connection to {0}:{1} ==".format(TARGET_HOST, TARGET_PORT))
s = socket.create_connection((TARGET_HOST, TARGET_PORT))

our_connection = [c for c in windows.system.network.ipv4 if c.established and c.remote_port == TARGET_PORT and c.remote_addr == s.getpeername()[0]]

print("Our connection is {0}".format(our_connection))
print("Sending YOP")
s.send("YOP")
print("Closing socket")
our_connection[0].close()
print("Sending LAIT")
s.send("LAIT")

Output-New

(cmd) python network\network.py
Working on ipv4
== Listening ==
Some listening connections: [<TCP IPV4 Listening socket on 0.0.0.0:80>, <TCP IPV4 Listening socket on 0.0.0.0:135>, <TCP IPV4 Listening socket on 0.0.0.0:445>]
Listening ports are : [80, 135, 445, 902, 912, 27036, 49664, 49665, 49666, 49667, 49671, 49673, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 5556, 6463, 22885, 22886, 27060, 49330, 49331, 49794, 49795, 49867, 52541, 57125, 57138, 65000, 65001, 139, 5556, 57046, 57109, 57110, 57143, 57144, 139, 5556, 139, 5556]
== Established ==
Some established connections: [<TCP IPV4 Connection 127.0.0.1:912 -> 127.0.0.1:49488>, <TCP IPV4 Connection 127.0.0.1:912 -> 127.0.0.1:52332>, <TCP IPV4 Connection 127.0.0.1:49488 -> 127.0.0.1:912>]
== connection to localhost:80 ==
Our connection is [<TCP IPV4 Connection 127.0.0.1:57167 -> 127.0.0.1:80>]
Sending YOP
Closing socket
Sending LAIT
Traceback (most recent call last):
  File "network\network.py", line 34, in <module>
    s.send("LAIT")
socket.error: [Errno 10054] An existing connection was forcibly closed by the remote host

18.6. Registry

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows

registry = windows.system.registry
print("Registry is <{0}>".format(registry))

current_user = registry("HKEY_CURRENT_USER")
print("HKEY_CURRENT_USER is <{0}>".format(current_user))
subkeys_name = [s.name for s in current_user.subkeys]
print("HKEY_CURRENT_USER subkeys names are:")
pprint.pprint(subkeys_name)

print("Opening 'Software' in HKEY_CURRENT_USER: {0}".format(current_user("Software")))
print("We can also open it in one access: {0}".format(registry(r"HKEY_CURRENT_USER\Sofware")))
print("Looking at CurrentVersion")

windows_info = registry("HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion")
print("Key is {0}".format(windows_info))

print("values are:")
pprint.pprint(windows_info.values)

registered_owner = windows_info["RegisteredOwner"]
print("registered owner = <{0}>".format(registered_owner))

Output

(cmd) python registry\registry.py
Registry is <<windows.winobject.registry.Registry object at 0x061F89F0>>
HKEY_CURRENT_USER is <<PyHKey "HKEY_CURRENT_USER">>
HKEY_CURRENT_USER subkeys names are:
['AppEvents',
 'AppXBackupContentType',
 'Console',
 'Control Panel',
 'Environment',
 'EUDC',
 'Keyboard Layout',
 'Network',
 'Printers',
 'Software',
 'System',
 'Uninstall',
 'Volatile Environment']
Opening 'Software' in HKEY_CURRENT_USER: <PyHKey "HKEY_CURRENT_USER\Software">
We can also open it in one access: <PyHKey "HKEY_CURRENT_USER\Sofware">
Looking at CurrentVersion
Key is <PyHKey "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion">
values are:
[KeyValue(name='SoftwareType', value=u'System', type=1),
 KeyValue(name='RegisteredOwner', value=u'hakril', type=1),
 ...
 KeyValue(name='PathName', value=u'C:\\WINDOWS', type=1)]
registered owner = <KeyValue(name='RegisteredOwner', value=u'hakril', type=1)>

18.7. Scheduled tasks

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.generated_def as gdef

tscheduler = windows.system.task_scheduler
print("Task scheduler is {0}".format(tscheduler))
root = tscheduler.root
print("Root folder is {0}".format(root))
print ("Listing sub folders")
for subfolder in root.folders:
    print("   * {0}".format(subfolder))
    last_name = subfolder.name

demo_folder = "\Microsoft\Windows\AppID"
print("Manually opening subfolder <{0}>".format(demo_folder))
subfolder = root(demo_folder)
print("Working into {0}".format(subfolder))
for task in subfolder.tasks:
    print("   * {task.name}".format(task=task))

print("")
print("Analysing task {task}".format(task=task))
print("   * Name: <{task.name}>".format(task=task))
print("   * Path: <{task.path}>".format(task=task))
print("   * Definition: <{task.definition}>".format(task=task))
print("Listing actions:")
for action in task.definition.actions:
    print("   * Action: <{action}>".format(action=action))
    print("     * Type: <{action.type!r}>".format(action=action))
    if getattr(action, "path", None):
        print("     * path: <{action.path}>".format(action=action))
        print("     * arguments: <{action.arguments}>".format(action=action))

    # import pdb;pdb.set_trace()
print("Listing triggers:")
for trigger in task.definition.triggers:
    print("   * Trigger type: <{trigger.type!r}>".format(trigger=trigger))

print("")
DEMO_FOLDER_NAME = "PFW_DEMO_FOLDER"
DEMO_TASK_NAME = "PFW_DEMO_TASK"
print("Creating folder <{0}>".format(DEMO_FOLDER_NAME))
demo_folder = root.create_folder(DEMO_FOLDER_NAME)
print("Demo folder is {0}".format(demo_folder))

print("Creating Task definition")
# Create a Task definition
new_task_definition = tscheduler.create()
actions = new_task_definition.actions
# Add an TASK_ACTION_EXEC action to the task def
new_action = actions.create(gdef.TASK_ACTION_EXEC)
new_action.path = sys.executable
new_action.arguments = "-c 'Hello !'"
# Register the new task under 'DEMO_TASK_NAME'
print("Registering task definition as <{0}> in <{1}>".format(DEMO_TASK_NAME, demo_folder))
new_task = demo_folder.register(DEMO_TASK_NAME, new_task_definition)
print("Created task is {0}".format(new_task))

print("Deleting the demo task")
del demo_folder[DEMO_TASK_NAME]
print("Deleting the demo folder")
root.delete_folder(DEMO_FOLDER_NAME)

Output

(cmd) python scheduled_tasks\scheduled_task.py
Task scheduler is <TaskService at 0x6f2a1c0>
Root folder is <TaskFolder "\" at 0x6f2a2b0>
Listing sub folders
   * <TaskFolder "\ASUS" at 0x6f2a350>
   * <TaskFolder "\Intel" at 0x6f2a300>
   * <TaskFolder "\Microsoft" at 0x6f2a3a0>
Manually opening subfolder <\Microsoft\Windows\AppID>
Working into <TaskFolder "\Microsoft\Windows\AppID" at 0x6f2a3f0>
   * PolicyConverter
   * SmartScreenSpecific
   * VerifiedPublisherCertStoreCheck

Analysing task <Task "VerifiedPublisherCertStoreCheck" at 0x6f2a300>
   * Name: <VerifiedPublisherCertStoreCheck>
   * Path: <\Microsoft\Windows\AppID\VerifiedPublisherCertStoreCheck>
   * Definition: <<TaskDefinition at 0x6f2a3a0>>
Listing actions:
   * Action: <<ExecAction at 0x6f2a350>>
     * Type: <TASK_ACTION_EXEC(0x0L)>
     * path: <%windir%\system32\appidcertstorecheck.exe>
     * arguments: <None>
Listing triggers:
   * Trigger type: <TASK_TRIGGER_BOOT(0x8L)>

Creating folder <PFW_DEMO_FOLDER>
Demo folder is <TaskFolder "\PFW_DEMO_FOLDER" at 0x6f2a3a0>
Creating Task definition
Registering task definition as <PFW_DEMO_TASK> in <<TaskFolder "\PFW_DEMO_FOLDER" at 0x6f2a3a0>>
Created task is <Task "PFW_DEMO_TASK" at 0x6f2a530>
Deleting the demo task
Deleting the demo folder

18.8. Event Log

import windows
import windows.generated_def as gdef

evtlogmgr = windows.system.event_log
print("Event log Manager is: {0}".format(evtlogmgr))
print("They are <{0}> channels".format(len(list(evtlogmgr.channels))))
print("They are <{0}> publishers".format(len(list(evtlogmgr.publishers))))


FIREWALL_CHANNEL = "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall"
print("Openning channel <{0}>".format(FIREWALL_CHANNEL))
evtchan = evtlogmgr[FIREWALL_CHANNEL]
print("Channel is {0}".format(evtchan))
# Note that `evtchan.events` is an alias for `evtchan.query().all()`
print("The channel contains <{0}> events".format(len(evtchan.events)))

print("")
EVT_QUERY = "Event/EventData[Data='C:\\WINDOWS\\System32\\svchost.exe'] and Event/System[EventID=2006]"
print("""Querying "{0}">""".format(EVT_QUERY))
query = evtchan.query(EVT_QUERY)
print("Query is {0}".format(query))
event_list = list(query)
print("List contains {0} event".format(len(event_list)))
event = event_list[0]

print("")
print("First event is {0}".format(event))
print("System values:")
print(" * ID: {0}".format(event.id))
print(" * version: {0}".format(event.version))
print(" * level: {0}".format(event.level))
print(" * opcode: {0}".format(event.opcode))
print(" * time_created: {0}".format(event.time_created))
print(" * ID: {0}".format(event.id))

print("Event specific values:")
for name, value in event.data.items():
    print(" * <{0}> -> <{1}>".format(name, value))

print("")
evtmeta =  event.metadata
print("Event metadata is {0}".format(evtmeta))
print(" * id : {0}".format(evtmeta.id))
print(" * channel_id : {0}".format(evtmeta.channel_id))
print(" * message_id : {0}".format(evtmeta.message_id))
print(" * event_data : {0}".format(evtmeta.event_data))
print(" * EventData template :\n{0}".format(evtmeta.template.replace("\r\n", "\n")))


print("")
print("Exploring complex Evt types:")

print("Channel is still {0}".format(evtchan))
print("Channel config is {0}".format(evtchan.config))
publisher = evtchan.config.publisher
print("Channel publisher is {0}".format(publisher))
print("Channel publisher metadata is {0}".format(publisher.metadata))

print("Publisher's channels are:")
for chan in publisher.metadata.channels:
    print(" * {0}".format(chan))

print("Some publisher's event metadata are:")
for evtmeta in list(publisher.metadata.events_metadata)[:3]:
    print(" * {0}: id={1}".format(evtmeta, evtmeta.id))

Output

(cmd) python event_log\eventlog.py
Event log Manager is: <windows.winobject.event_log.EvtlogManager object at 0x0592DB70>
They are <1155> channels
They are <1179> publishers
Openning channel <Microsoft-Windows-Windows Firewall With Advanced Security/Firewall>
Channel is <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall">
The channel contains <1037> events

Querying "Event/EventData[Data='C:\WINDOWS\System32\svchost.exe'] and Event/System[EventID=2006]">
Query is <EvtQuery object at 0x06E9F440>
List contains 304 event

First event is <EvtEvent id="2006" time="2018-05-06 08:03:06.210109">
System values:
 * ID: 2006
 * version: 0
 * level: 4
 * opcode: 0
 * time_created: 131700673862101088
 * ID: 2006
Event specific values:
 * <ModifyingUser> -> <108703760>
 * <RuleName> -> <ByteCodeGeneration>
 * <ModifyingApplication> -> <C:\WINDOWS\System32\svchost.exe>
 * <RuleId> -> <{318EF1CF-A3FA-4B04-8AAC-712276661117}>

Event metadata is <EventMetadata object at 0x06EB96C0>
 * id : 2006
 * channel_id : 16
 * message_id : 2986346454
 * event_data : [u'RuleId', u'RuleName', u'ModifyingUser', u'ModifyingApplication']
 * EventData template :
<template xmlns="http://schemas.microsoft.com/win/2004/08/events">
  <data name="RuleId" inType="win:UnicodeString" outType="xs:string"/>
  <data name="RuleName" inType="win:UnicodeString" outType="xs:string"/>
  <data name="ModifyingUser" inType="win:SID" outType="xs:string"/>
  <data name="ModifyingApplication" inType="win:UnicodeString" outType="xs:string"/>
</template>


Exploring complex Evt types:
Channel is still <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall">
Channel config is <ChannelConfig "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall">
Channel publisher is <EvtPublisher "Microsoft-Windows-Windows Firewall With Advanced Security">
Channel publisher metadata is <PublisherMetadata "Microsoft-Windows-Windows Firewall With Advanced Security">
Publisher's channels are:
 * <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall">
 * <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/ConnectionSecurity">
 * <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/FirewallVerbose">
 * <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/ConnectionSecurityVerbose">
 * <EvtChannel "Network Isolation Operational">
Some publisher's event metadata are:
 * <EventMetadata object at 0x06EBE710>: id=2000
 * <EventMetadata object at 0x06EBE7B0>: id=2001
 * <EventMetadata object at 0x06EBE3F0>: id=2002

18.9. Object manager

import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.generated_def as gdef

object_manager = windows.system.object_manager
print("Object manager is {0}".format(object_manager))
root = object_manager.root
print("Root object is {0}".format(root))

print("")
print("Listing some of root-subobject:")
# Kernel object of type 'Directory' are iterable
for i, (name, obj) in enumerate(root.items()):
    print("  * {0}: {1}".format(name, obj))
    if i == 3:
        break

print("")
print(r"Retrieving <\Rpc Control\lsasspirpc>:")
# You can retrieve this value in one request
x1 = root[r"\Rpc Control\lsasspirpc"]
# Sub-directory also allow __getitem__
x2 = root["Rpc Control"]["lsasspirpc"]
# You can directly request the object manager that will request `root`
x3 = object_manager[r"\Rpc Control\lsasspirpc"]
assert x1.fullname == x2.fullname == x3.fullname

lsasspirpc = x1
print("Object is: {0}".format(lsasspirpc))
print("   * name: <{0}>".format(lsasspirpc.name))
print("   * path: <{0}>".format(lsasspirpc.path))
print("   * fullname: <{0}>".format(lsasspirpc.fullname))
print("   * type: <{0}>".format(lsasspirpc.type))
print("   * target: <{0}>".format(lsasspirpc.target)) # None on non-symlink

print("")
print("Looking for a SymbolicLink in <ArcName>")
slo = [o for o in root["ArcName"].values() if o.type == "SymbolicLink"][0]
print("Object is: {0}".format(slo))
print("   * name: <{0}>".format(slo.name))
print("   * target: <{0}>".format(slo.target))

Output

(cmd) python object_manager\object_manager.py
Object manager is <windows.winobject.object_manager.ObjectManager object at 0x0370ED10>
Root object is <KernelObject "\" (type="Directory")>

Listing some of root-subobject:
  * PendingRenameMutex: <KernelObject "\PendingRenameMutex" (type="Mutant")>
  * ObjectTypes: <KernelObject "\ObjectTypes" (type="Directory")>
  * storqosfltport: <KernelObject "\storqosfltport" (type="FilterConnectionPort")>
  * MicrosoftMalwareProtectionRemoteIoPortWD: <KernelObject "\MicrosoftMalwareProtectionRemoteIoPortWD" (type="FilterConnectionPort")>

Retrieving <\Rpc Control\lsasspirpc>:
Object is: <KernelObject "\Rpc Control\lsasspirpc" (type="ALPC Port")>
   * name: <lsasspirpc>
   * path: <\Rpc Control>
   * fullname: <\Rpc Control\lsasspirpc>
   * type: <ALPC Port>
   * target: <None>

Looking for a SymbolicLink in <ArcName>
Object is: <KernelObject "\ArcName\multi(0)disk(0)rdisk(0)" (type="SymbolicLink")>
   * name: <multi(0)disk(0)rdisk(0)>
   * target: <\Device\Harddisk0\Partition0>

18.9.1. find objects

import argparse

import windows
import windows.generated_def as gdef

def obj_with_link(obj):
    target = obj.target
    if target is None:
        return str(obj)
    return "{0} -> <{1}>".format(obj, target)


def find_name(root, findname):
    TODO = [root]
    while TODO:
        try:
            for name, obj in TODO.pop().items():
                if findname in name or findname in obj.type:
                    print("* {0}".format(obj_with_link(obj)))
                if obj.type == "Directory":
                    TODO.append(obj)
        except gdef.NtStatusException as e:
            print("<{0}> -> {1}".format(obj.fullname, e.name))



parser = argparse.ArgumentParser(prog=__file__)
parser.add_argument('name', nargs='?', default="ls", help='The name of the object to find')
res = parser.parse_args()

objmanag = windows.system.object_manager
print("Looking for object name containing <{0}>".format(res.name))
find_name(objmanag.root, res.name)


Output

(cmd) python object_manager\findobj.py
Looking for object name containing <ls>
* <KernelObject "\KnownDlls32" (type="Directory")>
* <KernelObject "\Win32kCrossSessionGlobals" (type="Section")>
* <KernelObject "\KnownDlls" (type="Directory")>
<\DriverStores\SYSTEM> -> STATUS_ACCESS_DENIED
* <KernelObject "\Device\MailslotRedirector" (type="SymbolicLink")> -> <\Device\Mup\;MailslotRedirector>
* <KernelObject "\Device\Mailslot" (type="Device")>
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\Device\00000020> -> STATUS_ACCESS_DENIED
<\KernelObjects\PrefetchTracesReady> -> STATUS_ACCESS_DENIED
<\KnownDlls\powrprof.dll> -> STATUS_ACCESS_DENIED
* <KernelObject "\RPC Control\lsapolicylookup" (type="ALPC Port")>
* <KernelObject "\RPC Control\lsacap" (type="ALPC Port")>
* <KernelObject "\RPC Control\lsasspirpc" (type="ALPC Port")>
<\Windows\SbApiPort> -> STATUS_ACCESS_DENIED
<\Windows\SbApiPort> -> STATUS_ACCESS_DENIED
<\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED
<\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED
<\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED

18.10. Device manager

import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.generated_def as gdef

devmgr = windows.system.device_manager
print("Device manager is {0}".format(devmgr))

print("Enumerating the first 3 device classes")
for cls in devmgr.classes[:3]:
    print(" * {0}".format(cls))

print("Finding device class 'System'")
# Allow devmgr.classes["name"] ?
system_cls = [cls for cls in devmgr.classes if cls.name == b"System"][0]
print("  * {0}".format(system_cls))
print("  Enumerating some devices of 'System'")
devices = system_cls.devices.all()

for devinst in (devices[0], devices[25], devices[35]): # Some "random" devices to have interesting ones
    print("    * {0}".format(devinst))
    devconf = devinst.allocated_configuration
    if not devconf:
        continue
    print("        Enumerating allocated resources:")
    for resource in devconf.resources:
        print("          * {0}".format(resource))


# python64  samples\device\device_manager.py

# Device manager is <windows.winobject.device_manager.DeviceManager object at 0x0000000003669908>
# Enumerating the first 3 device classes
#  * <DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84>
#  * <DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362>
#  * <DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C>
# Finding device class 'System'
#   * <DeviceClass name="System" guid=4D36E97D-E325-11CE-BFC1-08002BE10318>
#   Enumerating some devices of 'System'
#     * <DeviceInstance "Motherboard resources" (id=1)>
#     * <DeviceInstance "Microsoft ACPI-Compliant Embedded Controller" (id=26)>
#         Enumerating allocated resources:
#           * <IoResource : [0x00000000000062-0x00000000000062]>
#           * <IoResource : [0x00000000000066-0x00000000000066]>
#     * <DeviceInstance "High Definition Audio Controller" (id=36)>
#         Enumerating allocated resources:
#           * <MemoryResource : [0x000000f7080000-0x000000f7083fff]>
#           * <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
#           * <IrqResource : [0x00000000000011]>

Output

(cmd) python device_manager\device_manager.py
Device manager is <windows.winobject.device_manager.DeviceManager object at 0x00000000033442E8>
Enumerating the first 3 device classes
 * <DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84>
 * <DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362>
 * <DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C>
Finding device class 'System'
  * <DeviceClass name="System" guid=4D36E97D-E325-11CE-BFC1-08002BE10318>
  Enumerating some devices of 'System'
    * <DeviceInstance "Motherboard resources" (id=1)>
    * <DeviceInstance "Microsoft ACPI-Compliant Embedded Controller" (id=26)>
        Enumerating allocated resources:
          * <IoResource : [0x00000000000062-0x00000000000062]>
          * <IoResource : [0x00000000000066-0x00000000000066]>
    * <DeviceInstance "High Definition Audio Controller" (id=36)>
        Enumerating allocated resources:
          * <MemoryResource : [0x000000f7080000-0x000000f7083fff]>
          * <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
          * <IrqResource : [0x00000000000011]>

18.10.1. Enumerate devices

import argparse

import windows
import windows.generated_def as gdef

devmgr = windows.system.device_manager

def class_generator(filter=None):
    for cls in devmgr.classes:
        if filter and cls.name != filter.encode():
            continue
        yield cls


def main(clsfilter, enumerate_devices, print_devinst_resources, attributes):
    for devcls in class_generator(clsfilter):
        print(devcls)
        if not enumerate_devices:
            continue
        # Enumerate devices
        for devinst in devcls.devices:
            print("  * {0}".format(devinst))

            # Print attributes
            if attributes:
                print("    Attributes:")
                for attr in attributes:
                    value = getattr(devinst, attr)
                    print("    * {0}={1}".format(attr, value))

            if not print_devinst_resources:
                continue
            # Device resources
            devconf = devinst.allocated_configuration
            if not devconf:
                # No allocated configuration
                # Check boot conf ?
                continue

            for resource in devconf.resources:
                print("    * {0}".format(resource))


if __name__ == "__main__":
    parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--class", dest="clsfilter", default=None, help="The classe to list: default all")
    parser.add_argument("--no-print-devices", action="store_true", help="Prevent the listing of devices in the matching classes")
    parser.add_argument("--print-resources", action="store_true", help="Print the resources allocated to the device instance")
    parser.add_argument("--attributes", nargs="+", help="The list of attributes to print for each discovered device instance")

    args = parser.parse_args()
    print(args)
    main(args.clsfilter,
            enumerate_devices=not args.no_print_devices,
            print_devinst_resources=args.print_resources,
            attributes=args.attributes)



$ python64 device_manager\enum_devices.py --no-print-devices
Namespace(clsfilter=None, no_print_devices=True, print_resources=False)
<DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84>
<DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362>
<DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C>
<DeviceClass name="DigitalMediaDevices" guid=14B62F50-3F15-11DD-AE16-0800200C9A66>
<DeviceClass name="PrintQueue" guid=1ED2BBF9-11F0-4084-B21F-AD83A8E6DCDC>
<DeviceClass name="WCEUSBS" guid=25DBCE51-6C8F-4A72-8A6D-B54C2B4FC835>
<DeviceClass name="SecurityAccelerator" guid=268C95A1-EDFE-11D3-95C3-0010DC4050A5>
<DeviceClass name="HidMsr" guid=2A9FE532-0CDC-44F9-9827-76192F2CA2FB>
<DeviceClass name="SystemRecovery" guid=2DB15374-706E-4131-A0C7-D7C78EB0289A>
....

$ python64 device_manager\enum_devices.py --class Processor --attributes name manufacturer device_object_name location_paths
Namespace(attributes=['name', 'manufacturer', 'device_object_name', 'location_paths'], clsfilter='Processor', no_print_devices=False, print_resources=False)
<DeviceClass name="Processor" guid=50127DC3-0F36-415E-A6CC-4CB3BE910B65>
  * <DeviceInstance "Intel Processor" (id=1)>
    Attributes:
    * name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
    * manufacturer=Intel
    * device_object_name=\Device\00000017
    * location_paths=[u'ACPI(_SB_)#ACPI(CPU0)']
  * <DeviceInstance "Intel Processor" (id=2)>
    Attributes:
    * name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
    * manufacturer=Intel
    * device_object_name=\Device\00000018
    * location_paths=[u'ACPI(_SB_)#ACPI(CPU1)']
  * <DeviceInstance "Intel Processor" (id=3)>
    Attributes:
    * name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
    * manufacturer=Intel
    * device_object_name=\Device\00000019
    * location_paths=[u'ACPI(_SB_)#ACPI(CPU2)']
  * <DeviceInstance "Intel Processor" (id=4)>
    Attributes:
    * name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
    * manufacturer=Intel
    * device_object_name=\Device\0000001a
    * location_paths=[u'ACPI(_SB_)#ACPI(CPU3)']


$ python64 device_manager\enum_devices.py --class Display --print-resources
Namespace(clsfilter='Display', no_print_devices=False, print_resources=True)
<DeviceClass name="Display" guid=4D36E968-E325-11CE-BFC1-08002BE10318>
  * <DeviceInstance "NVIDIA GeForce GTX 1070" (id=1)>
    * <MemoryResource : [0x000000f6000000-0x000000f6ffffff]>
    * <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
    * <MemoryResource : [0x000000e0000000-0x000000efffffff]>
    * <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
    * <MemoryResource : [0x000000f0000000-0x000000f1ffffff]>
    * <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
    * <IoResource : [0x0000000000e000-0x0000000000e07f]>
    ...

18.11. windows.wintrust

import sys
import os.path
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows.wintrust

TARGET_FILE = r"C:\windows\system32\ntdll.dll"
print("Checking signature of <{0}>".format(TARGET_FILE))
print(" is_signed: <{0}>".format(windows.wintrust.is_signed(TARGET_FILE)))
print(" check_signature: <{0}>".format(windows.wintrust.check_signature(TARGET_FILE)))

sign_info = windows.wintrust.full_signature_information(TARGET_FILE)
print(" full_signature_information:")
print("    * signed <{0}>".format(sign_info.signed))
print("    * catalog <{0}>".format(sign_info.catalog))
print("    * catalogsigned <{0}>".format(sign_info.catalogsigned))
print("    * additionalinfo <{0}>".format(sign_info.additionalinfo))

print("Checking signature of some loaded DLL")
for module in windows.current_process.peb.modules[:5]:
    path = module.fullname
    is_signed =  windows.wintrust.is_signed(path)
    if is_signed:
        print("<{0}> : {1}".format(path, is_signed))
    else:
        sign_info = windows.wintrust.full_signature_information(path)
        print("<{0}> : {1} ({2})".format(path, is_signed, sign_info[3]))


Output

(cmd) python crypto\wintrust.py
Checking signature of <C:\windows\system32\ntdll.dll>
 is_signed: <True>
 check_signature: <0>
 full_signature_information:
    * signed <True>
    * catalog <C:\WINDOWS\system32\CatRoot\{F750E6C3-38EE-11D1-85E5-00C04FC295EE}\Package_802_for_KB4054517~31bf3856ad364e35~amd64~~10.0.1.6.cat>
    * catalogsigned <True>
    * additionalinfo <0>
Checking signature of some loaded DLL
<c:\python27\python.exe> : False (TRUST_E_NOSIGNATURE(0x800b0100L))
<c:\windows\system32\ntdll.dll> : True
<c:\windows\system32\kernel32.dll> : True
<c:\windows\system32\kernelbase.dll> : True
<c:\windows\winsxs\x86_microsoft.vc90.crt_1fc8b3b9a1e18e3b_9.0.30729.9317_none_508dca76bcbcfe81\msvcr90.dll> : True

18.12. VectoredException()

18.12.1. In local process

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import ctypes
import windows
from windows.winobject.exception import VectoredException
import windows.generated_def.windef as windef
from windows.generated_def.winstructs import *


@VectoredException
def handler(exc):
    print("==Entry of VEH handler==")
    if exc[0].ExceptionRecord[0].ExceptionCode == EXCEPTION_ACCESS_VIOLATION:
        target_addr = ctypes.cast(exc[0].ExceptionRecord[0].ExceptionInformation[1], ctypes.c_void_p).value
        print("Instr at {0} accessed to addr {1}".format(hex(exc[0].ExceptionRecord[0].ExceptionAddress), hex(target_addr)))
        print("Resetting page protection to <PAGE_READWRITE>")
        windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_READWRITE)
        exc[0].ContextRecord[0].EEFlags.TF = 1
        return windef.EXCEPTION_CONTINUE_EXECUTION
    else:
        print("Exception of type {0}".format(exc[0].ExceptionRecord[0].ExceptionCode))
        print("Resetting page protection to <PAGE_NOACCESS>")
        windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_NOACCESS)
        return windef.EXCEPTION_CONTINUE_EXECUTION


windows.winproxy.AddVectoredExceptionHandler(0, handler)

target_page = windows.current_process.virtual_alloc(0x1000)
print("Protected page is at <{0}>".format(hex(target_page)))
print("Setting page protection to <PAGE_NOACCESS>")
windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_NOACCESS)

print("")
v = ctypes.c_uint.from_address(target_page).value
print("Value 1 read")

print("")
v = ctypes.c_uint.from_address(target_page + 0x10).value
print("Value 2 read")

Output

(cmd) python process\veh_segv.py
Protected page is at <0x7270000>
Setting page protection to <PAGE_NOACCESS>

==Entry of VEH handler==
Instr at 0x6a54c166 accessed to addr 0x7270000
Resetting page protection to <PAGE_READWRITE>
==Entry of VEH handler==
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_NOACCESS>
Value 1 read

==Entry of VEH handler==
Instr at 0x6a54c166 accessed to addr 0x7270010
Resetting page protection to <PAGE_READWRITE>
==Entry of VEH handler==
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_NOACCESS>
Value 2 read

18.12.2. In remote process

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.test

from windows.generated_def.winstructs import *

python_code = """
import windows
import ctypes
import windows
from windows.winobject.exception import VectoredException
import windows.generated_def.windef as windef
from windows.generated_def.winstructs import *

windows.utils.create_console()

module_to_trace = "gdi32.dll"
nb_repeat = [5]

@VectoredException
def handler(exc):
    if exc[0].ExceptionRecord[0].ExceptionCode == EXCEPTION_ACCESS_VIOLATION:
        print("")
        target_addr = ctypes.cast(exc[0].ExceptionRecord[0].ExceptionInformation[1], ctypes.c_void_p).value
        print("Instr at {0} accessed to addr {1} ({2})".format(hex(exc[0].ExceptionRecord[0].ExceptionAddress), hex(target_addr), module_to_trace))
        windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_EXECUTE_READWRITE)
        nb_repeat[0] -= 1
        if nb_repeat[0]:
            exc[0].ContextRecord[0].EEFlags.TF = 1
        else:
            print("No more tracing !")
        return windef.EXCEPTION_CONTINUE_EXECUTION
    else:
        print("Exception of type {0}".format(exc[0].ExceptionRecord[0].ExceptionCode))
        print("Resetting page protection to <PAGE_READWRITE>")
        windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_READWRITE)
        return windef.EXCEPTION_CONTINUE_EXECUTION


windows.winproxy.AddVectoredExceptionHandler(0, handler)

print("Tracing execution in module: <{0}>".format(module_to_trace))

module = [x for x in windows.current_process.peb.modules if x.name == module_to_trace][0]
target_page = module.baseaddr
code_size = module.pe.get_OptionalHeader().SizeOfCode

print("Protected page is at {0}".format(hex(target_page)))
windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_READWRITE)
"""

c = windows.test.pop_proc_64(dwCreationFlags=CREATE_SUSPENDED)
x = c.execute_python(python_code)

c.threads[0].resume()

import time
time.sleep(0.1)

for t in c.threads:
    t.suspend()

time.sleep(1)
c.exit()

Output:

(cmd λ) python.exe process\remote_veh_segv.py
(In another console)

Tracing execution in module: <gdi32.dll>
Protected page is at 0x7ffa3c700000L

Instr at 0x7ffa3c70f0f0L accessed to addr 0x7ffa3c70f0f0L (gdi32.dll)
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_READWRITE>

Instr at 0x7ffa3c70f0f5L accessed to addr 0x7ffa3c70f0f5L (gdi32.dll)
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_READWRITE>

Instr at 0x7ffa3c70f0faL accessed to addr 0x7ffa3c70f0faL (gdi32.dll)
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_READWRITE>

Instr at 0x7ffa3c70f0ffL accessed to addr 0x7ffa3c70f0ffL (gdi32.dll)
Exception of type EXCEPTION_SINGLE_STEP(0x80000004L)
Resetting page protection to <PAGE_READWRITE>

Instr at 0x7ffa3c70f100L accessed to addr 0x7ffa3c70f100L (gdi32.dll)
No more tracing !

18.13. Debugging

18.13.1. Debugger

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.test
import windows.debug

from windows.generated_def.winstructs import *



class MyDebugger(windows.debug.Debugger):
    def on_exception(self, exception):
        code = exception.ExceptionRecord.ExceptionCode
        addr = exception.ExceptionRecord.ExceptionAddress
        print("Got exception {0} at 0x{1:x}".format(code, addr))


class PrintUnicodeString(windows.debug.Breakpoint):
    def __init__(self, addr, argument_position):
        super(PrintUnicodeString, self).__init__(addr)
        self.arg_pos = argument_position


    def trigger(self, dbg, exc):
        p = dbg.current_process
        t = dbg.current_thread
        esp = t.context.Esp

        unicode_string_addr = p.read_ptr(esp + (self.arg_pos + 1) * 4)
        wstring_addr = p.read_ptr(unicode_string_addr + 4)
        dll_loaded = p.read_wstring(wstring_addr).lower()
        print("Loading <{0}>".format(dll_loaded))

        if dll_loaded.endswith("comctl32.dll"):
            print("Ask to load <comctl32.dll>: exiting process")
            dbg.current_process.exit()


calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS)
d = MyDebugger(calc)
d.add_bp(PrintUnicodeString("ntdll!LdrLoadDll", argument_position=2))
d.loop()

Output

(cmd) python debug\debugger_print_LdrLoaddll.py
Loading <kernel32.dll>
Got exception EXCEPTION_BREAKPOINT(0x80000003L) at 0x77e0dadf
Loading <api-ms-win-core-synch-l1-2-0>
Loading <api-ms-win-core-fibers-l1-1-1>
Loading <api-ms-win-core-fibers-l1-1-1>
Loading <api-ms-win-core-synch-l1-2-0>
Loading <api-ms-win-core-localization-l1-2-1>
Loading <kernel32>
Loading <api-ms-win-core-string-l1-1-0>
Loading <api-ms-win-core-datetime-l1-1-1>
Loading <api-ms-win-core-localization-obsolete-l1-2-0>
Loading <c:\windows\system32\imm32.dll>
Loading <api-ms-win-core-processthreads-l1-1-2.dll>
Loading <c:\windows\system32\uxtheme.dll>
Loading <c:\windows\system32\mrmcorer.dll>
Loading <c:\windows\system32\windows.storage.dll>
Loading <c:\windows\system32\efswrt.dll>
Loading <c:\windows\system32\twinapi.appcore.dll>
Loading <rpcrt4.dll>
Loading <c:\windows\system32\wintypes.dll>
Loading <c:\windows\syswow64\wintypes.dll>
Loading <comctl32.dll>
Ask to load <comctl32.dll>: exiting process

18.13.1.1. on_setup

import windows.debug

class MySetupDebugger(windows.debug.Debugger):
    def on_setup(self):
        super(MySetupDebugger, self).on_setup()
        print("Setup called: {0}".format(self.current_process))

    def on_exception(self, exc):
        print("Exception: {0}".format(exc.ExceptionRecord.ExceptionCode))

    def on_exit_process(self, evt):
        print("Process exit: {0}".format(self.current_process))

class SimpleDebugger(windows.debug.Debugger):
    def on_exception(self, exc):
        print("Exception: {0}".format(exc.ExceptionRecord.ExceptionCode))

    def on_exit_process(self, evt):
        print("Process exit: {0}".format(self.current_process))



print("== With on_setup ==")
dbg = MySetupDebugger.debug(r"c:\windows\system32\whoami.exe")
dbg.loop()

print("\n== Without on_setup ==")
dbg = SimpleDebugger.debug(r"c:\windows\system32\whoami.exe")
dbg.loop()

Output

(cmd) python debug\debugger_on_setup.py
== With on_setup ==
Setup called: <WinProcess "whoami.exe" pid 29796 at 0x4ccb790>
<whoami output>
Process exit: <WinProcess "whoami.exe" pid 29796 (DEAD) at 0x4ccb790>

== Without on_setup ==
Exception: EXCEPTION_BREAKPOINT(0x80000003L)
<whoami output>
Process exit: <WinProcess "whoami.exe" pid 33328 (DEAD) at 0x4ccbdb0>

18.13.1.2. Single stepping

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.test
import windows.debug

import windows.native_exec.simple_x86 as x86
from windows.generated_def.winstructs import *


class MyDebugger(windows.debug.Debugger):
    def __init__(self, *args, **kwargs):
        super(MyDebugger, self).__init__(*args, **kwargs)
        self.single_step_counter = 0

    def on_exception(self, exception):
        code = exception.ExceptionRecord.ExceptionCode
        addr = exception.ExceptionRecord.ExceptionAddress
        print("Got exception {0} at 0x{1:x}".format(code, addr))

    def on_single_step(self, exception):
        code = exception.ExceptionRecord.ExceptionCode
        addr = exception.ExceptionRecord.ExceptionAddress
        print("Got single_step {0} at 0x{1:x}".format(code, addr))
        self.single_step_counter -= 1
        if self.single_step_counter > 0:
            return self.single_step()
        else:
            print("No more single step: exiting")
            self.current_process.exit()


class SingleStepOnWrite(windows.debug.MemoryBreakpoint):
    """Check that BP/dbg can trigger single step and that instruction follows"""
    def trigger(self, dbg, exc):
        fault_addr = exc.ExceptionRecord.ExceptionInformation[1]
        eip = dbg.current_thread.context.pc
        print("Instruction at <{0:#x}> wrote at <{1:#x}>".format(eip, fault_addr))
        dbg.single_step_counter = 4
        return dbg.single_step()


calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS)
d = MyDebugger(calc)

code = calc.virtual_alloc(0x1000)
data = calc.virtual_alloc(0x1000)

injected = x86.MultipleInstr()
injected += x86.Mov("EAX", 0)
injected += x86.Mov(x86.deref(data), "EAX")
injected += x86.Add("EAX", 4)
injected += x86.Mov(x86.deref(data + 4), "EAX")
injected += x86.Add("EAX", 8)
injected += x86.Mov(x86.deref(data + 8), "EAX")
injected += x86.Nop()
injected += x86.Nop()
injected += x86.Ret()

calc.write_memory(code, injected.get_code())
d.add_bp(SingleStepOnWrite(data, size=8, events="W"))
calc.create_thread(code, 0)
d.loop()

Output

(cmd) python debug\debugger_membp_singlestep.py
Got exception EXCEPTION_BREAKPOINT(0x80000003L) at 0x77e0dadf
Instruction at <0x520006> wrote at <0x530000>
Got single_step EXCEPTION_SINGLE_STEP(0x80000004L) at 0x52000c
Got single_step EXCEPTION_SINGLE_STEP(0x80000004L) at 0x520011
Instruction at <0x520011> wrote at <0x530004>
Got single_step EXCEPTION_SINGLE_STEP(0x80000004L) at 0x520017
Got single_step EXCEPTION_SINGLE_STEP(0x80000004L) at 0x52001c
Got single_step EXCEPTION_SINGLE_STEP(0x80000004L) at 0x520022
Got single_step EXCEPTION_SINGLE_STEP(0x80000004L) at 0x520023
No more single step: exiting

18.13.1.3. windows.debug.FunctionCallBP

import sys
import os.path
import pprint
import threading
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.test
import windows.debug
import windows.generated_def as gdef

# The debugge python will just print the result of
# 3 call to IsDebuggerPresent
TARGET_PYTHON_CODE = '"\
import ctypes;\
import time;\
IsDebuggerPresent = ctypes.windll.kernel32.IsDebuggerPresent;\
print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\
time.sleep(1);\
print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\
time.sleep(1);\
print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\
"'

# This breakpoint now nothing about its target argument
# It only now how to break at the return of the function
# This allow us to change the return value of any function
class IncrementReturnValue(windows.debug.FunctionCallBP):
    def __init__(self, addr, initialvalue):
        super(IncrementReturnValue, self).__init__(addr)
        self.initialvalue = initialvalue

    def trigger(self, dbg, exc):
        # Ask to break a the return of the function
        # callback is ret_trigger
        self.break_on_ret(dbg, exc)

    def ret_trigger(self, dbg, exc):
        ctx = dbg.current_thread.context
        # Func result is an alias to EAX/RAX
        ctx.func_result = self.initialvalue
        # Set the new context for the target thread
        dbg.current_thread.set_context(ctx)
        self.initialvalue += 1

d = windows.debug.Debugger.debug(sys.executable, [sys.executable, "-c", TARGET_PYTHON_CODE])
# We could also give the direct address of the function
# But it would require to wait for the module to be loaded
d.add_bp(IncrementReturnValue("kernelbase!IsDebuggerPresent", 42))
d.loop()

Output

(cmd) python debug\change_function_ret_value.py
[DEBUGGE] IsDebuggerPresent=42
[DEBUGGE] IsDebuggerPresent=43
[DEBUGGE] IsDebuggerPresent=44

18.13.1.4. windows.debug.FunctionBP

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.test
import windows.debug

from windows.generated_def.winstructs import *

class FollowNtCreateFile(windows.debug.FunctionBP):
    TARGET = windows.winproxy.NtCreateFile
    COUNTER = 3

    def trigger(self, dbg, exc):
        if not self.COUNTER:
            print("Exiting process")
            dbg.current_process.exit()
            return
        params = self.extract_arguments(dbg.current_process, dbg.current_thread)
        filename = params["ObjectAttributes"].contents.ObjectName.contents.Buffer
        handle_addr = params["FileHandle"].value
        self.data = (filename, handle_addr)
        self.break_on_ret(dbg, exc)

    def ret_trigger(self, dbg, exc):
        filename, handle_addr = self.data
        ret_value = dbg.current_thread.context.func_result # EAX / RAX depending of bitness
        handle_value = dbg.current_process.read_ptr(handle_addr)
        if ret_value:
            print("NtCreateFile of <{0}> FAILED (result={1:#x})".format(filename, ret_value))
            return
        print("NtCreateFile of <{0}>: handle = {1:#x}".format(filename, handle_value))
        # Manual verification
        fhandle = [h for h in windows.system.handles if h.dwProcessId == dbg.current_process.pid and h.wValue == handle_value]
        if not fhandle:
            raise ValueError("handle not found!")
        fhandle = fhandle[0]
        print("Handle manually found! typename=<{0}>, name=<{1}>".format(fhandle.type, fhandle.name))
        print("")
        self.COUNTER -= 1

if __name__ == "__main__":
    calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS)
    d = windows.debug.Debugger(calc)
    d.add_bp(FollowNtCreateFile())
    d.loop()

Output

(cmd) python debug\debug_functionbp.py
NtCreateFile of <86250608>: handle = 0x124
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume2\Windows\SysWOW64\en-US\notepad.exe.mui>

NtCreateFile of <1996168876>: handle = 0x150
Handle manually found! typename=<File>, name=<\Device\DeviceApi>

NtCreateFile of <86206640>: handle = 0x280
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume2\Windows\Fonts\StaticCache.dat>

Exiting process

18.13.1.5. Debugger.attach

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.test
import windows.debug

from windows.generated_def.winstructs import *

# Just a debugger that follow NtCreateFile and print filename & handler
from debug_functionbp import FollowNtCreateFile


def follow_create_file(pid):
    print("Finding process with pid <{0}>".format(pid))
    target = [p for p in windows.system.processes if p.pid == pid][0]
    print("Target is {0}".format(target))
    dbg = windows.debug.Debugger.attach(target)
    print("Debugger attached: {0}".format(dbg))
    print("")
    dbg.add_bp(FollowNtCreateFile())
    dbg.loop()

if __name__ == "__main__":
    # Create a non-debugged process safe to debug
    calc = windows.test.pop_proc_32(dwCreationFlags=0)
    # Give ovnly the PID to follow_create_file
    follow_create_file(calc.pid)

Output

(cmd) python debug\attach.py
Finding process with pid <12252>
Target is <WinProcess "notepad.exe" pid 12252 at 0x66971b0>
Debugger attached: <windows.debug.debugger.Debugger object at 0x0667CFD0>

NtCreateFile of <10573272>: handle = 0x144
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume2\Windows\SysWOW64\en-US\notepad.exe.mui>

NtCreateFile of <1996168876>: handle = 0x170
Handle manually found! typename=<File>, name=<\Device\DeviceApi>

NtCreateFile of <10504328>: handle = 0x2a8
Handle manually found! typename=<File>, name=<\Device\HarddiskVolume2\Windows\Fonts\StaticCache.dat>

Exiting process

18.13.1.6. Native code tester

import sys
import argparse

import windows
import windows.test
import windows.debug as dbg
import windows.native_exec.simple_x86 as x86
import windows.native_exec.simple_x64 as x64
from windows.generated_def import *


def hexdump(string, start_addr=0):
    result = ""
    if len(string) == 0:
        return
    ascii = list("."*256)
    for i in range(1,0x7f):
        ascii[i] = chr(i)
    ascii[0x0] = "."
    ascii[0x7] = "."
    ascii[0x8] = "."
    ascii[0x9] = "."
    ascii[0xa] = "."
    ascii[0x1b] = "."
    ascii[0xd] = "."
    ascii[0xff] = "\x13"
    ascii = "".join(ascii)
    offset = 0
    while (offset+0x10) <= len(string):
        line = string[offset:(offset+0x10)]
        linebuf = " %08X " % (offset + start_addr)
        for i in range(0,16):
            if i == 8:
                linebuf += " "
            linebuf += "%02X " % ord(line[i])
        linebuf += " "
        for i in range(0,16):
            linebuf += ascii[ord(line[i])]
        result += linebuf+"\n"
        offset += 0x10
    if (len(string) % 0x10) > 0:
        linebuf = " %08X " % (offset + start_addr)
        for i in range((len(string)-(len(string) % 0x10)),(len(string))):
            if i == 8:
                linebuf += " "
            linebuf += "%02X " % ord(string[i])
        linebuf += "   "*(0x10-(len(string) % 0x10))
        linebuf += " "
        for i in range((len(string)-(len(string) % 0x10)),(len(string))):
            linebuf += ascii[ord(string[i])]
        result += linebuf+"\n"
    return result

class StartStepBP(dbg.breakpoints.HXBreakpoint):
    def trigger(self, dbg, exc):
        dbg.del_bp(self)
        dbg.on_single_step(exc) # Trigger single step processing

class CodeTesteur(dbg.Debugger):
    def __init__(self, process, code, register_start={}, steps=False):
        super(CodeTesteur, self).__init__(process)

        self.initial_code = code
        code += "\xcc"

        self.code_addr = self.write_code_in_target(process, code)
        register_start["pc"] = self.code_addr
        self.thread_exec = process.threads[0]
        self.context_exec = self.thread_exec.context
        self.setup_target_context(self.context_exec, register_start)
        print("Startup context is:")
        self.context_exec.dump()
        print(self.context_exec.EEFlags)
        self.thread_exec.suspend()
        self.thread_exec.set_context(self.context_exec)
        self.thread_exec.resume()
        self.init_breakpoint = False
        # Test code
        if steps:
            self.steps = True
            self.add_bp(StartStepBP(self.code_addr))
            self.last_context = self.context_exec
            self.last_code = self.initial_code

    def write_code_in_target(self, process, code):
        addr = process.virtual_alloc(len(code))
        process.write_memory(addr, code)
        return addr

    def setup_target_context(self, ctx, register_start):
        for name, value in register_start.items():
            if not hasattr(ctx, name):
                raise ValueError("Unknown register to setup <{0}>".format(name))
            setattr(ctx, name, value)

    def on_single_step(self, x):
        print("* New step !")
        # print("EIP = {0:#x}".format(self.current_thread.context.pc))
        self.report_ctx_diff(self.last_context, self.current_thread.context)
        self.last_context = self.current_thread.context
        self.last_code = self.report_code_diff(self.last_code)
        self.single_step()
        print ("")

    def on_exception(self, x):
        exc_code = x.ExceptionRecord.ExceptionCode
        exc_addr = x.ExceptionRecord.ExceptionAddress
        if not self.init_breakpoint and exc_code == EXCEPTION_BREAKPOINT:
            self.init_breakpoint = True
            return
        ctx = self.current_thread.context
        print("==Post-exec context==")
        ctx.dump()
        print(ctx.EEFlags)
        if exc_code == EXCEPTION_BREAKPOINT and exc_addr == self.context_exec.pc + len(self.initial_code):
            print("<Normal terminaison>")
        else:
            print("<{0}> at <{1:#x}>".format(exc_code, exc_addr))
            if exc_code == EXCEPTION_ACCESS_VIOLATION:
                exc_infos = x.ExceptionRecord.ExceptionInformation
                read_write = "write" if exc_infos[0] else "read"
                target_addr = exc_infos[1]
                print("   * Access of type <{0}> at address <{1:#x}>".format(read_write, target_addr))
        self.report_ctx_diff(self.context_exec, ctx)
        self.report_code_diff(self.initial_code)
        self.current_process.exit()
        return

    def report_ctx_diff(self, start, now):
        print("== DIFF ==")
        for name, start_value in start.regs():
            now_value = getattr(now, name)
            if start_value != now_value:
                diff = now_value - start_value
                print("{0}: {1:#x} -> {2:#x} ({3:+#x})".format(name, start_value, now_value, diff))
        if start.sp > now.sp:
            print("Negative Stack: dumping:")
            data = self.current_process.read_memory(now.sp, start.sp - now.sp)
            print(hexdump(data, start.sp))

    def report_code_diff(self, initial_code):
        code_size = len(initial_code)
        final_code = self.current_process.read_memory(self.code_addr, code_size)
        if final_code == initial_code:
            return initial_code
        print("== Executable code DIFF == ")
        print("Before:")
        print(hexdump(initial_code, self.code_addr))
        print("After:")
        print(hexdump(final_code, self.code_addr))
        return final_code

def test_code_x86(code, regs=None, raw=False, steps=False, **kwargs):
    print("Testing x86 code")
    process = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS)
    if raw:
        code = code.replace(" ", "").decode('hex')
    else:
        code = x86.assemble(code)

    start_register = {}
    if regs:
        for name_value in regs.split(";"):
            name, value = name_value.split("=")
            name = name.strip().capitalize()
            if name == "Eflags":
                name = "EFlags"
            value = int(value.strip(), 0)
            start_register[name] = value


    x = CodeTesteur(process, code, start_register, steps)
    x.loop()

def test_code_x64(code, regs=None, raw=False, **kwargs):
    print("Testing x64 code")
    if windows.current_process.bitness == 32:
        raise ValueError("Cannot debug a 64b process from 32b python")
    process = windows.test.pop_proc_64(dwCreationFlags=DEBUG_PROCESS)
    if raw:
        code = code.replace(" ", "").decode('hex')
    else:
        code = x64.assemble(code)

    start_register = {}
    if regs:
        for name_value in regs.split(";"):
            name, value = name_value.split("=")
            name = name.strip().capitalize()
            if name == "Eflags":
                name = "EFlags"
            value = int(value.strip(), 0)
            start_register[name] = value

    x = CodeTesteur(process, code, start_register)
    x.loop()


parser = argparse.ArgumentParser(prog=__file__)

parser.add_argument('--x64', action='store_const', dest="func", const=test_code_x64, default=test_code_x86, help='Code is x64')
parser.add_argument('--raw', action='store_true', help='argument is raw assembled code (in hex)')
parser.add_argument('--steps', action='store_true', help='Get all step info')
parser.add_argument('code', help='The code to execute')
parser.add_argument('regs', nargs="?", help='The default values of the registers')

res = parser.parse_args()

res.func(**res.__dict__)

Ouput

(cmd) python.exe test_code.py "mov eax, 0x42424242" "eax=0x11223344"
Testing x86 code
Startup context is:
Eip -> 0x3f0000L
Esp -> 0x3bfae4L
Eax -> 0x11223344L
Ebx -> 0x5a6000L
Ecx -> 0x0L
Edx -> 0x0L
Ebp -> 0x0L
Edi -> 0x0L
Esi -> 0x0L
EFlags -> 0x202L
EEflags(0x202L:IF)
==Post-exec context==
Eip -> 0x3f0007L
Esp -> 0x3bfae4L
Eax -> 0x42424242L
Ebx -> 0x5a6000L
Ecx -> 0x0L
Edx -> 0x0L
Ebp -> 0x0L
Edi -> 0x0L
Esi -> 0x0L
EFlags -> 0x202L
EEflags(0x202L:IF)
<Normal terminaison>
==DIFF==
Eip: 0x3f0000 -> 0x3f0007 (+0x7)
Eax: 0x11223344 -> 0x42424242 (+0x31200efe)


(cmd) python64 test_code.py --x64 "mov r15, 0x11223344; push r14; call r15" "rcx=1; r14=0x4242424243434343"
Testing x64 code
Startup context is:
Rip -> 0x205a1d60000L
Rsp -> 0xe24a88fa88L
Rax -> 0x0L
Rbx -> 0x0L
Rcx -> 0x1L
Rdx -> 0xe24aaf9000L
Rbp -> 0x0L
Rdi -> 0x0L
Rsi -> 0x0L
R8 -> 0x0L
R9 -> 0x0L
R10 -> 0x0L
R11 -> 0x0L
R12 -> 0x0L
R13 -> 0x0L
R14 -> 0x4242424243434343L
R15 -> 0x0L
EFlags -> 0x200L
EEflags(0x200L:IF)
==Post-exec context==
Rip -> 0x11223344L
Rsp -> 0xe24a88fa78L
Rax -> 0x0L
Rbx -> 0x0L
Rcx -> 0x1L
Rdx -> 0xe24aaf9000L
Rbp -> 0x0L
Rdi -> 0x0L
Rsi -> 0x0L
R8 -> 0x0L
R9 -> 0x0L
R10 -> 0x0L
R11 -> 0x0L
R12 -> 0x0L
R13 -> 0x0L
R14 -> 0x4242424243434343L
R15 -> 0x11223344L
EFlags -> 0x10202L
EEflags(0x10202L:IF|RF)
<EXCEPTION_ACCESS_VIOLATION(0xc0000005L)> at <0x11223344>
==DIFF==
Rip: 0x205a1d60000 -> 0x11223344 (-0x20590b3ccbc)
Rsp: 0xe24a88fa88 -> 0xe24a88fa78 (-0x10)
R15: 0x0 -> 0x11223344 (+0x11223344)
EFlags: 0x200 -> 0x10202 (+0x10002)
Negative Stack: dumping:
E24A88FA88 0C 00 D6 A1 05 02 00 00  43 43 43 43 42 42 42 42 ........CCCCBBBB

18.13.2. SymbolDebugger

import argparse
import os

import windows
import windows.debug
import windows.test


parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)')
args = parser.parse_args()
print(args)

if args.dbghelp:
    symbols.set_dbghelp_path(args.dbghelp)
else:
    if "PFW_DBGHELP_PATH" not in os.environ:
        print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail")


class MyInfoBP(windows.debug.Breakpoint):
    COUNT = 0
    def trigger(self, dbg, exc):
        cursym = dbg.current_resolver[exc.ExceptionRecord.ExceptionAddress]
        print("Breakpoint triggered at: {0}".format(cursym))
        print(repr(cursym))
        MyInfoBP.COUNT += 1
        if MyInfoBP.COUNT == 4:
            print("Quitting")
            dbg.current_process.exit()
        print("")

dbg = windows.debug.SymbolDebugger.debug(b"c:\\windows\\system32\\notepad.exe")
dbg.add_bp(MyInfoBP("kernelbase!CreateFileInternal+2"))
dbg.add_bp(MyInfoBP("ntdll!LdrpInitializeProcess"))
dbg.loop()

Output

(cmd) python debug\symbol_debugger.py
Namespace(dbghelp=None)
Breakpoint triggered at: ntdll!LdrpInitializeProcess
<SymbolInfoA name="LdrpInitializeProcess" start=0x77c676c0 tag=SymTagPublicSymbol>

Breakpoint triggered at: KERNELBASE!CreateFileInternal+0x2
<SymbolInfoA name="CreateFileInternal" start=0x75be2120 displacement=0x2 tag=SymTagFunction>

Breakpoint triggered at: KERNELBASE!CreateFileInternal+0x2
<SymbolInfoA name="CreateFileInternal" start=0x75be2120 displacement=0x2 tag=SymTagFunction>

Breakpoint triggered at: KERNELBASE!CreateFileInternal+0x2
<SymbolInfoA name="CreateFileInternal" start=0x75be2120 displacement=0x2 tag=SymTagFunction>
Quitting

18.13.3. LocalDebugger

18.13.3.1. In current process

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.debug
from windows.generated_def.winstructs import *
import windows.native_exec.simple_x86 as x86

class SingleSteppingDebugger(windows.debug.LocalDebugger):
    SINGLE_STEP_COUNT = 4
    def on_exception(self, exc):
        code = self.get_exception_code()
        context = self.get_exception_context()
        print("EXCEPTION !!!! Got a {0!r} at 0x{1:x}".format(code, context.pc))
        self.SINGLE_STEP_COUNT -= 1
        if self.SINGLE_STEP_COUNT:
            return self.single_step()
        return EXCEPTION_CONTINUE_EXECUTION

class RewriteBreakpoint(windows.debug.HXBreakpoint):
    def trigger(self, dbg, exc):
        context = dbg.get_exception_context()
        print("GOT AN HXBP at 0x{0:x}".format(context.pc))
        # Rewrite the infinite loop with 2 nop
        windows.current_process.write_memory(self.addr, b"\x90\x90")
        # Ask for a single stepping
        return dbg.single_step()


d = SingleSteppingDebugger()
# Infinite loop + nop + ret
code = x86.assemble("label :begin; jmp :begin; nop; ret")
func = windows.native_exec.create_function(code, [PVOID])
print("Code addr = 0x{0:x}".format(func.code_addr))
# Create a thread that will infinite loop
t = windows.current_process.create_thread(func.code_addr, 0)
# Add a breakpoint on the infitine loop
d.add_bp(RewriteBreakpoint(func.code_addr))
t.wait()
print("Done!")


Output

(cmd) python debug\local_debugger.py
Code addr = 0x5450002
GOT AN HXBP at 0x5450002
EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004L) at 0x5450003
EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004L) at 0x5450004
EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004L) at 0x5450005
EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004L) at 0x76948654
Done!

18.13.3.2. In remote process

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import ctypes
import windows
import windows.test

from windows.generated_def.winstructs import *

remote_code = """
import windows
from windows.generated_def.winstructs import *

windows.utils.create_console()

class YOLOHXBP(windows.debug.HXBreakpoint):
    def trigger(self, dbg, exc):
        p = windows.current_process
        arg_pos = 2
        context = dbg.get_exception_context()
        esp = context.Esp
        unicode_string_addr = p.read_ptr(esp + (arg_pos + 1) * 4)
        wstring_addr = p.read_ptr(unicode_string_addr + 4)
        dll_loaded = p.read_wstring(wstring_addr)
        print("I AM LOADING <{0}>".format(dll_loaded))

d = windows.debug.LocalDebugger()

exp = windows.current_process.peb.modules[1].pe.exports
#windows.utils.FixedInteractiveConsole(locals()).interact()
ldr = exp["LdrLoadDll"]
d.add_bp(YOLOHXBP(ldr))

"""

c = windows.test.pop_proc_32(dwCreationFlags=CREATE_SUSPENDED)
c.execute_python(remote_code)
c.threads[0].resume()

import time
time.sleep(2)
c.exit()

Ouput:

(cmd λ) python.exe debug\local_debugger_remote_process.py
(In another console)
I AM LOADING <C:\Windows\system32\uxtheme.dll>
I AM LOADING <C:\Windows\system32\uxtheme.dll>
I AM LOADING <C:\Windows\system32\uxtheme.dll>
I AM LOADING <C:\Windows\system32\uxtheme.dll>
I AM LOADING <kernel32.dll>
I AM LOADING <C:\Windows\WinSxS\x86_microsoft.windows.gdiplus_6595b64144ccf1df_1.1.9600.17415_none_dad8722c5bcc2d8f\gdiplus.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32.dll>
I AM LOADING <comctl32>
I AM LOADING <C:\Windows\SysWOW64\oleacc.dll>
I AM LOADING <OLEAUT32.DLL>
I AM LOADING <C:\Windows\system32\ole32.dll>
I AM LOADING <C:\Windows\system32\MSCTF.dll>
I AM LOADING <C:\Windows\SysWOW64\msxml6.dll>
I AM LOADING <C:\Windows\system32\shell32.dll>
I AM LOADING <C:\Windows\SYSTEM32\WINMM.dll>
I AM LOADING <C:\Windows\system32\ole32.dll>

18.14. Symbols

18.14.1. VirtualSymbolHandler

import os
import windows
import windows.generated_def as gdef
from windows.debug import symbols
import argparse


parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)')
args = parser.parse_args()
print(args)

if args.dbghelp:
    symbols.set_dbghelp_path(args.dbghelp)
else:
    if "PFW_DBGHELP_PATH" not in os.environ:
        print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail")


symbols.engine.options = 0 # Disable defered load
sh = symbols.VirtualSymbolHandler()

ntmod = sh.load_file(r"c:\windows\system32\ntdll.dll", addr=0x420000)

print("Ntdll module is: {0}".format(ntmod))
print("  * name = {0}".format(ntmod.name))
print("  * addr = {0:#x}".format(ntmod.addr))
print("  * path = {0:}".format(ntmod.path))
print("  * type = {0:}".format(ntmod.type))
print("  * pdb = {0:}".format(ntmod.pdb))

print("")
TEST_FUNCTION = "LdrLoadDll"
print("Resolving function <{0}>".format(TEST_FUNCTION))
loaddll = sh["ntdll!" + TEST_FUNCTION]
print("Symbol found !")
print("  * __repr__: {0!r}".format(loaddll))
print("  * __str__: {0}".format(loaddll))
print("  * addr: {0:#x}".format(loaddll.addr))
print("  * name: {0}".format(loaddll.name))
print("  * fullname: {0}".format(loaddll.fullname))
print("  * module: {0}".format(loaddll.module))

print("")
print("Loading kernelbase")
kbasemod = sh.load_file(r"c:\windows\system32\kernelbase.dll", addr=0x1230000)
print("Loaded modules are: {0}".format(sh.modules))
LOOKUP_ADDR = 0x1231242
print("Looking up address: {0:#x}".format(LOOKUP_ADDR))
lookupsym = sh[LOOKUP_ADDR]
print("Symbol resolved !")
print("  * __repr__: {0!r}".format(lookupsym))
print("  * __str__: {0}".format(lookupsym))
print("  * start: {0:#x}".format(lookupsym.start))
print("  * addr: {0:#x}".format(lookupsym.addr))
print("  * displacement: {0:#x}".format(lookupsym.displacement))
print("  * name: {0}".format(lookupsym.name))
print("  * fullname: {0}".format(lookupsym.fullname))
print("  * module: {0}".format(lookupsym.module))

Output

(cmd) python debug\symbols\virtsymdemo.py
Namespace(dbghelp=None)
Ntdll module is: <SymbolModule name="ntdll" type=SymPdb pdb="wntdll.pdb" addr=0x420000>
  * name = ntdll
  * addr = 0x420000
  * path = c:\windows\system32\ntdll.dll
  * type = <SYM_TYPE SymPdb(0x3L)>
  * pdb = d:\symbols\wntdll.pdb\3D038F31BBBF51C701937460DBAB1F531\wntdll.pdb

Resolving function <LdrLoadDll>
Symbol found !
  * __repr__: <SymbolInfoA name="LdrLoadDll" start=0x464d30 tag=SymTagFunction>
  * __str__: ntdll!LdrLoadDll
  * addr: 0x464d30
  * name: LdrLoadDll
  * fullname: ntdll!LdrLoadDll
  * module: <SymbolModule name="ntdll" type=SymPdb pdb="wntdll.pdb" addr=0x420000>

Loading kernelbase
Loaded modules are: [<SymbolModule name="ntdll" type=SymPdb pdb="wntdll.pdb" addr=0x420000>, <SymbolModule name="kernelbase" type=SymPdb pdb="wkernelbase.pdb" addr=0x1230000>]
Looking up address: 0x1231242
Symbol resolved !
  * __repr__: <SymbolInfoA name="__load_config_used" start=0x1231230 displacement=0x12 tag=SymTagPublicSymbol>
  * __str__: kernelbase!__load_config_used+0x12
  * start: 0x1231230
  * addr: 0x1231242
  * displacement: 0x12
  * name: __load_config_used
  * fullname: kernelbase!__load_config_used+0x12
  * module: <SymbolModule name="kernelbase" type=SymPdb pdb="wkernelbase.pdb" addr=0x1230000>

18.14.2. ProcessSymbolHandler

import os
import argparse

import windows
import windows.test
import windows.generated_def as gdef
from windows.debug import symbols


parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)')
args = parser.parse_args()
print(args)

if args.dbghelp:
    symbols.set_dbghelp_path(args.dbghelp)
else:
    if "PFW_DBGHELP_PATH" not in os.environ:
        print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail")


if windows.current_process.bitness == 32:
    target = windows.test.pop_proc_32()
else:
    target = windows.test.pop_proc_64()

print("Target is {0}".format(target))
sh = symbols.ProcessSymbolHandler(target)
import time;time.sleep(0.1) # Just wait for the process initialisation
sh.refresh() # Refresh symbol list (Only meaningful for ProcessSymbolHandler)

print("Some loaded modules are:".format())
for sm in sh.modules[:3]:
    print(" * {0}".format(sm))

createserv = sh["advapi32!CreateServiceEx"]

print("")
TEST_FUNCTION = "advapi32!CreateServiceEx"
print("Resolving function <{0}>".format(TEST_FUNCTION))
createserv = sh[TEST_FUNCTION]
print("Symbol found !")
print("  * __repr__: {0!r}".format(createserv))
print("  * __str__: {0}".format(createserv))
print("  * addr: {0:#x}".format(createserv.addr))
print("  * name: {0}".format(createserv.name))
print("  * fullname: {0}".format(createserv.fullname))
print("  * module: {0}".format(createserv.module))

target.exit()

Output

(cmd) python debug\symbols\processsymdemo.py
Namespace(dbghelp=None)
Target is <WinProcess "notepad.exe" pid 14280 at 0x4e177b0>
Some loaded modules are:
 * <SymbolModule name="notepad" type=SymDeferred pdb="" addr=0xc10000>
 * <SymbolModule name="ntdll" type=SymDeferred pdb="" addr=0x77bc0000>
 * <SymbolModule name="KERNEL32" type=SymDeferred pdb="" addr=0x77480000>

Resolving function <advapi32!CreateServiceEx>
Symbol found !
  * __repr__: <SymbolInfoA name="CreateServiceEx" start=0x764647b0 tag=SymTagPublicSymbol>
  * __str__: advapi32!CreateServiceEx
  * addr: 0x764647b0
  * name: CreateServiceEx
  * fullname: advapi32!CreateServiceEx
  * module: <SymbolModule name="advapi32" type=SymPdb pdb="advapi32.pdb" addr=0x76440000>

18.15. WMI

18.15.1. WMI requests

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows

print("WMI requester is {0}".format(windows.system.wmi))
print("Selecting * from 'Win32_Process'")
result = windows.system.wmi.select("Win32_Process")

print("They are <{0}> processes".format(len(result)))
print("Looking for ourself via pid")
us = [p for p in result if int(p["ProcessId"]) == windows.current_process.pid][0]

print("Some info about our process:")
print("    * {0} -> {1}".format("Name", us["Name"]))
print("    * {0} -> {1}".format("ProcessId", us["ProcessId"]))
print("    * {0} -> {1}".format("OSName", us["OSName"]))
print("    * {0} -> {1}".format("UserModeTime", us["UserModeTime"]))
print("    * {0} -> {1}".format("WindowsVersion", us["WindowsVersion"]))
print("    * {0} -> {1}".format("CommandLine", us["CommandLine"]))

print("<Select Caption,FileSystem,FreeSpace from Win32_LogicalDisk>:")
for vol in windows.system.wmi.query("select Caption,FileSystem,FreeSpace from Win32_LogicalDisk"):
    # Filter out system-properties for the sample
    print("    * " + str({k:v for k,v in vol.items() if not k.startswith("_")}))

print("\n ==== Advanced use ====")
print("Listing some namespaces:")
for namespace in [ns for ns in windows.system.wmi.namespaces if "2" in ns]:
    print("    * {0}".format(namespace))

security2 = windows.system.wmi["root\\SecurityCenter2"]
print("Querying non-default namespace: {0}".format(security2))
print("Listing some available classes:")
for clsname in [x for x in security2.classes if x["__CLASS"].endswith("Product")]:
    print("    * {0}".format(clsname))

print("Listing <AntiVirusProduct>:")
for av in security2.select("AntiVirusProduct"):
    print("    * {0}".format(av["displayName"]))



Output

(cmd) python wmi\wmi_request.py
WMI requester is <windows.winobject.wmi.WmiManager object at 0x04258918>
Selecting * from 'Win32_Process'
They are <308> processes
Looking for ourself via pid
Some info about our process:
    * Name -> python.exe
    * ProcessId -> 27144
    * OSName -> Microsoft Windows 10 Pro|C:\WINDOWS|\Device\Harddisk0\Partition2
    * UserModeTime -> 2812500
    * WindowsVersion -> 10.0.17134
    * CommandLine -> C:\Python27\python.exe  wmi\wmi_request.py
<Select Caption,FileSystem,FreeSpace from Win32_LogicalDisk>:
    * {u'Caption': u'B:', u'FreeSpace': None, u'FileSystem': None}
    * {u'Caption': u'C:', u'FreeSpace': u'5701517312', u'FileSystem': u'NTFS'}
    * {u'Caption': u'D:', u'FreeSpace': u'47657324544', u'FileSystem': u'NTFS'}
    * {u'Caption': u'E:', u'FreeSpace': u'89507512320', u'FileSystem': u'NTFS'}

 ==== Advanced use ====
Listing some namespaces:
    * CIMV2
    * SecurityCenter2
    * StandardCimv2
Querying non-default namespace: <WmiNamespace "root\SecurityCenter2">
Listing some available classes:
    * <WmiObject class "FirewallProduct">
    * <WmiObject class "AntiVirusProduct">
    * <WmiObject class "AntiSpywareProduct">
Listing <AntiVirusProduct>:
    * Windows Defender

18.15.2. WMI Create Process

import time
import windows


wmispace = windows.system.wmi["root\\cimv2"]
print("WMI namespace is <{0}>".format(wmispace))
proc_class = wmispace.get_object("Win32_process")
print("Process class is {0}".format(proc_class))

inparam_cls = proc_class.get_method("Create").inparam
print("Method Create InParams is <{0}>".format(inparam_cls))
print("Method Create InParams properties are <{0}>".format(inparam_cls.properties))
print("Creating instance of inparam")

inparam = inparam_cls()
print("InParam instance is <{0}>".format(inparam))
print("Setting <CommandLine>")
inparam["CommandLine"] = r"c:\windows\system32\notepad.exe"

print("Executing method")
# This API may change for something that better wraps cls/object/Parameters handling
outparam = wmispace.exec_method(proc_class, "Create", inparam)

print("OutParams is {0}".format(outparam))
print("Out params values are: {0}".format(outparam.properties))
target = windows.WinProcess(pid=int(outparam["ProcessId"]))
print("Created process is {0}".format(target))
print("Waiting 1s")
time.sleep(1)
print("Killing the process")
target.exit(0)



Output

(cmd) python wmi\create_process.py
WMI namespace is <<WmiNamespace "root\cimv2">>
Process class is <WmiObject class "Win32_Process">
Method Create InParams is <<WmiObject class "__PARAMETERS">>
Method Create InParams properties are <[u'CommandLine', u'CurrentDirectory', u'ProcessStartupInformation']>
Creating instance of inparam
InParam instance is <<WmiObject instance of "__PARAMETERS">>
Setting <CommandLine>
Executing method
OutParams is <WmiObject instance of "__PARAMETERS">
Out params values are: [u'ProcessId', u'ReturnValue']
Created process is <WinProcess "notepad.exe" pid 24036 at 0x3d04390>
Waiting 1s
Killing the process

18.16. windows.com

18.16.1. INetFwPolicy2

import sys
import os.path
import pprint
sys.path.append(os.path.abspath(__file__ + "\..\.."))

import windows
import windows.generated_def as gdef
from windows.generated_def import interfaces

# This code is a simple version of the firewall code in windows.winoject.network
print("Initialisation of COM")
windows.com.init()
print("Creating INetFwPolicy2 variable")
firewall = interfaces.INetFwPolicy2()
print("{0} (value = {1})".format(firewall, firewall.value))
print("")

print("Generating CLSID")
NetFwPolicy2CLSID = windows.com.IID.from_string("E2B3C97F-6AE1-41AC-817A-F6F92166D7DD")
print(NetFwPolicy2CLSID)
print("")

print("Creating COM instance")
windows.com.create_instance(NetFwPolicy2CLSID, firewall)
print("{0} (value = 0x{1:0})".format(firewall, firewall.value))
print("")

print("Checking for enabled profiles")
for profile in [gdef.NET_FW_PROFILE2_DOMAIN, gdef.NET_FW_PROFILE2_PRIVATE, gdef.NET_FW_PROFILE2_PUBLIC]:
    enabled = gdef.VARIANT_BOOL()
    firewall.get_FirewallEnabled(profile, enabled)
    print("   * {0} -> {1}".format(profile, enabled.value))

Output

(cmd) python com\com_inetfwpolicy2.py
Initialisation of COM
Creating INetFwPolicy2 variable
<INetFwPolicy2 object at 0x060677B0> (value = None)

Generating CLSID
<IID "E2B3C97F-6AE1-41AC-817A-F6F92166D7DD">

Creating COM instance
<INetFwPolicy2 object at 0x060677B0> (value = 0x85678080)

Checking for enabled profiles
   * NET_FW_PROFILE2_DOMAIN(0x1L) -> True
   * NET_FW_PROFILE2_PRIVATE(0x2L) -> True
   * NET_FW_PROFILE2_PUBLIC(0x4L) -> True

18.16.2. ICallInterceptor

import windows
import windows.generated_def as gdef
from windows import winproxy

# POC of ICallInterceptor
# Based on works by Pavel Yosifovich
# http://blogs.microsoft.co.il/pavely/2018/02/28/intercepting-com-objects-with-cogetinterceptor/

windows.com.init()

# Create an interceptor for the firewall (INetFwPolicy2)
interceptor = gdef.ICallInterceptor()
winproxy.CoGetInterceptor(gdef.INetFwPolicy2.IID, None, interceptor.IID, interceptor)

# The PythonForWindows firewall object is a real/valid INetFwPolicy2
# used for demos of ICallFrameEvents.Invoke
real_firewall = windows.system.network.firewall

# Custom Python ICallFrameEvents implementation
class MySink(windows.com.COMImplementation):
    IMPLEMENT = gdef.ICallFrameEvents

    def OnCall(self, this, frame):
        ifname = gdef.PWSTR()
        methodname = gdef.PWSTR()
        print("Hello from python sink !")
        frame.GetNames(ifname, methodname)
        print("Catching call to <{0}.{1}>".format(ifname.value, methodname.value))
        param0info = gdef.CALLFRAMEPARAMINFO()
        param0 = windows.com.Variant()
        frame.GetParamInfo(0, param0info)
        frame.GetParam(0, param0)
        print("Info about parameters 0:")
        windows.utils.sprint(param0info, name=" * param0info")
        print("param0 value = {0}".format(param0.aslong))
        frame.Invoke(real_firewall)
        frame.SetReturnValue(1234)
        print("Leaving the sink !")
        return 0

# Create and register our ICallFrameEvents sink
xsink = MySink()
interceptor.RegisterSink(xsink)
# Create the INetFwPolicy2 interceptor interface
fakefirewall = gdef.INetFwPolicy2()
interceptor.QueryInterface(fakefirewall.IID, fakefirewall)

# Calling one of the INetFwPolicy2 function for testing
# Testing on https://msdn.microsoft.com/en-us/library/windows/desktop/aa365316(v=vs.85).aspx
enabled = gdef.VARIANT_BOOL()
res = fakefirewall.get_FirewallEnabled(2, enabled)
print("return value = {0}".format(res))
print("firewall enabled = {0}".format(enabled))

# Test a function taking a POINTER(ICallFrameEvents) (PTR to interface)
print("Testing a function taking a PTR to a COM interface")
sink2 = gdef.ICallFrameEvents()
print("Before call: {0}".format((sink2, sink2.value)))
interceptor.GetRegisteredSink(sink2)
print("After call: {0}".format((sink2, sink2.value)))


# (cmd) python samples\com\icallinterceptor.py
# Hello from python sink !
# Catching call to <INetFwPolicy2.FirewallEnabled>
# Info about parameters 0:
#  * param0info.fIn -> 0x1
#  * param0info.fOut -> 0x0
#  * param0info.stackOffset -> 0x4L
#  * param0info.cbParam -> 0x4L
# param0 value = 2
# Leaving the sink !
# return value = 1234
# firewall enabled = VARIANT_BOOL(True)
# Testing a function taking a PTR to a COM interface
# Before call: (<ICallFrameEvents object at 0x066EF3F0>, None)
# After call: (<ICallFrameEvents object at 0x066EF3F0>, 107934504)

Output

(cmd) python com\icallinterceptor.py
Hello from python sink !
Catching call to <INetFwPolicy2.FirewallEnabled>
Info about parameters 0:
 * param0info.fIn -> 0x1
 * param0info.fOut -> 0x0
 * param0info.stackOffset -> 0x4L
 * param0info.cbParam -> 0x4L
param0 value = 2
Leaving the sink !
return value = 1234
firewall enabled = VARIANT_BOOL(True)
Testing a function taking a PTR to a COM interface
Before call: (<ICallFrameEvents object at 0x066EF3F0>, None)
After call: (<ICallFrameEvents object at 0x066EF3F0>, 107934504)

18.17. windows.crypto

18.17.1. Encryption demo

This sample is a working POC able to generate key-pair, encrypt and decrypt file.

import argparse
import getpass

import windows.crypto as crypto
from windows import winproxy
from windows.generated_def import *

import windows.crypto.generation as gencrypt

# http://stackoverflow.com/questions/1461272/basic-questions-on-microsoft-cryptoapi

def crypt(src, dst, certs, **kwargs):
    """Encrypt the content of 'src' file with the certifacts in 'certs' into 'dst'"""
    # Open every certificates in the certs list
    certlist = [crypto.Certificate.from_file(x) for x in certs]
    # Encrypt the content of 'src' with all the public keys(certs)
    res = crypto.encrypt(certlist, src.read())
    print("Encryption done. Result:")
    print(repr(res))
    # Write the result in 'dst'
    dst.write(res)
    dst.close()
    src.close()

def decrypt(src, pfxfile, password, outfile=None, **kwargs):
    """Decrypt the content of 'src' with the private key in 'pfxfile'. the 'pfxfile' is open using the 'password'"""
    # Open the 'pfx' with the given password
    if password is None:
        password = getpass.getpass()
    pfx = crypto.import_pfx(pfxfile.read(), password)
    # Decrypt the content of the file
    decrypted = crypto.decrypt(pfx, src.read())
    if outfile is None:
        print(u"Result = <{0}>".format(decrypted))
    else:
        with open(outfile, "wb") as f:
            f.write(decrypted)
    return decrypted


def sign(src, pfxfile, password, outfile=None, **kwargs):
    if password is None:
        password = getpass.getpass()
    pfx = crypto.import_pfx(pfxfile.read(), password)
    # Decrypt the content of the file
    certs = pfx.certs
    if len(certs) != 1:
        raise ValueError("PFX containing exactly one certificate expected for signature")
    cert = certs[0]
    signed = crypto.sign(cert, src.read())
    if outfile is None:
        print(u"Result = <{0}>".format(signed))
    else:
        with open(outfile, "wb") as f:
            f.write(signed)
    return signed

PFW_TMP_KEY_CONTAINER = "PythonForWindowsTMPContainer"

def genkeys(common_name, pfxpassword, outname, keysize=2048, **kwargs):
    """Generate a SHA256/RSA key pair. A self-signed certificate with 'common_name' is stored as 'outname'.cer.
    The private key is stored in 'outname'.pfx protected with 'pfxpassword'"""
    cert_store = crypto.CertificateStore.new_in_memory()
    # Create a TMP context that will hold our newly generated key-pair
    with crypto.CryptContext(PFW_TMP_KEY_CONTAINER, None, PROV_RSA_FULL, 0, retrycreate=True) as ctx:
        key = HCRYPTKEY()
        keysize_flags = keysize << 16
        # Generate a key-pair that is exportable
        winproxy.CryptGenKey(ctx, AT_KEYEXCHANGE, CRYPT_EXPORTABLE | keysize_flags, key)
        # It does NOT destroy the key-pair from the container,
        # It only release the key handle
        # https://msdn.microsoft.com/en-us/library/windows/desktop/aa379918(v=vs.85).aspx
        winproxy.CryptDestroyKey(key)

    # Descrption of the key-container that will be used to generate the certificate
    KeyProvInfo = CRYPT_KEY_PROV_INFO()
    KeyProvInfo.pwszContainerName = PFW_TMP_KEY_CONTAINER
    KeyProvInfo.pwszProvName = None
    KeyProvInfo.dwProvType = PROV_RSA_FULL
    KeyProvInfo.dwFlags = 0
    KeyProvInfo.cProvParam = 0
    KeyProvInfo.rgProvParam = None
    #KeyProvInfo.dwKeySpec = AT_SIGNATURE
    KeyProvInfo.dwKeySpec = AT_KEYEXCHANGE

    crypt_algo = CRYPT_ALGORITHM_IDENTIFIER()
    crypt_algo.pszObjId = szOID_RSA_SHA256RSA

    certif_name = "CN={0}".format(common_name)
    # Generate a self-signed certificate based on the given key-container and signature algorithme
    certif = gencrypt.generate_selfsigned_certificate(certif_name, key_info=KeyProvInfo, signature_algo=crypt_algo)
    # Add the newly created certificate to our TMP cert-store
    cert_store.add_certificate(certif)
    # Generate a pfx from the TMP cert-store
    print("Password is <{0}>".format(pfxpassword))
    pfx = gencrypt.generate_pfx(cert_store, pfxpassword)
    if outname is None:
        outname = common_name.lower()

    # Dump the certif (public key) and pfx (public + private keys)
    with open(outname + ".cer", "wb") as f:
        # The encoded certif only contains the public key
        f.write(certif.encoded)
    with open(outname + ".pfx", "wb") as f:
        f.write(pfx)
    print(certif)
    # Destroy the TMP key container
    prov = HCRYPTPROV()
    winproxy.CryptAcquireContextW(prov, PFW_TMP_KEY_CONTAINER, None, PROV_RSA_FULL, CRYPT_DELETEKEYSET)

# Openssl commands to check ce certif/pfx

## Read certificate info (.cer)
### openssl x509 -inform der -in {certif} -text -noout

## Read pfx info (ask to another password to encrypt Private key before print/export)
### openssl pkcs12 -info -in {pfx} -nokeys
## Read pfx info !!!! PRINT PRIVATE KEY !!!!
### openssl pkcs12 -info -in {pfx} -nodes

## Read ASN1 data
### openssl asn1parse -inform DER -in {file}




parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers(description='valid subcommands')

cryptparse = subparsers.add_parser('crypt')
cryptparse.set_defaults(func=crypt)

cryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to encrypt')
cryptparse.add_argument('dst', type=argparse.FileType('wb'), help='The encrypted file')
cryptparse.add_argument('certs', type=str, nargs='+',
                    help='List of certfile used to encrypt the src')

decryptparse = subparsers.add_parser('decrypt')
decryptparse.set_defaults(func=decrypt)
decryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to decrypt')
decryptparse.add_argument('pfxfile', type=argparse.FileType('rb'), help='PFX file to use')
decryptparse.add_argument('--password', help='Password of the PFX')
decryptparse.add_argument('--outfile', default=None, help='The outputfile default is print')


decryptparse = subparsers.add_parser('sign')
decryptparse.set_defaults(func=sign)
decryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to sign')
decryptparse.add_argument('pfxfile', type=argparse.FileType('rb'), help='PFX file to use')
decryptparse.add_argument('--password', help='Password of the PFX')
decryptparse.add_argument('--outfile', default=None, help='The outputfile default is print')


genkeysparse = subparsers.add_parser('genkey', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
genkeysparse.set_defaults(func=genkeys)
genkeysparse.add_argument('common_name', nargs='?', metavar='CommonName', default='DEFAULT', help='the common name of the certificate')
genkeysparse.add_argument('outname', nargs='?',help='The filename base for the generated files')
genkeysparse.add_argument('--pfxpassword', nargs='?', help='Password to protect the PFX')
genkeysparse.add_argument('--keysize', nargs='?', type=lambda x: int(x, 0), default=2048, help='The size of the RSA key')

res = parser.parse_args()
res.func(**res.__dict__)

Ouput:

(cmd λ) python crypto\encryption_demo.py genkey YOLOCERTIF mykey --pfxpassword MYPASSWORD
<CertificatContext "YOLOCERTIF" serial="1b a4 3e 17 f7 ed ec ab 4f f8 11 46 48 e9 29 25">

(cmd λ) ls
mykey.cer  mykey.pfx

(cmd λ) echo|set /p="my secret message" > message.txt

(cmd λ) python crypto\encryption_demo.py crypt message.txt message.crypt mykey.cer
Encryption done. Result:
bytearray(b'0\x82\x01\x19\x06\t*\x86H\x86\xf7\r\x01\x07\x03\xa0\x82\x01\n0\x82\x01\x06\x02\x01\x001\x81\xc30\x81
\xc0\x02\x01\x000)0\x151\x130\x11\x06\x03U\x04\x03\x13\nYOLOCERTIF\x02\x10\x1b\xa4>\x17\xf7\xed\xec\xabO\xf8\x11
FH\xe9)%0\r\x06\t*\x86H\x86\xf7\r\x01\x01\x01\x05\x00\x04\x81\x80V\x89)\xf5\xaaM\x99cEA\x17^\xa2D~\x94\xe3\xf2\x1f
\x05Y\xc2\xbb\xb2\xbbYBpU6\x870\xce\xe7\xd2M{\xbb\xb9K\xa0\xf5\xe5\x93\xca\xedF\x80.x\xdc\xf2\x0c\xa6UO\x01\r\xaf
\xd0Z\xd9\xabnzR\xd4j=\xca\xc2RG\xcd\x11u\x82\x7f\x8c\xd8t\xb9\xf9\xe8%\xfal\xaaHPj;\xecKk]\t%\xfd\x91\xcc\xe0lWf
\xc6\x12x\x1am\xc8\x01t\xac\xa6\xf3#\x02\xd4J \x8eZ\xbb\x10W\xe1 0;\x06\t*\x86H\x86\xf7\r\x01\x07\x010\x14\x06\x08*
\x86H\x86\xf7\r\x03\x07\x04\x08\x14F\x04\xad\xed9\xed<\x80\x18\x80]6\xccTV\xbc\xb8*\x84QY!~\xb3\n\x1aV\xd4\rf\xd1n:')

(cmd λ) python crypto\encryption_demo.py decrypt --password BADPASS message.crypt mykey.pfx
Traceback (most recent call last):
File "..\samples\encryption_demo.py", line 103, in <module>
    res.func(**res.__dict__)
File "..\samples\encryption_demo.py", line 26, in decrypt
    pfx = crypto.import_pfx(pfxfile.read(), password)
File "c:\users\hakril\documents\work\pythonforwindows\windows\crypto\certificate.py", line 153, in import_pfx
    cert_store = winproxy.PFXImportCertStore(pfx, password, flags)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 1065, in PFXImportCertStore
    return PFXImportCertStore.ctypes_function(pPFX, szPassword, dwFlags)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 148, in perform_call
    return self._cprototyped(*args)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 69, in kernel32_error_check
    raise WinproxyError(func_name)
windows.winproxy.error.WinproxyError: PFXImportCertStore: [Error 86] The specified network password is not correct.

(cmd λ) python crypto\encryption_demo.py decrypt --password MYPASSWORD message.crypt mykey.pfx
Result = <my secret message>

18.17.2. Certificate demo

import hashlib
import base64
import windows.crypto

windowscert = b"""-----BEGIN CERTIFICATE-----
MIIFBDCCA+ygAwIBAgITMwAAAQZuwyXEMckYDgAAAAABBjANBgkqhkiG9w0BAQsF
ADCBhDELMAkGA1UEBhMCVVMxEzARBgNVBAgTCldhc2hpbmd0b24xEDAOBgNVBAcT
B1JlZG1vbmQxHjAcBgNVBAoTFU1pY3Jvc29mdCBDb3Jwb3JhdGlvbjEuMCwGA1UE
AxMlTWljcm9zb2Z0IFdpbmRvd3MgUHJvZHVjdGlvbiBQQ0EgMjAxMTAeFw0xNjEw
MTEyMDM5MzFaFw0xODAxMTEyMDM5MzFaMHAxCzAJBgNVBAYTAlVTMRMwEQYDVQQI
EwpXYXNoaW5ndG9uMRAwDgYDVQQHEwdSZWRtb25kMR4wHAYDVQQKExVNaWNyb3Nv
ZnQgQ29ycG9yYXRpb24xGjAYBgNVBAMTEU1pY3Jvc29mdCBXaW5kb3dzMIIBIjAN
BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyWcaCYghNInk3ecpyu2uZ7LCV9QS
7GWYr41ufTkcL66ewHxlAoWjmkKG6W2Bp9BYYQok10iDeDGACE9Vjr6m4Jdh+YuN
RLxMnHC8JTGzk96CzmdBPAuUWdAcHNmTkIWQF6AXzsbBWsekQejvDBygAOCuIYh4
sBgNa5cjTxQc7Iyp9c7RxBmThV5BNFTOnSN6D9N8zU+ENgIZuyHxGvqzRdrhU4G4
Cg/h1CkI4TgeZQZCeUNPnWV6DMuvPCiqGEia5phOJZyENKND0Sx6eQZrYnuz1gMn
YaEnO+ggegtt4pWpqg8Ch0jNrkL1fb3Kzz7E34/K9dcTgaOymfF6qUKabQIDAQAB
o4IBgDCCAXwwHwYDVR0lBBgwFgYKKwYBBAGCNwoDBgYIKwYBBQUHAwMwHQYDVR0O
BBYEFBEciVg/vsVmKtr/hmHt7KM6g8lSMFIGA1UdEQRLMEmkRzBFMQ0wCwYDVQQL
EwRNT1BSMTQwMgYDVQQFEysyMjk4NzkrMTQ3NDQ5YmUtMTVhOC00ZWJhLTkzZjMt
ZDExMGE1YzQ1NTUyMB8GA1UdIwQYMBaAFKkpAjmOFsSXeM2Q+Z5PmuF8Va9TMFQG
A1UdHwRNMEswSaBHoEWGQ2h0dHA6Ly93d3cubWljcm9zb2Z0LmNvbS9wa2lvcHMv
Y3JsL01pY1dpblByb1BDQTIwMTFfMjAxMS0xMC0xOS5jcmwwYQYIKwYBBQUHAQEE
VTBTMFEGCCsGAQUFBzAChkVodHRwOi8vd3d3Lm1pY3Jvc29mdC5jb20vcGtpb3Bz
L2NlcnRzL01pY1dpblByb1BDQTIwMTFfMjAxMS0xMC0xOS5jcnQwDAYDVR0TAQH/
BAIwADANBgkqhkiG9w0BAQsFAAOCAQEAvYC1iawgKoxXAotQXaN0lj1J5VX01/un
7JybZF4sPMG4acoFT85Ao5U6TK5ATPB7yPUulAivp8908DwTGqN+Ju6iH+UkvAb+
a/WcHVEMxQXK5eOFNE6yekUArBGbMNWlTFrpwklmVTnL9R+4aApTEe6ITT1KLDio
5uFw98n5Sqgh+In073czyiTG7MVhBexbOfhgnciXoufeyhwy1pYgjouSqSQZs4bj
cUwQTwGlS2Gd5a+3nblhjn+QhSszIo1K5n1udLPFWtn29BuGlSrtTXPv5OCfNtLO
l2ec6CyjDQc6HcQBNCsbJVq6qGtQbYNE+ih+KhIU4tO5jf25xthf2g==
-----END CERTIFICATE-----"""


raw_cert = base64.decodestring(b"".join(windowscert.split(b"\n")[1:-1]))
cert = windows.crypto.Certificate.from_buffer(raw_cert)

print("Analysing certificate: {0}".format(cert))
print("    * name: <{0}>".format(cert.name))
print("    * issuer: <{0}>".format(cert.issuer))
print("    * raw_serial: <{0}>".format(cert.raw_serial))
print("    * serial: <{0}>".format(cert.serial))
print("    * encoded start: <{0!r}>".format(cert.encoded[:20]))

print("")
chains = cert.chains
print("This certificate has {0} certificate chain(s)".format(len(chains)))
for i, chain in enumerate(chains):
    print("Chain {0}:".format(i))
    for ccert in chain:

        print("  {0}:".format(ccert))
        print("    * issuer: <{0}>".format(ccert.issuer))

print ("")
cert_to_verif = ccert
print("Looking for <{0}> in trusted certificates".format(cert_to_verif.name))
root_store = windows.crypto.CertificateStore.from_system_store("Root")
# This is not the correct way verify the validity of a certificate chain.
# I would say that if the goal is to verify the signature of the certificate: use wintrust.
# (or maybe CertVerifyCertificateChainPolicy : https://msdn.microsoft.com/en-us/library/windows/desktop/aa377163(v=vs.85).aspx)
matchs = [c for c in root_store.certs if c == cert_to_verif]
print("matches = {0}".format(matchs))
if matchs:
    print("Found it !")
else:
    print("Not found :(")

## Extract certificates of a PE file
## This code is not a fixed API and the current state of my tests

print ("")
print ("== PE Analysis ==")
TARGET_FILE = r"C:\windows\system32\ntdll.dll"
print("Target sha1 = <{0}>".format(hashlib.sha1(open(TARGET_FILE, "rb").read()).hexdigest()))
cryptobj = windows.crypto.CryptObject(TARGET_FILE)
print("Analysing {0}".format(cryptobj))
print("File has {0} signer(s):".format(cryptobj.crypt_msg.nb_signer))
for i, signer in enumerate(cryptobj.crypt_msg.signers):
    print("Signer {0}:".format(i))
    print("   * Issuer: {0!r}".format(signer.Issuer.data))
    print("   * HashAlgorithme: {0}".format(signer.HashAlgorithm.pszObjId))
    cert = cryptobj.cert_store.find(signer.Issuer, signer.SerialNumber)
    print("   * Certificate: {0}".format(cert))

print("")
print("File embdeds {0} certificate(s):".format(cryptobj.crypt_msg.nb_cert))
for i, certificate in enumerate(cryptobj.crypt_msg.certs):
    print("   * {0}) {1}".format(i, certificate))

Output

(cmd) python crypto\certificate.py
Analysing certificate: <Certificate "Microsoft Windows" serial="33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06">
    * name: <Microsoft Windows>
    * issuer: <Microsoft Windows Production PCA 2011>
    * raw_serial: <[51, 0, 0, 1, 6, 110, 195, 37, 196, 49, 201, 24, 14, 0, 0, 0, 0, 1, 6]>
    * serial: <33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06>
    * encoded start: <bytearray(b'0\x82\x05\x040\x82\x03\xec\xa0\x03\x02\x01\x02\x02\x133\x00\x00\x01\x06')>

This certificate has 1 certificate chain(s)
Chain 0:
  <Certificate "Microsoft Windows" serial="33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06">:
    * issuer: <Microsoft Windows Production PCA 2011>
  <Certificate "Microsoft Windows Production PCA 2011" serial="61 07 76 56 00 00 00 00 00 08">:
    * issuer: <Microsoft Root Certificate Authority 2010>
  <Certificate "Microsoft Root Certificate Authority 2010" serial="28 cc 3a 25 bf ba 44 ac 44 9a 9b 58 6b 43 39 aa">:
    * issuer: <Microsoft Root Certificate Authority 2010>

Looking for <Microsoft Root Certificate Authority 2010> in trusted certificates
matches = [<Certificate "Microsoft Root Certificate Authority 2010" serial="28 cc 3a 25 bf ba 44 ac 44 9a 9b 58 6b 43 39 aa">]
Found it !

== PE Analysis ==
Target sha1 = <339e4c236e716d4b6f6b10359c425eb3d1478df7>
Analysing <CryptObject "C:\windows\system32\ntdll.dll" content_type=CERT_QUERY_CONTENT_PKCS7_SIGNED_EMBED(0xaL)>
File has 1 signer(s):
Signer 0:
   * Issuer: bytearray(b'0\x81\x841\x0b0\t\x06\x03U\x04\x06\x13\x02US1\x130\x11\x06\x03U\x04\x08\x13\nWashington1\x100\x0e\x06\x03U\x04\x07\x13\x07Redmond1\x1e0\x1c\x06\x03U\x04\n\x13\x15Microsoft Corporation1.0,\x06\x03U\x04\x03\x13%Microsoft Windows Production PCA 2011')
   * HashAlgorithme: 2.16.840.1.101.3.4.2.1
   * Certificate: <Certificate "Microsoft Windows" serial="33 00 00 01 73 30 31 07 26 65 b8 b9 b3 00 00 00 00 01 73">

File embdeds 2 certificate(s):
   * 0) <Certificate "Microsoft Windows" serial="33 00 00 01 73 30 31 07 26 65 b8 b9 b3 00 00 00 00 01 73">
   * 1) <Certificate "Microsoft Windows Production PCA 2011" serial="61 07 76 56 00 00 00 00 00 08">

18.18. windows.alpc

18.18.1. simple alpc communication

import multiprocessing

import windows.alpc
from windows.generated_def import LPC_CONNECTION_REQUEST, LPC_REQUEST

PORT_NAME = r"\RPC Control\PythonForWindowsPORT"


def alpc_server():
    server = windows.alpc.AlpcServer(PORT_NAME) # Create the ALPC Port
    print("[SERV] PORT <{0}> CREATED".format(PORT_NAME))

    msg = server.recv() # Wait for a message
    print("[SERV] Message type = {0:#x}".format(msg.type))
    print("[SERV] Received data: <{0}>".format(msg.data))
    assert msg.type & 0xfff  == LPC_CONNECTION_REQUEST # Check that message is a connection request
    print("[SERV] Connection request")
    server.accept_connection(msg)

    msg = server.recv() # Wait for a real message
    print ""
    print("[SERV] Received message: <{0}>".format(msg.data))
    print("[SERV] Message type = {0:#x}".format(msg.type))
    assert msg.type & 0xfff  == LPC_REQUEST
    # We can reply by two ways:
    #    - Send the same message with modified data
    #    - Recreate a Message and copy the MessageId
    msg.data = "REQUEST '{0}' DONE".format(msg.data)
    server.send(msg)



def alpc_client():
    print("Client pid = {0}".format(windows.current_process.pid))
    # Creation an 'AlpcClient' with a port name will connect to the port with an empty message
    client = windows.alpc.AlpcClient(PORT_NAME)
    print("[CLIENT] Connected: {0}".format(client))
    # Send a message / wait for the response
    response = client.send_receive("Hello world !")
    print("[CLIENT] Response: <{0}>".format(response.data))
    # You can also send message without waiting for a response with 'client.send'


if __name__ == "__main__":
    proc = multiprocessing.Process(target=alpc_server, args=())
    proc.start()
    import time; time.sleep(0.5)
    alpc_client()
    print("BYE")
    proc.terminate()

Output

(cmd) python alpc\simple_alpc.py
[SERV] PORT <\RPC Control\PythonForWindowsPORT> CREATED
Client pid = 15840
[SERV] Message type = 0x300a
[SERV] Received data: <>
[SERV] Connection request
[CLIENT] Connected: <windows.alpc.AlpcClient object at 0x06919290>

[SERV] Received message: <Hello world !>
[SERV] Message type = 0x3001
[CLIENT] Response: <REQUEST 'Hello world !' DONE>
BYE

18.18.2. advanced alpc communication

import sys
import multiprocessing

import windows.alpc
from windows.generated_def import LPC_CONNECTION_REQUEST, LPC_REQUEST
import windows.generated_def as gdef

import ctypes
import tempfile

PORT_NAME = r"\RPC Control\PythonForWindowsPORT_2"
PORT_CONTEXT = 0x11223344


def full_alpc_server():
    print("server pid = {0}".format(windows.current_process.pid))
    server = windows.alpc.AlpcServer(PORT_NAME)
    print("[SERV] PORT <{0}> CREATED".format(PORT_NAME))
    msg = server.recv()
    print("[SERV] == Message received ==")
    if msg.type & 0xfff == LPC_CONNECTION_REQUEST:
        print(" * ALPC connection request: <{0}>".format(msg.data))
        msg.data = "Connection message response"
        server.accept_connection(msg, port_context=PORT_CONTEXT)
    else:
        raise ValueError("Expected connection")

    while True:
        msg = server.recv()
        print("[SERV] == Message received ==")
        # print("       * Data: {0}".format(msg.data))
        # print("[SERV] RECV Message type = {0:#x}".format(msg.type))
        # print("[SERV] RECV Message Valid ATTRS = {0:#x}".format(msg.attributes.ValidAttributes))
        # print("[SERV] RECV Message ATTRS = {0:#x}".format(msg.attributes.AllocatedAttributes))
        if msg.type & 0xfff == LPC_REQUEST:
            print(" * ALPC request: <{0}>".format(msg.data))
            print(" * view_is_valid <{0}>".format(msg.view_is_valid))
            if msg.view_is_valid:
                print("   * message view attribute:")
                windows.utils.print_ctypes_struct(msg.view_attribute, "       - VIEW", hexa=True)
                view_data = windows.current_process.read_string(msg.view_attribute.ViewBase)
                print("   * Reading view content: <{0}>".format(view_data))
                # Needed in Win7 - TODO: why is there a different behavior ?
                msg.attributes.ValidAttributes -= gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE
            print(" * security_is_valid <{0}>".format(msg.security_is_valid))
            print(" * handle_is_valid <{0}>".format(msg.handle_is_valid))
            if msg.handle_is_valid:
                if msg.handle_attribute.Handle:
                    print("   * message handle attribute:")
                    windows.utils.print_ctypes_struct(msg.handle_attribute, "       - HANDLE", hexa=True)
                    if msg.handle_attribute.ObjectType == 1:
                        f = windows.utils.create_file_from_handle(msg.handle_attribute.Handle)
                        print("   - File: {0}".format(f))
                        print("   - content: <{0}>".format(f.read()))
                    else:
                        print("  - unknow object type == {0}".format(msg.handle_attribute.ObjectType))
                msg.attributes.ValidAttributes -= gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE

            print(" * context_is_valid <{0}>".format(msg.context_is_valid))
            if msg.context_is_valid:
                print("   * message context attribute:")
                windows.utils.print_ctypes_struct(msg.context_attribute, "     - CTX", hexa=True)

            if msg.attributes.ValidAttributes & gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE:
                print(" * message token attribute:")
                token_struct = msg.attributes.get_attribute(gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE)
                windows.utils.print_ctypes_struct(token_struct, "   - TOKEN", hexa=True)

            # We can reply by to way:
            #    - Send the same message with modified data
            #    - Recreate a Message and copy the MessageId
            msg.data = "REQUEST '{0}' DONE".format(msg.data)
            sys.stdout.flush()
            server.send(msg)
        else:
            print ValueError("Unexpected message type <{0}>".format(msg.type & 0xfff))


def send_message_with_handle(client):
    print ""
    print("[Client] == Sending a message with a handle ==")

    # Craft a file with some data
    f = tempfile.NamedTemporaryFile()
    f.write("Tempfile data <3")
    f.seek(0)

    # New message with a Handle
    msg = windows.alpc.AlpcMessage()
    msg.attributes.ValidAttributes |= gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE
    msg.handle_attribute.Flags = gdef.ALPC_HANDLEFLG_DUPLICATE_SAME_ACCESS
    msg.handle_attribute.Handle = windows.utils.get_handle_from_file(f)
    msg.handle_attribute.ObjectType = 0
    msg.handle_attribute.DesiredAccess = 0
    msg.data = "some message with a file"
    client.send_receive(msg)

def send_message_with_view(client):
    print ""
    print("[Client] == Sending a message with a view ==")

    # Create View
    section = client.create_port_section(0, 0, 0x4000)
    view = client.map_section(section[0], 0x4000)

    # New message with a View
    msg = windows.alpc.AlpcMessage(0x2000)
    msg.attributes.ValidAttributes |= gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE
    msg.view_attribute.Flags = 0
    msg.view_attribute.ViewBase = view.ViewBase
    msg.view_attribute.SectionHandle = view.SectionHandle
    msg.view_attribute.ViewSize = 0x4000
    msg.data = "some message with a view"
    windows.current_process.write_memory(view.ViewBase, "The content of the view :)\x00")
    client.send_receive(msg)

def alpc_client():
    print("Client pid = {0}".format(windows.current_process.pid))
    client = windows.alpc.AlpcClient()

    # You can create a non-connected AlpcClient and send a custom
    # 'AlpcMessage' for complexe alpc port connection.
    connect_message = windows.alpc.AlpcMessage()
    connect_message.data = "Connection request client message"
    print("[CLIENT] == Connecting to port ==")
    connect_response = client.connect_to_port(PORT_NAME, connect_message)
    print("[CLIENT] Connected with response: <{0}>".format(connect_response.data))

    # AlpcClient send/recv/send_receive methods accept both string or
    # AlpcMessage for complexe message.
    print""
    print("[CLIENT] == Sending a message ==")
    msg = windows.alpc.AlpcMessage()
    msg.data = "Complex Message 1"
    print(" * Sending Message <{0}>".format(msg.data))
    response = client.send_receive(msg)
    print("[CLIENT] Server response: <{0}>".format(response.data))
    print("[CLIENT] RESP Message Valid ATTRS = {0}".format(response.valid_attributes))

    send_message_with_handle(client)
    send_message_with_view(client)
    sys.stdout.flush()


if __name__ == "__main__":
    proc = multiprocessing.Process(target=full_alpc_server, args=())
    proc.start()
    import time; time.sleep(0.5)
    alpc_client()
    import time; time.sleep(0.5)
    print("BYE")
    proc.terminate()

Output

(cmd) python alpc\advanced_alpc.py
server pid = 2300
[SERV] PORT <\RPC Control\PythonForWindowsPORT_2> CREATED
Client pid = 14848
[CLIENT] == Connecting to port ==
[SERV] == Message received ==
 * ALPC connection request: <Connection request client message>
[CLIENT] Connected with response: <Connection message response>

[CLIENT] == Sending a message ==
 * Sending Message <Complex Message 1>
[SERV] == Message received ==
 * ALPC request: <Complex Message 1>
 * view_is_valid <False>
 * security_is_valid <False>
 * handle_is_valid <False>
 * context_is_valid <True>
   * message context attribute:
     - CTX.PortContext -> 0x11223344
     - CTX.MessageContext -> None
     - CTX.Sequence -> 0x1L
     - CTX.MessageId -> 0x0L
     - CTX.CallbackId -> 0x0L
 * message token attribute:
   - TOKEN.TokenId -> 0x4101ef8eL
   - TOKEN.AuthenticationId -> 0x54177L
   - TOKEN.ModifiedId -> 0x4077ab89L
[CLIENT] Server response: <REQUEST 'Complex Message 1' DONE>
[CLIENT] RESP Message Valid ATTRS = [ALPC_MESSAGE_CONTEXT_ATTRIBUTE(0x20000000L)]

[Client] == Sending a message with a handle ==
[SERV] == Message received ==
 * ALPC request: <some message with a file>
 * view_is_valid <False>
 * security_is_valid <False>
 * handle_is_valid <True>
   * message handle attribute:
       - HANDLE.Flags -> 0x0L
       - HANDLE.Handle -> 0x2cc
       - HANDLE.ObjectType -> 0x1L
       - HANDLE.DesiredAccess -> 0x13019fL
   - File: <open file '<fdopen>', mode 'r' at 0x049ECA18>
   - content: <Tempfile data <3>
 * context_is_valid <True>
   * message context attribute:
     - CTX.PortContext -> 0x11223344
     - CTX.MessageContext -> None
     - CTX.Sequence -> 0x2L
     - CTX.MessageId -> 0x0L
     - CTX.CallbackId -> 0x0L
 * message token attribute:
   - TOKEN.TokenId -> 0x4101ef8eL
   - TOKEN.AuthenticationId -> 0x54177L
   - TOKEN.ModifiedId -> 0x4077ab89L

[Client] == Sending a message with a view ==
[SERV] == Message received ==
 * ALPC request: <some message with a view>
 * view_is_valid <True>
   * message view attribute:
       - VIEW.Flags -> 0x0L
       - VIEW.SectionHandle -> None
       - VIEW.ViewBase -> 0x4780000
       - VIEW.ViewSize -> 0x4000
   * Reading view content: <The content of the view :)>
 * security_is_valid <False>
 * handle_is_valid <False>
 * context_is_valid <True>
   * message context attribute:
     - CTX.PortContext -> 0x11223344
     - CTX.MessageContext -> None
     - CTX.Sequence -> 0x3L
     - CTX.MessageId -> 0x0L
     - CTX.CallbackId -> 0x0L
 * message token attribute:
   - TOKEN.TokenId -> 0x4101ef8eL
   - TOKEN.AuthenticationId -> 0x54177L
   - TOKEN.ModifiedId -> 0x4077ab89L
[SERV] == Message received ==
Unexpected message type <12>
[SERV] == Message received ==
Unexpected message type <5>
[SERV] == Message received ==
Unexpected message type <12>
[SERV] == Message received ==
Unexpected message type <12>
BYE

18.19. windows.rpc

18.19.1. Manual UAC

import argparse
import sys

import windows.rpc
import windows.generated_def as gdef
from windows.rpc import ndr

# NDR Descriptions
class NDRPoint(ndr.NdrStructure):
    MEMBERS = [ndr.NdrLong, ndr.NdrLong]

class NdrUACStartupInfo(ndr.NdrStructure):
    MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString),
                ndr.NdrLong,
                ndr.NdrLong,
                ndr.NdrLong,
                ndr.NdrLong,
                ndr.NdrLong,
                ndr.NdrLong,
                ndr.NdrLong,
                ndr.NdrLong,
                ndr.NdrLong,
                NDRPoint]

class RAiLaunchAdminProcessParameters(ndr.NdrParameters):
    MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString),
                ndr.NdrUniquePTR(ndr.NdrWString),
                ndr.NdrLong,
                ndr.NdrLong,
                ndr.NdrWString,
                ndr.NdrWString,
                NdrUACStartupInfo,
                ndr.NdrLong,
                ndr.NdrLong]

class NdrProcessInformation(ndr.NdrParameters):
    MEMBERS = [ndr.NdrLong] * 4

# Parsing args
parser = argparse.ArgumentParser(prog=__file__)
parser.add_argument('--target', default=sys.executable, help='Executable to launch')
parser.add_argument('--cmdline', default="", help='The commandline for the process')
parser.add_argument('--uacflags', type=lambda x: int(x, 0), default=0x11)
parser.add_argument('--creationflags', type=lambda x: int(x, 0), default=gdef.CREATE_UNICODE_ENVIRONMENT)
params = parser.parse_args()
print(params)

# Connecting to RPC Interface.
UAC_UIID = "201ef99a-7fa0-444c-9399-19ba84f12a1a"
client = windows.rpc.find_alpc_endpoint_and_connect(UAC_UIID)
iid = client.bind(UAC_UIID)

# Marshalling parameters.
parameters = RAiLaunchAdminProcessParameters.pack([
    params.target, # Application Path
    params.cmdline, # Commandline
    params.uacflags, # UAC-Request Flag
    params.creationflags, # dwCreationFlags
    "", # StartDirectory
    "WinSta0\\Default", # Station
        # Startup Info
        (None, # Title
        0, # dwX
        0, # dwY
        0, # dwXSize
        0, # dwYSize
        0, # dwXCountChars
        0, # dwYCountChars
        0, # dwFillAttribute
        0, # dwFlags
        5, # wShowWindow
        # Point structure: Use MonitorFromPoint to setup StartupInfo.hStdOutput
        (0, 0)),
    0, # Window-Handle to know if UAC can steal focus
    0xffffffff]) # UAC Timeout

result = client.call(iid, 0, parameters)
stream = ndr.NdrStream(result)

ph, th, pid, tid = NdrProcessInformation.unpack(stream)
return_value = ndr.NdrLong.unpack(stream)
print("Return value = {0:#x}".format(return_value))
target = windows.winobject.process.WinProcess(handle=ph)
print("Created process is {0}".format(target))
print(" * bitness is {0}".format(target.bitness))
print(" * integrity: {0}".format(target.token.integrity))
print(" * elevated: {0}".format(target.token.is_elevated))

Output:

(cmd λ) python rpc\uac.py
Namespace(cmdline='', creationflags=CREATE_UNICODE_ENVIRONMENT(0x400L), target='C:\\Python27\\python.exe', uacflags=17)
# UAC pop - asking to execute python.exe | Clicking Yes
Return value = 0x6
Created process is <WinProcess "python.exe" pid 19304 at 0x455f7d0>
* bitness is 32
* integrity: SECURITY_MANDATORY_HIGH_RID(0x3000L)
* elevated: True

# The new python.exe in another window
>>> windows.current_process.token.integrity
SECURITY_MANDATORY_HIGH_RID(0x3000L)
>>> windows.current_process.token.is_elevated
True

18.19.2. Manual LsarEnumeratePrivileges

import windows.rpc
from windows.rpc import ndr


class PLSAPR_OBJECT_ATTRIBUTES(ndr.NdrStructure):
    MEMBERS = [ndr.NdrLong,
                ndr.NdrUniquePTR(ndr.NdrWString),
                ndr.NdrUniquePTR(ndr.NdrLong), # We dont care of the subtype as we will pass None
                ndr.NdrLong,
                ndr.NdrUniquePTR(ndr.NdrLong), # We dont care of the subtype as we will pass None
                ndr.NdrUniquePTR(ndr.NdrLong)] # We dont care of the subtype as we will pass None

## From: RPCVIEW
# long Proc44_LsarOpenPolicy2(
# 	[in][unique][string] wchar_t* arg_0,
# 	[in]struct Struct_364_t* arg_1,
# 	[in]long arg_2,
# 	[out][context_handle] void** arg_3);

# This function has a [out][context_handle] meaning it return a context_handle
# Context handle are represented by 5 NdrLong where the first one is always 0
# PythonForWindows represent context_handle using NdrContextHandle
class LsarOpenPolicy2Parameter(ndr.NdrParameters):
    MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString),
                PLSAPR_OBJECT_ATTRIBUTES,
                ndr.NdrLong]

## From: RPCVIEW
# long Proc2_LsarEnumeratePrivileges(
# 	[in][context_handle] void* arg_0,
# 	[in][out]long *arg_1,
# 	[out]struct Struct_110_t* arg_2,
# 	[in]long arg_3);

# This function has a [in][context_handle] meaning it expect a context_handle
# We can pass the NdrContextHandle returned by Proc44_LsarOpenPolicy2
class LsarEnumeratePrivilegesParameter(ndr.NdrParameters):
    MEMBERS = [ndr.NdrContextHandle,
                ndr.NdrLong,
                ndr.NdrLong]


class LSAPR_POLICY_PRIVILEGE_DEF(object):
    @classmethod
    def unpack(cls, stream):
        size1 = ndr.NdrShort.unpack(stream)
        ptr = ndr.NdrShort.unpack(stream)
        size2 = ndr.NdrLong.unpack(stream)
        luid = ndr.NdrHyper.unpack(stream)
        return ptr, luid


class LSAPR_PRIVILEGE_ENUM_BUFFER(object):
    @classmethod
    def unpack(cls, stream):
        entries = ndr.NdrLong.unpack(stream)
        array_size = ndr.NdrLong.unpack(stream)
        array_ptr = ndr.NdrLong.unpack(stream)
        # Unpack pointed array
        array_size2 = ndr.NdrLong.unpack(stream)
        assert array_size == array_size2
        x = []
        # unpack each elements LSAPR_POLICY_PRIVILEGE_DEF
        for i in range(array_size2):
            ptr, luid = LSAPR_POLICY_PRIVILEGE_DEF.unpack(stream)
            if ptr:
                x.append(luid)
        # unpack pointed strings
        result = []
        for luid in x:
            name = ndr.NdrWcharConformantVaryingArrays.unpack(stream)
            result.append((luid, name))
        return result


# Actual code

## LSASS alpc endpoints is fixed, no need for the epmapper
client = windows.rpc.RPCClient(r"\RPC Control\lsasspirpc")
## Bind to the desired interface
iid = client.bind('12345778-1234-abcd-ef00-0123456789ab', version=(0,0))

## Craft parameters and call 'LsarOpenPolicy2'
params = LsarOpenPolicy2Parameter.pack([None, (0, None, None, 0, None, None), 0x20000000])
res = client.call(iid, 44, params)
## Unpack the resulting handle
handle = ndr.NdrContextHandle.unpack(ndr.NdrStream(res))

# As context_handle have 4 NdrLong of effective data
# We can represent them as GUID
# NdrContextHandle is just a wrapper packing/unpacking GUID and taking
# care of the leading NdrLong(0) in the actual ndr representation of context_handle
print("Context Handle is: {0}\n".format(handle))

## Craft parameters and call 'LsarEnumeratePrivileges'
x = LsarEnumeratePrivilegesParameter.pack([handle, 0, 10000]);
res = client.call(iid, 2, x)

print("Privileges:")
## Unpack the resulting 'LSAPR_PRIVILEGE_ENUM_BUFFER'
priviledges = LSAPR_PRIVILEGE_ENUM_BUFFER.unpack(ndr.NdrStream(res))
for priv in priviledges:
    print priv

Output

(cmd) python rpc\lsass.py
(2, u'SeCreateTokenPrivilege')
(3, u'SeAssignPrimaryTokenPrivilege')
(4, u'SeLockMemoryPrivilege')
(5, u'SeIncreaseQuotaPrivilege')
(6, u'SeMachineAccountPrivilege')
(7, u'SeTcbPrivilege')
(8, u'SeSecurityPrivilege')
(9, u'SeTakeOwnershipPrivilege')
(10, u'SeLoadDriverPrivilege')
(11, u'SeSystemProfilePrivilege')
(12, u'SeSystemtimePrivilege')
(13, u'SeProfileSingleProcessPrivilege')
(14, u'SeIncreaseBasePriorityPrivilege')
(15, u'SeCreatePagefilePrivilege')
(16, u'SeCreatePermanentPrivilege')
(17, u'SeBackupPrivilege')
(18, u'SeRestorePrivilege')
(19, u'SeShutdownPrivilege')
(20, u'SeDebugPrivilege')
(21, u'SeAuditPrivilege')
(22, u'SeSystemEnvironmentPrivilege')
(23, u'SeChangeNotifyPrivilege')
(24, u'SeRemoteShutdownPrivilege')
(25, u'SeUndockPrivilege')
(26, u'SeSyncAgentPrivilege')
(27, u'SeEnableDelegationPrivilege')
(28, u'SeManageVolumePrivilege')
(29, u'SeImpersonatePrivilege')
(30, u'SeCreateGlobalPrivilege')
(31, u'SeTrustedCredManAccessPrivilege')
(32, u'SeRelabelPrivilege')
(33, u'SeIncreaseWorkingSetPrivilege')
(34, u'SeTimeZonePrivilege')
(35, u'SeCreateSymbolicLinkPrivilege')
(36, u'SeDelegateSessionUserImpersonatePrivilege')

18.20. windows.pipe

18.20.1. Communication with an injected process

import windows
import windows.test

p = windows.test.pop_proc_32()
print("Child is {0}".format(p))

PIPE_NAME = "PFW_Pipe"

rcode = """
import windows

f = open('tst.txt', "w+")
fh = windows.utils.get_handle_from_file(f)
hm = windows.winproxy.CreateFileMappingA(fh, dwMaximumSizeLow=0x1000, lpName=None)
addr = windows.winproxy.MapViewOfFile(hm, dwNumberOfBytesToMap=0x1000)

windows.pipe.send_object("{pipe}", addr)
"""

with windows.pipe.create(PIPE_NAME) as np:
    print("Created pipe is {0}".format(np))
    p.execute_python(rcode.format(pipe=PIPE_NAME))
    print("Receiving object from injected process")
    addr = np.recv()

print("Remote Address = {0:#x}".format(addr))
print("Querying memory in target at <{0:#x}>".format(addr))
print("    * {0}".format(p.query_memory(addr)))
print("Querying mapped file in target at <{0:#x}>".format(addr))
print("    * {0}".format(p.get_mapped_filename(addr)))
p.exit()

Output

(cmd) python pipe\child_send_object.py
Child is <WinProcess "notepad.exe" pid 4316 at 0x672fcf0>
Created pipe is <PipeConnection name="\\.\pipe\PFW_Pipe" server=True>
Receiving object from injected process
Remote Address = 0x97a0000
Querying memory in target at <0x97a0000>
    * <MEMORY_BASIC_INFORMATION32 BaseAddress=0x97a0000 RegionSize=0x001000 State=MEM_COMMIT(0x1000L) Type=MEM_MAPPED(0x40000L) Protect=PAGE_READWRITE(0x4L)>
Querying mapped file in target at <0x97a0000>
    * \Device\HarddiskVolume2\Users\hakril\Documents\projets\PythonForWindows\samples\tst.txt

18.21. windows.security

18.21.1. Security Descriptor

import windows.security

SDDL = "O:BAG:AND:(A;OI;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)(D;CIIO;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)"

sd = windows.security.SecurityDescriptor.from_string(SDDL)
print("Security descriptor is: {0}".format(sd))

print("Owner: {0}".format(sd.owner))
print("  - lookup: {0}".format(windows.utils.lookup_sid(sd.owner)))
print("Group: {0}".format(sd.group))
print("  - lookup: {0}".format(windows.utils.lookup_sid(sd.group)))

dacl = sd.dacl
print("Dacl: {0}".format(dacl))

for i, ace in enumerate(dacl):
    print("")
    print("  ACE [{0}]: {1}".format(i, ace))
    print("    - Header-AceType: {0}".format(ace.Header.AceType))
    print("    - Header-AceFlags: {0}".format(ace.Header.AceFlags))
    print("    - Header-flags: {0}".format(ace.Header.flags))
    print("    - Mask: {0}".format(ace.Mask))
    print("    - mask: {0}".format(ace.mask))
    print("    - Sid:  {0}".format(ace.sid))

Output

(cmd) python security\security_descriptor.py
Security descriptor is: <SecurityDescriptor object at 0x03E151C0>
Owner: S-1-5-32-544
  - lookup: (u'BUILTIN', u'Administrateurs')
Group: S-1-5-7
  - lookup: (u'AUTORITE NT', u'ANONYMOUS LOGON')
Dacl: <Acl count=2>

  ACE [0]: <AccessAllowedACE mask=269353023>
    - Header-AceType: ACCESS_ALLOWED_ACE_TYPE(0x0L)
    - Header-AceFlags: 1
    - Header-flags: [OBJECT_INHERIT_ACE(0x1L)]
    - Mask: 269353023
    - mask: [1L, 2L, 4L, 8L, 16L, 32L, READ_CONTROL(0x20000L), WRITE_DAC(0x40000L), WRITE_OWNER(0x80000L), GENERIC_ALL(0x10000000L)]
    - Sid:  S-1-0-0

  ACE [1]: <AccessDeniedACE mask=269353023>
    - Header-AceType: ACCESS_DENIED_ACE_TYPE(0x1L)
    - Header-AceFlags: 10
    - Header-flags: [CONTAINER_INHERIT_ACE(0x2L), INHERIT_ONLY_ACE(0x8L)]
    - Mask: 269353023
    - mask: [1L, 2L, 4L, 8L, 16L, 32L, READ_CONTROL(0x20000L), WRITE_DAC(0x40000L), WRITE_OWNER(0x80000L), GENERIC_ALL(0x10000000L)]
    - Sid:  S-1-0-0

18.21.2. Query SACL

import sys
import windows.security

TARGET = r"C:\windows\notepad.exe" # On WIN10 (at least) notepad.exe has a AuditACE

if not windows.current_process.token.elevated:
    print(ValueError("This sample should be run as admin to demonstration SACL access"))

print("")
print("[NO-PRIV] Querying <{0}> SecurityDescriptor without SACL".format(TARGET))
sd = windows.security.SecurityDescriptor.from_filename(TARGET)
print("sacl = {0}".format(sd.sacl))

print("")
print("[NO-PRIV] Querying <{0}> SecurityDescriptor with SACL".format(TARGET))
try:
    sd = windows.security.SecurityDescriptor.from_filename(TARGET, query_sacl=True)
    print("sacl = {0}".format(sd.sacl))
except WindowsError as e:
    print(e)

print("")
print("Enabling <SeSecurityPrivilege>")
try:
    windows.current_process.token.enable_privilege("SeSecurityPrivilege")
except ValueError as e:
    print("[ERROR] {0}".format(e))
    exit(1)

print("")
print("[PRIV] Querying <{0}> SecurityDescriptor with SACL".format(TARGET))
sd = windows.security.SecurityDescriptor.from_filename(TARGET, query_sacl=True)
print("sacl = {0}".format(sd.sacl))
print(list(sd.sacl))

Output

(cmd) python security\query_sacl.py
This sample should be run as admin to demonstration SACL access

[NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor without SACL
sacl = <Acl count=0>

[NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL
None: [Error 1314] A required privilege is not held by the client.

Enabling <SeSecurityPrivilege>
[ERROR] <Token TokenId=0xd6db5cc Type=TokenPrimary(0x1L)> has no privilege <SeSecurityPrivilege>


(cmd-admin) python security\query_sacl.py

[NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor without SACL
sacl = <Acl count=0>

[NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL
None: [Error 1314] A required privilege is not held by the client.

Enabling <SeSecurityPrivilege>

[PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL
sacl = <Acl count=1>
[<SystemAuditACE mask=852246>]

18.22. ETW (Event Tracing for Windows)

18.22.1. Trace processing

import ctypes
import struct
import windows
import windows.generated_def as gdef
makeg = gdef.GUID.from_string

# This sample record the ETW event of provider CBB61B6D-A2CF-471A-9A58-A4CD5C08FFBA
# related to the UAC (service AppInfo)
# The ETW session is called MY_UAC_MONITOR

# Is then trigger the UAC and display the retrieved event afterward

def show(event):
    print("{0:#x}: {1}".format(event.EventHeader.TimeStamp, event))
    print("    guid: {0}".format(event.guid))
    print("    id: {0}".format(event.id))
    print("    opcode: {0}".format(event.opcode))
    print("    level: {0}".format(event.level))
    print("    data: {0!r}".format(event.user_data.replace("\x00", "")))
    return 0

session_name = "MY_UAC_MONITOR"
logfile_name = "uac.trace"

print("Recording UAC event in file <{0}> using session named <{1}>".format(logfile_name, session_name))

my_trace = windows.system.etw.open_trace(session_name, logfile=logfile_name)
my_trace.stop(soft=True) # Stop previous trace with this name if exists
my_trace.start()
my_trace.enable("CBB61B6D-A2CF-471A-9A58-A4CD5C08FFBA", 0xff, 0xff)

# Trigger UAC
windows.winproxy.ShellExecuteA(None, "runas", "mmc.exe", "BAD_MMC_FILENAME", None , 5)

my_trace.stop()
my_trace.process(show) #: Process the events registered in the trace (and logfile)

Output

(cmd) python etw\uac_trace.py
Recording UAC event in file <uac.trace> using session named <MY_UAC_MONITOR>
0x1d65bb1febaceea: <EventRecord provider="68FDD900-4A3E-11D1-84F4-0000F80464E3" id=0>
    guid: 68FDD900-4A3E-11D1-84F4-0000F80464E3
    id: 0
    opcode: 0
    level: 0
    data: '\x01\n\x01\x05\xbbG\x04-D\xd5\xff\xb1[\xd6\x01Zb\x02\t\x01\x04\xf0\x0c\t\x06\xc4\xff\xff\xff@tzres.dll,-302\n\x05\x03@tzres.dll,-301\x03\x05\x02\xc4\xff\xff\xff\xc0\x92\x1c\xd2D[\xd6\x01\x80\x96\x98\xea\xce\xba\xfe\xb1[\xd6\x01\x01MY_UAC_MONITORC:\\Users\\hakril\\Documents\\projets\\PythonForWindows\\samples\\uac.trace'
[...]
0x1d65bb1fec4c011: <EventRecord provider="DEB74A23-5444-3F3B-924B-0E653973F55A" id=11>
    guid: DEB74A23-5444-3F3B-924B-0E653973F55A
    id: 11
    opcode: 0
    level: 0
    data: '\x9e\x06\x04\x19\x14\x04\x08\x04\xff\xff\xff\xffWinSta0\\DefaultC:\\Windows\\System32\\mmc.exe"C:\\Windows\\System32\\mmc.exe" BAD_MMC_FILENAMEC:\\Users\\hakril\\Documents\\projets\\PythonForWindows\\samples'
0x1d65bb1fec4e48b: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=10>
    guid: C0B508D3-5459-339F-A213-889C238CA5B1
    id: 10
    opcode: 0
    level: 0
    data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe"C:\\WINDOWS\\SysWOW64\\mmc.exe" BAD_MMC_FILENAME` '
0x1d65bb1fec7c8eb: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=13>
    guid: C0B508D3-5459-339F-A213-889C238CA5B1
    id: 13
    opcode: 0
    level: 0
    data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe'
0x1d65bb1fec7c8f0: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=14>
    guid: C0B508D3-5459-339F-A213-889C238CA5B1
    id: 14
    opcode: 0
    level: 0
    data: '"C:\\WINDOWS\\SysWOW64\\mmc.exe" BAD_MMC_FILENAME'
[...]
0x1d65bb1feca018d: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=11>
    guid: 172FF31C-2D80-31A6-FCA8-EB000D380666
    id: 11
    opcode: 0
    level: 0
    data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe'
0x1d65bb1feca030d: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=25>
    guid: 172FF31C-2D80-31A6-FCA8-EB000D380666
    id: 25
    opcode: 0
    level: 0
    data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe\x08\x02TRUEFALSE\x10'
0x1d65bb1fecfcdc0: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=27>
    guid: 172FF31C-2D80-31A6-FCA8-EB000D380666
    id: 27
    opcode: 0
    level: 0
    data: '\x05TRUETRUE'
[...]

18.22.2. Enumeration

import windows

etwmgr = windows.system.etw

print("ETW Manager is: {0}".format(etwmgr))

print("")
print("Listing some ETW sessions:")
for sess in etwmgr.sessions[:2]:
    print("  * {0}".format(sess))
    print("     * name: {0}".format(sess.name))
    print("     * guid: {0}".format(sess.guid))
    print("     * id: {0}".format(sess.id))
    print("     * logfile: {0}".format(sess.logfile))

target_id = sess.id
NB_MATCH = 0
print("")
print("Looking for providers for: {0}".format(sess))
for provider in windows.system.etw.providers:
    if NB_MATCH == 3:
        break
    for instance in provider.instances:
        if NB_MATCH == 3:
            break
        for session in instance.sessions:
            if session.LoggerId == target_id and instance.Pid:
                proc = [p for p in windows.system.processes if p.pid == instance.Pid][0]
                print("Found a provider/session for target:")
                print("  * Provider: {0}".format(provider))
                print("  * Instance: {0}".format(instance))
                print("  * Process: {0}".format(proc))
                NB_MATCH += 1
                if NB_MATCH == 3:
                    break
                break

Output

(cmd) python etw\etw_enumeration.py
ETW Manager is: <windows.winobject.event_trace.EtwManager object at 0x03AFBF70>

Listing some ETW sessions:
  * <EventTraceProperties name="AppModel" guid=A922A8BE-2450-438E-9520-FBCDFB46B0BD>
     * name: AppModel
     * guid: A922A8BE-2450-438E-9520-FBCDFB46B0BD
     * id: 4
     * logfile: 
  * <EventTraceProperties name="LwtNetLog" guid=603BA31E-EC5A-4CDE-BE87-ED0A16C3B170>
     * name: LwtNetLog
     * guid: 603BA31E-EC5A-4CDE-BE87-ED0A16C3B170
     * id: 14
     * logfile: C:\WINDOWS\System32\LogFiles\WMI\LwtNetLog.etl

Looking for providers for: <EventTraceProperties name="LwtNetLog" guid=603BA31E-EC5A-4CDE-BE87-ED0A16C3B170>
Found a provider/session for target:
  * Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE">
  * Instance: <TraceProviderInstanceInfo Pid=5256 EnableCount=1>
  * Process: <WinProcess "RuntimeBroker.exe" pid 5256 at 0x54c39d0>
Found a provider/session for target:
  * Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE">
  * Instance: <TraceProviderInstanceInfo Pid=10768 EnableCount=1>
  * Process: <WinProcess "chrome.exe" pid 10768 at 0x54c3930>
Found a provider/session for target:
  * Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE">
  * Instance: <TraceProviderInstanceInfo Pid=10236 EnableCount=1>
  * Process: <WinProcess "YourPhone.exe" pid 10236 at 0x54c37d0>