Как получить ответный SSL-сертификат на запросы в python?

Попытка получить сертификат SSL из ответа в requests,

Какой хороший способ сделать это?

Спасибо большое!

10 ответов

Решение

requests намеренно оборачивает вещи низкого уровня как это. Как правило, единственное, что вы хотите сделать, это проверить, что сертификаты действительны. Для этого просто пройдите verify=True, Если вы хотите использовать нестандартный пакет cacert, вы также можете пропустить это. Например:

resp = requests.get('https://example.com', verify=True, cert=['/path/to/my/ca.crt'])

Также, requests это в первую очередь набор оберток вокруг других библиотек, в основном urllib3 и stdlib's http.client (или, для 2.x, httplib) а также ssl,

Иногда ответом является просто получить на объектах более низкого уровня (например, resp.raw это urllib3.response.HTTPResponse), но во многих случаях это невозможно.

И это один из таких случаев. Единственные объекты, которые когда-либо видели сертификаты, - это http.client.HTTPSConnection (или urllib3.connectionpool.VerifiedHTTPSConnection, но это только подкласс первого) и ssl.SSLSocket и ни один из них больше не существует к моменту возврата запроса. (Как следует из названия connectionpool подразумевает, что HTTPSConnection объект хранится в пуле и может быть использован повторно, как только это будет сделано; SSLSocket является членом HTTPSConnection.)

Итак, вам нужно что-то исправить, чтобы вы могли скопировать данные в цепочку. Это может быть так просто, как это:

HTTPResponse = requests.packages.urllib3.response.HTTPResponse
orig_HTTPResponse__init__ = HTTPResponse.__init__
def new_HTTPResponse__init__(self, *args, **kwargs):
    orig_HTTPResponse__init__(self, *args, **kwargs)
    try:
        self.peercert = self._connection.sock.getpeercert()
    except AttributeError:
        pass
HTTPResponse.__init__ = new_HTTPResponse__init__

HTTPAdapter = requests.adapters.HTTPAdapter
orig_HTTPAdapter_build_response = HTTPAdapter.build_response
def new_HTTPAdapter_build_response(self, request, resp):
    response = orig_HTTPAdapter_build_response(self, request, resp)
    try:
        response.peercert = resp.peercert
    except AttributeError:
        pass
    return response
HTTPAdapter.build_response = new_HTTPAdapter_build_response

Это не проверено, поэтому никаких гарантий; вам может понадобиться исправить это.

Кроме того, создание подклассов и переопределение будет, вероятно, чище, чем monkeypatching (особенно с HTTPAdapter был разработан, чтобы быть подклассом).

Или, что еще лучше, разветвление urllib3 а также requestsизменение вашего форка и (если вы считаете, что это законно полезно) отправка пул-запросов в апстрим.

Во всяком случае, теперь, из вашего кода, вы можете сделать это:

resp.peercert

Конечно, вы также можете передать всю информацию, необходимую для проверки сертификата, но это еще проще, поскольку он уже проходит через верхний уровень.

Для начала, ответ Абарнерта очень полный. Пока гонюсь за предложенным connection-close вопрос о Kalkran я на самом деле обнаружил, что peercert не содержал подробную информацию о сертификате SSL. Я копнул глубже информацию о соединении и сокете и извлек self.sock.connection.get_peer_certificate() функция, которая содержит отличные функции, такие как:

  • get_subject() для CN
  • get_notAfter() а также get_notBefore() на срок годности
  • get_serial_number() а также get_signature_algorithm() для крипто-связанных технических деталей
  • ...

Код теперь становится:

import requests

HTTPResponse = requests.packages.urllib3.response.HTTPResponse
orig_HTTPResponse__init__ = HTTPResponse.__init__
def new_HTTPResponse__init__(self, *args, **kwargs):
    orig_HTTPResponse__init__(self, *args, **kwargs)
    try:
        self.peer_certificate = self._connection.peer_certificate
    except AttributeError:
        pass
HTTPResponse.__init__ = new_HTTPResponse__init__

HTTPAdapter = requests.adapters.HTTPAdapter
orig_HTTPAdapter_build_response = HTTPAdapter.build_response
def new_HTTPAdapter_build_response(self, request, resp):
    response = orig_HTTPAdapter_build_response(self, request, resp)
    try:
        response.peer_certificate = resp.peer_certificate
    except AttributeError:
        pass
    return response
HTTPAdapter.build_response = new_HTTPAdapter_build_response

