import ctypes
import os.path
import xml.dom.minidom
from contextlib import contextmanager
import windows
import windows.generated_def as gdef
from windows import winproxy
from windows.pycompat import int_types, basestring
# Helpers
@contextmanager
def ClosingEvtHandle(handle):
try:
yield handle
finally:
winproxy.EvtClose(handle)
# low-level api helpers
def queryinfo(handle, propertyid):
size = 0x1000
buffer = ctypes.create_string_buffer(size)
evt = ImprovedEVT_VARIANT.from_buffer(buffer)
res = gdef.DWORD()
windows.winproxy.EvtGetLogInfo(handle, propertyid, size, evt, res)
return evt
def arrayproperty(handle, property, index, flags=0):
size = 0x1000
buffer = ctypes.create_string_buffer(size)
evt = ImprovedEVT_VARIANT.from_buffer(buffer)
res = gdef.DWORD()
windows.winproxy.EvtGetObjectArrayProperty(handle, property, index, flags, size, evt, res)
return evt
def generate_query_function(query_function):
def generated_query_function(handle, propertyid, flags=0):
size = 0x10000
buffer = ctypes.create_string_buffer(size)
evt = ImprovedEVT_VARIANT.from_buffer(buffer)
res = gdef.DWORD()
query_function(handle, propertyid, flags, size, evt, res)
return evt
return generated_query_function
chaninfo = generate_query_function(winproxy.EvtGetChannelConfigProperty)
eventinfo = generate_query_function(winproxy.EvtGetEventMetadataProperty)
publishinfo = generate_query_function(winproxy.EvtGetPublisherMetadataProperty)
class EvtHandle(gdef.EVT_HANDLE):
# Class attribute function
# Will pass (self) as first parameter (binding)
# No need to pass any param to close ourself :)
_close_function = windows.winproxy.EvtClose
def __del__(self):
if not bool(self):
return
self._close_function()
# Class high-level API
[docs]
class EvtQuery(EvtHandle):
"""Represent an Event-log query"""
DEFAULT_TIMEOUT = 0x1000
def __init__(self, handle=0, channel=None, timeout=None):
super(EvtQuery, self).__init__(handle)
self.channel = channel
self.timeout = timeout if timeout is not None else self.DEFAULT_TIMEOUT
def __next__(self):
"""Return the next :class:`EvtEvent` matching the query"""
try:
event = EvtEvent(channel=self.channel)
ret = gdef.DWORD()
windows.winproxy.EvtNext(self, 1, event, self.timeout, 0, ret)
except WindowsError as e:
if e.winerror == gdef.ERROR_NO_MORE_ITEMS:
raise StopIteration
raise
assert ret.value == 1
return event
def __iter__(self):
return self
[docs]
def seek(self, position, seek_flags=None):
"""Seek to ``position``.
``seek_flags`` can be one of:
* ``None``
* ``EvtSeekRelativeToFirst``
* ``EvtSeekRelativeToLast``
* ``EvtSeekRelativeToBookmark``
If ``seek_flags`` is None:
* ``position >= 0`` will use ``EvtSeekRelativeToFirst``
* ``position < 0`` will use ``EvtSeekRelativeToLast`` and with ``position+1``
* This allow retrieve the ``position`` lasts events
"""
if seek_flags is None:
if position >= 0:
seek_flags = gdef.EvtSeekRelativeToFirst
else:
# -1 + EvtSeekRelativeToLast will give us the last 2 events
# So passing (-1, None) will give us the last event only
# If user do not want this calcul it can directly pass seek_flags
seek_flags = gdef.EvtSeekRelativeToLast
position += 1
windows.winproxy.EvtSeek(self, position, 0, 0, seek_flags)
next = __next__ # Yep.. real name is 'next' in Py2 :D
[docs]
def all(self): # SqlAlchemy like :)
"""Return a list with all the query results
:rtype: [:class:`EvtEvent`] -- A list of Event
"""
return list(self)
[docs]
def first(self): # SqlAlchemy like :) -> allow testing in interactive console
"""Return the first query result
:rtype: :class:`EvtEvent` -- An Event
"""
return next(iter(self))
[docs]
class EvtEvent(EvtHandle):
"""An Event log"""
def __init__(self, handle=0, channel=None):
super(EvtEvent, self).__init__(handle)
self.channel = channel
def render(self, ctx, rtype):
size = 0x10000
buffer = ctypes.c_buffer(size)
rsize = gdef.DWORD()
elementnb = gdef.DWORD()
try:
windows.winproxy.EvtRender(ctx, self, rtype, size, buffer, rsize, elementnb)
except WindowsError as e:
if e.winerror != gdef.ERROR_INSUFFICIENT_BUFFER:
raise
size = rsize.value
buffer = ctypes.c_buffer(size)
windows.winproxy.EvtRender(ctx, self, rtype, size, buffer, rsize, elementnb)
# Adapting return value type
if rtype != gdef.EvtRenderEventValues:
# import pdb;pdb.set_trace()
# assert elementnb.value == 1
return buffer[:rsize.value]
# print("Got <{0}> elt".format(elementnb.value))
return list((ImprovedEVT_VARIANT * elementnb.value).from_buffer(buffer))
def render_xml(self):
xml = self.render(None, 1).decode("utf-16")
assert xml[-1] == "\x00"
return xml[:-1]
[docs]
def value(self, name, **kwargs):
"""Retrieve a value from the event.
``name`` is an XPath expressions that uniquely identify a node or attribute in the event.
(see https://msdn.microsoft.com/en-us/library/windows/desktop/aa385352(v=vs.85).aspx)
"""
values = self.get_values((name,), **kwargs)
assert len(values) == 1
return values[0]
def get_values(self, values, flags=gdef.EvtRenderContextValues):
nbelt = len(values)
pwstr_values = tuple(gdef.LPWSTR(v) for v in values)
pwstr_rarray = (gdef.LPWSTR * nbelt)(*pwstr_values)
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa385352(v=vs.85).aspx
# An array of XPath expressions that uniquely identify a node or attribute in the event that you want to render.
# Each value wil return 1 node :)
ctx = windows.winproxy.EvtCreateRenderContext(nbelt, pwstr_rarray, gdef.EvtRenderContextValues)
result = self.render(ctx, gdef.EvtRenderEventValues)
windows.winproxy.EvtClose(ctx)
return [r.value for r in result]
def system_values(self): # POC: use this for all @property based on system data ?
ctx = windows.winproxy.EvtCreateRenderContext(0, None, gdef.EvtRenderContextSystem)
result = self.render(ctx, gdef.EvtRenderEventValues)
windows.winproxy.EvtClose(ctx)
return [r.value for r in result]
[docs]
def event_values(self):
"""The values of the event in a list"""
ctx = windows.winproxy.EvtCreateRenderContext(0, None, gdef.EvtRenderContextUser)
result = self.render(ctx, gdef.EvtRenderEventValues)
windows.winproxy.EvtClose(ctx)
return [r.value for r in result]
def get_raw_values(self, values, flags=gdef.EvtRenderContextValues):
nbelt = len(values)
pwstr_values = tuple(gdef.LPWSTR(v) for v in values)
pwstr_rarray = (gdef.LPWSTR * nbelt)(*pwstr_values)
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa385352(v=vs.85).aspx
# An array of XPath expressions that uniquely identify a node or attribute in the event that you want to render.
# Each value will return 1 node :)
ctx = windows.winproxy.EvtCreateRenderContext(nbelt, pwstr_rarray, gdef.EvtRenderContextValues)
result = self.render(ctx, gdef.EvtRenderEventValues)
windows.winproxy.EvtClose(ctx)
return list(result)
# Properties arround common Event/System values
@property
def provider(self):
"""The provider of the event"""
return self.system_values()[gdef.EvtSystemProviderName]
@property
def computer(self):
"""The computer that generated the event"""
return self.system_values()[gdef.EvtSystemComputer]
@property
def id(self):
"""The ID of the Event"""
return self.value("Event/System/EventID")
@property
def version(self):
"""The version of the Event"""
return self.value("Event/System/Version")
@property
def level(self):
"""The level of the Event"""
return self.value("Event/System/Level")
@property
def opcode(self):
"""The opcode of the Event"""
return self.value("Event/System/Opcode")
@property
def time_created(self):
"""The creation time of the Event"""
return self.value("Event/System/TimeCreated/@SystemTime")
@property
def pid(self):
"""The process ID of the Event"""
return self.value("Event/System/Execution/@ProcessID")
@property
def tid(self):
"""The process ID of the Event"""
return self.value("Event/System/Execution/@ThreadID")
@property
def error_payload(self):
raw = self.value("Event/ProcessingErrorData/EventPayload")
return bytearray(raw) if raw is not None else None
@property
def user(self):
"""The User ID associated with the Event"""
return self.system_values()[gdef.EvtSystemUserID]
@property
def metadata(self):
"""The medata for the current Event
:type: :class:`EventMetadata`
"""
try:
return self.channel.get_event_metadata(self.id)
except KeyError as e:
if not self.channel.config.classic:
raise
# id not found: try via the Provider in the event (classic channel)
return self.channel.get_classic_event_metadata(self.id, self.provider)
# Test
@property
def data(self): # user/event specifique data
"""A dict of EventData Name:Value for the current dict.
:type: :class:`dict`
"""
# What about classic channels where there is no event_metadata ?
# Return a dict with [0-1-2-3-4] as key ? raise ?
# Juste use the render_xml ?
event_data_name = (i["name"] for i in self.metadata.event_data if i["type"] == "data")
return {k:v for k,v in zip(event_data_name, self.event_values())}
def xml_data(self):
xmlevt = xml.dom.minidom.parseString(self.render_xml())
res = {}
eventdata = xmlevt.getElementsByTagName("EventData")
if eventdata:
# <Data Name='FIELD_NAME'>FIELD_VALUE</Data>
for i, datanode in enumerate(xmlevt.getElementsByTagName("Data")):
name = datanode.getAttribute("Name")
if not name:
# Some Data in old EVTX have no name (Windows Powershell)
# Do the best we can by using the position of the event
name = str(i)
if datanode.hasChildNodes():
value = datanode.firstChild.nodeValue
else:
value = ""
if not (name not in res):
import pdb;pdb.set_trace()
res[name] = value
userdata = xmlevt.getElementsByTagName("UserData")
if userdata:
# <UserData>
# <EventXML xmlns="Event_NS">
# <FIELD_NAME>FIELD_VALUE</FIELD_NAME>
# </EventXML>
# </UserData>
for datanode in userdata[0].firstChild.childNodes:
name = datanode.tagName
if datanode.hasChildNodes():
value = datanode.firstChild.nodeValue
else:
value = ""
assert name not in res
res[name] = value
return res
@property
def date(self):
"""``Event.time_created`` as a :class:``datetime``"""
return windows.utils.datetime_from_filetime(self.time_created)
def __repr__(self):
creation_time = windows.utils.datetime_from_filetime(self.time_created)
return '<{0} id="{self.id}" time="{creation_time}">'.format(type(self).__name__, self=self, creation_time=creation_time)
class ImprovedEVT_VARIANT(gdef.EVT_VARIANT):
VALUE_MAPPER = {
gdef.EvtVarTypeNull : 'NoneValue',
gdef.EvtVarTypeString : 'StringVal',
gdef.EvtVarTypeAnsiString : 'AnsiStringVal',
gdef.EvtVarTypeSByte : 'SByteVal',
gdef.EvtVarTypeByte : 'ByteVal',
gdef.EvtVarTypeInt16 : 'Int16Val',
gdef.EvtVarTypeUInt16 : 'UInt16Val',
gdef.EvtVarTypeInt32 : 'Int32Val',
gdef.EvtVarTypeUInt32 : 'UInt32Val',
gdef.EvtVarTypeInt64 : 'Int64Val',
gdef.EvtVarTypeUInt64 : 'UInt64Val',
gdef.EvtVarTypeSingle : 'SingleVal',
gdef.EvtVarTypeDouble : 'DoubleVal',
gdef.EvtVarTypeBoolean : 'BooleanVal',
gdef.EvtVarTypeBinary : 'BinaryVal',
gdef.EvtVarTypeGuid : 'GuidVal',
gdef.EvtVarTypeSizeT : 'SizeTVal',
gdef.EvtVarTypeFileTime : 'FileTimeVal',
gdef.EvtVarTypeSysTime : 'SysTimeVal',
gdef.EvtVarTypeSid : 'SidVal',
gdef.EvtVarTypeHexInt32 : 'UInt32Val',
gdef.EvtVarTypeHexInt64 : 'UInt64Val',
gdef.EvtVarTypeEvtHandle : 'EvtHandleVal',
gdef.EvtVarTypeEvtXml : 'XmlVal',
# Array types: TODO: generic stuff
gdef.EvtVarTypeString + gdef.EVT_VARIANT_TYPE_ARRAY : "StringArr",
gdef.EvtVarTypeUInt16 + gdef.EVT_VARIANT_TYPE_ARRAY : "UInt16Arr",
gdef.EvtVarTypeUInt32 + gdef.EVT_VARIANT_TYPE_ARRAY : "UInt32Arr",
gdef.EvtVarTypeUInt64 + gdef.EVT_VARIANT_TYPE_ARRAY : "UInt64Arr",
}
NoneValue = None
@property
def Type(self):
raw_type = super(ImprovedEVT_VARIANT, self).Type
return gdef.EVT_VARIANT_TYPE.mapper.get(raw_type, raw_type)
@property
def value(self): # Prototype !!
attrname = self.VALUE_MAPPER[self.Type]
# print("Resolve type <{0}> -> {1}".format(self.Type, attrname))
v = getattr(self, attrname)
if self.Type == gdef.EvtVarTypeBinary:
v = v[:self.Count] # No need for a raw UBYTE ptr
elif self.Type == gdef.EvtVarTypeGuid:
v = v[0] # Deref LP_GUID
elif self.Type & gdef.EVT_VARIANT_TYPE_ARRAY:
# TODO: handle all array type
v = v[:self.Count]
return v
@classmethod
def from_value(cls, value, vtype=None):
if vtype is None:
# Guess type
if isinstance(value, int_types):
vtype = gdef.EvtVarTypeUInt64
elif isinstance(value, basestring):
vtype = gdef.EvtVarTypeString
elif isinstance(value, bytes):
# not basestring and bytes -> py3 bytes
vtype = gdef.EvtVarTypeBinary
value = windows.utils.BUFFER(gdef.BYTE).from_buffer_copy(value)
else:
raise NotImplementedError("LATER")
self = cls()
# import pdb;pdb.set_trace()
# Yolo test :)
super(ImprovedEVT_VARIANT, ImprovedEVT_VARIANT).Type.__set__(self, vtype)
# super(ImprovedEVT_VARIANT, self).Type = vtype
attrname = self.VALUE_MAPPER[self.Type]
setattr(self, attrname, value)
if self.Type in (gdef.EvtVarTypeBinary, gdef.EvtVarTypeString):
self.Count = len(value)
return self
def __repr__(self):
return "<{0} of type={1}>".format(type(self).__name__, self.Type)
[docs]
class EvtChannel(object):
"""An Event Log channel"""
DEFAULT_QUERY_FLAGS = gdef.EvtQueryChannelPath + gdef.EvtQueryForwardDirection
def __init__(self, name):
self.name = name
self.event_metadata_by_id = {}
self.classic_event_metadata_by_id = {} # For classic only
[docs]
def query(self, filter=None, ids=None, timeout=None):
"""Query the event with the ``ids`` or perform a query with the raw query ``filter``
Both parameters are mutually exclusive.
.. note:: Here are some query examples
List all events with a event data attribute named 'RuleName':
``Event/EventData/Data[@Name='RuleName']``
List all events with a event data value of 'C:\\\\WINDOWS\\\\System32\\\\svchost.exe':
``Event/EventData[Data='C:\\WINDOWS\\System32\\svchost.exe']``
List all events with an EventID of 2006:
``Event/System[EventID=2006]``
List all event with a given EventID while searching for a specific field value (Sysmon for the test here)
``Event/System[EventID=3] and Event/EventData/Data[@Name='DestinationIp'] and Event/EventData[Data='10.0.0.2']``
List all events with a given provider of Microsoft-Windows-TaskScheduler:
``Event/System/Provider[@Name='Microsoft-Windows-TaskScheduler']``
:rtype: :class:`EvtQuery`
"""
if ids and filter:
raise ValueError("<ids> and <filter> are mutually exclusive")
if ids is not None:
if isinstance(ids, int_types):
ids = (ids,)
ids_filter = " or ".join("EventID={0}".format(id) for id in ids)
filter = "Event/System[{0}]".format(ids_filter)
query_handle = winproxy.EvtQuery(None, self.name, filter, self.DEFAULT_QUERY_FLAGS)
return EvtQuery(query_handle, self, timeout=timeout)
@property
def events(self):
"""The list of all events in the channels, an alias for ``channel.query().all()``
:type: [:class:`EvtEvent`] -- A list of :class:`EvtEvent`
"""
return self.query().all()
@property
def config(self):
"""The configuration of the channel
:type: :class:`ChannelConfig`
"""
return ChannelConfig.from_channel_name(self.name)
def get_classic_event_metadata(self, id, providername):
if providername not in self.classic_event_metadata_by_id:
# print("CALCUL FOR PROVIDER: <{0}> !!!!!!!!!!!!!!".format(providername))
publisher = EvtPublisher(providername)
events_metadata = {x.id: x for x in publisher.metadata.events_metadata}
self.classic_event_metadata_by_id[providername] = events_metadata
return self.classic_event_metadata_by_id[providername][id]
def __repr__(self):
return '<{0} "{1}">'.format(type(self).__name__, self.name)
[docs]
class EvtFile(EvtChannel):
"""Represent an Evtx file"""
DEFAULT_QUERY_FLAGS = gdef.EvtQueryFilePath + gdef.EvtQueryForwardDirection
@property
def config(self):
"""Not implemented for EvtFile
:raise: :class:`NotImplementedError`
"""
raise NotImplementedError("Cannot retrieve the configuration of an EvtFile")
[docs]
class ChannelConfig(EvtHandle):
"""The configuration of a event channel"""
# TODO: https://learn.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtsetchannelconfigproperty
def __init__(self, handle, name=None):
super(ChannelConfig, self).__init__(handle)
self.name = name
[docs]
@classmethod
def from_channel_name(cls, name):
"""Return the :class:`ChannelConfig` for the channel ``name``"""
return cls(winproxy.EvtOpenChannelConfig(None, name, 0), name)
@property
def publisher(self):
"""The :class:`EvtPublisher` for the channel"""
return EvtPublisher(chaninfo(self, gdef.EvtChannelConfigOwningPublisher).value)
[docs]
def publishers(self):
""" The list of Publisher publishing on the channel. seems pertinent for Legacy channel like "System"
:type: [:class:`EvtPublisher`] - A list of :class:`EvtPublisher`
"""
return [EvtPublisher(pub) for pub in chaninfo(self, gdef.EvtChannelPublisherList).value]
@property
def keywords(self):
return int(chaninfo(self, gdef.EvtChannelPublishingConfigKeywords).value)
@property
def enabled(self):
return bool(chaninfo(self, gdef.EvtChannelConfigEnabled).value)
@property
def classic(self):
"""``True`` if the channel is a classic event channel (for example the Application or System log)"""
return bool(chaninfo(self, gdef.EvtChannelConfigClassicEventlog).value)
def __repr__(self):
return '<{0} "{1}">'.format(type(self).__name__, self.name)
class PublisherMetadataChannel(object):
"""Represent a PublisherMetadataChannel (see https://docs.microsoft.com/en-us/windows/win32/api/winevt/ne-winevt-evt_publisher_metadata_property_id)"""
def __init__(self, pub_metadata, channel_id):
super(PublisherMetadataChannel, self).__init__()
self.pub_metadata = pub_metadata
self._id = channel_id
def _query_channel_metadata_property(self, propertyid):
return self.pub_metadata.chanrefs.property(propertyid, self._id)
@property
def flags(self):
"""The flags of the ``PublisherMetadataChannel``"""
return int(self._query_channel_metadata_property(gdef.EvtPublisherMetadataChannelReferenceFlags))
@property
def name(self):
"""The name of the ``PublisherMetadataChannel``"""
return str(self._query_channel_metadata_property(gdef.EvtPublisherMetadataChannelReferencePath))
@property
def id(self):
"""The reference id of the ``PublisherMetadataChannel``"""
return int(self._query_channel_metadata_property(gdef.EvtPublisherMetadataChannelReferenceID))
@property
def index(self):
"""The reference index of the ``PublisherMetadataChannel``"""
return int(self._query_channel_metadata_property(gdef.EvtPublisherMetadataChannelReferenceIndex))
@property
def message_id(self):
"""The message id of the ``PublisherMetadataChannel``"""
return int(self._query_channel_metadata_property(gdef.EvtPublisherMetadataChannelReferenceMessageID))
class PublisherMetadataLevel(object):
"""Represent a PublisherMetadataLevel (see https://docs.microsoft.com/en-us/windows/win32/api/winevt/ne-winevt-evt_publisher_metadata_property_id)"""
def __init__(self, pub_metadata, channel_id):
super(PublisherMetadataLevel, self).__init__()
self.pub_metadata = pub_metadata
self._id = channel_id
def _query_level_metadata_property(self, propertyid):
return self.pub_metadata.levelrefs.property(propertyid, self._id)
@property
def name(self):
return str(self._query_level_metadata_property(gdef.EvtPublisherMetadataLevelName))
@property
def value(self):
return int(self._query_level_metadata_property(gdef.EvtPublisherMetadataLevelValue))
@property
def message_id(self):
return int(self._query_level_metadata_property(gdef.EvtPublisherMetadataLevelMessageID))
class PublisherMetadataOpcode(object):
"""Represent a PublisherMetadataOpcode (see https://docs.microsoft.com/en-us/windows/win32/api/winevt/ne-winevt-evt_publisher_metadata_property_id)"""
def __init__(self, pub_metadata, channel_id):
super(PublisherMetadataOpcode, self).__init__()
self.pub_metadata = pub_metadata
self._id = channel_id
def _query_opcode_metadata_property(self, propertyid):
return self.pub_metadata.opcoderefs.property(propertyid, self._id)
@property
def name(self):
"""The name of the ``PublisherMetadataOpcode``"""
return str(self._query_opcode_metadata_property(gdef.EvtPublisherMetadataOpcodeName))
@property
def value(self):
"""The opcode value of the ``PublisherMetadataOpcode``"""
return int(self._query_opcode_metadata_property(gdef.EvtPublisherMetadataOpcodeValue))
@property
def message_id(self):
"""The message id of the ``PublisherMetadataOpcode``"""
return int(self._query_opcode_metadata_property(gdef.EvtPublisherMetadataOpcodeMessageID))
class PublisherMetadataKeyword(object):
"""Represent a PublisherMetadataKeyword (see https://docs.microsoft.com/en-us/windows/win32/api/winevt/ne-winevt-evt_publisher_metadata_property_id)"""
def __init__(self, pub_metadata, channel_id):
super(PublisherMetadataKeyword, self).__init__()
self.pub_metadata = pub_metadata
self._id = channel_id
def _query_keyword_metadata_property(self, propertyid):
return self.pub_metadata.keywordrefs.property(propertyid, self._id)
@property
def name(self):
"""The name of the ``PublisherMetadataKeyword``"""
return str(self._query_keyword_metadata_property(gdef.EvtPublisherMetadataKeywordName))
@property
def value(self):
"""The value of the ``PublisherMetadataKeyword``"""
return int(self._query_keyword_metadata_property(gdef.EvtPublisherMetadataKeywordValue))
@property
def message_id(self):
"""The message id of the ``PublisherMetadataKeyword``"""
return int(self._query_keyword_metadata_property(gdef.EvtPublisherMetadataKeywordMessageID))
class PublisherMetadataTask(object):
"""Represent a PublisherMetadataTask (see https://docs.microsoft.com/en-us/windows/win32/api/winevt/ne-winevt-evt_publisher_metadata_property_id)"""
def __init__(self, pub_metadata, channel_id):
super(PublisherMetadataTask, self).__init__()
self.pub_metadata = pub_metadata
self._id = channel_id
def _query_keyword_metadata_property(self, propertyid):
return self.pub_metadata.taskrefs.property(propertyid, self._id)
@property
def name(self):
"""The name of the ``PublisherMetadataTask``"""
return str(self._query_keyword_metadata_property(gdef.EvtPublisherMetadataTaskName))
@property
def value(self):
"""The value of the ``PublisherMetadataTask``"""
return int(self._query_keyword_metadata_property(gdef.EvtPublisherMetadataTaskValue))
@property
def event_guid(self):
"""The event GUId of the ``PublisherMetadataTask``"""
return self._query_keyword_metadata_property(gdef.EvtPublisherMetadataTaskEventGuid)
@property
def message_id(self):
"""The message ID GUId of the ``PublisherMetadataTask``"""
return int(self._query_keyword_metadata_property(gdef.EvtPublisherMetadataTaskMessageID))
[docs]
class EvtPublisher(object):
"""An Event provider"""
def __init__(self, name):
self.name = name
@property
def metadata(self):
"""Return the metadata for this publisher
:type: :class:`PublisherMetadata`
"""
return PublisherMetadata.from_publisher_name(self.name)
def __repr__(self):
return '<{0} "{1}">'.format(type(self).__name__, self.name)
[docs]
class PropertyArray(gdef.EVT_OBJECT_ARRAY_PROPERTY_HANDLE):
"TODO"
@property
def size(self):
array_size = gdef.DWORD()
windows.winproxy.EvtGetObjectArraySize(self, array_size)
return array_size.value
def property(self, type, index):
return arrayproperty(self, type, index).value
[docs]
class EvtlogManager(object):
"""The main Evt class to open Evt channel/publisher and evtx file"""
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa385784(v=vs.85).aspx
[docs]
def is_implemented(self):
"""Return ``True`` if the new Evt-API is implemented on the current computer
see: https://msdn.microsoft.com/en-us/library/windows/desktop/aa385784(v=vs.85).aspx
"""
return windows.winproxy.is_implemented(windows.winproxy.EvtQuery)
@property
def channels(self):
h = windows.winproxy.EvtOpenChannelEnum(None, 0)
size = 0x1000
buffer = ctypes.create_unicode_buffer(size)
ressize = gdef.DWORD()
with ClosingEvtHandle(h):
while True:
try:
windows.winproxy.EvtNextChannelPath(h, size, buffer, ressize)
except WindowsError as e:
if e.winerror != gdef.ERROR_NO_MORE_ITEMS:
raise
return
assert buffer[ressize.value - 1] == "\x00"
name = buffer[:ressize.value - 1]
chan = EvtChannel(name)
yield chan
@property
def publishers(self):
h = windows.winproxy.EvtOpenPublisherEnum(None, 0)
size = 0x1000
buffer = ctypes.create_unicode_buffer(size)
ressize = gdef.DWORD()
with ClosingEvtHandle(h):
while True:
try:
windows.winproxy.EvtNextPublisherId(h, size, buffer, ressize)
except WindowsError as e:
if e.winerror != gdef.ERROR_NO_MORE_ITEMS:
raise
return
assert buffer[ressize.value - 1] == "\x00"
name = buffer[:ressize.value - 1]
publisher = EvtPublisher(name)
yield publisher
[docs]
def open_channel(self, name):
"""Open the Evt channel with ``name``
:rtype: :class:`EvtChannel`
"""
chan = EvtChannel(name)
chan.config # Force to retrieve a handle (check channel exists)
return chan
[docs]
def open_evtx_file(self, filename):
"""Open the evtx file with ``filename``
:rtype: :class:`EvtFile`
"""
with windows.utils.DisableWow64FsRedirection():
if not os.path.exists(filename):
raise WindowsError(gdef.ERROR_FILE_NOT_FOUND, "Could not find file <{0}>".format(filename))
file = EvtFile(filename)
return file
[docs]
def open_publisher(self, name):
"""Open the Evt publisher with ``name``
:rtype: :class:`EvtPublisher`
"""
publisher = EvtPublisher(name)
publisher.metadata # Force to retrieve a handle (check channel exists)
return publisher
[docs]
def __getitem__(self, name):
"""Open the Evt Channel/Publisher or Evtx file with ``name``
:rtype: :class:`EvtChannel` or :class:`EvtPublisher` or :class:`EvtFile`
"""
try:
return self.open_channel(name)
except WindowsError as e:
if e.winerror == gdef.ERROR_ACCESS_DENIED:
raise
try:
return self.open_publisher(name)
except WindowsError as e:
if e.winerror == gdef.ERROR_ACCESS_DENIED:
raise
# Raise FILE_NOT_FOUND if not found (last chance)
return self.open_evtx_file(name)