import os.path
from collections import defaultdict, namedtuple
from contextlib import contextmanager
import windows
import windows.generated_def as gdef
import windows.winobject.exception as winexception
import windows.native_exec.simple_x86 as x86
import windows.native_exec.simple_x64 as x64
from windows.winobject.process import WinProcess, WinThread
from windows.dbgprint import dbgprint
from windows import winproxy
from windows.generated_def.winstructs import *
from windows.generated_def import windef
from .breakpoints import *
#from windows.syswow64 import CS_32bits
from windows.winobject.exception import VectoredException
from windows.pycompat import basestring
PAGE_SIZE = 0x1000
class DebuggerError(Exception):
pass
class DEBUG_EVENT(DEBUG_EVENT):
KNOWN_EVENT_CODE = dict((x,x) for x in [EXCEPTION_DEBUG_EVENT,
CREATE_THREAD_DEBUG_EVENT, CREATE_PROCESS_DEBUG_EVENT,
EXIT_THREAD_DEBUG_EVENT, EXIT_PROCESS_DEBUG_EVENT, LOAD_DLL_DEBUG_EVENT,
UNLOAD_DLL_DEBUG_EVENT, OUTPUT_DEBUG_STRING_EVENT, RIP_EVENT])
@property
def code(self):
return self.KNOWN_EVENT_CODE.get(self.dwDebugEventCode, self.dwDebugEventCode)
WatchedPage = namedtuple('WatchedPage', ["original_prot", "bps"])
[docs]
class Debugger(object):
"""A debugger based on standard Win32 API. Handle :
* Standard BP (int3)
* Hardware-Exec BP (DrX)
* Memory BP (virtual_protect)
"""
[docs]
def __init__(self, target):
"""``target`` must be a debuggable :class:`WinProcess`."""
self._init_dispatch_handlers()
self.target = target
self.is_target_launched = False
self.processes = {}
self.threads = {}
self.current_process = None
self.current_thread = None
self.first_bp_encoutered = False
# List of breakpoints
self.breakpoints = {}
self._pending_breakpoints = {} #Breakpoints to put in new process / threads
# Values rewritten by "\xcc"
self._memory_save = dict()
# Dict of {tid : {drx taken : BP}}
self._hardware_breakpoint = {}
# Breakpoints to reput..
self._breakpoint_to_reput = {}
self._module_by_process = {}
self._pending_breakpoints_new = defaultdict(list)
self._explicit_single_step = {}
self._watched_pages = {}# Dict [page_modif] -> [mem bp on the page]
# [start] -> (size, current_proctection, original_prot)
self._virtual_protected_memory = [] # List of memory-range modified by a MemBP
self._current_debug_event = None
[docs]
@classmethod
def attach(cls, target):
"""attach to ``target`` (must be a :class:`WinProcess`)
:rtype: :class:`Debugger`
.. note::
see :ref:`Debugger.attach sample <sample_debugger_attach>`"""
winproxy.DebugActiveProcess(target.pid)
return cls(target)
[docs]
def detach(self, target=None):
"""Detach from all debugged processes or process ``target``"""
if target is None:
targets = self.processes.values()
if not targets:
# We are not following any process
# maybe a attach/detach with Debugger.loop
# Just detach from the initial target
if self.target:
tpid = self.target.pid
self.target = None # Remove ref to process -> GC -> CloseHandle -> process is destroyed
windows.winproxy.DebugActiveProcessStop(tpid)
return
for proc in list(targets):
self.detach(proc)
del targets
return
if not isinstance(target, WinProcess):
raise ValueError("Detach accept only WinProcess")
self.disable_all_memory_breakpoints(target)
for bp in list(self.breakpoints[target.pid].values()):
if not bp.apply_to_target(target):
target_threads = [t for t in target.threads if t.tid in self.threads]
bp_threads = []
# TODO: clean API tu request HXBP on a thread
for t in target_threads:
t_bps = [pos for pos, hbp in self._hardware_breakpoint[t.tid].items() if hbp == bp]
if t_bps:
bp_threads.append(t)
self.del_bp(bp, bp_threads)
else:
self.del_bp(bp, [target])
del self._breakpoint_to_reput[target.pid]
for thread in [t for t in target.threads if t.tid in self.threads]:
del self._explicit_single_step[thread.tid]
del self.threads[thread.tid]
ctx = thread.context
if ctx.EEFlags.TF: # Remove TRAPFlag before detaching (or it will lead to a crash)
ctx.EEFlags.TF = 0
thread.set_context(ctx)
del self.processes[target.pid]
del self._watched_pages[target.pid]
del self._module_by_process[target.pid]
if target is self.current_process: # Bug if CTRL+C and current_process changed ?
self._finish_debug_event(self._current_debug_event, DBG_CONTINUE)
self.current_process = None
self.current_thread = None
if self.target and target.pid == self.target.pid:
self.target = None
windows.winproxy.DebugActiveProcessStop(target.pid)
def _killed_in_action(self):
"""Return ``True`` if current process have been detached by user callback"""
# Fix ? _handle_exit_process remove from processes but need a FinishDebugEvent
return self.current_process is None or self.current_process.pid not in self.processes
[docs]
@classmethod
def debug(cls, path, args=None, dwCreationFlags=0, show_windows=False):
"""Create a process and debug it.
:rtype: :class:`Debugger`"""
dwCreationFlags |= DEBUG_PROCESS
c = windows.utils.create_process(path, args=args, dwCreationFlags=dwCreationFlags, show_windows=show_windows)
return cls(c)
def _init_dispatch_handlers(self):
dbg_evt_dispatch = {}
dbg_evt_dispatch[EXCEPTION_DEBUG_EVENT] = self._handle_exception
dbg_evt_dispatch[CREATE_THREAD_DEBUG_EVENT] = self._handle_create_thread
dbg_evt_dispatch[CREATE_PROCESS_DEBUG_EVENT] = self._handle_create_process
dbg_evt_dispatch[EXIT_PROCESS_DEBUG_EVENT] = self._handle_exit_process
dbg_evt_dispatch[EXIT_THREAD_DEBUG_EVENT] = self._handle_exit_thread
dbg_evt_dispatch[LOAD_DLL_DEBUG_EVENT] = self._handle_load_dll
dbg_evt_dispatch[UNLOAD_DLL_DEBUG_EVENT] = self._handle_unload_dll
dbg_evt_dispatch[RIP_EVENT] = self._handle_rip
dbg_evt_dispatch[OUTPUT_DEBUG_STRING_EVENT] = self._handle_output_debug_string
self._DebugEventCode_dispatch = dbg_evt_dispatch
def _debug_event_generator(self):
while True:
debug_event = gdef.DEBUG_EVENT()
try:
winproxy.WaitForDebugEvent(debug_event)
except KeyboardInterrupt as e:
# So we will go out of the loop because of a Ctrl+c
# BP trigger will not be called.
# So AT LEAST quit loop with a coherent context
# Fix thread PC if we just triggered a BP
if (debug_event.dwDebugEventCode == gdef.EXCEPTION_DEBUG_EVENT and
debug_event.u.Exception.ExceptionRecord.ExceptionCode in [gdef.EXCEPTION_BREAKPOINT, gdef.STATUS_WX86_BREAKPOINT]):
# This is a breakpoint: One of ours ?
bp_addr = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
if bp_addr in self.breakpoints[debug_event.dwProcessId]:
# Leave the thread in a coherent state for detach
# Should it be done in detach ?
# And just stock the "Interrupted debug_event" here ?
thread = self.threads[debug_event.dwThreadId]
ctx = thread.context
ctx.pc -= 1
thread.set_context(ctx)
raise
finally: # If user Ctrl+c -> we could raise just at the return of WaitForDebugEvent
self._current_debug_event = debug_event
yield debug_event
def _finish_debug_event(self, event, action):
if self.current_thread:
dbgprint("Finishing event for TID <{0}>".format(self.current_thread.tid), "DBG")
else:
dbgprint("Finishing event", "DBG")
if action not in [windef.DBG_CONTINUE, windef.DBG_EXCEPTION_NOT_HANDLED]:
raise ValueError('Unknow action : <0>'.format(action))
winproxy.ContinueDebugEvent(event.dwProcessId, event.dwThreadId, action)
self._current_debug_event = None
def _add_exe_to_module_list(self, create_process_event):
"""Add the intial exe file described by create_process_event to the list of module in the process"""
exe_path = self.current_process.get_mapped_filename(create_process_event.lpBaseOfImage)
exe_name = os.path.basename(exe_path).lower()
if exe_name.endswith(".exe"):
exe_name = exe_name[:-len(".exe")]
#print("Exe name is {0}".format(exe_name))
self._module_by_process[self.current_process.pid][exe_name] = windows.pe_parse.GetPEFile(create_process_event.lpBaseOfImage, self.current_process)
#self._setup_pending_breakpoints_load_dll(exe_name) # Already setup in _setup_pending_breakpoints_new_process
def _update_debugger_state(self, debug_event):
self.current_process = self.processes[debug_event.dwProcessId]
self.current_thread = self.threads[debug_event.dwThreadId]
def _dispatch_debug_event(self, debug_event):
handler = self._DebugEventCode_dispatch.get(debug_event.dwDebugEventCode, self._handle_unknown_debug_event)
return handler(debug_event)
def _dispatch_breakpoint(self, exception, addr):
bp = self.breakpoints[self.current_process.pid][addr]
with self.DisabledMemoryBreakpoint():
x = bp.trigger(self, exception)
return x
def _resolve(self, addr, target):
dbgprint("Resolving <{0}> in <{1}>".format(addr, target), "DBG")
if not isinstance(addr, basestring):
return addr
dll, api = addr.split("!")
dll = dll.lower()
modules = self._module_by_process[target.pid]
mod = None
if dll in modules:
mod = [modules[dll]]
elif target.is_wow_64 and dll == "ntdll" and "ntdll32" in modules:
# https://twitter.com/hakril/status/1555473886321549312
mod = [modules["ntdll32"]]
if not mod:
return None
# TODO: optim exports are the same for whole system (32 vs 64 bits)
# I don't have to reparse the exports each time..
# Try to interpret api as an int
try:
api_int = int(api, 0)
return mod[0].baseaddr + api_int
except ValueError:
pass
exports = mod[0].exports
if api not in exports:
dbgprint("Error resolving <{0}> in <{1}>".format(addr, target), "DBG")
raise ValueError("Unknown API <{0}> in DLL {1}".format(api, dll))
target_addr = exports[api]
if isinstance(target_addr, basestring):
target_string = target_addr.replace(".", "!")
dbgprint("<{0}> is export proxy to <{1}>".format(addr, target_string), "DBG")
# Possible infinite loop ?
return self._resolve(target_string, target)
return target_addr
def add_pending_breakpoint(self, bp, target):
self._pending_breakpoints_new[target].append(bp)
def remove_pending_breakpoint(self, bp, target):
self._pending_breakpoints_new[target].remove(bp)
def _setup_breakpoint(self, bp, target):
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
if target is None:
if bp.type in [STANDARD_BP, MEMORY_BREAKPOINT]: #TODO: better..
targets = self.processes.values()
else:
targets = self.threads.values()
else:
targets = [target]
for target in targets:
return _setup_method(bp, target)
def _restore_breakpoints(self):
for bp in self._breakpoint_to_reput[self.current_process.pid]:
if bp.type == HARDWARE_EXEC_BP:
raise NotImplementedError("Why is this here ? we use RF flags to pass HXBP")
restore = getattr(self, "_restore_breakpoint_" + bp.type)
restore(bp, self.current_process)
self._breakpoint_to_reput[self.current_process.pid].clear()
return
def _setup_breakpoint_BP(self, bp, target):
if not isinstance(target, WinProcess):
raise ValueError("Cannot setup STANDARD_BP on {0}".format(target))
addr = self._resolve(bp.addr, target)
# raise DebuggerError("Could not set breakpoint {0} at <{1}>".format(bp, bp.addr))
if addr is None:
return False
dbgprint("Setting soft-BP at <{0:#x}> in <{1}>".format(addr, target), "DBG")
bp._addr = addr
self._memory_save[target.pid][addr] = target.read_memory(addr, 1)
self.breakpoints[target.pid][addr] = bp
target.write_memory(addr, "\xcc")
return True
def _restore_breakpoint_BP(self, bp, target):
self._memory_save[target.pid][bp._addr] = target.read_memory(bp._addr, 1)
return target.write_memory(bp._addr, "\xcc")
def _remove_breakpoint_BP(self, bp, target):
if not isinstance(target, WinProcess):
raise ValueError("SETUP STANDARD_BP on {0}".format(target))
addr = self._resolve(bp.addr, target)
target.write_memory(addr, self._memory_save[target.pid][addr])
del self._memory_save[target.pid][addr]
del self.breakpoints[target.pid][addr]
return True
def _setup_breakpoint_HXBP(self, bp, target):
#print("Setup {0} into {1}".format(bp, target))
if not isinstance(target, WinThread):
raise ValueError("SETUP HXBP_BP on {0}".format(target))
# Todo: opti, not reparse exports for all thread of the same process..
addr = self._resolve(bp.addr, target.owner)
if addr is None:
return False
x = self._hardware_breakpoint[target.tid]
if all(pos in x for pos in range(4)):
raise ValueError("Cannot put {0} in {1} (DRx full)".format(bp, target))
empty_drx = str([pos for pos in range(4) if pos not in x][0])
ctx = target.context
# Windows DebugCtl aliasing in DR7
# See https://www.codeproject.com/Articles/517466/Last-branch-records-and-branch-tracing
ctx.EDr7.LE = 0 # bit 8 of DR7 represents bit 0 of DebugCtl. This is the LBR bit. (last branch record, will explain)
ctx.EDr7.GE = 0 # bit 9 of DR7 represents bit 1 of DebugCtl. This is the BTF bit. (single-step on branches)
setattr(ctx.EDr7, "L" + empty_drx, 1)
setattr(ctx, "Dr" + empty_drx, addr)
x[int(empty_drx)] = bp
target.set_context(ctx)
self.breakpoints[target.owner.pid][addr] = bp
dbgprint("Setting HXBP at <{0:#x}> in <{1}> (Dr{2})".format(addr, target, empty_drx), "DBG")
return True
def _remove_breakpoint_HXBP(self, bp, target):
if not isinstance(target, WinThread):
raise ValueError("SETUP HXBP_BP on {0}".format(target))
addr = self._resolve(bp.addr, target.owner)
bp_pos = [pos for pos, hbp in self._hardware_breakpoint[target.tid].items() if hbp == bp]
if not bp_pos:
raise ValueError("Asked to remove {0} from {1} but not present in hbp_list".format(bp, target))
bp_pos_str = str(bp_pos[0])
ctx = target.context
setattr(ctx.EDr7, "L" + bp_pos_str, 0)
setattr(ctx, "Dr" + bp_pos_str, 0)
target.set_context(ctx)
try: # TODO: vraiment faire les HXBP par thread ? ...
del self.breakpoints[target.owner.pid][addr]
except:
pass
return True
## MemBP internal helpers
def _compute_page_access_for_event(self, target, events):
if "R" in events:
return PAGE_NOACCESS
if set("WX").issubset(events):
return PAGE_READONLY
if events == set("W"):
return PAGE_EXECUTE_READ
if events == set("X"):
# Might have problem if DEP is not enabled
if target.bitness == 64:
has_DEP = True
elif windows.winproxy.is_implemented(windows.winproxy.GetProcessDEPPolicy):
has_DEP = DWORD()
permaned = LONG()
windows.winproxy.GetProcessDEPPolicy(target.handle, has_DEP, permaned)
has_DEP = has_DEP.value
else:
has_DEP = False
return PAGE_READWRITE if has_DEP else PAGE_NOACCESS
raise ValueError("Unexpected set of event for Membp: {0}".format(events))
def _setup_breakpoint_MEMBP(self, bp, target):
addr = self._resolve(bp.addr, target)
bp._addr = addr
self._events = set(bp.events)
if addr is None:
return False
# Split in affected pages:
protection_for_bp = self._compute_page_access_for_event(target, self._events)
affected_pages = range((addr >> 12) << 12, addr + bp.size, PAGE_SIZE)
old_prot = DWORD()
cp_watch_page = self._watched_pages[self.current_process.pid]
for page_addr in affected_pages:
if page_addr not in cp_watch_page:
target.virtual_protect(page_addr, PAGE_SIZE, protection_for_bp, old_prot)
# Page with no other MemBP
cp_watch_page[page_addr] = WatchedPage(old_prot.value, [bp])
else:
# Reduce the right of the page to the common need
cp_watch_page[page_addr].bps.append(bp)
full_page_events = set.union(*[bp.events for bp in cp_watch_page[page_addr].bps])
protection_for_page = self._compute_page_access_for_event(target, full_page_events)
target.virtual_protect(page_addr, PAGE_SIZE, protection_for_page, None)
# TODO: watch for overlap with other MEM breakpoints
return True
def _restore_breakpoint_MEMBP(self, bp, target):
(page_addr, page_prot) = bp._reput_page
return target.virtual_protect(page_addr, PAGE_SIZE, page_prot, None)
def _remove_breakpoint_MEMBP(self, bp, target):
affected_pages = range((bp._addr >> 12) << 12, bp._addr + bp.size, PAGE_SIZE)
vprot_begin = affected_pages[0]
vprot_size = PAGE_SIZE * len(affected_pages)
cp_watch_page = self._watched_pages[self.current_process.pid]
for page_addr in affected_pages:
cp_watch_page[page_addr].bps.remove(bp)
if not cp_watch_page[page_addr].bps:
try:
target.virtual_protect(page_addr, PAGE_SIZE, cp_watch_page[page_addr].original_prot, None)
except WindowsError as e:
# TODO
# What should we do if the virtual protect fail on a Non-Free page ?
# It may be because the page was dealloc + map as a view..
# For now: keep the page as-is
if not target.query_memory(page_addr).State == MEM_FREE:
pass
# If page is MEM_FREE ignore the error
del cp_watch_page[page_addr]
else:
full_page_events = set.union(*[bp.events for bp in cp_watch_page[page_addr].bps])
protection_for_page = self._compute_page_access_for_event(target, full_page_events)
try:
target.virtual_protect(page_addr, PAGE_SIZE, protection_for_page, None)
except Exception as e:
# if not target.query_memory(page_addr).State == MEM_FREE:
# raise
for bp in cp_watch_page[page_addr].bps:
bp.on_error(self, page_addr)
# TODO: handle case were it is mem-free ?
return True
def _setup_pending_breakpoints_new_process(self, new_process):
for bp in self._pending_breakpoints_new[None]:
if bp.apply_to_target(new_process): #BP for thread or process ?
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
_setup_method(bp, new_process)
for bp in list(self._pending_breakpoints_new[new_process.pid]):
if bp.apply_to_target(new_process):
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
if _setup_method(bp, new_process):
self._pending_breakpoints_new[new_process.pid].remove(bp)
def _setup_pending_breakpoints_new_thread(self, new_thread):
for bp in self._pending_breakpoints_new[None]:
if bp.apply_to_target(new_thread): #BP for thread or process ?
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
_setup_method(bp, new_thread)
for bp in self._pending_breakpoints_new[new_thread.owner.pid]:
if bp.apply_to_target(new_thread):
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
_setup_method(bp, new_thread)
for bp in list(self._pending_breakpoints_new[new_thread.tid]):
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
if _setup_method(bp, new_thread):
self._pending_breakpoints_new[new_thread.tid].remove(bp)
def _setup_pending_breakpoints_load_dll(self, dll_name):
for bp in self._pending_breakpoints_new[None]:
if isinstance(bp.addr, basestring):
target_dll = bp.addr.lower().split("!")[0]
# Cannot work AS-IS yet. Implement it ?
# if target_dll == "*" or target_dll == dll_name:
if (target_dll == dll_name or
# https://twitter.com/hakril/status/1555473886321549312
(self.current_process.is_wow_64 and target_dll == "ntdll" and dll_name == "ntdll32")):
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
if bp.apply_to_target(self.current_process):
_setup_method(bp, self.current_process)
else:
for t in [t for t in self.current_process.threads if t.tid in self.threads]:
_setup_method(bp, t)
for bp in self._pending_breakpoints_new[self.current_process.pid]:
if isinstance(bp.addr, basestring):
target_dll = bp.addr.split("!")[0]
if (target_dll == dll_name or
# https://twitter.com/hakril/status/1555473886321549312
(self.current_process.is_wow_64 and target_dll == "ntdll" and dll_name == "ntdll32")):
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
_setup_method(bp, self.current_process)
for thread in self.current_process.threads:
for bp in self._pending_breakpoints_new[thread.tid]:
if isinstance(bp.addr, basestring):
target_dll = bp.addr.split("!")[0]
if (target_dll == dll_name or
# https://twitter.com/hakril/status/1555473886321549312
(self.current_process.is_wow_64 and target_dll == "ntdll" and dll_name == "ntdll32")):
_setup_method = getattr(self, "_setup_breakpoint_" + bp.type)
_setup_method(bp, self.thread)
def _pass_breakpoint(self, addr):
process = self.current_process
thread = self.current_thread
process.write_memory(addr, self._memory_save[process.pid][addr])
regs = thread.context
regs.EFlags |= (1 << 8)
#regs.pc -= 1 # Done in _handle_exception_breakpoint before dispatch
thread.set_context(regs)
bp = self.breakpoints[self.current_process.pid][addr]
self._breakpoint_to_reput[process.pid].add(bp) #Register pending breakpoint for next single step
def _pass_memory_breakpoint(self, bp, page_protect, fault_page):
cp = self.current_process
page_prot = DWORD()
cp.virtual_protect(fault_page, PAGE_SIZE, page_protect, page_prot)
thread = self.current_thread
ctx = thread.context
ctx.EEFlags.TF = 1
thread.set_context(ctx)
bp._reput_page = (fault_page, page_prot.value)
self._breakpoint_to_reput[cp.pid].add(bp)
# debug event handlers
def _handle_unknown_debug_event(self, debug_event):
raise NotImplementedError("dwDebugEventCode = {0}".format(debug_event.dwDebugEventCode))
def _handle_exception_breakpoint(self, exception, excp_addr):
excp_bitness = self.get_exception_bitness(exception)
YOLO = False
if self.current_thread.context.EEFlags.TF:
dbgprint("Single step as begin of _handle_exception_breakpoint", "DBG")
# import pdb;pdb.set_trace()
YOLO = True
# Sub-method _do_setup() ?
dbg_has_setup = None
if not self.first_bp_encoutered:
dbg_has_setup = not getattr(self.on_setup, "_abstract_on_setup_", False)
self.first_bp_encoutered = True
if dbg_has_setup:
with self.DisabledMemoryBreakpoint():
continue_flag = self.on_setup() # Handle single-step here ?
# Check killed in action ?
# What if setup + BP object() ?
if excp_addr in self.breakpoints[self.current_process.pid]:
thread = self.current_thread
if self.current_process.bitness == 32 and excp_bitness == 64:
ctx = thread.context_syswow
else:
ctx = thread.context
ctx.pc -= 1
if self.current_process.bitness == 32 and excp_bitness == 64:
thread.set_syswow_context(ctx)
else:
thread.set_context(ctx)
del thread
continue_flag = self._dispatch_breakpoint(exception, excp_addr)
if self._killed_in_action():
return continue_flag
self._explicit_single_step[self.current_thread.tid] = self.current_thread.context.EEFlags.TF and not YOLO
if self._explicit_single_step[self.current_thread.tid]:
dbgprint("Someone ask for an explicit Single step", "DBG")
if excp_addr in self.breakpoints[self.current_process.pid]:
# Setup BP if not suppressed
self._pass_breakpoint(excp_addr)
return continue_flag
if dbg_has_setup: # setup() was called on this BP-event: no on_exception()
return continue_flag
with self.DisabledMemoryBreakpoint():
return self.on_exception(exception)
def _handle_exception_singlestep(self, exception, excp_addr):
if self._breakpoint_to_reput.get(self.current_process.pid):
self._restore_breakpoints()
if self._explicit_single_step[self.current_thread.tid]:
with self.DisabledMemoryBreakpoint():
self.on_single_step(exception)
if not self._killed_in_action():
self._explicit_single_step[self.current_thread.tid] = self.current_thread.context.EEFlags.TF
if self._explicit_single_step[self.current_thread.tid]:
dbgprint("Someone ask for an explicit Single step - 7", "DBG")
return DBG_CONTINUE
elif excp_addr in self.breakpoints[self.current_process.pid]:
# Verif that's not a standard BP ?
bp = self.breakpoints[self.current_process.pid][excp_addr]
with self.DisabledMemoryBreakpoint():
bp.trigger(self, exception)
if self._killed_in_action():
return DBG_CONTINUE
ctx = self.current_thread.context
self._explicit_single_step[self.current_thread.tid] = ctx.EEFlags.TF
if self._explicit_single_step[self.current_thread.tid]:
dbgprint("Someone ask for an explicit Single step - 2", "DBG")
if excp_addr in self.breakpoints[self.current_process.pid]:
ctx.EEFlags.RF = 1
self.current_thread.set_context(ctx)
return DBG_CONTINUE
elif self._explicit_single_step[self.current_thread.tid]:
with self.DisabledMemoryBreakpoint():
continue_flag = self.on_single_step(exception)
if self._killed_in_action():
return continue_flag
# Does not handle case where EEFlags.TF was by the debugge before trigering the exception
# Should set the flag explicitly in single_step ? and not just use EEFlags.TF ?
self._explicit_single_step[self.current_thread.tid] = self.current_thread.context.EEFlags.TF
if self._explicit_single_step[self.current_thread.tid]:
dbgprint("Someone ask for an explicit Single step - 3", "DBG")
return continue_flag
else:
with self.DisabledMemoryBreakpoint():
continue_flag = self.on_exception(exception)
if self._killed_in_action():
return continue_flag
# Does not handle case where EEFlags.TF was by the debugge before trigering the exception
# Should set the flag explicitly in single_step ? and not just use EEFlags.TF ?
self._explicit_single_step[self.current_thread.tid] = self.current_thread.context.EEFlags.TF
if self._explicit_single_step[self.current_thread.tid]:
dbgprint("Someone ask for an explicit Single step - 4", "DBG")
return continue_flag
# === Testing PAGE_NOACCESS(0x1L) ===
# exception: access violation reading 0x00470000
# exception: access violation writing 0x00470000
# === Testing PAGE_READONLY(0x2L) ===
# exception: access violation writing 0x00470000
# === Testing PAGE_READWRITE(0x4L) ===
# === Testing PAGE_EXECUTE(0x10L) ===
# exception: access violation writing 0x00470000
# === Testing PAGE_EXECUTE_READ(0x20L) ===
# exception: access violation writing 0x00470000
# === Testing PAGE_EXECUTE_READWRITE(0x40L) ===
def _handle_exception_access_violation(self, exception, excp_addr):
READ = 0
WRITE = 1
EXEC = 2
EVENT_STR = "RWX"
fault_type = exception.ExceptionRecord.ExceptionInformation[0]
fault_addr = exception.ExceptionRecord.ExceptionInformation[1]
pc_addr = self.current_thread.context.pc
if fault_addr == pc_addr:
fault_type = EXEC
event = EVENT_STR[fault_type]
fault_page = (fault_addr >> 12) << 12
cp_watch_page = self._watched_pages[self.current_process.pid]
mem_bp = self.get_memory_breakpoint_at(fault_addr, self.current_process)
if mem_bp is False: # No BP on this page
with self.DisabledMemoryBreakpoint():
return self.on_exception(exception)
original_prot = cp_watch_page[fault_page].original_prot
if mem_bp is None or event not in mem_bp.events: # Page has MEMBP but None handle this address | event not asked by membp
# This hack is bad, find a BP on the page to restore original access..
bp = cp_watch_page[fault_page].bps[-1]
self._pass_memory_breakpoint(bp, original_prot, fault_page)
return DBG_CONTINUE
with self.DisabledMemoryBreakpoint():
continue_flag = mem_bp.trigger(self, exception)
if self._killed_in_action():
return continue_flag
self._explicit_single_step[self.current_thread.tid] = self.current_thread.context.EEFlags.TF
if self._explicit_single_step[self.current_thread.tid]:
dbgprint("Someone ask for an explicit Single step - 5", "DBG")
# If BP has not been removed in trigger, pas it
if fault_page in cp_watch_page and mem_bp in cp_watch_page[fault_page].bps:
self._pass_memory_breakpoint(mem_bp, original_prot, fault_page)
return continue_flag
# TODO: self._explicit_single_step setup by single_step() ? check at the end ? finally ?
def _handle_exception(self, debug_event):
"""Handle EXCEPTION_DEBUG_EVENT"""
exception = debug_event.u.Exception
self._update_debugger_state(debug_event)
if windows.current_process.bitness == 32:
exception.__class__ = winexception.EEXCEPTION_DEBUG_INFO32
else:
exception.__class__ = winexception.EEXCEPTION_DEBUG_INFO64
excp_code = exception.ExceptionRecord.ExceptionCode
excp_addr = exception.ExceptionRecord.ExceptionAddress
if excp_code in [EXCEPTION_BREAKPOINT, STATUS_WX86_BREAKPOINT]:
dbgprint("Handle exception as breakpoint in TID {0}".format(self.current_thread.tid), "DBG")
return self._handle_exception_breakpoint(exception, excp_addr)
elif excp_code in [EXCEPTION_SINGLE_STEP, STATUS_WX86_SINGLE_STEP]:
dbgprint("Handle exception as single step in TID {0}".format(self.current_thread.tid), "DBG")
return self._handle_exception_singlestep(exception, excp_addr)
elif excp_code == EXCEPTION_ACCESS_VIOLATION:
dbgprint("Handle exception as access_violation in TID {0}".format(self.current_thread.tid), "DBG")
return self._handle_exception_access_violation(exception, excp_addr)
else:
with self.DisabledMemoryBreakpoint():
dbgprint("Handle exception as on_exception".format(self.current_thread.tid), "DBG")
continue_flag = self.on_exception(exception)
if self._killed_in_action():
return continue_flag
self._explicit_single_step[self.current_thread.tid] = self.current_thread.context.EEFlags.TF
if self._explicit_single_step[self.current_thread.tid]:
dbgprint("Someone ask for an explicit Single step - 6", "DBG")
return continue_flag
def _get_loaded_dll(self, load_dll):
name_sufix = ""
pe = windows.pe_parse.GetPEFile(load_dll.lpBaseOfDll, self.current_process)
if self.current_process.bitness == 32 and pe.bitness == 64:
name_sufix = "64"
addr = None
if load_dll.lpImageName:
try:
addr = self.current_process.read_ptr(load_dll.lpImageName)
except:
pass
if not addr:
pe = windows.pe_parse.GetPEFile(load_dll.lpBaseOfDll, self.current_process)
dll_name = pe.export_name
if not dll_name:
dll_name = os.path.basename(self.current_process.get_mapped_filename(load_dll.lpBaseOfDll))
return dll_name + name_sufix
if load_dll.fUnicode:
return self.current_process.read_wstring(addr) + name_sufix
return self.current_process.read_string(addr) + name_sufix
def _handle_create_process(self, debug_event):
"""Handle CREATE_PROCESS_DEBUG_EVENT"""
create_process = debug_event.u.CreateProcessInfo
# Duplicate handle, so garbage collection of the process/thread does not
# break the debug API invariant (those x_event handle are close by the debug API itself)
proc_handle = HANDLE()
thread_handle = HANDLE()
cp_handle = windows.current_process.handle
winproxy.DuplicateHandle(cp_handle, create_process.hProcess, cp_handle, ctypes.byref(proc_handle), dwOptions=DUPLICATE_SAME_ACCESS)
winproxy.DuplicateHandle(cp_handle, create_process.hThread, cp_handle, ctypes.byref(thread_handle), dwOptions=DUPLICATE_SAME_ACCESS)
dbgprint(" Got PROC handle {0:#x}".format(create_process.hProcess, self), "HANDLE")
dbgprint(" PROC handle duplicated: {0:#x}".format(proc_handle.value), "HANDLE")
dbgprint(" Got THREAD handle {0:#x}".format(create_process.hThread, self), "HANDLE")
dbgprint(" THREAD handle duplicated: {0:#x}".format(thread_handle.value), "HANDLE")
self.current_process = WinProcess._from_handle(proc_handle.value)
self.current_thread = WinThread._from_handle(thread_handle.value)
dbgprint("New process: {0}".format(self.current_process), "DBG")
self.threads[self.current_thread.tid] = self.current_thread
self._explicit_single_step[self.current_thread.tid] = False
self._hardware_breakpoint[self.current_thread.tid] = {}
self.processes[self.current_process.pid] = self.current_process
self._watched_pages[self.current_process.pid] = {} #defaultdict(list)
self.breakpoints[self.current_process.pid] = {}
self._breakpoint_to_reput[self.current_process.pid] = set()
self._memory_save[self.current_process.pid] = {}
self._module_by_process[self.current_process.pid] = {}
self._internal_on_create_process(create_process) # Allow hook for symbol-debugger
self._update_debugger_state(debug_event)
self._add_exe_to_module_list(create_process)
self._setup_pending_breakpoints_new_process(self.current_process)
self._setup_pending_breakpoints_new_thread(self.current_thread)
with self.DisabledMemoryBreakpoint():
try:
return self.on_create_process(create_process)
finally:
if create_process.hFile:
winproxy.CloseHandle(create_process.hFile)
def _handle_exit_process(self, debug_event):
"""Handle EXIT_PROCESS_DEBUG_EVENT"""
self._update_debugger_state(debug_event)
exit_process = debug_event.u.ExitProcess
retvalue = self.on_exit_process(exit_process)
del self.threads[self.current_thread.tid]
del self._explicit_single_step[self.current_thread.tid]
del self._hardware_breakpoint[self.current_thread.tid]
del self._breakpoint_to_reput[self.current_process.pid]
del self.processes[self.current_process.pid]
del self._watched_pages[self.current_process.pid]
del self._memory_save[self.current_process.pid]
del self._module_by_process[self.current_process.pid]
cpid = self.current_process.pid
self.current_thread = None
self.current_process = None
if self.target and cpid == self.target.pid:
self.target = None
return retvalue
def _handle_create_thread(self, debug_event):
"""Handle CREATE_THREAD_DEBUG_EVENT"""
create_thread = debug_event.u.CreateThread
# Duplicate handle, so garbage collection of the thread does not
# break the debug API invariant (those x_event handle are close by the debug API itself)
thread_handle = HANDLE()
cp_handle = windows.current_process.handle
winproxy.DuplicateHandle(cp_handle, create_thread.hThread, cp_handle, ctypes.byref(thread_handle), dwOptions=DUPLICATE_SAME_ACCESS)
new_thread = WinThread._from_handle(thread_handle.value)
self.threads[new_thread.tid] = new_thread
# The new thread is on the thread pool: we can now update the debugger state
self._update_debugger_state(debug_event)
self._explicit_single_step[self.current_thread.tid] = False
self._hardware_breakpoint[self.current_thread.tid] = {}
self._setup_pending_breakpoints_new_thread(self.current_thread)
with self.DisabledMemoryBreakpoint():
return self.on_create_thread(create_thread)
def _handle_exit_thread(self, debug_event):
"""Handle EXIT_THREAD_DEBUG_EVENT"""
self._update_debugger_state(debug_event)
exit_thread = debug_event.u.ExitThread
with self.DisabledMemoryBreakpoint():
retvalue = self.on_exit_thread(exit_thread)
del self.threads[self.current_thread.tid]
del self._hardware_breakpoint[self.current_thread.tid]
del self._explicit_single_step[self.current_thread.tid]
return retvalue
def _internal_on_create_process(self, create_process):
return None
def _internal_on_load_dll(self, load_dll):
return None
def _handle_load_dll(self, debug_event):
"""Handle LOAD_DLL_DEBUG_EVENT"""
self._update_debugger_state(debug_event)
load_dll = debug_event.u.LoadDll
dll = self._get_loaded_dll(load_dll)
dll_name = os.path.basename(dll).lower()
if dll_name.endswith(".dll"):
dll_name = dll_name[:-4]
# Mais c'est debile..
# Si j'ai ntdll et ntdll64: les deux vont avoir le meme nom..
# A changer sur Win11 ou ntdll est remontee en tant que ntdll32.dll
# https://twitter.com/hakril/status/1555473886321549312
if dll_name.endswith(".dll64"):
dll_name = dll_name[:-6] + "64" # Crade..
#print("Load {0} -> {1}".format(dll, dll_name))
self._module_by_process[self.current_process.pid][dll_name] = windows.pe_parse.GetPEFile(load_dll.lpBaseOfDll, self.current_process)
self._internal_on_load_dll(load_dll) # Allow hook for symbol-debugger
self._setup_pending_breakpoints_load_dll(dll_name)
with self.DisabledMemoryBreakpoint():
try:
return self.on_load_dll(load_dll)
finally:
if load_dll.hFile:
winproxy.CloseHandle(load_dll.hFile)
def _handle_unload_dll(self, debug_event):
"""Handle UNLOAD_DLL_DEBUG_EVENT"""
self._update_debugger_state(debug_event)
unload_dll = debug_event.u.UnloadDll
with self.DisabledMemoryBreakpoint():
return self.on_unload_dll(unload_dll)
def _handle_output_debug_string(self, debug_event):
"""Handle OUTPUT_DEBUG_STRING_EVENT"""
self._update_debugger_state(debug_event)
debug_string = debug_event.u.DebugString
with self.DisabledMemoryBreakpoint():
return self.on_output_debug_string(debug_string)
def _handle_rip(self, debug_event):
"""Handle RIP_EVENT"""
self._update_debugger_state(debug_event)
rip_info = debug_event.u.RipInfo
with self.DisabledMemoryBreakpoint():
return self.on_rip(rip_info)
## Public API
[docs]
def loop(self):
"""Debugging loop: handle event / dispatch to breakpoint. Returns when all targets are dead/detached"""
for debug_event in self._debug_event_generator():
dbg_continue_flag = self._dispatch_debug_event(debug_event)
if dbg_continue_flag is None:
dbg_continue_flag = DBG_CONTINUE
if debug_event.dwDebugEventCode == EXIT_PROCESS_DEBUG_EVENT or not self._killed_in_action():
#if not self._killed_in_action():
# should we always _finish_debug_event even if process was killed ?
# rhaaa _killed_in_action is a REALLY bad name, it's not killed, it's detached
# TODO: FIXME
self._finish_debug_event(debug_event, dbg_continue_flag)
if not self.processes:
break
[docs]
def add_bp(self, bp, addr=None, type=None, target=None):
"""Add a breakpoint, bp can be:
* a :class:`Breakpoint` (addr and type must be ``None``)
* any callable (addr and type must NOT be ``None``) (NON-TESTED)
If the ``bp`` type is ``STANDARD_BP`` or ``MEMORY_BREAKPOINT``, target can be ``None`` (all targets) or a process.
If the ``bp`` type is ``HARDWARE_EXEC_BP``, target can be ``None`` (all targets), a process or a thread.
"""
if getattr(bp, "addr", None) is None:
if addr is None or type is None:
raise ValueError("Breakpoing should have addr attribute or function should receive explicite addr parameter")
bp = ProxyBreakpoint(bp, addr, type)
else:
if addr is not None or type is not None:
raise ValueError("Given <addr|type> by parameters but BP object have them")
del addr
del type
if target is None:
# Need to add it to all other breakpoint
self.add_pending_breakpoint(bp, None)
elif target is not None:
# Check that targets are accepted
if target not in list(self.processes.values()) + list(self.threads.values()):
# if target == self.target: # Original target (that have not been lauched yet)
return self.add_pending_breakpoint(bp, target)
# else:
# raise ValueError("Unknown target {0}".format(target))
return self._setup_breakpoint(bp, target)
[docs]
def del_bp(self, bp, targets=None):
"""Delete a breakpoint, if targets is ``None``: delete it from all targets"""
original_target = targets
_remove_method = getattr(self, "_remove_breakpoint_" + bp.type)
if targets is None:
if bp.type in [STANDARD_BP, MEMORY_BREAKPOINT]: #TODO: better..
targets = self.processes.values()
else:
targets = self.threads.values()
for target in targets:
_remove_method(bp, target)
if original_target is None:
return self.remove_pending_breakpoint(bp, original_target)
[docs]
def single_step(self):
"""Make the ``current_thread`` ``single_step``. ``Debugger.on_single_step`` will be called after that"""
t = self.current_thread
ctx = t.context
ctx.EEFlags.TF = 1
t.set_context(ctx)
## Memory Breakpoint helper
[docs]
def get_memory_breakpoint_at(self, addr, process=None):
"""Get the memory breakpoint that handle ``addr``
Return values are:
* ``False`` if the page has no memory breakpoint (real fault)
* ``None`` if the page as memBP but None handle ``addr``
* ``bp`` the MemBP that handle ``addr``
"""
if process is None:
process = self.current_process
fault_page = (addr >> 12) << 12
if fault_page not in self._watched_pages[process.pid]:
return False
for bp in self._watched_pages[process.pid][fault_page].bps:
if bp._addr <= addr < bp._addr + bp.size:
return bp
return None
[docs]
def disable_all_memory_breakpoints(self, target=None):
"""Restore all pages to their original access rights.
If target is ``None``, use ``current_process``
:return: a mapping of all disabled breakpoints that must be passed to :func:`restore_all_memory_breakpoints`"""
if target is None:
target = self.current_process
res = {}
cp_watch_page = self._watched_pages[target.pid]
page_protection = DWORD()
for page_addr, watched_page in cp_watch_page.items():
try:
target.virtual_protect(page_addr, PAGE_SIZE, watched_page.original_prot, page_protection)
except WindowsError as e:
# Check if page have been unmapped
# print("disable_all_memory_breakpoints failed on page {0:#x} in state {1:#x}".format(page_addr, target.query_memory(page_addr).State))
# if not target.query_memory(page_addr).State == MEM_FREE:
# import pdb;pdb.set_trace()
# raise
# If page have been unmap, warn the concerned Breakpoints.
for bp in watched_page.bps:
# TODO: Document
bp.on_error(self, page_addr)
res[page_addr] = page_protection.value
return res
[docs]
def restore_all_memory_breakpoints(self, data, target=None):
"""Re-setup all memory breakpoints, affecting pages access rights.
If target is ``None``, use ``current_process``
``data`` is the result of the corresponding call to :func:`disable_all_memory_breakpoints`"""
if target is None:
target = self.current_process
for page_addr, protection in data.items():
# Prevent restoring deleted breakpoints
if page_addr in self._watched_pages[target.pid]:
target.virtual_protect(page_addr, PAGE_SIZE, protection, None)
return
[docs]
@contextmanager
def DisabledMemoryBreakpoint(self, target=None):
"""A context-manager that disable all memory breakpoints and restore them on exit"""
data = self.disable_all_memory_breakpoints(target)
try:
yield
finally:
if not self._killed_in_action():
self.restore_all_memory_breakpoints(data, target)
[docs]
def get_exception_bitness(self, exc):
"""Return the bitness in which the exception occured.
Useful when debugingg a 32b process from a 64bits one
:return: :class:`int` -- 32 or 64"""
if windows.current_process.bitness == 32:
return 32
if exc.ExceptionRecord.ExceptionCode in [STATUS_WX86_BREAKPOINT, STATUS_WX86_SINGLE_STEP]:
return 32
return 64
[docs]
@staticmethod
def kill_on_exit(choice):
"""If set to True(default in Windows) will kill all attached process on thread exit.
Otherwise, the thread detaches from all processes being debugged on exit.
See: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-debugsetprocesskillonexit
"""
return windows.winproxy.DebugSetProcessKillOnExit(choice)
# Public callback
[docs]
def on_setup(self):
"""Called on the first breakpoint event occuring in the debugger.
This callback allow to setup hook / interact with the debugee when ready:
- If :func:`on_setup` is overriden by a subclass it will be called and :func:`on_exception` will NOT be called for this event (first BP).
- If :func:`on_setup` is not defined the first BP will trigger an :func:`on_exception`.
.. note::
see sample :ref:`sample_debugger_on_setup`
"""
return None
# Help detect if on_setup was override
on_setup._abstract_on_setup_ = True
[docs]
def on_exception(self, exception):
"""Called on exception event other that known breakpoint or requested single step. ``exception`` is one of the following type:
* :class:`windows.winobject.exception.EEXCEPTION_DEBUG_INFO32`
* :class:`windows.winobject.exception.EEXCEPTION_DEBUG_INFO64`
The default behaviour is to return ``DBG_CONTINUE`` for the known exception code
and ``DBG_EXCEPTION_NOT_HANDLED`` else
"""
dbgprint("Exception: {0} at ".format(exception.ExceptionRecord.ExceptionCode, exception.ExceptionRecord.ExceptionAddress), "DBG")
if not exception.ExceptionRecord.ExceptionCode in winexception.exception_name_by_value:
return DBG_EXCEPTION_NOT_HANDLED
return DBG_CONTINUE
[docs]
def on_single_step(self, exception):
"""Called on requested single step ``exception`` is one of the following type:
* :class:`windows.winobject.exception.EEXCEPTION_DEBUG_INFO32`
* :class:`windows.winobject.exception.EEXCEPTION_DEBUG_INFO64`
There is no default implementation, if you use ``Debugger.single_step()`` you should implement ``on_single_step``
"""
raise NotImplementedError("Debugger that explicitly single step should implement <on_single_step>")
[docs]
def on_create_process(self, create_process):
"""Called on create_process event
:param CREATE_PROCESS_DEBUG_INFO create_process:"""
pass
[docs]
def on_exit_process(self, exit_process):
"""Called on exit_process event
:param EXIT_PROCESS_DEBUG_INFO exit_process:"""
pass
[docs]
def on_create_thread(self, create_thread):
"""Called on create_thread event
:param CREATE_THREAD_DEBUG_INFO create_thread:"""
pass
[docs]
def on_exit_thread(self, exit_thread):
"""Called on exit_thread event
:param EXIT_THREAD_DEBUG_INFO exit_thread:"""
pass
[docs]
def on_load_dll(self, load_dll):
"""Called on load_dll event
:param LOAD_DLL_DEBUG_INFO load_dll:"""
pass
[docs]
def on_unload_dll(self, unload_dll):
"""Called on unload_dll event
:param UNLOAD_DLL_DEBUG_INFO unload_dll:"""
pass
[docs]
def on_output_debug_string(self, debug_string):
"""Called on debug_string event
:param OUTPUT_DEBUG_STRING_INFO debug_string:"""
pass
[docs]
def on_rip(self, rip_info):
"""Called on rip_info event
:param RIP_INFO rip_info:"""
pass