扭曲获取密码名称

twisted get cipher name

from twisted.internet.protocol import ClientFactory
from twisted.internet.protocol import Protocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import returnValue
from twisted.internet.ssl import CertificateOptions
from twisted.internet.ssl import AcceptableCiphers
from ssl import PROTOCOL_SSLv23
from ssl import DER_cert_to_PEM_cert
from OpenSSL.crypto import FILETYPE_PEM
from OpenSSL.crypto import load_certificate
import time
import json

normalCyphers = AcceptableCiphers.fromOpenSSLCipherString(
    'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
    'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES:!aNULL:'
    '!eNULL:!MD5'
)
normalCtxFac = CertificateOptions(acceptableCiphers=normalCyphers, method=PROTOCOL_SSLv23)

weakCiphers = AcceptableCiphers.fromOpenSSLCipherString('ALL:!aNULL:!eNULL')
weakCtxFac = CertificateOptions(acceptableCiphers=weakCiphers, method=PROTOCOL_SSLv23)


def asn1DateToTimestamp(asn1Date):
    expirationDate = time.strptime(asn1Date[:8], '%Y%m%d')
    return int(time.mktime(expirationDate))


class CertCheckProtocol(Protocol):

    def __init__(self, dfd, isWeakSsl):
        self.dfd = dfd
        self.isWeakSsl = isWeakSsl

    def connectionMade(self):
        reactor.callLater(0.01, self.getCert, 20)

    def getCert(self, depth):
        cert = self.transport.getPeerCertificate()
        transportHandle = self.transport.getHandle()
        if cert is None or transportHandle is None:
            if depth <= 0:
                self.transport.loseConnection()
                return
            reactor.callLater(0.01, self.getCert, depth - 1)
        else:
            cipherName = transportHandle.get_cipher_name()
            key = DER_cert_to_PEM_cert(cert)
            targetCert = load_certificate(FILETYPE_PEM, key)
            timestamp = asn1DateToTimestamp(targetCert.get_notAfter())
            expiresIn = timestamp - time.time()
            try:
                usedCipher = '  '.join(map(str, cipherName))
            except Exception:
                usedCipher = str(cipherName)

            self.dfd.callback({
                'name': 'certificate',
                'expiresIn': expiresIn,
                'sha1Digest': targetCert.digest('sha1'),
                'signatureAlgorithm': targetCert.get_signature_algorithm(),
                'issuer': targetCert.get_issuer().CN,
                'notAfter': timestamp,
                'notBefore': asn1DateToTimestamp(targetCert.get_notBefore()),
                'serialNumber': targetCert.get_serial_number(),
                'subject': targetCert.get_subject().CN,
                'sslVersion': targetCert.get_version(),
                'usedCipher': usedCipher,
                'weakCipher': self.isWeakSsl
            })

    def connectionLost(self, reason):
        if not self.dfd.called:
            self.dfd.errback(Exception('Connection lost'))


class CertCheckFactory(ClientFactory):

    def __init__(self, dfd, isWeakSsl):
        self.dfd = dfd
        self.isWeakSsl = isWeakSsl

    def clientConnectionFailed(self, connector, reason):
        self.dfd.errback(reason)

    def buildProtocol(self, addr):
        return CertCheckProtocol(self.dfd, self.isWeakSsl)


@inlineCallbacks
def getCertificateInfo(ip, port=443):
    dfd = Deferred()
    factory = CertCheckFactory(dfd, isWeakSsl=False)
    reactor.connectSSL(ip, int(port), factory, contextFactory=normalCtxFac)
    try:
        res = yield dfd
    except Exception as ex:
        if hasattr(ex, 'reason') and 'HANDSHAKE_FAILURE' in ex.reason:
            dfd = Deferred()
            factory = CertCheckFactory(dfd, isWeakSsl=True)
            reactor.connectSSL(ip, int(port), factory, contextFactory=weakCtxFac)
            res = yield dfd
        else:
            raise
    returnValue(res)


@inlineCallbacks
def testit(ip):
    res = yield getCertificateInfo(ip)
    print json.dumps(res)
    reactor.stop()

if __name__ == '__main__':
    testit('x.x.x.x')
    reactor.run()

我不确定捕获握手失败是否正确。仍然必须使用密码较弱的服务器来测试该部分。

