import itertools
import ctypes
import windows
from windows import winproxy
import windows.generated_def as gdef
from windows.crypto import DEFAULT_ENCODING
from windows.pycompat import urepr_encode
import windows.crypto.cryptmsg
CRYPT_OBJECT_FORMAT_TYPE = [
gdef.CERT_QUERY_OBJECT_FILE,
gdef.CERT_QUERY_OBJECT_BLOB,
gdef.CERT_QUERY_CONTENT_CERT,
gdef.CERT_QUERY_CONTENT_CTL,
gdef.CERT_QUERY_CONTENT_CRL,
gdef.CERT_QUERY_CONTENT_SERIALIZED_STORE,
gdef.CERT_QUERY_CONTENT_SERIALIZED_CERT,
gdef.CERT_QUERY_CONTENT_SERIALIZED_CTL,
gdef.CERT_QUERY_CONTENT_SERIALIZED_CRL,
gdef.CERT_QUERY_CONTENT_PKCS7_SIGNED,
gdef.CERT_QUERY_CONTENT_PKCS7_UNSIGNED,
gdef.CERT_QUERY_CONTENT_PKCS7_SIGNED_EMBED,
gdef.CERT_QUERY_CONTENT_PKCS10,
gdef.CERT_QUERY_CONTENT_PFX,
gdef.CERT_QUERY_CONTENT_CERT_PAIR,
gdef.CERT_QUERY_CONTENT_PFX_AND_LOAD
]
CRYPT_OBJECT_FORMAT_TYPE_DICT = gdef.FlagMapper(*CRYPT_OBJECT_FORMAT_TYPE)
## Move CryptObject to new .py ?
[docs]
class CryptObject(object):
"""Extract information from an CryptoAPI object.
(see `CryptQueryObject <https://msdn.microsoft.com/en-us/library/windows/desktop/aa380264(v=vs.85).aspx>`_)
Current main use is extracting the signers certificates from a PE file.
"""
MSG_PARAM_KNOW_TYPES = {gdef.CMSG_SIGNER_INFO_PARAM: gdef.CMSG_SIGNER_INFO,
gdef.CMSG_SIGNER_COUNT_PARAM: gdef.DWORD,
gdef.CMSG_CERT_COUNT_PARAM: gdef.DWORD}
def __init__(self, filename, content_type=gdef.CERT_QUERY_CONTENT_FLAG_ALL):
# No other API than filename for now..
self.filename = filename
dwEncoding = gdef.DWORD()
dwContentType = gdef.DWORD()
dwFormatType = gdef.DWORD()
hStore = CertificateStore()
hMsg = windows.crypto.CryptMessage()
winproxy.CryptQueryObject(gdef.CERT_QUERY_OBJECT_FILE,
gdef.LPWSTR(filename),
# filename,
content_type,
gdef.CERT_QUERY_FORMAT_FLAG_BINARY,
0,
dwEncoding,
dwContentType,
dwFormatType,
hStore,
hMsg,
None)
self.cert_store = hStore if hStore else None
"""The :class:`CertificateStore` that includes all of the certificates, CRLs, and CTLs in the object"""
self.crypt_msg = hMsg if hMsg else None #: yolo
"""The :class:`CryptMessage` for any ``PKCS7`` content in the object"""
self.encoding = dwEncoding
self.content_type = CRYPT_OBJECT_FORMAT_TYPE_DICT[dwContentType.value]
"""The type of the opened message"""
def _signers_and_certs_generator(self):
if self.crypt_msg is None:
return
for signer in self.crypt_msg.signers:
# We could directly extract the certificates from the 'crypt_msg' (I guess)
# But 'CryptQueryObject' had the sympathy of already opening a CertificateStore
# for us. So we use it.
# I am open to counter-argument on this methodology.
cert = self.cert_store.find(signer.Issuer, signer.SerialNumber)
yield signer, cert
@property
def signers_and_certs(self):
"""The list of signer info and certificates signing the object.
:rtype: [(:class:`~windows.generated_def.winstructs.CMSG_SIGNER_INFO`, :class:`Certificate`)]
.. note::
:class:`~windows.generated_def.winstructs.CMSG_SIGNER_INFO` might be changed to a wrapping-subclass.
"""
return list(self._signers_and_certs_generator())
def __repr__(self):
return urepr_encode(u'<{0} "{1}" content_type={2!r}>'.format(type(self).__name__, self.filename, self.content_type))
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa382037(v=vs.85).aspx
[docs]
class CertificateStore(gdef.HCERTSTORE):
"""A certificate store"""
@property
def certs(self):
"""The list of certificates in the store
:type: [:class:`Certificate`] -- A list of certificate
"""
res = []
last = None
while True:
try:
cert = winproxy.CertEnumCertificatesInStore(self, last)
except winproxy.WinproxyError as e:
if (e.winerror & 0xffffffff) in (gdef.CRYPT_E_NOT_FOUND,):
return tuple(res)
raise
# Need to duplicate as CertEnumCertificatesInStore will free the context 'last'
ecert = windows.crypto.Certificate.from_pointer(cert)
res.append(ecert.duplicate())
last = ecert
raise RuntimeError("Out of infinit loop")
[docs]
def add_certificate(self, certificate):
"""Add a certificate to the store"""
winproxy.CertAddCertificateContextToStore(self, certificate, gdef.CERT_STORE_ADD_NEW, None)
[docs]
@classmethod
def from_file(cls, filename):
"""Create a new :class:`CertificateStore` from ``filename``"""
res = winproxy.CertOpenStore(gdef.CERT_STORE_PROV_FILENAME_A, DEFAULT_ENCODING, None, gdef.CERT_STORE_OPEN_EXISTING_FLAG, filename)
return ctypes.cast(res, cls)
def _yolo(self):
x = winproxy.CertEnumCTLsInStore(self, None)
title = None
windows.winproxy.CryptUIDlgViewContext(gdef.CERT_STORE_CTL_CONTEXT, x, None, title, 0, None)
return x
# See https://msdn.microsoft.com/en-us/library/windows/desktop/aa388136(v=vs.85).aspx
[docs]
@classmethod
def from_system_store(cls, store_name):
"""Create a new :class:`CertificateStore` from system store ``store_name``
(see `System Store Locations <https://msdn.microsoft.com/en-us/library/windows/desktop/aa388136(v=vs.85).aspx>`_)
"""
res = winproxy.CertOpenStore(gdef.CERT_STORE_PROV_SYSTEM_A, DEFAULT_ENCODING, None, gdef.CERT_SYSTEM_STORE_LOCAL_MACHINE | gdef.CERT_STORE_READONLY_FLAG, store_name)
return ctypes.cast(res, cls)
[docs]
@classmethod
def new_in_memory(cls):
"""Create a new temporary :class:`CertificateStore` in memory"""
res = winproxy.CertOpenStore(gdef.CERT_STORE_PROV_MEMORY, DEFAULT_ENCODING, None, 0, None)
return ctypes.cast(res, cls)
# TODO: a more complete search API ?
[docs]
def find(self, issuer, serialnumber):
"""Return the certificate that match `issuer` and `serialnumber`
:return: :class:`Certificate` -- ``None`` if certificate is not found
"""
# data = self.get_signer_data(index)
cert_info = gdef.CERT_INFO()
cert_info.Issuer = issuer
cert_info.SerialNumber = serialnumber
try:
rawcertcontext = winproxy.CertFindCertificateInStore(self, DEFAULT_ENCODING, 0, gdef.CERT_FIND_SUBJECT_CERT, ctypes.byref(cert_info), None)
except WindowsError as e:
if not e.winerror & 0xffffffff == gdef.CRYPT_E_NOT_FOUND:
raise
return None
return Certificate.from_pointer(rawcertcontext)
def __del__(self):
return winproxy.CertCloseStore(self, 0)
# PKCS12_NO_PERSIST_KEY -> do not save it in a key container on disk
# Without it, a key container is created at 'C:\Users\USERNAME\AppData\Roaming\Microsoft\Crypto\RSA\S-1-5-21-3241049326-165485355-1070449050-1001'
# More about this:
# If you use 'PKCS12_NO_PERSIST_KEY' the key are indeed NOT STORED but there is a problem
# If you use an algo like 'szOID_NIST_AES256_CBC' the function 'CryptDecryptMessage' won't be able to decrypt the message
# Unless you also specify the 'PKCS12_ALWAYS_CNG_KSP' flags.
# My guess: somewhere 'CryptDecryptMessage' ask for each (CNG_KSP | CSP ?) to try to decrypt with the keys
# BUT: as we DID NOT EXPORT the keys, they are not able to get the key from memory and expect them on disk.
# By forcing PKCS12_ALWAYS_CNG_KSP we remove this as the key are directly linked to the correct CNG_KSP in the CertStore
# Look like it's based on this part of the PFX:
# Microsoft CSP Name: Microsoft Enhanced Cryptographic Provider v1.0
# BUT this will not allow to decrypt RSA_RC4 ?
[docs]
def import_pfx(pfx, password=None, flags=gdef.CRYPT_USER_KEYSET | gdef.PKCS12_NO_PERSIST_KEY | gdef.PKCS12_ALWAYS_CNG_KSP):
"""Import the file ``pfx`` with the ``password``.
``default flags = PKCS12_NO_PERSIST_KEY | CRYPT_USER_KEYSET``.
``PKCS12_NO_PERSIST_KEY`` tells ``CryptoAPI`` to NOT save the keys in a on-disk container.
:return: :class:`CertificateStore`
"""
if isinstance(pfx, windows.pycompat.anybuff) or isinstance(pfx, bytearray):
pfx = gdef.CRYPT_DATA_BLOB.from_string(pfx)
cert_store = winproxy.PFXImportCertStore(pfx, password, flags)
return CertificateStore(cert_store)
[docs]
class Certificate(gdef.CERT_CONTEXT):
"""Represent a Certificate """
@property
def raw_serial(self):
"""The raw serial number of the certificate.
:type: [:class:`int`]: A list of int ``0 <= x <= 255``"""
serial_number = self.pCertInfo[0].SerialNumber
return [(c & 0xff) for c in serial_number.pbData[:serial_number.cbData][::-1]]
@property
def serial(self):
"""The string representation of the certificate's serial.
:type: :class:`str`
"""
serial_bytes = self.raw_serial
return " ".join("{:02x}".format(x) for x in serial_bytes)
[docs]
def get_name(self, nametype=gdef.CERT_NAME_SIMPLE_DISPLAY_TYPE, param_type=0, flags=0):
"""Retrieve the subject or issuer name of the certificate.
See `CertGetNameStringA <https://msdn.microsoft.com/en-us/library/windows/desktop/aa376086(v=vs.85).aspx>`_
:returns: :class:`str`
"""
if nametype == gdef.CERT_NAME_RDN_TYPE:
param_type = gdef.DWORD(param_type)
param_type = gdef.LPDWORD(param_type)
size = winproxy.CertGetNameStringW(self, nametype, flags, param_type, None, 0)
namebuff = ctypes.create_unicode_buffer(size)
size = winproxy.CertGetNameStringW(self, nametype, flags, param_type, namebuff, size)
return namebuff[:-1]
name = property(get_name)
"""The name of the certificate.
:type: :class:`str`"""
def raw_hash(self):
size = gdef.DWORD(100)
buffer = ctypes.c_buffer(size.value)
winproxy.CryptHashCertificate(None, 0, 0, self.pbCertEncoded, self.cbCertEncoded, ctypes.cast(buffer, gdef.LPBYTE), size)
return buffer[:size.value]
@property
def thumbprint(self):
"""The thumbprint of the certificate (which is the sha1 of the encoded cert).
Example:
>>> x
<Certificate "YOLO2" serial="6f 1d 3e 7d d9 77 59 a9 4c 1c 53 dc 80 db 0c fe">
>>> x.thumbprint
'E2 A2 DB 76 A1 DD 8E 70 0D C6 9F CB 71 CF 29 12 C6 D9 78 97'
:type: :class:`str`
"""
return " ".join("{:02X}".format(x) for x in bytearray(self.raw_hash()))
@property
def distinguished_name(self):
"""The distinguished name (DN) of the certificate.
Example:
>>> x
<Certificate "Microsoft Windows Production PCA 2011" serial="61 07 76 56 00 00 00 00 00 08">
>>> x.distinguished_name
'C=US, S=Washington, L=Redmond, O=Microsoft Corporation, CN=Microsoft Windows Production PCA 2011'
:type: :class:`str`
"""
return self.get_name(gdef.CERT_NAME_RDN_TYPE, gdef.CERT_X500_NAME_STR)
@property
def issuer(self):
"""The name of the certificate's issuer.
:type: :class:`str`"""
return self.get_name(flags=gdef.CERT_NAME_ISSUER_FLAG)
@property
def store(self):
"""The certificate store that contains the certificate
:type: :class:`CertificateStore`
"""
return CertificateStore(self.hCertStore)
def get_raw_certificate_chains(self): # Rename to all_chains ?
chain_context = EPCCERT_CHAIN_CONTEXT()
enhkey_usage = gdef.CERT_ENHKEY_USAGE()
enhkey_usage.cUsageIdentifier = 0
enhkey_usage.rgpszUsageIdentifier = None
cert_usage = gdef.CERT_USAGE_MATCH()
cert_usage.dwType = gdef.USAGE_MATCH_TYPE_AND
cert_usage.Usage = enhkey_usage
chain_para = gdef.CERT_CHAIN_PARA()
chain_para.cbSize = ctypes.sizeof(chain_para)
chain_para.RequestedUsage = cert_usage
winproxy.CertGetCertificateChain(None, self, None, self.hCertStore, ctypes.byref(chain_para), 0, None, ctypes.byref(chain_context))
# Lower chains ?
# winproxy.CertGetCertificateChain(None, self, None, self[0].hCertStore, ctypes.byref(chain_para), 0x80, None, ctypes.byref(chain_context))
#return CertficateChain(chain_context)
return chain_context
@property # fixedproperty ?
def chains(self):
"""The list of chain context available for this certificate. Each elements of this list is a list of ``Certificate`` that should
go from the ``self`` certificate to a trusted certificate.
:type: [[:class:`Certificate`]] -- A list of chain (list) of :class:`Certificate`
"""
chain_context = self.get_raw_certificate_chains()
res = []
for chain in chain_context.chains:
chain_res = [elt.cert for elt in chain.elements]
res.append(chain_res)
return res
# API Arround CertSelectCertificateChains ?
# https://msdn.microsoft.com/en-us/library/windows/desktop/dd433797(v=vs.85).aspx
[docs]
def duplicate(self):
"""Duplicate the certificate by incrementing the internal refcount. (see `CertDuplicateCertificateContext <https://msdn.microsoft.com/en-us/library/windows/desktop/aa376045(v=vs.85).aspx>`_)
note: The object returned is ``self``
:return: :class:`Certificate`
"""
res = winproxy.CertDuplicateCertificateContext(self)
# Check what the doc says: the pointer returned is actually the PCERT in parameter
# Only the refcount is incremented
# This postulate allow us to return 'self' directly
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa376045(v=vs.85).aspx
if not ctypes.addressof(res[0]) == ctypes.addressof(self):
raise ValueError("CertDuplicateCertificateContext did not returned the argument (check doc)")
return self
def view(self, title=None):
return windows.winproxy.CryptUIDlgViewContext(gdef.CERT_STORE_CERTIFICATE_CONTEXT, ctypes.byref(self), None, title, 0, None)
KNOWN_PROPERTIES_VALUES = gdef.FlagMapper(
gdef.CERT_KEY_PROV_HANDLE_PROP_ID,
gdef.CERT_KEY_PROV_INFO_PROP_ID,
gdef.CERT_SHA1_HASH_PROP_ID,
gdef.CERT_MD5_HASH_PROP_ID,
gdef.CERT_HASH_PROP_ID,
gdef.CERT_KEY_CONTEXT_PROP_ID,
gdef.CERT_KEY_SPEC_PROP_ID,
gdef.CERT_IE30_RESERVED_PROP_ID,
gdef.CERT_PUBKEY_HASH_RESERVED_PROP_ID,
gdef.CERT_ENHKEY_USAGE_PROP_ID,
gdef.CERT_CTL_USAGE_PROP_ID,
gdef.CERT_NEXT_UPDATE_LOCATION_PROP_ID,
gdef.CERT_FRIENDLY_NAME_PROP_ID,
gdef.CERT_PVK_FILE_PROP_ID,
gdef.CERT_DESCRIPTION_PROP_ID,
gdef.CERT_ACCESS_STATE_PROP_ID,
gdef.CERT_SIGNATURE_HASH_PROP_ID,
gdef.CERT_SMART_CARD_DATA_PROP_ID,
gdef.CERT_EFS_PROP_ID,
gdef.CERT_FORTEZZA_DATA_PROP_ID,
gdef.CERT_ARCHIVED_PROP_ID,
gdef.CERT_KEY_IDENTIFIER_PROP_ID,
gdef.CERT_AUTO_ENROLL_PROP_ID,
gdef.CERT_PUBKEY_ALG_PARA_PROP_ID,
gdef.CERT_CROSS_CERT_DIST_POINTS_PROP_ID,
gdef.CERT_ISSUER_PUBLIC_KEY_MD5_HASH_PROP_ID,
gdef.CERT_SUBJECT_PUBLIC_KEY_MD5_HASH_PROP_ID,
gdef.CERT_ENROLLMENT_PROP_ID,
gdef.CERT_DATE_STAMP_PROP_ID,
gdef.CERT_ISSUER_SERIAL_NUMBER_MD5_HASH_PROP_ID,
gdef.CERT_SUBJECT_NAME_MD5_HASH_PROP_ID,
gdef.CERT_EXTENDED_ERROR_INFO_PROP_ID,
gdef.CERT_RENEWAL_PROP_ID,
gdef.CERT_ARCHIVED_KEY_HASH_PROP_ID,
gdef.CERT_AUTO_ENROLL_RETRY_PROP_ID,
gdef.CERT_AIA_URL_RETRIEVED_PROP_ID,
gdef.CERT_AUTHORITY_INFO_ACCESS_PROP_ID,
gdef.CERT_BACKED_UP_PROP_ID,
gdef.CERT_OCSP_RESPONSE_PROP_ID,
gdef.CERT_REQUEST_ORIGINATOR_PROP_ID,
gdef.CERT_SOURCE_LOCATION_PROP_ID)
def enum_properties(self):
prop = 0
res = []
while True:
prop = winproxy.CertEnumCertificateContextProperties(self, prop)
if not prop:
return res
res.append(self.KNOWN_PROPERTIES_VALUES[prop])
raise RuntimeError("Unreachable code")
properties = property(enum_properties)
"""The properties of the certificate
:type: [:class:`int` or :class:`~windows.generated_def.Flag`] -- A list of property ID
"""
#def get_property(self):
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa376079(v=vs.85).aspx
# - Usefull:
# CERT_SHA1_HASH_PROP_ID
[docs]
def get_property(self, prop):
"TODO: DOC :D + auto-type ?"
datasize = gdef.DWORD()
windows.winproxy.CertGetCertificateContextProperty(self, prop, None, datasize)
buf = (gdef.BYTE * datasize.value)()
windows.winproxy.CertGetCertificateContextProperty(self, prop, buf, datasize)
return bytearray(buf)
@property
def encoded(self):
"""The encoded certificate.
:type: :class:`bytearray`"""
return bytearray(self.pbCertEncoded[:self.cbCertEncoded])
@property
def version(self):
"""The version number of the certificate
:type: :class:`int`
"""
return self.pCertInfo[0].dwVersion
[docs]
@classmethod
def from_file(cls, filename):
"""Create a :class:`Certificate` from the file ``filename``
:return: :class:`Certificate`
"""
with open(filename, "rb") as f:
data = f.read()
buf = (ctypes.c_ubyte * len(data))(*bytearray(data))
pcert = windows.winproxy.CertCreateCertificateContext(windows.crypto.DEFAULT_ENCODING, buf, len(data))
return cls.from_pointer(pcert)
[docs]
@classmethod
def from_buffer(cls, data):
"""Create a :class:`Certificate` from the buffer ``data``
:return: :class:`Certificate`
"""
buf = (ctypes.c_ubyte * len(data))(*bytearray(data))
pcert = windows.winproxy.CertCreateCertificateContext(windows.crypto.DEFAULT_ENCODING, buf, len(data))
return cls.from_pointer(pcert)
@classmethod
def from_pointer(self, ptr):
return ctypes.cast(ptr, ctypes.POINTER(Certificate))[0]
def __eq__(self, other):
if not isinstance(other, Certificate):
return NotImplemented
return windows.winproxy.CertCompareCertificate(DEFAULT_ENCODING, self.pCertInfo, other.pCertInfo)
def __repr__(self):
return urepr_encode(u'<{0} "{1}" serial="{2}">'.format(type(self).__name__, self.name, self.serial))
# class CertficateChain(object):
# def __init__(self, pc_chain_context):
# self.chain = pc_chain_context[0]
#
# def to_list(self):
# res = []
# for i in range(self.chain.rgpChain[0][0].cElement):
# res.append(CertificateContext(self.chain.rgpChain[0][0].rgpElement[i][0].pCertContext[0]))
# return res
# Those classes are more of a POC than anything else
# Should be the struct itself (like Certificate ?)
class EPCCERT_CHAIN_CONTEXT(gdef.PCCERT_CHAIN_CONTEXT):
_type_ = gdef.PCCERT_CHAIN_CONTEXT._type_
@property
def chains(self):
res = []
# if (self[0].cLowerQualityChainContext):
# print("LOL")
# import pdb;pdb.set_trace()
# if self[0].cChain > 1:
# print("HAAAAA")
# import pdb;pdb.set_trace()
for i in range(self[0].cChain):
simple_chain = ctypes.cast(self[0].rgpChain[i], EPCCERT_SIMPLE_CHAIN)
res.append(simple_chain)
return res
@property
def all_cert(self):
res = []
for chain in self.chains:
ch = []
res.append(ch)
for element in chain.elements:
ch.append(element.cert)
return res
class EPCCERT_SIMPLE_CHAIN(gdef.PCCERT_SIMPLE_CHAIN):
_type_ = gdef.PCCERT_SIMPLE_CHAIN._type_
@property
def elements(self):
res = []
for i in range(self[0].cElement):
element = ctypes.cast(self[0].rgpElement[i], EPCERT_CHAIN_ELEMENT)
res.append(element)
return res
class EPCERT_CHAIN_ELEMENT(gdef.PCERT_CHAIN_ELEMENT):
_type_ = gdef.PCERT_CHAIN_ELEMENT._type_
@property
def cert(self):
return Certificate.from_pointer(self[0].pCertContext)
# Move this in another .py ?
[docs]
class CryptContext(gdef.HCRYPTPROV):
""" A context manager arround ``CryptAcquireContextW`` & ``CryptReleaseContext``
.. note::
see usage in sample :ref:`sample_crypto_encryption` (function ``genkeys``)
"""
_type_ = gdef.HCRYPTPROV._type_
def __init__(self, pszContainer=None, pszProvider=None, dwProvType=0, dwFlags=0, retrycreate=False):
self.pszContainer = pszContainer
self.pszProvider = pszProvider
self.dwProvType = dwProvType
self.dwFlags = dwFlags
self.retrycreate = True
#self.value = HCRYPTPROV()
pass
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
def acquire(self):
try:
return winproxy.CryptAcquireContextW(self, self.pszContainer, self.pszProvider, self.dwProvType, self.dwFlags)
except WindowsError as e:
if not self.retrycreate:
raise
return winproxy.CryptAcquireContextW(self, self.pszContainer, self.pszProvider, self.dwProvType, self.dwFlags | gdef.CRYPT_NEWKEYSET)
def release(self):
return winproxy.CryptReleaseContext(self, False)