HTTPSConnection = requests.packages.urllib3.connection.HTTPSConnection
orig_HTTPSConnection_connect = HTTPSConnection.connect
def new_HTTPSConnection_connect(self):
    orig_HTTPSConnection_connect(self)
    try:
        self.peer_certificate = self.sock.connection.get_peer_certificate()
    except AttributeError:
        pass
HTTPSConnection.connect = new_HTTPSConnection_connect

Вы сможете легко получить доступ к результату:

r = requests.get('https://yourdomain.tld', timeout=0.1)
print('Expires on: {}'.format(r.peer_certificate.get_notAfter()))
print(dir(r.peer_certificate))

Если, как и я, вы хотите игнорировать предупреждения SSL-сертификатов, просто добавьте следующее в начало файла и не проверяйте SSL:

from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

r = requests.get('https://yourdomain.tld', timeout=0.1, verify=False)
print(dir(r.peer_certificate))

Спасибо всем за потрясающие ответы.

Это помогло мне найти ответ на этот вопрос:

Как добавить пользовательский корневой сертификат CA в CA Store, используемый Python в Windows?

И соберите этот репозиторий:

https://github.com/neozenith/get-ca-py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
Get Certificates from a request and dump them.
"""

import argparse
import sys

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

"""
Inspired by the answers from this Stackru question:
https://stackru.com/questions/16903528/how-to-get-response-ssl-certificate-from-requests-in-python