这是堆栈跟踪,表明 self._socket 是 none 用于传输句柄

  File "C:\Python27\lib\site-packages\twisted\internet\base.py", line 825, in runUntilCurrent
    call.func(*call.args, **call.kw)
  File "C:\Users\sjuul\workspace\meuk\soCertQuestion.py", line 50, in getCert
    cipherName = transportHandle.get_cipher_name()
  File "C:\Python27\lib\site-packages\OpenSSL\SSL.py", line 838, in __getattr__
    return getattr(self._socket, name)
exceptions.AttributeError: 'NoneType' object has no attribute 'get_cipher_name'

它没有完全公开 - 请随时为此在 Twisted 上提交错误 - 但你可以通过 pyOpenSSL API escape-hatch 找到它,self.transport.getHandle().get_cipher_name().

当我修改你的示例以从标准库 ssl 和 pyOpenSSL OpenSSL 模块中删除虚假导入时,它工作正常,并告诉我 google.com 正在使用 ECDHE-RSA-AES128-GCM-SHA256:

from twisted.internet.protocol import ClientFactory
from twisted.internet.protocol import Protocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import returnValue
from twisted.internet.ssl import CertificateOptions
from twisted.internet.ssl import AcceptableCiphers

import time
import json

normalCyphers = AcceptableCiphers.fromOpenSSLCipherString(
    'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
    'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES:!aNULL:'
    '!eNULL:!MD5'
)
normalCtxFac = CertificateOptions(acceptableCiphers=normalCyphers)

weakCiphers = AcceptableCiphers.fromOpenSSLCipherString('ALL:!aNULL:!eNULL')
weakCtxFac = CertificateOptions(acceptableCiphers=weakCiphers)


def asn1DateToTimestamp(asn1Date):
    expirationDate = time.strptime(asn1Date[:8], '%Y%m%d')
    return int(time.mktime(expirationDate))


class CertCheckProtocol(Protocol):

    def __init__(self, dfd, isWeakSsl):
        self.dfd = dfd
        self.isWeakSsl = isWeakSsl

    def connectionMade(self):
        reactor.callLater(0.01, self.getCert, 20)

    def getCert(self, depth):
        cert = self.transport.getPeerCertificate()
        transportHandle = self.transport.getHandle()
        if cert is None or transportHandle is None:
            if depth <= 0:
                self.transport.loseConnection()
                return
            reactor.callLater(0.01, self.getCert, depth - 1)
        else:
            cipherName = transportHandle.get_cipher_name()
            timestamp = asn1DateToTimestamp(cert.get_notAfter())
            expiresIn = timestamp - time.time()

            self.dfd.callback({
                'name': 'certificate',
                'expiresIn': expiresIn,
                'sha1Digest': cert.digest('sha1'),
                'signatureAlgorithm': cert.get_signature_algorithm(),
                'issuer': cert.get_issuer().CN,
                'notAfter': timestamp,
                'notBefore': asn1DateToTimestamp(cert.get_notBefore()),
                'serialNumber': cert.get_serial_number(),
                'subject': cert.get_subject().CN,
                'sslVersion': cert.get_version(),
                'usedCipher': cipherName,
                'weakCipher': self.isWeakSsl
            })

    def connectionLost(self, reason):
        if not self.dfd.called:
            self.dfd.errback(Exception('Connection lost'))


class CertCheckFactory(ClientFactory):

    def __init__(self, dfd, isWeakSsl):
        self.dfd = dfd
        self.isWeakSsl = isWeakSsl

    def clientConnectionFailed(self, connector, reason):
        self.dfd.errback(reason)

    def buildProtocol(self, addr):
        return CertCheckProtocol(self.dfd, self.isWeakSsl)


@inlineCallbacks
def getCertificateInfo(ip, port=443):
    dfd = Deferred()
    factory = CertCheckFactory(dfd, isWeakSsl=False)
    reactor.connectSSL(ip, int(port), factory, contextFactory=normalCtxFac)
    try:
        res = yield dfd
    except Exception as ex:
        if hasattr(ex, 'reason') and 'HANDSHAKE_FAILURE' in ex.reason:
            dfd = Deferred()
            factory = CertCheckFactory(dfd, isWeakSsl=True)
            reactor.connectSSL(ip, int(port), factory,
                               contextFactory=weakCtxFac)
            res = yield dfd
        else:
            raise
    returnValue(res)


@inlineCallbacks
def testit(ip):
    res = yield getCertificateInfo(ip)
    print json.dumps(res)
    reactor.stop()

if __name__ == '__main__':
    testit('google.com')
    reactor.run()