import ctypes
import windows
from collections import namedtuple
from contextlib import contextmanager
from windows import utils
from windows import winproxy
from windows.pycompat import int_types
import windows.generated_def as gdef
from windows.generated_def import *
from windows import security
from windows.pycompat import basestring, urepr_encode
"""
``type`` might be one of:
* ``SERVICE_KERNEL_DRIVER(0x1L)``
* ``SERVICE_FILE_SYSTEM_DRIVER(0x2L)``
* ``SERVICE_WIN32_OWN_PROCESS(0x10L)``
* ``SERVICE_WIN32_SHARE_PROCESS(0x20L)``
* ``SERVICE_INTERACTIVE_PROCESS(0x100L)``
``state`` might be one of:
* ``SERVICE_STOPPED(0x1L)``
* ``SERVICE_START_PENDING(0x2L)``
* ``SERVICE_STOP_PENDING(0x3L)``
* ``SERVICE_RUNNING(0x4L)``
* ``SERVICE_CONTINUE_PENDING(0x5L)``
* ``SERVICE_PAUSE_PENDING(0x6L)``
* ``SERVICE_PAUSED(0x7L)``
``flags`` might be one of:
* ``0``
* ``SERVICE_RUNS_IN_SYSTEM_PROCESS(0x1L)``
"""
[docs]
class ServiceManager(utils.AutoHandle):
"""An object to query, list and explore services"""
_close_function = staticmethod(winproxy.CloseServiceHandle)
def __init__(self):
self.enum_flags = None #: Lazy computed at first enum
def _get_enum_service_type_flags(self):
if self.enum_flags is not None:
return self.enum_flags
if windows.system.version[0] < 10:
# Old value of SERVICE_TYPE_ALL
flags = (gdef.SERVICE_WIN32 |
gdef.SERVICE_ADAPTER |
gdef.SERVICE_DRIVER |
gdef.SERVICE_INTERACTIVE_PROCESS)
else:
flags = (SERVICE_WIN32 |
SERVICE_ADAPTER |
SERVICE_DRIVER |
SERVICE_INTERACTIVE_PROCESS |
SERVICE_USER_SERVICE |
SERVICE_USERSERVICE_INSTANCE)
if windows.system.build_number >= 14393:
# This flag was introduced in 14393 (Thank ProcessHacker & google)
flags |= gdef.SERVICE_PKG_SERVICE
self.enum_flags = flags
return flags
def _get_handle(self):
return windows.winproxy.OpenSCManagerW(dwDesiredAccess=gdef.MAXIMUM_ALLOWED)
def open_service(self, name, access=gdef.MAXIMUM_ALLOWED):
return windows.winproxy.OpenServiceW(self.handle, name, access) # Check service exists :)
[docs]
def get_service(self, key, access=gdef.MAXIMUM_ALLOWED):
"""Get a service by its name/index or a list of services via a slice
:return: :class:`Service` or [:class:`Service`] -- A :class:`Service` or list of :class:`Service`
"""
if isinstance(key, int_types):
return self.enumerate_services()[key]
if isinstance(key, slice):
# Get service list
servlist = self.enumerate_services()
# Extract indexes matching the slice
indexes = key.indices(len(servlist))
return [servlist[idx] for idx in range(*indexes)]
# Retrieve service by its name
handle = self.open_service(key, access)
return Service(name=key, handle=handle)
__getitem__ = get_service
"""Get a service by its name/index or a list of services via a slice
:return: :class:`Service` or [:class:`Service`] -- A :class:`Service` or list of :class:`Service`
"""
def get_service_display_name(self, name):
# This API is strange..
# Why can't we retrieve the display name for a service handle ?
BUFFER_SIZE = 0x1000
result = (WCHAR * BUFFER_SIZE)()
size_needed = gdef.DWORD(BUFFER_SIZE)
windows.winproxy.GetServiceDisplayNameW(self.handle, name, result, size_needed)
return result.value
def _enumerate_services_generator(self, flags=None):
"""The generator code behind __iter__.
Allow to iter over the services on the system
"""
size_needed = gdef.DWORD()
nb_services = gdef.DWORD()
counter = gdef.DWORD()
if flags is None:
flags = self._get_enum_service_type_flags()
try:
windows.winproxy.EnumServicesStatusExW(self.handle, SC_ENUM_PROCESS_INFO, flags, SERVICE_STATE_ALL, None, 0, ctypes.byref(size_needed), ctypes.byref(nb_services), byref(counter), None)
except WindowsError as e:
if e.winerror == gdef.ERROR_INVALID_PARAMETER:
raise # Invalid enum flags: better raise that risk infinite loop
while True:
size = size_needed.value
buffer = (BYTE * size)()
try:
windows.winproxy.EnumServicesStatusExW(self.handle, SC_ENUM_PROCESS_INFO, flags, SERVICE_STATE_ALL, buffer, size, ctypes.byref(size_needed), ctypes.byref(nb_services), byref(counter), None)
except WindowsError as e:
if e.winerror == gdef.ERROR_INVALID_PARAMETER:
raise # Invalid enum flags: better raise that risk infinite loop
continue
break
services_array = (gdef.ENUM_SERVICE_STATUS_PROCESSW * nb_services.value).from_buffer(buffer)
for service_info in services_array:
shandle = self.open_service(service_info.lpServiceName)
yield Service(handle=shandle, name=service_info.lpServiceName, description=service_info.lpDisplayName)
return
__iter__ = _enumerate_services_generator
"""Iter over the services on the system
:yield: :class:`Service`
"""
def enumerate_services(self):
return list(self._enumerate_services_generator())
def create(self, name, description, access, type, start, path):
newservice_handle = windows.winproxy.CreateServiceW(
self.handle, # hSCManager
name, # lpServiceName
description, # lpDisplayName
access, # dwDesiredAccess
type, # dwServiceType
start, # dwStartType
gdef.SERVICE_ERROR_NORMAL, # dwErrorControl
path, # lpBinaryPathName
None, # lpLoadOrderGroup
None, # lpdwTagId
None, # lpDependencies
None, # lpServiceStartName
None) # lpPassword
return Service(handle=newservice_handle, name=name, description=description)
[docs]
class Service(gdef.SC_HANDLE):
"""Represent a service on the system"""
def __init__(self, handle, name, description=None):
super(Service, self).__init__(handle)
self.name = name
"""The name of the service
:type: :class:`str`
"""
if description is not None:
self._description = description # Setup fixedpropety
@property
def description(self):
"""The description of the service
:type: :class:`str`
"""
return ServiceManager().get_service_display_name(self.name)
@property
def status(self):
"""The status of the service
:type: :class:`~windows.generated_def.winstructs.SERVICE_STATUS_PROCESS`
"""
buffer = windows.utils.BUFFER(gdef.SERVICE_STATUS_PROCESS)()
size_needed = gdef.DWORD()
windows.winproxy.QueryServiceStatusEx(self, gdef.SC_STATUS_PROCESS_INFO, buffer.cast(gdef.LPBYTE), ctypes.sizeof(buffer), size_needed)
return buffer[0]
@property # Can change if service is started/stopped when the object exist
def process(self):
"""The process running the service (if any)
:type: :class:`WinProcess <windows.winobject.process.WinProcess>` or ``None``
"""
pid = self.status.dwProcessId
if not pid:
return None
l = windows.WinProcess(pid=pid)
return l
@property
def security_descriptor(self):
"""The security descriptor of the service
:type: :class:`~windows.security.SecurityDescriptor`
"""
return security.SecurityDescriptor.from_service(self.name)
[docs]
def start(self, args=None):
"""Start the service
:param args: a list of :class:`str`
"""
nbelt = 0
if args is not None:
if isinstance(args, windows.pycompat.anybuff):
args = [args]
nbelt = len(args)
args = (gdef.LPWSTR * (nbelt))(*args)
return windows.winproxy.StartServiceW(self, nbelt, args)
[docs]
def stop(self):
"""Stop the service"""
status = SERVICE_STATUS()
windows.winproxy.ControlService(self, gdef.SERVICE_CONTROL_STOP, status)
return status
def __repr__(self):
return urepr_encode(u"""<{0} "{1}" {2!r}>""".format(type(self).__name__, self.name, self.status.state))
def __del__(self):
return windows.winproxy.CloseServiceHandle(self)