Python 以字符串形式请求 CA 证书
Python requests CA certificates as a string
目前我们正在使用将 CA 证书放在服务器上以访问第三方 API 的方法。
certificate_path = os.path.join(CERT_PATH, 'cacert.pem')
certificate_key_path = os.path.join(CERT_PATH, 'cacert.key')
response = requests.get(url, cert=(certificate_path, certificate_key_path))
这行得通,但我们正在寻找而不是将 CA 证书存储在服务器上,存储在数据库中的 Accounts
Table 出于安全目的(客户提出的安全原因)。
所以问题是:
有什么方法可以直接将 CA 证书的字符串直接传递给 requests
(而不是将内容写入临时文件)?
是否有任何其他 http
python 模块支持在 http
get/post 请求中传递 CA 证书的字符串?
除了将它们存储在数据库和服务器上,我们还应该使用其他方法吗?
有一种方法可以通过临时文件来完成,就像这样:
cert = tempfile.NamedTemporaryFile(delete=False)
cert.write(CERTIFICATE_AS_STRING)
cert.close()
requests.get(url, cert=cert.name, verify=True)
os.unlink(cert.name)
如果您想知道为什么这可能不安全,请在此处查看我的回答:
如果想在不使用临时文件的情况下执行此操作,可以通过覆盖请求 SSLContext 来实现。样本可见于.
您提供的示例正在传递 client-side 证书,如请求 documentation 中所示。
就目前而言,无法通过内存中的客户端证书和密钥(或作为字符串)。
猴子修补来拯救 - 通过猴子修补requests
,您可以添加从内存加载客户端证书和密钥的能力。以下补丁可以在不破坏现有功能的情况下以多种格式传递客户端证书和密钥。
import requests
from OpenSSL.crypto import PKCS12, X509, PKey
def _is_key_file_encrypted(keyfile):
'''In memory key is not encrypted'''
if isinstance(keyfile, PKey):
return False
return _is_key_file_encrypted.original(keyfile)
class PyOpenSSLContext(requests.packages.urllib3.contrib.pyopenssl.PyOpenSSLContext):
'''Support loading certs from memory'''
def load_cert_chain(self, certfile, keyfile=None, password=None):
if isinstance(certfile, X509) and isinstance(keyfile, PKey):
self._ctx.use_certificate(certfile)
self._ctx.use_privatekey(keyfile)
else:
super().load_cert_chain(certfile, keyfile=keyfile, password=password)
class HTTPAdapter(requests.adapters.HTTPAdapter):
'''Handle a variety of cert types'''
def cert_verify(self, conn, url, verify, cert):
if cert:
# PKCS12
if isinstance(cert, PKCS12):
conn.cert_file = cert.get_certificate()
conn.key_file = cert.get_privatekey()
cert = None
elif isinstance(cert, tuple) and len(cert) == 2:
# X509 and PKey
if isinstance(cert[0], X509) and hasattr(cert[1], PKey):
conn.cert_file = cert[0]
conn.key_file = cert[1]
cert = None
# cryptography objects
elif hasattr(cert[0], 'public_bytes') and hasattr(cert[1], 'private_bytes'):
conn.cert_file = X509.from_cryptography(cert[0])
conn.key_file = PKey.from_cryptography_key(cert[1])
cert = None
super().cert_verify(conn, url, verify, cert)
def patch_requests(adapter=True):
'''You can perform a full patch and use requests as usual:
>>> patch_requests()
>>> requests.get('https://httpbin.org/get')
or use the adapter explicitly:
>>> patch_requests(adapter=False)
>>> session = requests.Session()
>>> session.mount('https', HTTPAdapter())
>>> session.get('https://httpbin.org/get')
'''
if hasattr(requests.packages.urllib3.util.ssl_, '_is_key_file_encrypted'):
_is_key_file_encrypted.original = requests.packages.urllib3.util.ssl_._is_key_file_encrypted
requests.packages.urllib3.util.ssl_._is_key_file_encrypted = _is_key_file_encrypted
requests.packages.urllib3.util.ssl_.SSLContext = PyOpenSSLContext
if adapter:
requests.sessions.HTTPAdapter = HTTPAdapter
要使用补丁,您可以执行以下操作(假设以上代码位于名为 patch.py
的文件中)
import os
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from patch import patch_requests
CLIENT_CERT = serialization.load_pem_x509_certificate(
os.getenv('CLIENT_CERT'), default_backend())
CLIENT_KEY = serialization.load_pem_private_key(
os.getenv('CLIENT_KEY'), None, default_backend())
# monkey patch load_cert_chain to allow loading
# cryptography certs and keys from memory
patch_requests()
response = requests.get(url, cert=(CLIENT_CERT, CLIENT_KEY))
您现在能够以 pyopenssl
个对象或 cryptography
个对象的形式向内存中的请求提供客户端证书。
我采用了不同的方法并使用 init_poolmanager 来设置 ssl 上下文。我避免打补丁,所以它只适用于 Session 对象。
E.x.:
#pip install requests pyOpenSSL
import OpenSSL
import requests
import requests.hooks
from urllib3 import Retry
from urllib3.contrib.pyopenssl import PyOpenSSLContext
from urllib3.util.ssl_ import create_urllib3_context
class ClientSideCertificateHTTPAdapter(requests.adapters.HTTPAdapter):
DEFAULT_PROTOCOL = create_urllib3_context().protocol
def __init__(self, *args, cert, key, protocol=DEFAULT_PROTOCOL, **kwargs):
self._cert = cert
self._key = key
self._protocol = protocol
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
ctx = PyOpenSSLContext(self._protocol)
kwargs["ssl_context"] = ctx
ctx._ctx.use_certificate(self._cert)
ctx._ctx.use_privatekey(self._key)
return super().init_poolmanager(*args, **kwargs)
def main():
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----")
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----", b"passphrase_goes_here")
adapter = ClientSideCertificateHTTPAdapter(cert=cert, key=key, max_retries=Retry(total=10, backoff_factor=0.5))
session = requests.Session()
session.mount("https://www.hotmail.com/", adapter)
session.get("https://www.hotmail.com/api/v2/mail")
if __name__ == "__main__":
main()
目前我们正在使用将 CA 证书放在服务器上以访问第三方 API 的方法。
certificate_path = os.path.join(CERT_PATH, 'cacert.pem')
certificate_key_path = os.path.join(CERT_PATH, 'cacert.key')
response = requests.get(url, cert=(certificate_path, certificate_key_path))
这行得通,但我们正在寻找而不是将 CA 证书存储在服务器上,存储在数据库中的 Accounts
Table 出于安全目的(客户提出的安全原因)。
所以问题是:
有什么方法可以直接将 CA 证书的字符串直接传递给
requests
(而不是将内容写入临时文件)?是否有任何其他
http
python 模块支持在http
get/post 请求中传递 CA 证书的字符串?除了将它们存储在数据库和服务器上,我们还应该使用其他方法吗?
有一种方法可以通过临时文件来完成,就像这样:
cert = tempfile.NamedTemporaryFile(delete=False)
cert.write(CERTIFICATE_AS_STRING)
cert.close()
requests.get(url, cert=cert.name, verify=True)
os.unlink(cert.name)
如果您想知道为什么这可能不安全,请在此处查看我的回答:
如果想在不使用临时文件的情况下执行此操作,可以通过覆盖请求 SSLContext 来实现。样本可见于
您提供的示例正在传递 client-side 证书,如请求 documentation 中所示。
就目前而言,无法通过内存中的客户端证书和密钥(或作为字符串)。
猴子修补来拯救 - 通过猴子修补requests
,您可以添加从内存加载客户端证书和密钥的能力。以下补丁可以在不破坏现有功能的情况下以多种格式传递客户端证书和密钥。
import requests
from OpenSSL.crypto import PKCS12, X509, PKey
def _is_key_file_encrypted(keyfile):
'''In memory key is not encrypted'''
if isinstance(keyfile, PKey):
return False
return _is_key_file_encrypted.original(keyfile)
class PyOpenSSLContext(requests.packages.urllib3.contrib.pyopenssl.PyOpenSSLContext):
'''Support loading certs from memory'''
def load_cert_chain(self, certfile, keyfile=None, password=None):
if isinstance(certfile, X509) and isinstance(keyfile, PKey):
self._ctx.use_certificate(certfile)
self._ctx.use_privatekey(keyfile)
else:
super().load_cert_chain(certfile, keyfile=keyfile, password=password)
class HTTPAdapter(requests.adapters.HTTPAdapter):
'''Handle a variety of cert types'''
def cert_verify(self, conn, url, verify, cert):
if cert:
# PKCS12
if isinstance(cert, PKCS12):
conn.cert_file = cert.get_certificate()
conn.key_file = cert.get_privatekey()
cert = None
elif isinstance(cert, tuple) and len(cert) == 2:
# X509 and PKey
if isinstance(cert[0], X509) and hasattr(cert[1], PKey):
conn.cert_file = cert[0]
conn.key_file = cert[1]
cert = None
# cryptography objects
elif hasattr(cert[0], 'public_bytes') and hasattr(cert[1], 'private_bytes'):
conn.cert_file = X509.from_cryptography(cert[0])
conn.key_file = PKey.from_cryptography_key(cert[1])
cert = None
super().cert_verify(conn, url, verify, cert)
def patch_requests(adapter=True):
'''You can perform a full patch and use requests as usual:
>>> patch_requests()
>>> requests.get('https://httpbin.org/get')
or use the adapter explicitly:
>>> patch_requests(adapter=False)
>>> session = requests.Session()
>>> session.mount('https', HTTPAdapter())
>>> session.get('https://httpbin.org/get')
'''
if hasattr(requests.packages.urllib3.util.ssl_, '_is_key_file_encrypted'):
_is_key_file_encrypted.original = requests.packages.urllib3.util.ssl_._is_key_file_encrypted
requests.packages.urllib3.util.ssl_._is_key_file_encrypted = _is_key_file_encrypted
requests.packages.urllib3.util.ssl_.SSLContext = PyOpenSSLContext
if adapter:
requests.sessions.HTTPAdapter = HTTPAdapter
要使用补丁,您可以执行以下操作(假设以上代码位于名为 patch.py
的文件中)
import os
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from patch import patch_requests
CLIENT_CERT = serialization.load_pem_x509_certificate(
os.getenv('CLIENT_CERT'), default_backend())
CLIENT_KEY = serialization.load_pem_private_key(
os.getenv('CLIENT_KEY'), None, default_backend())
# monkey patch load_cert_chain to allow loading
# cryptography certs and keys from memory
patch_requests()
response = requests.get(url, cert=(CLIENT_CERT, CLIENT_KEY))
您现在能够以 pyopenssl
个对象或 cryptography
个对象的形式向内存中的请求提供客户端证书。
我采用了不同的方法并使用 init_poolmanager 来设置 ssl 上下文。我避免打补丁,所以它只适用于 Session 对象。
E.x.:
#pip install requests pyOpenSSL
import OpenSSL
import requests
import requests.hooks
from urllib3 import Retry
from urllib3.contrib.pyopenssl import PyOpenSSLContext
from urllib3.util.ssl_ import create_urllib3_context
class ClientSideCertificateHTTPAdapter(requests.adapters.HTTPAdapter):
DEFAULT_PROTOCOL = create_urllib3_context().protocol
def __init__(self, *args, cert, key, protocol=DEFAULT_PROTOCOL, **kwargs):
self._cert = cert
self._key = key
self._protocol = protocol
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
ctx = PyOpenSSLContext(self._protocol)
kwargs["ssl_context"] = ctx
ctx._ctx.use_certificate(self._cert)
ctx._ctx.use_privatekey(self._key)
return super().init_poolmanager(*args, **kwargs)
def main():
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----")
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----", b"passphrase_goes_here")
adapter = ClientSideCertificateHTTPAdapter(cert=cert, key=key, max_retries=Retry(total=10, backoff_factor=0.5))
session = requests.Session()
session.mount("https://www.hotmail.com/", adapter)
session.get("https://www.hotmail.com/api/v2/mail")
if __name__ == "__main__":
main()