import ctypes
import windows
import windows.hooks as hooks
import windows.utils as utils
from windows.generated_def.winstructs import *
from windows.utils import transform_ctypes_fields
import windows.remotectypes as rctypes
IMAGE_ORDINAL_FLAG32 = 0x80000000
IMAGE_ORDINAL_FLAG64 = 0x8000000000000000
def get_structure_transformer_for_target(target, targetbitness=None):
current_bitness = windows.current_process.bitness
if target is None:
ctypes_structure_transformer = lambda x:x
create_structure_at = lambda structcls, addr: structcls.from_address(addr)
return ctypes_structure_transformer, create_structure_at
if targetbitness is None:
targetbitness = target.bitness
if targetbitness == 32 and current_bitness == 64:
ctypes_structure_transformer = rctypes.transform_type_to_remote32bits
elif targetbitness == 64 and current_bitness == 32:
ctypes_structure_transformer = rctypes.transform_type_to_remote64bits
elif targetbitness == current_bitness:
ctypes_structure_transformer = rctypes.transform_type_to_remote
else:
raise NotImplementedError("Parsing {0} PE from {1} Process".format(targetedbitness, proc_bitness))
def create_structure_at(structcls, addr): # Il reste une closure sur 'target' ici !!
return ctypes_structure_transformer(structcls)(addr, target)
return ctypes_structure_transformer, create_structure_at
def get_pe_bitness(baseaddr, target):
# We can force bitness as the field we access are bitness-independant
pe = GetPEFile(baseaddr, target, force_bitness=32)
machine = pe.get_NT_HEADER().FileHeader.Machine
if machine == 0x14c:
return 32
elif machine == 0x8664:
return 64
else:
raise ValueError("Unknow PE target machine <0x{0:x}>".format(machine))
## == PEPARSE V2 ==
import collections
CtypesStructureTransformers = collections.namedtuple("CtypesStructureTransformers", ["ctypes_structure_transformer", "create_structure_at"])
[docs]
def GetPEFile(baseaddr, target=None, force_bitness=None):
"""Returns a :class:`PEFile` to explore a PE loaded at `baseaddr` in process `target`.
:rtype: :class:`PEFile`
.. note::
If target is ``None`` it refers to the current process
"""
proc_bitness = windows.current_process.bitness
if force_bitness is None:
targetedbitness = get_pe_bitness(baseaddr, target)
else:
targetedbitness = force_bitness
transformers = get_structure_transformer_for_target(target, targetedbitness)
#ctypes_structure_transformer, create_structure_at = transformers
transfor_funcs = CtypesStructureTransformers(*transformers) # TODO: rename
return PEFile(target, baseaddr, targetedbitness, transfor_funcs)
class THUNK_DATA(ctypes.Union):
_fields_ = [
("Ordinal", PVOID),
("AddressOfData", PVOID)
]
# Special case for .NET PE32 rewrite as 64b
# We may have a PE in a 64b process with a 32b IAT
class THUNK_DATA_32(ctypes.Union):
_fields_ = [
("Ordinal", DWORD),
("AddressOfData", DWORD)
]
class IMPORT_BY_NAME(ctypes.Structure):
_fields_ = [
("Hint", WORD),
("Name", BYTE)
]
def get_string(target, addr):
if target is None:
return ctypes.c_char_p(addr).value.decode("latin1")
return target.read_string(addr)
class PESection(IMAGE_SECTION_HEADER):
@property
def name(self):
if self.target is None:
name = get_string(self.target, ctypes.addressof(self.Name))[:8]
else:
name = get_string(self.target, self._base_addr)[:8]
# Decode as UTF-8 as the MS doc say ?
return name
@property
def start(self):
return self.baseaddr + self.VirtualAddress
@property
def size(self):
return self.VirtualSize
def __repr__(self):
return "<PESection \"{0}\">".format(self.name)
@classmethod
def create(cls, pefile, addr):
self = pefile.transformers.create_structure_at(cls, addr)
self.baseaddr = pefile.baseaddr
self.target = pefile.target
return self
class IATPtr(PVOID):
@classmethod
def from_iatentry(cls, iat_entry):
self = cls.from_address(iat_entry.addr)
self.addr = iat_entry.addr
self.nonhookvalue = iat_entry.nonhookvalue
return self
[docs]
class IATEntry(ctypes.Structure):
"""Represent an entry in the IAT of a module
Can be used to get resolved value and setup hook
"""
_fields_ = [
("value", PVOID)]
@classmethod
def create(cls, addr, ord, name, target, transformers):
self = transformers.create_structure_at(cls, addr)
self.addr = addr
self.ord = ord
self.name = name
self.hook = None
self.nonhookvalue = self.value
self.target = target
return self
def __repr__(self):
return '<{0} "{1}" ordinal {2}>'.format(self.__class__.__name__, self.name, self.ord)
[docs]
def set_hook(self, callback, types=None):
"""Setup a hook on the entry and return it.
You MUST keep a reference to the hook while the hook is enabled.
:param callback: the hook
.. note::
see :ref:`hook_protocol`
:rtype: :class:`windows.hooks.IATHook`
.. warning::
This works only for PEFile with the current process as target.
"""
if self.target is not None:
raise NotImplementedError("Setting hook in remote process (use python code injection)")
hook = hooks.IATHook(self, callback, types)
import weakref
self.whook = weakref.ref(hook, self.on_destroy)
self.hook = hook
hook.enable()
return hook
def on_destroy(self, *args):
# We cannot know if the hook was enabled here..
print("DESTROY: {0} -> ".format(args, self.enabled))
# print(args[0]())
[docs]
def remove_hook(self):
"""Remove the hook on the entry"""
if self.hook is None:
return False
self.hook.disable()
self.hook = None
return True
# def __del__(self):
# print(self.hook)
# if self.hook:
# print("LOL BYE {0}".format(self.hook))
class IMAGE_IMPORT_DESCRIPTOR(IMAGE_IMPORT_DESCRIPTOR): # TODO: use explicite name winstructs.IMAGE_IMPORT_DESCRIPTOR
def get_INT(self, pe):
THUNK_DATA_TYPE = THUNK_DATA
if not self.OriginalFirstThunk:
return None
# We may have 32bits PE mapped in 32bits process (thanks to .NET PE)
if self.target is None and pe.bitness != windows.current_process.bitness:
assert windows.current_process.bitness == 64 and pe.bitness == 32, "Mapped 64b PE in current process 32b not handled"
THUNK_DATA_TYPE = THUNK_DATA_32
int_addr = self.OriginalFirstThunk + self.baseaddr
int_entry = self.transformers.create_structure_at(THUNK_DATA_TYPE, int_addr)
res = []
while int_entry.Ordinal:
if int_entry.Ordinal & self.IMAGE_ORDINAL_FLAG:
res += [(int_entry.Ordinal & 0x7fffffff, None)]
else:
import_by_name = self.transformers.create_structure_at(IMPORT_BY_NAME, self.baseaddr + int_entry.AddressOfData)
name_address = self.baseaddr + int_entry.AddressOfData + type(import_by_name).Name.offset
name = get_string(self.target, name_address)
res.append((import_by_name.Hint, name))
int_addr += ctypes.sizeof(type(int_entry))
int_entry = self.transformers.create_structure_at(THUNK_DATA_TYPE, int_addr)
return res
def get_IAT(self, pe):
THUNK_DATA_TYPE = THUNK_DATA
if self.target is None and pe.bitness != windows.current_process.bitness:
assert windows.current_process.bitness == 64 and pe.bitness == 32, "Mapped 64b PE in current process 32b not handled"
THUNK_DATA_TYPE = THUNK_DATA_32
iat_addr = self.FirstThunk + self.baseaddr
iat_entry = self.transformers.create_structure_at(THUNK_DATA_TYPE, iat_addr)
res = []
while iat_entry.Ordinal:
res.append(IATEntry.create(iat_addr, -1, "??", self.target, self.transformers))
iat_addr += ctypes.sizeof(type(iat_entry))
iat_entry = self.transformers.create_structure_at(THUNK_DATA_TYPE, iat_addr)
return res
@classmethod
def create(cls, pefile, addr):
self = pefile.transformers.create_structure_at(cls, addr)
self.baseaddr = pefile.baseaddr
self.transformers = pefile.transformers
self.IMAGE_ORDINAL_FLAG = pefile.IMAGE_ORDINAL_FLAG
self.target = pefile.target
return self
class IMAGE_EXPORT_DIRECTORY(IMAGE_EXPORT_DIRECTORY): # TODO: use explicite name winstructs._IMAGE_EXPORT_DIRECTORY
def get_exports(self):
NameOrdinals = self.transformers.create_structure_at((WORD * self.NumberOfNames), self.AddressOfNameOrdinals + self.baseaddr)
NameOrdinals = list(NameOrdinals)
Functions = self.transformers.create_structure_at((DWORD * self.NumberOfFunctions), self.AddressOfFunctions + self.baseaddr)
Names = self.transformers.create_structure_at((DWORD * self.NumberOfNames), self.AddressOfNames + self.baseaddr)
res = []
for nb, func in enumerate(Functions):
func += self.baseaddr
if nb in NameOrdinals:
name = get_string(self.target, Names[NameOrdinals.index(nb)] + self.baseaddr)
# Export name should be ascii
# Decode from ascii or return bytes ?
# https://docs.microsoft.com/en-us/windows/win32/debug/pe-format#export-address-table
else:
name = None
res.append((nb, func, name))
return res
@classmethod
def create(cls, pefile, addr):
self = pefile.transformers.create_structure_at(cls, addr)
self.transformers = pefile.transformers
self.target = pefile.target
self.baseaddr = pefile.baseaddr
return self
class IMAGE_DOS_HEADER(ctypes.Structure):
_fields_ = [
("e_magic", CHAR * 2),
("e_cblp", WORD),
("e_cp", WORD),
("e_crlc", WORD),
("e_cparhdr", WORD),
("e_minalloc", WORD),
("e_maxalloc", WORD),
("e_ss", WORD),
("e_sp", WORD),
("e_csum", WORD),
("e_ip", WORD),
("e_cs", WORD),
("e_lfarlc", WORD),
("e_ovno", WORD),
("e_res", WORD * 4),
("e_oemid", WORD),
("e_oeminfo", WORD),
("e_res2", WORD * 10),
("e_lfanew", DWORD),
]
[docs]
class PEFile(object):
"""Represent a PE loaded in a process (current or remote)"""
def __init__(self, target, baseaddr, targetedbitness, transformers):
self.target = target
self.baseaddr = baseaddr
self.bitness = targetedbitness
self.transformers = transformers
if targetedbitness == 32:
self.IMAGE_ORDINAL_FLAG = IMAGE_ORDINAL_FLAG32
else:
self.IMAGE_ORDINAL_FLAG = IMAGE_ORDINAL_FLAG64
def get_DOS_HEADER(self):
return self.transformers.create_structure_at(IMAGE_DOS_HEADER, self.baseaddr)
def get_NT_HEADER(self):
offset = self.get_DOS_HEADER().e_lfanew
if self.bitness == 32:
return self.transformers.create_structure_at(IMAGE_NT_HEADERS32, self.baseaddr + offset)
return self.transformers.create_structure_at(IMAGE_NT_HEADERS64, self.baseaddr + offset)
STANDARD_OPTIONAL_HEADER_TYPE_PER_MAGIC = {
IMAGE_NT_OPTIONAL_HDR32_MAGIC: IMAGE_OPTIONAL_HEADER32,
IMAGE_NT_OPTIONAL_HDR64_MAGIC: IMAGE_OPTIONAL_HEADER64,
}
STANDARD_OPTIONAL_HEADER_SIZE_PER_MAGIC = (
(IMAGE_NT_OPTIONAL_HDR32_MAGIC, ctypes.sizeof(IMAGE_OPTIONAL_HEADER32)),
(IMAGE_NT_OPTIONAL_HDR64_MAGIC, ctypes.sizeof(IMAGE_OPTIONAL_HEADER64)),
)
def get_OptionalHeader(self):
# We can have a 32bits PE with a 64 bits OptionalHeader
# Ex : PE32 .NET that allow to be loaded in 64b process
# See: https://github.com/dotnet/runtime/blob/8bbe33819464216becffb7cf8b7ea8dd3bab5836/src/coreclr/src/vm/peimagelayout.cpp#L599
# In this case the OptionalHeader is transformed in 64bits & OptionalHeader.Magic is changed accordingly
# So we cannot just rely on get_NT_HEADER() to give us the correct OptionalHeader type. some re-check are required
default_opth = self.get_NT_HEADER().OptionalHeader
# Cannot juste compare types with type(default_opth) as it may be a remoteType
current_opth_infos = (default_opth.Magic, ctypes.sizeof(default_opth))
if current_opth_infos in self.STANDARD_OPTIONAL_HEADER_SIZE_PER_MAGIC:
# The default OptionalHeader structure match what we expect based on the magic (most of the cases)
return default_opth
# Mismatch -> PE32 remapped as 64b (with OptionalHeader rewrite)
# Remap the correct OptionalHeader
opt_header_real_type = self.STANDARD_OPTIONAL_HEADER_TYPE_PER_MAGIC[default_opth.Magic]
opt_header_addr = default_opth._base_addr if self.target else ctypes.addressof(default_opth)
return self.transformers.create_structure_at(opt_header_real_type, opt_header_addr)
def get_DataDirectory(self):
return self.get_OptionalHeader().DataDirectory
def get_IMPORT_DESCRIPTORS(self):
import_datadir = self.get_DataDirectory()[IMAGE_DIRECTORY_ENTRY_IMPORT]
if import_datadir.VirtualAddress == 0:
return []
import_descriptor_addr = self.baseaddr + import_datadir.VirtualAddress
current_import_descriptor = IMAGE_IMPORT_DESCRIPTOR.create(self, import_descriptor_addr)
res = []
while current_import_descriptor.FirstThunk:
res.append(current_import_descriptor)
import_descriptor_addr += ctypes.sizeof(IMAGE_IMPORT_DESCRIPTOR)
current_import_descriptor = IMAGE_IMPORT_DESCRIPTOR.create(self, import_descriptor_addr)
return res
def get_EXPORT_DIRECTORY(self):
export_directory_rva = self.get_DataDirectory()[IMAGE_DIRECTORY_ENTRY_EXPORT].VirtualAddress
if export_directory_rva == 0:
return None
export_directory_addr = self.baseaddr + export_directory_rva
exp_dir = IMAGE_EXPORT_DIRECTORY.create(self, export_directory_addr)
return exp_dir
@utils.fixedpropety
def sections(self):
nt_header = self.get_NT_HEADER()
nb_section = nt_header.FileHeader.NumberOfSections
SizeOfOptionalHeader = self.get_NT_HEADER().FileHeader.SizeOfOptionalHeader
if self.target is None:
opt_header_addr = ctypes.addressof(self.get_NT_HEADER().OptionalHeader)
else:
opt_header_addr = self.get_NT_HEADER().OptionalHeader._base_addr
base_section = opt_header_addr + SizeOfOptionalHeader
#pe_section_type = IMAGE_SECTION_HEADER
return [PESection.create(self, base_section + (sizeof(IMAGE_SECTION_HEADER) * i)) for i in range(nb_section)]
#sections_array = self.transformers.create_structure_at((self.PESection * nb_section), base_section)
#return list(sections_array)
@utils.fixedpropety
def exports(self):
"""The exports of the PE in a dict. Keys are ordinal (:class:`int`) and name (:class:`str`).
The values are the addresses of the exports.
:type: {(:class:`int` or :class:`str`) : :class:`int`}"""
res = {}
exp_dir = self.get_EXPORT_DIRECTORY()
export_datadir = self.get_DataDirectory()[IMAGE_DIRECTORY_ENTRY_EXPORT]
export_start = self.baseaddr + export_datadir.VirtualAddress
export_end = export_start + export_datadir.Size
if exp_dir is None:
return res
raw_exports = exp_dir.get_exports()
for id, rva_addr, rva_name in raw_exports:
if export_start <= rva_addr < export_end:
# Export proxy...
# Contains the string to another Dll.Function
rva_addr = get_string(self.target, rva_addr) # Put the string proxy instead
res[id] = rva_addr
if rva_name is not None:
res[rva_name] = rva_addr
return res
@utils.fixedpropety
def export_name(self):
"""The Name attribute of the ``EXPORT_DIRECTORY``"""
exp_dir = self.get_EXPORT_DIRECTORY()
if exp_dir is None:
return None
if not exp_dir.Name:
return None
return get_string(self.target, self.baseaddr + exp_dir.Name)
# TODO: get imports by parsing other modules exports if no INT
@utils.fixedpropety
def imports(self):
"""The imports of the PE in a dict.
Keys are the names of DLL to import from and values are :class:`list`
of :class:`IATEntry`
:type: {:class:`str` : [:class:`IATEntry`]}"""
res = {}
for import_descriptor in self.get_IMPORT_DESCRIPTORS():
INT = import_descriptor.get_INT(self)
IAT = import_descriptor.get_IAT(self)
if INT is not None:
for iat_entry, (ord, name) in zip(IAT, INT):
# str(name.decode()) -> python2 and python3 compatible for str result
iat_entry.ord = ord
iat_entry.name = str(name) if name else ""
name = get_string(self.target, self.baseaddr + import_descriptor.Name)
res.setdefault(name.lower(), []).extend(IAT)
return res