Source code for windows.winobject.network

import windows
import ctypes
import socket
import struct

from windows import winproxy
import windows.generated_def as gdef
from windows.com import interfaces as cominterfaces
from windows.generated_def.winstructs import *
from windows.generated_def.windef import *


[docs]class TCP4Connection(MIB_TCPROW_OWNER_PID): """A TCP4 socket (connected or listening)""" @property def established(self): """``True`` if connection is established else it's a listening socket""" return self.dwState == MIB_TCP_STATE_ESTAB @property def remote_port(self): """:type: :class:`int`""" if not self.established: return None return socket.ntohs(self.dwRemotePort) @property def local_port(self): """:type: :class:`int`""" return socket.ntohs(self.dwLocalPort) @property def local_addr(self): """Local address IP (x.x.x.x) :type: :class:`str`""" return socket.inet_ntoa(struct.pack("<I", self.dwLocalAddr)) @property def remote_addr(self): """remote address IP (x.x.x.x) :type: :class:`str`""" if not self.established: return None return socket.inet_ntoa(struct.pack("<I", self.dwRemoteAddr)) @property def remote_proto(self): """Identification of the protocol associated with the remote port. Equals ``remote_port`` if no protocol is associated with it. :type: :class:`str` or :class:`int` """ try: return socket.getservbyport(self.remote_port, 'tcp') except socket.error: return self.remote_port @property def remote_host(self): """Identification of the remote hostname. Equals ``remote_addr`` if the resolution fails :type: :class:`str` or :class:`int` """ try: return socket.gethostbyaddr(self.remote_addr) except socket.error: return self.remote_addr
[docs] def close(self): """Close the connection <require elevated process>""" closing = MIB_TCPROW() closing.dwState = MIB_TCP_STATE_DELETE_TCB closing.dwLocalAddr = self.dwLocalAddr closing.dwLocalPort = self.dwLocalPort closing.dwRemoteAddr = self.dwRemoteAddr closing.dwRemotePort = self.dwRemotePort return winproxy.SetTcpEntry(ctypes.byref(closing))
def __repr__(self): if not self.established: return "<TCP IPV4 Listening socket on {0}:{1}>".format(self.local_addr, self.local_port) return "<TCP IPV4 Connection {s.local_addr}:{s.local_port} -> {s.remote_addr}:{s.remote_port}>".format(s=self)
[docs]class TCP6Connection(MIB_TCP6ROW_OWNER_PID): """A TCP6 socket (connected or listening)""" @staticmethod def _str_ipv6_addr(addr): return ":".join(c.encode('hex') for c in addr) @property def established(self): """``True`` if connection is established else it's a listening socket""" return self.dwState == MIB_TCP_STATE_ESTAB @property def remote_port(self): """:type: :class:`int`""" if not self.established: return None return socket.ntohs(self.dwRemotePort) @property def local_port(self): """:type: :class:`int`""" return socket.ntohs(self.dwLocalPort) @property def local_addr(self): """Local address IP :type: :class:`str`""" return self._str_ipv6_addr(self.ucLocalAddr) @property def remote_addr(self): """remote address IP :type: :class:`str`""" if not self.established: return None return self._str_ipv6_addr(self.ucRemoteAddr) @property def remote_proto(self): """Equals to ``self.remote_port`` for Ipv6""" return self.remote_port @property def remote_host(self): """Equals to ``self.remote_addr`` for Ipv6""" return self.remote_addr def close(self): raise NotImplementedError("Closing IPV6 connection non implemented") def __repr__(self): if not self.established: return "<TCP IPV6 Listening socket on {0}:{1}>".format(self.local_addr, self.local_port) return "<TCP IPV6 Connection {0}:{1} -> {2}:{3}>".format(self.local_addr, self.local_port, self.remote_addr, self.remote_port)
def get_MIB_TCPTABLE_OWNER_PID_from_buffer(buffer): x = windows.generated_def.winstructs.MIB_TCPTABLE_OWNER_PID.from_buffer(buffer) nb_entry = x.dwNumEntries class _GENERATED_MIB_TCPTABLE_OWNER_PID(ctypes.Structure): _fields_ = [ ("dwNumEntries", DWORD), ("table", TCP4Connection * nb_entry), ] return _GENERATED_MIB_TCPTABLE_OWNER_PID.from_buffer(buffer) def get_MIB_TCP6TABLE_OWNER_PID_from_buffer(buffer): x = windows.generated_def.winstructs.MIB_TCP6TABLE_OWNER_PID.from_buffer(buffer) nb_entry = x.dwNumEntries # Struct _MIB_TCP6TABLE_OWNER_PID definitions class _GENERATED_MIB_TCP6TABLE_OWNER_PID(Structure): _fields_ = [ ("dwNumEntries", DWORD), ("table", TCP6Connection * nb_entry), ] return _GENERATED_MIB_TCP6TABLE_OWNER_PID.from_buffer(buffer)
[docs]class Firewall(cominterfaces.INetFwPolicy2): """The windows firewall""" @property def rules(self): """The rules of the firewall :type: [:class:`FirewallRule`] -- A list of rule """ ifw_rules = cominterfaces.INetFwRules() self.get_Rules(ifw_rules) nb_rules = gdef.LONG() ifw_rules.get_Count(nb_rules) unknw = cominterfaces.IUnknown() ifw_rules.get__NewEnum(unknw) pVariant = cominterfaces.IEnumVARIANT() unknw.QueryInterface(pVariant.IID, pVariant) count = gdef.ULONG() var = windows.com.Variant() rules = [] for i in range(nb_rules.value): pVariant.Next(1, var, count) if not count.value: break rule = FirewallRule() idisp = var.asdispatch idisp.QueryInterface(rule.IID, rule) rules.append(rule) return rules @property def current_profile_types(self): """Mask of the profiles currently enabled :type: :class:`long` """ cpt = gdef.LONG() self.get_CurrentProfileTypes(cpt) return cpt.value @property def enabled(self): """A maping of the active firewall profiles { ``NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_DOMAIN(0x1L)``: ``True`` or ``False``, ``NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_PRIVATE(0x2L)``: ``True`` or ``False``, ``NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_PUBLIC(0x4L)``: ``True`` or ``False``, } :type: :class:`dict` """ profiles = [gdef.NET_FW_PROFILE2_DOMAIN, gdef.NET_FW_PROFILE2_PRIVATE, gdef.NET_FW_PROFILE2_PUBLIC] return {prof: self.enabled_for_profile_type(prof) for prof in profiles} def enabled_for_profile_type(self, profile_type): enabled = gdef.VARIANT_BOOL() self.get_FirewallEnabled(profile_type, enabled) return enabled.value
[docs]class FirewallRule(cominterfaces.INetFwRule): """A rule of the firewall""" @property def name(self): """Name of the rule :type: :class:`unicode` """ name = gdef.BSTR() self.get_Name(name) return name.value @property def description(self): """Description of the rule :type: :class:`unicode` """ description = gdef.BSTR() self.get_Description(description) return description.value @property def application_name(self): """Name of the application to which apply the rule :type: :class:`unicode` """ applicationname = gdef.BSTR() self.get_ApplicationName(applicationname) return applicationname.value @property def service_name(self): """Name of the service to which apply the rule :type: :class:`unicode` """ servicename = gdef.BSTR() self.get_ServiceName(servicename) return servicename.value @property def protocol(self): """Protocol to which apply the rule :type: :class:`long` """ protocol = gdef.LONG() self.get_Protocol(protocol) return protocol.value @property def local_address(self): """Local address of the rule :type: :class:`unicode` """ local_address = gdef.BSTR() self.get_LocalAddresses(local_address) return local_address.value @property def remote_address(self): """Remote address of the rule :type: :class:`unicode` """ remote_address = gdef.BSTR() self.get_RemoteAddresses(remote_address) return remote_address.value @property def direction(self): """Direction of the rule, values might be: * ``NET_FW_RULE_DIRECTION_.NET_FW_RULE_DIR_IN(0x1L)`` * ``NET_FW_RULE_DIRECTION_.NET_FW_RULE_DIR_OUT(0x2L)`` subclass of :class:`long` """ direction = gdef.NET_FW_RULE_DIRECTION() self.get_Direction(direction) return direction.value @property def interface_types(self): """Types of interface of the rule :type: :class:`unicode` """ interface_type = gdef.BSTR() self.get_InterfaceTypes(interface_type) return interface_type.value @property def local_port(self): """Local port of the rule :type: :class:`unicode` """ local_port = gdef.BSTR() self.get_LocalPorts(local_port) return local_port.value @property def remote_port(self): """Remote port of the rule :type: :class:`unicode` """ remote_port = gdef.BSTR() self.get_RemotePorts(remote_port) return remote_port.value @property def action(self): """Action of the rule, values might be: * ``NET_FW_ACTION_.NET_FW_ACTION_BLOCK(0x0L)`` * ``NET_FW_ACTION_.NET_FW_ACTION_ALLOW(0x1L)`` subclass of :class:`long` """ action = gdef.NET_FW_ACTION() self.get_Action(action) return action.value @property def enabled(self): """``True`` if rule is enabled""" enabled = gdef.VARIANT_BOOL() self.get_Enabled(enabled) return enabled.value @property def grouping(self): """Grouping of the rule :type: :class:`unicode` """ grouping = gdef.BSTR() self.get_RemotePorts(grouping) return grouping.value @property def icmp_type_and_code(self): icmp_type_and_code = gdef.BSTR() self.get_RemotePorts(icmp_type_and_code) return icmp_type_and_code.value def __repr__(self): return u'<{0} "{1}">'.format(type(self).__name__, self.name).encode("ascii", errors='backslashreplace')
[docs]class Network(object): NetFwPolicy2 = windows.com.IID.from_string("E2B3C97F-6AE1-41AC-817A-F6F92166D7DD") @property def firewall(self): """The firewall of the system :type: :class:`Firewall` """ windows.com.init() firewall = Firewall() windows.com.create_instance(self.NetFwPolicy2, firewall) return firewall @staticmethod def _get_tcp_ipv4_sockets(): size = ctypes.c_uint(0) try: winproxy.GetExtendedTcpTable(None, ctypes.byref(size), ulAf=AF_INET) except winproxy.WinproxyError: pass # Allow us to set size to the needed value buffer = (ctypes.c_char * size.value)() winproxy.GetExtendedTcpTable(buffer, ctypes.byref(size), ulAf=AF_INET) t = get_MIB_TCPTABLE_OWNER_PID_from_buffer(buffer) return list(t.table) @staticmethod def _get_tcp_ipv6_sockets(): size = ctypes.c_uint(0) try: winproxy.GetExtendedTcpTable(None, ctypes.byref(size), ulAf=AF_INET6) except winproxy.WinproxyError: pass # Allow us to set size to the needed value buffer = (ctypes.c_char * size.value)() winproxy.GetExtendedTcpTable(buffer, ctypes.byref(size), ulAf=AF_INET6) t = get_MIB_TCP6TABLE_OWNER_PID_from_buffer(buffer) return list(t.table) ipv4 = property(lambda self: self._get_tcp_ipv4_sockets()) """List of TCP IPv4 socket (connection and listening) :type: [:class:`TCP4Connection`]""" ipv6 = property(lambda self: self._get_tcp_ipv6_sockets()) """List of TCP IPv6 socket (connection and listening) :type: [:class:`TCP6Connection`] """