为 DNS 更新生成 TSIG 密钥环(作为编码字节串)

Generate TSIG keyring (as encoded byte string) for DNS Update

我正在尝试使用 python DNS 模块 (dnspython) 创建 (add) 新 DNS 记录。

文档指定了如何创建更新 http://www.dnspython.org/examples.html :

import dns.tsigkeyring
import dns.update
import sys

keyring = dns.tsigkeyring.from_text({
    'host-example.' : 'XXXXXXXXXXXXXXXXXXXXXX=='
})

update = dns.update.Update('dyn.test.example', keyring=keyring)
update.replace('host', 300, 'a', sys.argv[1])

但它并不精确,首先如何实际生成可以传递给 dns.tsigkeyring.from_text() 方法的密钥环字符串。

生成密钥的正确方法是什么?我在我的组织中使用 krb5。

Server is running on Microsoft AD DNS with GSS-TSIG.

TSIG 和 GSS-TSIG 是不同的野兽 – 前者使用可以简单地从服务器复制的静态预共享密钥,但后者使用 Kerberos (GSSAPI) 为每个事务协商会话密钥。

最初发布此主题时,dnspython 1.x 不支持任何 GSS-TSIG。

(握手不会产生可以转换为常规 TSIG 密钥环的静态密钥;相反,必须调用 GSSAPI 库本身来构建身份验证器——dnspython 1.x 无法做到这一点,尽管dnspython 2.1 终于可以了。)

如果您尝试更新 Active Directory DNS 服务器,BIND 的 nsupdate 命令行工具支持 GSS-TSIG(有时它甚至可以工作)。您应该能够通过 subprocess 运行 它并通过标准输入简单地提供必要的更新。

cmds = [f'zone {dyn_zone}\n',
        f'del {fqdn}\n',
        f'add {fqdn} 60 TXT "{challenge}"\n',
        f'send\n']
subprocess.run(["nsupdate", "-g"],
               input="".join(cmds).encode(),
               check=True)

与大多数 Kerberos 客户端应用程序一样,nsupdate 期望凭据已经存在于环境中(也就是说,您需要事先使用 kinit 获得 TGT;或者,如果最新版本使用 MIT Krb5,您可以将 $KRB5_CLIENT_KTNAME 指向包含客户端凭据的密钥表。

更新:dnspython 2.1 终于有了 GSS-TSIG 的必要部分,但创建密钥环目前是一个 非常 手动过程 –您必须自己调用 GSSAPI 库并处理 TKEY 协商。这样做的代码包含在底部。

(下面的 Python 代码可以传递一个自定义 gssapi.Credentials 对象,但除此之外它会像 nsupdate 一样在环境中查找凭据。)

import dns.rdtypes.ANY.TKEY
import dns.resolver
import dns.update
import gssapi
import socket
import time
import uuid

def _build_tkey_query(token, key_ring, key_name):
    inception_time = int(time.time())
    tkey = dns.rdtypes.ANY.TKEY.TKEY(dns.rdataclass.ANY,
                                     dns.rdatatype.TKEY,
                                     dns.tsig.GSS_TSIG,
                                     inception_time,
                                     inception_time,
                                     3,
                                     dns.rcode.NOERROR,
                                     token,
                                     b"")

    query = dns.message.make_query(key_name,
                                   dns.rdatatype.TKEY,
                                   dns.rdataclass.ANY)
    query.keyring = key_ring
    query.find_rrset(dns.message.ADDITIONAL,
                     key_name,
                     dns.rdataclass.ANY,
                     dns.rdatatype.TKEY,
                     create=True).add(tkey)
    return query

def _probe_server(server_name, zone):
    gai = socket.getaddrinfo(str(server_name),
                             "domain",
                             socket.AF_UNSPEC,
                             socket.SOCK_DGRAM)
    for af, sf, pt, cname, sa in gai:
        query = dns.message.make_query(zone, "SOA")
        res = dns.query.udp(query, sa[0], timeout=2)
        return sa[0]

def gss_tsig_negotiate(server_name, server_addr, creds=None):
    # Acquire GSSAPI credentials
    gss_name = gssapi.Name(f"DNS@{server_name}",
                           gssapi.NameType.hostbased_service)
    gss_ctx = gssapi.SecurityContext(name=gss_name,
                                     creds=creds,
                                     usage="initiate")

    # Name generation tips: https://tools.ietf.org/html/rfc2930#section-2.1
    key_name = dns.name.from_text(f"{uuid.uuid4()}.{server_name}")
    tsig_key = dns.tsig.Key(key_name, gss_ctx, dns.tsig.GSS_TSIG)

    key_ring = {key_name: tsig_key}
    key_ring = dns.tsig.GSSTSigAdapter(key_ring)

    token = gss_ctx.step()
    while not gss_ctx.complete:
        tkey_query = _build_tkey_query(token, key_ring, key_name)
        response = dns.query.tcp(tkey_query, server_addr, timeout=5)
        if not gss_ctx.complete:
            # Original comment:
            # https://github.com/rthalley/dnspython/pull/530#issuecomment-658959755
            # "this if statement is a bit redundant, but if the final token comes
            # back with TSIG attached the patch to message.py will automatically step
            # the security context. We dont want to excessively step the context."
            token = gss_ctx.step(response.answer[0][0].key)

    return key_name, key_ring

def gss_tsig_update(zone, update_msg, creds=None):
    # Find the SOA of our zone
    answer = dns.resolver.resolve(zone, "SOA")
    soa_server = answer.rrset[0].mname
    server_addr = _probe_server(soa_server, zone)

    # Get the GSS-TSIG key
    key_name, key_ring = gss_tsig_negotiate(soa_server, server_addr, creds)

    # Dispatch the update
    update_msg.use_tsig(keyring=key_ring,
                        keyname=key_name,
                        algorithm=dns.tsig.GSS_TSIG)
    response = dns.query.tcp(update_msg, server_addr)
    return response