Python zeep soap 请求签名失败:无法加载证书

Python zeep soap request signature fails: cannot load cert

我正在尝试使用 Python zeep 发出 SOAP 请求,这需要带有 RSA 2048 的 sha256 签名。 由于某种原因,我无法加载签名,并且在尝试发送请求时出现 'signature fails: cannot load cert' 错误。

示例代码如下:

from zeep import Client as cl
from requests import Session
from zeep.transports import Transport
from zeep.plugins import HistoryPlugin
from lxml import etree
import zeep
from zeep.wsse.signature import Signature
import uuid
import OpenSSL
import base64
from zeep.wsse import utils
from datetime import datetime, timedelta


# Helper Class
class SingnatureOverride(Signature):
    def apply(self, envelope, headers):
        security = utils.get_security_header(envelope)

        created = datetime.utcnow()
        expired = created + timedelta(seconds=1 * 60)

        timestamp = utils.WSU('Timestamp')
        timestamp.append(utils.WSU('Created', created.replace(microsecond=0).isoformat()+'Z'))
        timestamp.append(utils.WSU('Expires', expired.replace(microsecond=0).isoformat()+'Z'))

        security.append(timestamp)

        super().apply(envelope, headers)
        return envelope, headers

# Override response verification and skip response verification for now...
# Zeep does not supprt Signature verification with different certificate...
# Ref. https://github.com/mvantellingen/python-zeep/pull/822/  "Add support for different signing and verification certificates #822"
    def verify(self, envelope):
        return envelope


# Funtion to generete ecrtificates and keys
def create_csr(common_name, country=None, state=None, city=None,
               organization=None, organizational_unit=None,
               email_address=None):

    key = OpenSSL.crypto.PKey()
    key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)

    req = OpenSSL.crypto.X509()
    req.get_subject().CN = common_name
    if organizational_unit:
        req.get_subject().OU = organizational_unit
    if organization:
        req.get_subject().O = organization
    if city:
        req.get_subject().L = city
    if state:
        req.get_subject().ST = state
    if country:
        req.get_subject().C = country
    if email_address:
        req.get_subject().emailAddress = email_address

    req.set_pubkey(key)
    req.sign(key, 'sha256')

    private_key = OpenSSL.crypto.dump_privatekey(
        OpenSSL.crypto.FILETYPE_PEM, key)
    public_key = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_PEM, key)

    csr = OpenSSL.crypto.dump_certificate(
               OpenSSL.crypto.FILETYPE_PEM, req)
    
    out_keys = {
        "private key" : private_key,
        "public key": public_key,
        "certificate": csr,
        "reqest": req
    }
    return out_keys


certificate = create_csr(common_name="somename",
                         organizational_unit="unit",
                         organization="unit",
                         city="test",
                         state="test",
                         country="GB"
                         )

# Write certificate and keys to files
with open('cert.pem', "wb") as f:
    f.write(certificate['certificate'])

with open('public.pem', "wb") as f:
    f.write(certificate['public key'])

with open('private.pem', "wb") as f:
    f.write(certificate['private key'])



# Create Client
history = HistoryPlugin()
session = Session()
session.verify = False  # For some reason SSL validation fails and I had to ignore it. This is unacceptable and needs to be addressed, but it is outside of the scope of this question.
transport = Transport(session=session)
client = cl(url, transport=transport, plugins=[history],
            wsse=SingnatureOverride("private.pem", "cert.pem"))

# url
url="https://stest.bankconnect.dk/2019/04/04/services/CorporateService?wsdl"

# build headers
now = datetime.now()
unique_id = uuid.uuid4().hex
headers = {
        'serviceHeader': {
        'organisationIdentification':  {
            'mainRegistrationNumber': "randint",
            'isoCountryCode': 'GB',
            
        },
        'format': 'ISO20022',
        'functionIdentification': "randint",
        'erpInformation': 'randstring',
        'endToEndMessageId': unique_id,
        'createDateTime': now.strftime("%Y-%m-%dT%H:%M:%S"),
        
    },
    }

# Send request
# InternalError: (-1, 'cannot load cert') here
client.service.getCustomerStatement(_soapheaders=headers, )

# debug
sent = etree.tostring(history.last_sent["envelope"], encoding="unicode", pretty_print=True)
received = etree.tostring(history.last_received["envelope"], encoding="unicode", pretty_print=True)

print(sent)
print(received)

我不得不将一些机密信息匿名化,但如果它对解决问题至关重要,请随时询问详情。

编辑:添加了一个测试 url

Edit2:添加了正确的 headers 以便于复制

我想我明白了。在签署证书之前将以下行添加到 'create_csr' 函数似乎可以解决问题。它解决了特定的 'load cert' 问题并发送了请求,但是服务器仍然认为签名无效,所以我不确定辅助函数是否生成无效证书或者这是服务器问题。

def create_csr(*args):
....
    req.set_serial_number(1000)
    req.gmtime_adj_notBefore(0)
    req.gmtime_adj_notAfter(10*365*24*60*60)
    req.set_issuer(req.get_subject())

    req.sign(key, 'sha256')
....