import sys
import ctypes
from collections import namedtuple
import windows
from windows import winproxy
from windows import generated_def as gdef
import windows.pycompat
## For 64b python
# 0x1f: 0x80000000: ALPC_MESSAGE_SECURITY_ATTRIBUTE(0x80000000) : size=0x18?
# 0x1e: 0x40000000: ALPC_MESSAGE_VIEW_ATTRIBUTE(0x40000000): size=0x20
# 0x1d: 0x20000000: ALPC_MESSAGE_CONTEXT_ATTRIBUTE(0x20000000): size=0x20
# 0x1c: 0x10000000: ALPC_MESSAGE_HANDLE_ATTRIBUTE(0x10000000): size=0x18
# 0x1b: 0x8000000: ALPC_MESSAGE_TOKEN_ATTRIBUTE(0x8000000): size=0x18
# 0x1a: 0x4000000: ALPC_MESSAGE_DIRECT_ATTRIBUTE(0x4000000) size=0x8
# 0x19: 0x2000000: ALPC_MESSAGE_WORK_ON_BEHALF_ATTRIBUTE(0x2000000) size=0x8
DEFAULT_MESSAGE_SIZE = 0x1000
[docs]
class AlpcMessage(object):
"""Represent a full ALPC Message: a :class:`AlpcMessagePort` and a :class:`MessageAttribute`"""
# PORT_MESSAGE + MessageAttribute
def __init__(self, msg_or_size=DEFAULT_MESSAGE_SIZE, attributes=None):
# Init the PORT_MESSAGE
if isinstance(msg_or_size, windows.pycompat.int_types):
self.port_message_buffer_size = msg_or_size
self.port_message_raw_buffer = ctypes.c_buffer(msg_or_size)
self.port_message = AlpcMessagePort.from_buffer(self.port_message_raw_buffer)
self.port_message.set_datalen(0)
elif isinstance(msg_or_size, AlpcMessagePort):
self.port_message = msg_or_size
self.port_message_raw_buffer = self.port_message.raw_buffer
self.port_message_buffer_size = len(self.port_message_raw_buffer)
else:
raise NotImplementedError("Uneexpected type for <msg_or_size>: {0}".format(msg_or_size))
# Init the MessageAttributes
if attributes is None:
# self.attributes = MessageAttribute.with_all_attributes()
self.attributes = MessageAttribute.with_all_attributes() ## Testing
else:
self.attributes = attributes
# PORT_MESSAGE wrappers
@property
def type(self):
"""The type of the message (``PORT_MESSAGE.u2.s2.Type``)"""
return self.port_message.u2.s2.Type
def get_port_message_data(self):
return self.port_message.data
def set_port_message_data(self, data):
self.port_message.data = data
data = property(get_port_message_data, set_port_message_data)
"The data of the message (located after the PORT_MESSAGE header)"
# MessageAttributes wrappers
## Low level attributes access
@property
def security_attribute(self):
"""The :data:`~windows.generated_def.ALPC_MESSAGE_SECURITY_ATTRIBUTE` of the message
:type: :class:`ALPC_SECURITY_ATTR`
"""
return self.attributes.get_attribute(gdef.ALPC_MESSAGE_SECURITY_ATTRIBUTE)
@property
def view_attribute(self):
"""The :data:`~windows.generated_def.ALPC_MESSAGE_VIEW_ATTRIBUTE` of the message:
:type: :class:`ALPC_DATA_VIEW_ATTR`
"""
return self.attributes.get_attribute(gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE)
@property
def context_attribute(self):
"""The :data:`~windows.generated_def.ALPC_MESSAGE_CONTEXT_ATTRIBUTE` of the message:
:type: :class:`ALPC_CONTEXT_ATTR`
"""
return self.attributes.get_attribute(gdef.ALPC_MESSAGE_CONTEXT_ATTRIBUTE)
@property
def handle_attribute(self):
"""The :data:`~windows.generated_def.ALPC_MESSAGE_HANDLE_ATTRIBUTE` of the message:
:type: :class:`ALPC_HANDLE_ATTR`
"""
return self.attributes.get_attribute(gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE)
## Low level validity check (Test)
@property
def view_is_valid(self): # Change the name ?
"""True if :data:`~windows.generated_def.ALPC_MESSAGE_VIEW_ATTRIBUTE` is a ValidAttributes"""
return self.attributes.is_valid(gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE)
@property
def security_is_valid(self): # Change the name ?
"""True if :data:`~windows.generated_def.ALPC_MESSAGE_SECURITY_ATTRIBUTE` is a ValidAttributes"""
return self.attributes.is_valid(gdef.ALPC_MESSAGE_SECURITY_ATTRIBUTE)
@property
def handle_is_valid(self): # Change the name ?
"""True if :data:`~windows.generated_def.ALPC_MESSAGE_HANDLE_ATTRIBUTE` is a ValidAttributes"""
return self.attributes.is_valid(gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE)
@property
def context_is_valid(self): # Change the name ?
"""True if :data:`~windows.generated_def.ALPC_MESSAGE_CONTEXT_ATTRIBUTE` is a ValidAttributes"""
return self.attributes.is_valid(gdef.ALPC_MESSAGE_CONTEXT_ATTRIBUTE)
@property
def valid_attributes(self):
"""The list of valid attributes
:type: [:class:`~windows.generated_def.Flag`]
"""
return self.attributes.valid_list
@property
def allocated_attributes(self):
"""The list of allocated attributes
:type: [:class:`~windows.generated_def.Flag`]
"""
return self.attributes.allocated_list
## High level setup (Test)
def setup_view(self, size, section_handle=0, flags=None):
raise NotImplementedError(self.setup_view)
[docs]
class AlpcMessagePort(gdef.PORT_MESSAGE):
"""The effective ALPC Message composed of a ``PORT_MESSAGE`` structure followed by the data"""
# Constructeur
[docs]
@classmethod
def from_buffer(self, buffer):
# A sort of super(AlpcMessagePort).from_buffer
# But from_buffer is from the Metaclass of AlpcMessagePort so we use 'type(AlpcMessagePort)'
# To access the standard version of from_buffer.
self = type(AlpcMessagePort).from_buffer(AlpcMessagePort, buffer)
self.buffer_size = len(buffer)
self.raw_buffer = buffer
self.header_size = ctypes.sizeof(self)
self.max_datasize = self.buffer_size - self.header_size
return self
@classmethod
def from_buffer_size(cls, buffer_size):
buffer = ctypes.c_buffer(buffer_size)
return cls.from_buffer(buffer)
def read_data(self):
return self.raw_buffer[ctypes.sizeof(self):ctypes.sizeof(self) + self.u1.s1.DataLength]
def write_data(self, data):
if len(data) > self.max_datasize:
import pdb; pdb.set_trace()
raise ValueError("Cannot write data of len <{0}> (raw_buffer size == <{1}>)".format(len(data), self.buffer_size))
self.raw_buffer[self.header_size: self.header_size + len(data)] = data
self.set_datalen(len(data))
data = property(read_data, write_data)
"The data of the message (located after the header)"
def set_datalen(self, datalen):
self.u1.s1.TotalLength = self.header_size + datalen
self.u1.s1.DataLength = datalen
def get_datalen(self):
return self.u1.s1.DataLength
datalen = property(get_datalen, set_datalen)
"""The length of the data"""
KNOWN_ALPC_ATTRIBUTES = (gdef.ALPC_MESSAGE_SECURITY_ATTRIBUTE,
gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE,
gdef.ALPC_MESSAGE_CONTEXT_ATTRIBUTE,
gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE,
gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE,
gdef.ALPC_MESSAGE_DIRECT_ATTRIBUTE,
gdef.ALPC_MESSAGE_WORK_ON_BEHALF_ATTRIBUTE)
KNOWN_ALPC_ATTRIBUTES_MAPPING = gdef.FlagMapper(*KNOWN_ALPC_ATTRIBUTES)
[docs]
class MessageAttribute(gdef.ALPC_MESSAGE_ATTRIBUTES):
"""The attributes of an ALPC message"""
ATTRIBUTE_BY_FLAG = [(gdef.ALPC_MESSAGE_SECURITY_ATTRIBUTE, gdef.ALPC_SECURITY_ATTR),
(gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE, gdef.ALPC_DATA_VIEW_ATTR),
(gdef.ALPC_MESSAGE_CONTEXT_ATTRIBUTE, gdef.ALPC_CONTEXT_ATTR),
(gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE, gdef.ALPC_HANDLE_ATTR),
(gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE, gdef.ALPC_TOKEN_ATTR),
(gdef.ALPC_MESSAGE_DIRECT_ATTRIBUTE, gdef.ALPC_DIRECT_ATTR),
(gdef.ALPC_MESSAGE_WORK_ON_BEHALF_ATTRIBUTE, gdef.ALPC_WORK_ON_BEHALF_ATTR),
]
[docs]
@classmethod
def with_attributes(cls, attributes):
"""Create a new :class:`MessageAttribute` with ``attributes`` allocated
:returns: :class:`MessageAttribute`
"""
size = cls._get_required_buffer_size(attributes)
buffer = ctypes.c_buffer(size)
self = cls.from_buffer(buffer)
self.raw_buffer = buffer
res = gdef.DWORD()
winproxy.AlpcInitializeMessageAttribute(attributes, self, len(self.raw_buffer), res)
return self
[docs]
@classmethod
def with_all_attributes(cls):
"""Create a new :class:`MessageAttribute` with the following attributes allocated:
- :class:`ALPC_MESSAGE_SECURITY_ATTRIBUTE`
- :class:`ALPC_MESSAGE_VIEW_ATTRIBUTE`
- :class:`ALPC_MESSAGE_CONTEXT_ATTRIBUTE`
- :class:`ALPC_MESSAGE_HANDLE_ATTRIBUTE`
- :class:`ALPC_MESSAGE_TOKEN_ATTRIBUTE`
- :class:`ALPC_MESSAGE_DIRECT_ATTRIBUTE`
- :class:`ALPC_MESSAGE_WORK_ON_BEHALF_ATTRIBUTE`
:returns: :class:`MessageAttribute`
"""
return cls.with_attributes(gdef.ALPC_MESSAGE_SECURITY_ATTRIBUTE |
gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE |
gdef.ALPC_MESSAGE_CONTEXT_ATTRIBUTE |
gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE |
gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE |
gdef.ALPC_MESSAGE_DIRECT_ATTRIBUTE |
gdef.ALPC_MESSAGE_WORK_ON_BEHALF_ATTRIBUTE)
@staticmethod
def _get_required_buffer_size(flags):
res = gdef.DWORD()
try:
windows.winproxy.AlpcInitializeMessageAttribute(flags, None, 0, res)
except windows.generated_def.ntstatus.NtStatusException as e:
# Buffer too small: osef
return res.value
return res.value
[docs]
def is_allocated(self, attribute):
"""Return ``True`` if ``attribute`` is allocated"""
return bool(self.AllocatedAttributes & attribute)
[docs]
def is_valid(self, attribute):
"""Return ``True`` if ``attribute`` is valid"""
return bool(self.ValidAttributes & attribute)
def get_attribute(self, attribute):
if not self.is_allocated(attribute):
raise ValueError("Cannot get non-allocated attribute <{0}>".format(attribute))
offset = ctypes.sizeof(self)
for sflag, struct in self.ATTRIBUTE_BY_FLAG:
if sflag == attribute:
# print("Attr {0:#x} was at offet {1:#x}".format(attribute, offset))
return struct.from_address(ctypes.addressof(self) + offset)
elif self.is_allocated(sflag):
offset += ctypes.sizeof(struct)
raise ValueError("ALPC Attribute <{0}> not found :(".format(attribute))
def _extract_alpc_attributes_values(self, value):
attrs = []
for mask in (1 << i for i in range(64)):
if value & mask:
attrs.append(mask)
return [KNOWN_ALPC_ATTRIBUTES_MAPPING[x] for x in attrs]
@property
def valid_list(self):
"""The list of valid attributes
:type: [:class:`~windows.generated_def.Flag`]
"""
return self._extract_alpc_attributes_values(self.ValidAttributes)
@property
def allocated_list(self):
"""The list of allocated attributes
:type: [:class:`~windows.generated_def.Flag`]
"""
return self._extract_alpc_attributes_values(self.AllocatedAttributes)
AlpcSection = namedtuple("AlpcSection", ["handle", "size"])
class AlpcTransportBase(object):
def send_receive(self, alpc_message, receive_msg=None, flags=gdef.ALPC_MSGFLG_SYNC_REQUEST, timeout=None):
"""Send and receive a message with ``flags``.
:param alpc_message: The message to send. If ``alpc_message`` is a :class:`str` it build an AlpcMessage with the message as data.
:type alpc_message: AlpcMessage or str
:param receive_msg: The message to send. If ``receive_msg`` is a ``None`` it create and return a simple :class:`AlpcMessage`
:type receive_msg: AlpcMessage or None
:param int flags: The flags for :func:`NtAlpcSendWaitReceivePort`
"""
if isinstance(alpc_message, windows.pycompat.anybuff):
raw_alpc_message = alpc_message
alpc_message = AlpcMessage(max(0x1000, len(alpc_message) + 0x200))
alpc_message.port_message.data = raw_alpc_message
if receive_msg is None:
receive_msg = AlpcMessage(DEFAULT_MESSAGE_SIZE)
receive_size = gdef.SIZE_T(receive_msg.port_message_buffer_size)
winproxy.NtAlpcSendWaitReceivePort(self.handle, flags, alpc_message.port_message, alpc_message.attributes, receive_msg.port_message, receive_size, receive_msg.attributes, timeout)
return receive_msg
def send(self, alpc_message, flags=0):
"""Send the ``alpc_message`` with ``flags``
:param alpc_message: The message to send. If ``alpc_message`` is a :class:`str` it build an AlpcMessage with the message as data.
:type alpc_message: AlpcMessage or str
:param int flags: The flags for :func:`NtAlpcSendWaitReceivePort`
"""
if isinstance(alpc_message, windows.pycompat.anybuff):
raw_alpc_message = alpc_message
alpc_message = AlpcMessage(max(0x1000, len(alpc_message) + 0x200))
alpc_message.port_message.data = raw_alpc_message
winproxy.NtAlpcSendWaitReceivePort(self.handle, flags, alpc_message.port_message, alpc_message.attributes, None, None, None, None)
def recv(self, receive_msg=None, flags=0):
"""Receive a message into ``alpc_message`` with ``flags``.
:param receive_msg: The message to send. If ``receive_msg`` is a ``None`` it create and return a simple :class:`AlpcMessage`
:type receive_msg: AlpcMessage or None
:param int flags: The flags for :func:`NtAlpcSendWaitReceivePort`
"""
if receive_msg is None:
receive_msg = AlpcMessage(DEFAULT_MESSAGE_SIZE)
receive_size = gdef.SIZE_T(receive_msg.port_message_buffer_size)
winproxy.NtAlpcSendWaitReceivePort(self.handle, flags, None, None, receive_msg.port_message, receive_size, receive_msg.attributes, None)
return receive_msg
def _close_port(self, port_handle):
windows.winproxy.NtAlpcDisconnectPort(port_handle, 0)
windows.winproxy.CloseHandle(port_handle)
[docs]
class AlpcClient(AlpcTransportBase):
"An ALPC client able to connect to a port and send/receive messages"
def __init__(self, port_name=None):
"""Init the :class:`AlpcClient` automatically connect to ``port_name`` using default values if given"""
self.handle = None
self.port_name = None #: The name of the ALPC port the client is connect to.
if port_name is not None:
x = self.connect_to_port(port_name, "")
def _alpc_port_to_unicode_string(self, name):
return gdef.UNICODE_STRING.from_string(name)
[docs]
def connect_to_port(self, port_name, connect_message=None,
port_attr=None, port_attr_flags=0x10000, obj_attr=None,
flags=gdef.ALPC_MSGFLG_SYNC_REQUEST, timeout=None):
"""Connect to the ALPC port ``port_name``. Most of the parameters have defauls value is ``None`` is passed.
:param AlpcMessage connect_message: The message send with the connection request, if not ``None`` the function will return an :class:`AlpcMessage`
:param ALPC_PORT_ATTRIBUTES port_attr: The port attributes, one with default value will be used if this parameter is ``None``
:param int port_attr_flags: ``ALPC_PORT_ATTRIBUTES.Flags`` used if ``port_attr`` is ``None`` (MUTUALY EXCLUSINVE WITH ``port_attr``)
:param OBJECT_ATTRIBUTES obj_attr: The attributes of the port (can be None)
:param int flags: The flags for :func:`NtAlpcConnectPort`
:param int timeout: The timeout of the request
"""
# TODO raise on mutual exclusive parameter
if self.handle is not None:
raise ValueError("Client already connected")
handle = gdef.HANDLE()
port_name_unicode = self._alpc_port_to_unicode_string(port_name)
if port_attr is None:
port_attr = gdef.ALPC_PORT_ATTRIBUTES()
port_attr.Flags = port_attr_flags # Flag qui fonctionne pour l'UAC
port_attr.MaxMessageLength = DEFAULT_MESSAGE_SIZE
port_attr.MemoryBandwidth = 0
port_attr.MaxPoolUsage = 0xffffffff
port_attr.MaxSectionSize = 0xffffffff
port_attr.MaxViewSize = 0xffffffff
port_attr.MaxTotalSectionSize = 0xffffffff
port_attr.DupObjectTypes = 0xffffffff
port_attr.SecurityQos.Length = ctypes.sizeof(port_attr.SecurityQos)
port_attr.SecurityQos.ImpersonationLevel = gdef.SecurityImpersonation
port_attr.SecurityQos.ContextTrackingMode = 0
port_attr.SecurityQos.EffectiveOnly = 0
if connect_message is None:
send_msg = None
send_msg_attr = None
buffersize = None
elif isinstance(connect_message, windows.pycompat.anybuff):
buffersize = gdef.DWORD(len(connect_message) + 0x1000)
send_msg = AlpcMessagePort.from_buffer_size(buffersize.value)
send_msg.data = connect_message
send_msg_attr = MessageAttribute.with_all_attributes()
elif isinstance(connect_message, AlpcMessage):
send_msg = connect_message.port_message
send_msg_attr = connect_message.attributes
buffersize = gdef.DWORD(connect_message.port_message_buffer_size)
else:
raise ValueError("Don't know how to send <{0!r}> as connect message".format(connect_message))
receive_attr = MessageAttribute.with_all_attributes()
winproxy.NtAlpcConnectPort(handle, port_name_unicode, obj_attr, port_attr, flags, None, send_msg, buffersize, send_msg_attr, receive_attr, timeout)
# If send_msg is not None, it contains the ClientId.UniqueProcess : PID of the server :)
self.handle = handle.value
self.port_name = port_name
return AlpcMessage(send_msg, receive_attr) if send_msg is not None else None
def create_port_section(self, Flags, SectionHandle, SectionSize):
AlpcSectionHandle = gdef.HANDLE()
ActualSectionSize = gdef.SIZE_T()
# RPCRT4 USE FLAGS 0x40000 ALPC_VIEWFLG_NOT_SECURE ?
winproxy.NtAlpcCreatePortSection(self.handle, Flags, SectionHandle, SectionSize, AlpcSectionHandle, ActualSectionSize)
return AlpcSection(AlpcSectionHandle.value, ActualSectionSize.value)
def map_section(self, section_handle, size, flags=0):
view_attributes = gdef.ALPC_DATA_VIEW_ATTR()
view_attributes.Flags = 0
view_attributes.SectionHandle = section_handle
view_attributes.ViewBase = 0
view_attributes.ViewSize = size
r = winproxy.NtAlpcCreateSectionView(self.handle, flags, view_attributes)
return view_attributes
def disconnect(self):
if self.handle:
self._close_port(self.handle)
def __del__(self):
if sys.path is not None:
self.disconnect()
[docs]
class AlpcServer(AlpcTransportBase):
"""An ALPC server able to create a port, accept connections and send/receive messages"""
def __init__(self, port_name=None):
self.port_name = None
self.communication_port_list = []
self.handle = None
if port_name is not None:
self.create_port(port_name)
def _alpc_port_to_unicode_string(self, name):
return gdef.UNICODE_STRING.from_string(name)
[docs]
def create_port(self, port_name, msglen=None, port_attr_flags=0, obj_attr=None, port_attr=None):
"""Create the ALPC port ``port_name``. Most of the parameters have defauls value is ``None`` is passed.
:param str port_name: The port's name to create.
:param int msglen: ``ALPC_PORT_ATTRIBUTES.MaxMessageLength`` used if ``port_attr`` is ``None`` (MUTUALY EXCLUSINVE WITH ``port_attr``)
:param int port_attr_flags: ``ALPC_PORT_ATTRIBUTES.Flags`` used if ``port_attr`` is ``None`` (MUTUALY EXCLUSINVE WITH ``port_attr``)
:param OBJECT_ATTRIBUTES obj_attr: The attributes of the port, one with default value will be used if this parameter is ``None``
:param ALPC_PORT_ATTRIBUTES port_attr: The port attributes, one with default value will be used if this parameter is ``None``
"""
# TODO raise on mutual exclusive parameter (port_attr + port_attr_flags | obj_attr + msglen)
handle = gdef.HANDLE()
raw_name = port_name
if not raw_name.startswith("\\"):
raw_name = "\\" + port_name
port_name = self._alpc_port_to_unicode_string(raw_name)
if msglen is None:
msglen = DEFAULT_MESSAGE_SIZE
if obj_attr is None:
obj_attr = gdef.OBJECT_ATTRIBUTES()
obj_attr.Length = ctypes.sizeof(obj_attr)
obj_attr.RootDirectory = None
obj_attr.ObjectName = ctypes.pointer(port_name)
obj_attr.Attributes = 0
obj_attr.SecurityDescriptor = None
obj_attr.SecurityQualityOfService = None
if port_attr is None:
port_attr = gdef.ALPC_PORT_ATTRIBUTES()
port_attr.Flags = port_attr_flags
# port_attr.Flags = 0x2080000
# port_attr.Flags = 0x90000
port_attr.MaxMessageLength = msglen
port_attr.MemoryBandwidth = 0
port_attr.MaxPoolUsage = 0xffffffff
port_attr.MaxSectionSize = 0xffffffff
port_attr.MaxViewSize = 0xffffffff
port_attr.MaxTotalSectionSize = 0xffffffff
port_attr.DupObjectTypes = 0xffffffff
# windows.utils.print_ctypes_struct(port_attr, " - PORT_ATTR", hexa=True)
winproxy.NtAlpcCreatePort(handle, obj_attr, port_attr)
self.port_name = raw_name
self.handle = handle.value
[docs]
def accept_connection(self, msg, port_attr=None, port_context=None):
"""Accept the connection for a ``LPC_CONNECTION_REQUEST`` message.
``msg.MessageId`` must be the same as the connection requesting message.
:param AlpcMessage msg: The response message.
:param ALPC_PORT_ATTRIBUTES port_attr: The attributes of the port, one with default value will be used if this parameter is ``None``
:param PVOID port_context: A value that will be copied in ``ALPC_CONTEXT_ATTR.PortContext`` of every message on this connection.
"""
rhandle = gdef.HANDLE()
if port_attr is None:
port_attr = gdef.ALPC_PORT_ATTRIBUTES()
port_attr.Flags = 0x80000
# port_attr.Flags = 0x80000 + 0x2000000
# port_attr.Flags = 0x2000000
port_attr.MaxMessageLength = DEFAULT_MESSAGE_SIZE
port_attr.MemoryBandwidth = 0
port_attr.MaxPoolUsage = 0xffffffff
port_attr.MaxSectionSize = 0xffffffff
port_attr.MaxViewSize = 0xffffffff
port_attr.MaxTotalSectionSize = 0xffffffff
port_attr.DupObjectTypes = 0xffffffff
# windows.utils.print_ctypes_struct(port_attr, " - CONN_PORT_ATTR", hexa=True)
winproxy.NtAlpcAcceptConnectPort(rhandle, self.handle, 0, None, port_attr, port_context, msg.port_message, None, True)
self.communication_port_list.append(rhandle.value)
return msg
def disconnect(self):
if self.handle:
self._close_port(self.handle)
for com_port_handle in self.communication_port_list:
self._close_port(com_port_handle)
# TODO: add an API to close a communication port ?
def __del__(self):
if sys.path is not None:
self.disconnect()