Python pkcs#7 x509 加密信任链

Python pkcs#7 x509 chain of trust with cryptography

我正在开发一个 python 库来将 apple pay 有效负载处理成可用的卡详细信息。为此,我遵循官方文档 here.

除 2 个验证步骤外,一切正常:

步骤1.c.

Ensure that there is a valid X.509 chain of trust from the signature to the root CA. Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate, that the leaf certificate is signed by the intermediate CA, and that the intermediate CA is signed by the Apple Root CA - G3.

在这里,我最终使用密码学库通过以下代码进行了最后 2 次检查:

from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA

def verify_root_ca_chain_of_trust(
    trusted_root_ca: x509.Certificate,
    intermediate_cert: x509.Certificate,
    leaf_cert: x509.Certificate
) -> None:
    try:
        # verify that the intermediate CA is signed by the Apple Root CA - G3
        trusted_pub = trusted_root_ca.public_key()
        trusted_pub.verify(
            intermediate_cert.signature,
            intermediate_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
        # verify that the leaf certificate is signed by the intermediate CA
        trusted_intermediate_pub = intermediate_cert.public_key()
        trusted_intermediate_pub.verify(
            leaf_cert.signature,
            leaf_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
    except TypeError as err:
        raise CustomError('error') from err

我的问题是我只执行了所需检查的 2/3。 我不知道该怎么做:

Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate

步骤 1.d。和 1.e.

d. For ECC (EC_v1), ensure that the signature is a valid ECDSA signature (ecdsa-with-SHA256 1.2.840.10045.4.3.2) of the concatenated values of the ephemeralPublicKey, data, transactionId, and applicationData keys.

e. Inspect the CMS signing time of the signature, as defined by section 11.3 of RFC 5652. If the time signature and the transaction time differ by more than a few minutes, it's possible that the token is a replay attack.

到目前为止我尝试了什么:

def validate_token_signature(
    trusted_cert: x509.Certificate,
    signature: str,
    payment_data: str,
    ephemeral_pub: str,
    transaction_id: str,
    application_data: str = None,
) -> None:
    data_byte: bytes = base64.b64decode(ephemeral_pub)
    payment_data_byte: bytes = base64.b64decode(payment_data)
    transaction_id_byte: bytes = bytes.fromhex(transaction_id)
    data: bytes = data_byte + payment_data_byte + transaction_id_byte
    if application_data is not None:
        application_data_byte: bytes = base64.b64decode(application_data)
        data = data + application_data_byte
    try:
        trusted_leaf_pub = trusted_cert.public_key()
        trusted_leaf_pub.verify(base64.b64decode(signature), data, ECDSA(hashes.SHA256()))
    except InvalidSignature as err:
        print(err)
        # raise SignatureError('error') from err```

这里他们没有指定要验证的签名,我认为它是 PKCS#7,但密码学仅将 pkcs#7 作为 x509 证书列表处理,如所述 here:

Deserialize a PEM encoded PKCS7 blob to a list of certificates. PKCS7 can contain many other types of data, including CRLs, but this function will ignore everything except certificates.

有没有办法用 python 密码学进行这些检查,或者我是否必须使用另一个库,如 pyopenssl 或其他东西?

要读取 cms 签名时间并验证 pkcs7 签名,我需要访问 cms 内容和签名者信息。这不能通过 m2crypto 或密码学的 pkcs#7 模块实现。我最终使用的是 asn1crypto。

步骤 1.c 和 1.d 是相同检查的一部分,所以我确实保留了信任链的上述检查:

def verify_root_ca_chain_of_trust(
    trusted_root_ca: x509.Certificate,
    intermediate_cert: x509.Certificate,
    leaf_cert: x509.Certificate
) -> None:
    try:
        # verify that the intermediate CA is signed by the Apple Root CA - G3
        trusted_pub = trusted_root_ca.public_key()
        trusted_pub.verify(
            intermediate_cert.signature,
            intermediate_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
        # verify that the leaf certificate is signed by the intermediate CA
        trusted_intermediate_pub = intermediate_cert.public_key()
        trusted_intermediate_pub.verify(
            leaf_cert.signature,
            leaf_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
    except TypeError as err:
        raise CustomError('error') from err

做最后一部分检查 'Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate' 我确实使用了 asn1crypto:

    from asn1crypto import cms
    from asn1crypto import core

    def validate_token_signature(
        trusted_cert: x509.Certificate,
        signature: bytes,
        payment_data: bytes,
        ephemeral_pub: bytes,
        transaction_id: bytes,
        application_data: bytes,
    ) -> None:
        
        signed_data = cms.ContentInfo.load(signature)['content']
        algo = signed_data['digest_algorithms'][0]['algorithm'].native
        signers_info = signed_data['signer_infos']
        attr_signature = signers_info[0].native['signature']
        attrs = signers_info[0]['signed_attrs']

        # Insure data signer and cert signer match
        cert_issuer: str = trusted_cert.issuer.rdns[0].rfc4514_string().split("=")[1]
        signed_data_issuer: str = dict(dict(dict(signers_info.native[0])['sid'])['issuer'])['common_name']
        if not cert_issuer == signed_data_issuer:
            raise CustomError('error')

        # Verify that cert is still validate now
        if not trusted_cert.not_valid_before < datetime.now() < trusted_cert.not_valid_after:
            raise CustomError('error')

        # user data
        udata: bytes = ephemeral_pub + payment_data + transaction_id + application_data

        mdData = getattr(hashlib, algo)(udata).digest()

        if attrs is not None and not isinstance(attrs, core.Void):
            # if attrs, mdSigned == message_digest attribute
            mdSigned = None
            for attr in attrs:
                if attr['type'].native == 'message_digest':
                    mdSigned = attr['values'].native[0]
            signedData = attrs.dump()
            signedData = b'\x31' + signedData[1:]
        else:
            # if no attrs, mdSigned == hash of userdata
            mdSigned = mdData
            signedData = udata

        # 2- verify() must succeed succeeded
        try:
            trusted_cert.public_key().verify(attr_signature, signedData, ECDSA(hashes.SHA256()))
        except InvalidSignature:
            raise CustomError('error')

        # 3- hashok must be True
        if not mdData == mdSigned:
            raise CustomError('error')

对于 cms 签名时间验证 (1.e) 我确实也使用了通过 asn1 对象获取的数据:

    from asn1crypto import cms

    def cms_compare(p7: bytes) -> None:
        ci = cms.ContentInfo.load(p7)
        try:
            content = dict(ci.native['content'])
            signed_time: str = dict(dict(content['signer_infos'][0])['signed_attrs'][1])['values'][0]
            timed: datetime = datetime.strptime(str(signed_time), "%Y-%m-%d %H:%M:%S%z")

            if int(time.time()) - int(timed.strftime('%s')) > 30:
                raise CustomError('error')
        except TypeError:
            raise CustomError('error')