What follows is a series of patching the low level libraries in requests.
"""

"""
https://stackru.com/a/47931103/622276
"""

sock_requests = requests.packages.urllib3.contrib.pyopenssl.WrappedSocket


def new_getpeercertchain(self, *args, **kwargs):
    x509 = self.connection.get_peer_cert_chain()
    return x509


sock_requests.getpeercertchain = new_getpeercertchain

"""
https://stackru.com/a/16904808/622276
"""

HTTPResponse = requests.packages.urllib3.response.HTTPResponse
orig_HTTPResponse__init__ = HTTPResponse.__init__


def new_HTTPResponse__init__(self, *args, **kwargs):
    orig_HTTPResponse__init__(self, *args, **kwargs)
    try:
        self.peercertchain = self._connection.sock.getpeercertchain()
    except AttributeError:
        pass


HTTPResponse.__init__ = new_HTTPResponse__init__

HTTPAdapter = requests.adapters.HTTPAdapter
orig_HTTPAdapter_build_response = HTTPAdapter.build_response


def new_HTTPAdapter_build_response(self, request, resp):
    response = orig_HTTPAdapter_build_response(self, request, resp)
    try:
        response.peercertchain = resp.peercertchain
    except AttributeError:
        pass
    return response


HTTPAdapter.build_response = new_HTTPAdapter_build_response

"""
Attempt to wrap in a somewhat usable CLI
"""


def cli(args):
    parser = argparse.ArgumentParser(description="Request any URL and dump the certificate chain")
    parser.add_argument("url", metavar="URL", type=str, nargs=1, help="Valid https URL to be handled by requests")

    verify_parser = parser.add_mutually_exclusive_group(required=False)
    verify_parser.add_argument("--verify", dest="verify", action="store_true", help="Explicitly set SSL verification")
    verify_parser.add_argument(
        "--no-verify", dest="verify", action="store_false", help="Explicitly disable SSL verification"
    )
    parser.set_defaults(verify=True)

    return vars(parser.parse_args(args))


def dump_pem(cert, outfile="ca-chain.crt"):
    """Use the CN to dump certificate to PEM format"""
    PyOpenSSL = requests.packages.urllib3.contrib.pyopenssl
    pem_data = PyOpenSSL.OpenSSL.crypto.dump_certificate(PyOpenSSL.OpenSSL.crypto.FILETYPE_PEM, cert)
    issuer = cert.get_issuer().get_components()

    print(pem_data.decode("utf-8"))

    with open(outfile, "a") as output:
        for part in issuer:
            output.write(part[0].decode("utf-8"))
            output.write("=")
            output.write(part[1].decode("utf-8"))
            output.write(",\t")
        output.write("\n")
        output.write(pem_data.decode("utf-8"))


if __name__ == "__main__":
    cli_args = cli(sys.argv[1:])

    url = cli_args["url"][0]
    req = requests.get(url, verify=cli_args["verify"])
    for cert in req.peercertchain:
        dump_pem(cert)

Это, хотя и не красиво, работает:

import requests

req = requests.get('https://httpbin.org')
pool = req.connection.poolmanager.connection_from_url('https://httpbin.org')
conn = pool.pool.get()
# get() removes it from the pool, so put it back in
pool.pool.put(conn)
print(conn.sock.getpeercert())
      import requests
import json
import ssl

with requests.get("https://www.google.com", stream=True) as response:
   certificate_info_raw = response.raw.connection.sock.getpeercert(True)
   pem_cert = ssl.DER_cert_to_PEM_cert(certificate_info_raw)
   print(pem_cert)
   certificate_info = response.raw.connection.sock.getpeercert()
   print(json.dumps(certificate_info, indent=4))

Выход:

      -----BEGIN CERTIFICATE-----
MIIEhjCCA26gAwIBAgIQApqj0oLcEXwKFFjW6rz/AjANBgkqhkiG9w0BAQsFADBG
MQswCQYDVQQGEwJVUzEiMCAGA1UEChMZR29vZ2xlIFRydXN0IFNlcnZpY2VzIExM
QzETMBEGA1UEAxMKR1RTIENBIDFDMzAeFw0yMzA0MDMwODI1MDdaFw0yMzA2MjYw
ODI1MDZaMBkxFzAVBgNVBAMTDnd3dy5nb29nbGUuY29tMFkwEwYHKoZIzj0CAQYI
KoZIzj0DAQcDQgAEWsob6/KiuiRzjGTzkJLpOPM3ESkXWFkNmqM9WWyWgb+EmnXq
ITDSQWy4NjIVuk9srxOw4OOW2QqpPdgG21e6PqOCAmYwggJiMA4GA1UdDwEB/wQE
AwIHgDATBgNVHSUEDDAKBggrBgEFBQcDATAMBgNVHRMBAf8EAjAAMB0GA1UdDgQW
BBSKd+L1AHQBG1iw8oWwIivemR4I1zAfBgNVHSMEGDAWgBSKdH+vhc3ulc09nNDi
RhTzcTUdJzBqBggrBgEFBQcBAQReMFwwJwYIKwYBBQUHMAGGG2h0dHA6Ly9vY3Nw
LnBraS5nb29nL2d0czFjMzAxBggrBgEFBQcwAoYlaHR0cDovL3BraS5nb29nL3Jl
cG8vY2VydHMvZ3RzMWMzLmRlcjAZBgNVHREEEjAQgg53d3cuZ29vZ2xlLmNvbTAh
BgNVHSAEGjAYMAgGBmeBDAECATAMBgorBgEEAdZ5AgUDMDwGA1UdHwQ1MDMwMaAv
oC2GK2h0dHA6Ly9jcmxzLnBraS5nb29nL2d0czFjMy9RcUZ4Ymk5TTQ4Yy5jcmww
ggEDBgorBgEEAdZ5AgQCBIH0BIHxAO8AdQC3Pvsk35xNunXyOcW6WPRsXfxCz3qf
NcSeHQmBJe20mQAAAYdGbz2aAAAEAwBGMEQCIG7nw2oMgD6XqIecHM8dwFz2h4j9
uhJe9pKnZkdEFvj9AiADhJz3LXHohaTPi1GdLUORvdGquHrNQ6EpGvaxWRDoVQB2
AK33vvp8/xDIi509nB4+GGq0Zyldz7EMJMqFhjTr3IKKAAABh0ZvPbwAAAQDAEcw
RQIgRJcXnT5QCU9tYSqK+r407UgoS7k3E0AFXHmHDJOQGJYCIQCEnHxWxFqh/heK
OqvYhHy1v7cLZ5mywQ9hGKJt1sBJRTANBgkqhkiG9w0BAQsFAAOCAQEAAbKQe92N
mhqCTUgL8ssqz7wa2jHYTotcCq7CwFu3Iy/IeAKomzowFpYHtDqdbJPDh8qTMxnp
f0Z3cLNYRzlIl6rSOMRG7Ij3xv8E0jGO+US6QIpdoNSKEMUwVAXEyD2/gkQYDcFv
q1p2GgikEN6dL7ohXPr5MxB211tPwEoC6uI0zwSfZa0m/ZsvRESbtN88975GnrRz
YPPLSJB+nV7d9BuG2Xt9BPGydEyQeDslgm8QX4kPyGYKkNJMM3I7ZdgIbpe7EJpZ
eHaEpdY8TzSadL2jCokCS0hmwzhuqqsycDIkDyIKQKxTexemB8pf2Sw3UAe7cU9Q
lW5FOc9ifDnEMw==
-----END CERTIFICATE-----

{
    "subject": [
        [
            [
                "commonName",
                "www.google.com"
            ]
        ]
    ],
    "issuer": [
        [
            [
                "countryName",
                "US"
            ]
        ],
        [
            [
                "organizationName",
                "Google Trust Services LLC"
            ]
        ],
        [
            [
                "commonName",
                "GTS CA 1C3"
            ]
        ]
    ],
    "version": 3,
    "serialNumber": "029AA3D282DC117C0A1458D6EABCFF02",
    "notBefore": "Apr  3 08:25:07 2023 GMT",
    "notAfter": "Jun 26 08:25:06 2023 GMT",
    "subjectAltName": [
        [
            "DNS",
            "www.google.com"
        ]
    ],
    "OCSP": [
        "http://ocsp.pki.goog/gts1c3"
    ],
    "caIssuers": [
        "http:/../Playground/"
    ],
    "crlDistributionPoints": [
        "http:/../Playground/"
    ]
}

Для начала ответ Абарнерта очень полный

Но я хотел бы добавить, что в случае, если вы ищете цепочку сертификатов сверстников, вам нужно будет исправить еще один кусок кода

import requests
sock_requests = requests.packages.urllib3.contrib.pyopenssl.WrappedSocket
def new_getpeercertchain(self,*args, **kwargs):
    x509 = self.connection.get_peer_cert_chain()
    return x509
sock_requests.getpeercertchain = new_getpeercertchain

после этого вы можете назвать это очень похоже на принятый ответ

HTTPResponse = requests.packages.urllib3.response.HTTPResponse
orig_HTTPResponse__init__ = HTTPResponse.__init__
def new_HTTPResponse__init__(self, *args, **kwargs):
    orig_HTTPResponse__init__(self, *args, **kwargs)
    try:
        self.peercertchain = self._connection.sock.getpeercertchain()
    except AttributeError:
        pass
HTTPResponse.__init__ = new_HTTPResponse__init__

HTTPAdapter = requests.adapters.HTTPAdapter
orig_HTTPAdapter_build_response = HTTPAdapter.build_response
def new_HTTPAdapter_build_response(self, request, resp):
    response = orig_HTTPAdapter_build_response(self, request, resp)
    try:
        response.peercertchain = resp.peercertchain
    except AttributeError:
        pass
    return response
HTTPAdapter.build_response = new_HTTPAdapter_build_response

ты получишь resp.peercertchain который содержит tuple из OpenSSL.crypto.X509 объекты

Я многому научился из приведенных выше ответов.

Чтобы решить проблему подключения: закрыть, я объединил ответы Марка Аменри и Инь Лея, чтобы найти следующее решение.

Пусть это поможет.

      # from Connection get SSL certificate patch
HTTPSConnection = requests.packages.urllib3.connection.HTTPSConnection
orig_HTTPSConnection_connect = HTTPSConnection.connect
def new_HTTPSConnection_connect(self):
    orig_HTTPSConnection_connect(self)
    try:
        self.peer_certificate = self.sock.getpeercert(binary_form=True)
    except AttributeError as e:
        pass
HTTPSConnection.connect = new_HTTPSConnection_connect

А потом

      with requests.get(url, stream=True) as r:
    certificate_info = r.raw.connection.peer_certificate
    print(certificate_info)

Просто сделайте это:

      import requests

with requests.get("https://www.bilibili.com", stream=True) as response:
    certificate_info = response.raw.connection.sock.getpeercert()
    subject = dict(x[0] for x in certificate_info['subject'])
    issuer = dict(x[0] for x in certificate_info['issuer'])

    print("commonName:", subject['commonName'])
    print("issuer:", issuer['commonName'])

Тогда вывод:

      commonName: *.bilibili.com
issuer: GlobalSign RSA OV SSL CA 2018

Желаю вам помочь.

Более чистое (-иш) решение, основанное на предыдущих очень хороших ответах!

  1. необходимо исправить запросы. Исходный файл адаптера перед переопределением класса HTTPResponse ( ожидающий запрос на вытягивание: https://github.com/psf/requests/pull/6039):
    • добавить переменную статического класса в класс HTTPAdapter(BaseAdapter) : _clsHTTPResponse = HTTPResponse
    • измените метод send() , чтобы использовать _clsHTTPResponse вместо прямого создания объекта HTTPResponse: resp = _clsHTTPResponse.from_httplib(...
  2. используйте этот код:
      """
Subclassing HTTP / requests to get peer_certificate back from lower levels
"""
from typing import Optional, Mapping, Any
from http.client import HTTPSConnection
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
from urllib3.poolmanager import PoolManager,key_fn_by_scheme
from urllib3.connectionpool import HTTPSConnectionPool,HTTPConnectionPool
from urllib3.connection import HTTPSConnection,HTTPConnection
from urllib3.response import HTTPResponse as URLLIB3_HTTPResponse

#force urllib3 to use pyopenssl
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()  

class HTTPSConnection_withcert(HTTPSConnection):
    def __init__(self, *args, **kw):
        self.peer_certificate = None
        super().__init__(*args, **kw)
    def connect(self):
        res = super().connect() 
        self.peer_certificate = self.sock.connection.get_peer_certificate()
        return res

class HTTPResponse_withcert(URLLIB3_HTTPResponse):
    def __init__(self, *args, **kwargs):
        self.peer_certificate = None
        res = super().__init__( *args, **kwargs)
        self.peer_certificate = self._connection.peer_certificate
        return res
       
class HTTPSConnectionPool_withcert(HTTPSConnectionPool):
    ConnectionCls   = HTTPSConnection_withcert
    ResponseCls     = HTTPResponse_withcert
    
class PoolManager_withcert(PoolManager): 
    def __init__(
        self,
        num_pools: int = 10,
        headers: Optional[Mapping[str, str]] = None,
        **connection_pool_kw: Any,
    ) -> None:   
        super().__init__(num_pools,headers,**connection_pool_kw)
        self.pool_classes_by_scheme = {"http": HTTPConnectionPool, "https": HTTPSConnectionPool_withcert}
        self.key_fn_by_scheme = key_fn_by_scheme.copy()
                
class HTTPAdapter_withcert(HTTPAdapter):
    _clsHTTPResponse = HTTPResponse_withcert
    def build_response(self, request, resp):
        response = super().build_response( request, resp)
        response.peer_certificate = resp.peer_certificate
        return response

    def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
        #do not call super() to not initialize PoolManager twice
        # save these values for pickling
        self._pool_connections  = connections
        self._pool_maxsize      = maxsize
        self._pool_block        = block

        self.poolmanager        = PoolManager_withcert(num_pools=connections, 
                                                   maxsize=maxsize,
                                                   block=block, 
                                                   strict=True, 
                                                   **pool_kwargs)
class Session_withcert(Session):
    def __init__(self):
        super().__init__()
        self.mount('https://', HTTPAdapter_withcert())
  1. И это все ! Теперь вы можете использовать новую сессию Session_withcert() как базовую, но вы также можете:
      ss= Session_withcert()
resp=ss.get("https://www.google.fr")
resp.peer_certificate.get_subject()
print(resp.peer_certificate.get_subject())

который выведет:

      <X509Name object '/CN=*.google.fr'>

Для получения деталей сертификата, таких как CN и дата истечения срока действия, хорошо работает следующий сценарий, адаптированный из этого примера. Это также позволяет избежать некоторых ошибок, которые я получил, которые, как я полагаю, были вызваны неправильными / несовместимыми версиями запросов и urllib3: "AttributeError: объект 'SSLSocket' не имеет атрибута 'connection'" и "AttributeError: объект 'VerifiedHTTPSConnection' не имеет атрибута 'peer_certificate'"

from OpenSSL.SSL import Connection, Context, SSLv3_METHOD, TLSv1_2_METHOD
from datetime import datetime, time
import socket
host = 'www.google.com'
try:
    try:
        ssl_connection_setting = Context(SSLv3_METHOD)
    except ValueError:
        ssl_connection_setting = Context(TLSv1_2_METHOD)
    ssl_connection_setting.set_timeout(5)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, 443))
        c = Connection(ssl_connection_setting, s)
        c.set_tlsext_host_name(str.encode(host))
        c.set_connect_state()
        c.do_handshake()
        cert = c.get_peer_certificate()
        print("Is Expired: ", cert.has_expired())
        print("Issuer: ", cert.get_issuer())
        subject_list = cert.get_subject().get_components()
        cert_byte_arr_decoded = {}
        for item in subject_list:
            cert_byte_arr_decoded.update({item[0].decode('utf-8'): item[1].decode('utf-8')})
        print(cert_byte_arr_decoded)
        if len(cert_byte_arr_decoded) > 0:
            print("Subject: ", cert_byte_arr_decoded)
        if cert_byte_arr_decoded["CN"]:
            print("Common Name: ", cert_byte_arr_decoded["CN"])
        end_date = datetime.strptime(str(cert.get_notAfter().decode('utf-8')), "%Y%m%d%H%M%SZ")
        print("Not After (UTC Time): ", end_date)
        diff = end_date - datetime.now()
        print('Summary: "{}" SSL certificate expires on {} i.e. {} days.'.format(host, end_date, diff.days))
        c.shutdown()
        s.close()
except:
    print("Connection to {} failed.".format(host))  

Этот скрипт требует Python 3 и pyOpenSSL.

Другие вопросы по тегам