使用 Python API 以编程方式克隆 Kubernetes 对象

Clone Kubernetes objects programmatically using the Python API

Python API 可用于从集群中读取对象。通过克隆我们可以说:

  1. 使用 kubectl get
  2. 获取现有 Kubernetes 对象的副本
  3. 更改对象的属性
  4. 应用新对象

直到最近,--export api was deprecated in 1.14 的选项。我们如何使用 Python Kubernetes API 执行上述 1-3 的步骤?

关于如何提取 code from Python API to YAML 存在多个问题,但不清楚如何转换 Kubernetes API 对象。

看完需求后,我花了几个小时研究 Kubernetes Python API。 Issue 340 and others ask about how to transform the Kubernetes API object into a dict, but the only workaround I found was to retrieve the raw data 然后转换为 JSON.

  • 以下代码使用 Kubernetes API 从命名空间对象中获取 deployment 及其相关的 hpa,但将它们的原始值检索为 JSON。
  • 然后,将数据转换为dict后,您可以选择removing null references清理数据。
  • 完成后,您可以将 dict 作为 YAML 负载转换为 save the YAML to the file system
  • 最后,您可以使用 kubectl 或 Kubernetes Python API.
  • 进行申请

注:

  • 确保设置 KUBECONFIG=config 以便您可以指向一个集群
  • 确保使用给定命名空间中要克隆的相应对象的名称来调整 origin_obj_name = "istio-ingressgateway"origin_obj_namespace = "istio-system" 的值。
import os
import logging
import yaml
import json
logging.basicConfig(level = logging.INFO)

import crayons
from kubernetes import client, config
from kubernetes.client.rest import ApiException

LOGGER = logging.getLogger(" IngressGatewayCreator ")

class IngressGatewayCreator:

    @staticmethod
    def clone_default_ingress(clone_context):
        # Clone the deployment
        IngressGatewayCreator.clone_deployment_object(clone_context)

        # Clone the deployment's HPA
        IngressGatewayCreator.clone_hpa_object(clone_context)

    @staticmethod
    def clone_deployment_object(clone_context):
        kubeconfig = os.getenv('KUBECONFIG')
        config.load_kube_config(kubeconfig)
        v1apps = client.AppsV1beta1Api()

        deployment_name = clone_context.origin_obj_name
        namespace = clone_context.origin_obj_namespace

        try:
            # gets an instance of the api without deserialization to model
            # https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
            deployment = v1apps.read_namespaced_deployment(deployment_name, namespace, _preload_content=False)

        except ApiException as error:
            if error.status == 404:
                LOGGER.info("Deployment %s not found in namespace %s", deployment_name, namespace)
                return
            raise

        # Clone the object deployment as a dic
        cloned_dict = IngressGatewayCreator.clone_k8s_object(deployment, clone_context)

        # Change additional objects
        cloned_dict["spec"]["selector"]["matchLabels"]["istio"] = clone_context.name
        cloned_dict["spec"]["template"]["metadata"]["labels"]["istio"] = clone_context.name

        # Save the deployment template in the output dir
        context.save_clone_as_yaml(cloned_dict, "deployment")

    @staticmethod
    def clone_hpa_object(clone_context):
        kubeconfig = os.getenv('KUBECONFIG')
        config.load_kube_config(kubeconfig)
        hpas = client.AutoscalingV1Api()

        hpa_name = clone_context.origin_obj_name
        namespace = clone_context.origin_obj_namespace

        try:
            # gets an instance of the api without deserialization to model
            # https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
            hpa = hpas.read_namespaced_horizontal_pod_autoscaler(hpa_name, namespace, _preload_content=False)

        except ApiException as error:
            if error.status == 404:
                LOGGER.info("HPA %s not found in namespace %s", hpa_name, namespace)
                return
            raise

        # Clone the object deployment as a dic
        cloned_dict = IngressGatewayCreator.clone_k8s_object(hpa, clone_context)

        # Change additional objects
        cloned_dict["spec"]["scaleTargetRef"]["name"] = clone_context.name

        # Save the deployment template in the output dir
        context.save_clone_as_yaml(cloned_dict, "hpa")

    @staticmethod
    def clone_k8s_object(k8s_object, clone_context):
        # Manipilate in the dict level, not k8s api, but from the fetched raw object
        # https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
        cloned_obj = json.loads(k8s_object.data)

        labels = cloned_obj['metadata']['labels']
        labels['istio'] = clone_context.name

        cloned_obj['status'] = None

        # Scrub by removing the "null" and "None" values
        cloned_obj = IngressGatewayCreator.scrub_dict(cloned_obj)

        # Patch the metadata with the name and labels adjusted
        cloned_obj['metadata'] = {
            "name": clone_context.name,
            "namespace": clone_context.origin_obj_namespace,
            "labels": labels
        }

        return cloned_obj

    # 
    @staticmethod
    def scrub_dict(d):
        new_dict = {}
        for k, v in d.items():
            if isinstance(v, dict):
                v = IngressGatewayCreator.scrub_dict(v)
            if isinstance(v, list):
                v = IngressGatewayCreator.scrub_list(v)
            if not v in (u'', None, {}):
                new_dict[k] = v
        return new_dict

    # 
    @staticmethod
    def scrub_list(d):
        scrubbed_list = []
        for i in d:
            if isinstance(i, dict):
                i = IngressGatewayCreator.scrub_dict(i)
            scrubbed_list.append(i)
        return scrubbed_list


class IngressGatewayContext:

    def __init__(self, manifest_dir, name, hostname, nats, type):
        self.manifest_dir = manifest_dir
        self.name = name
        self.hostname = hostname
        self.nats = nats
        self.ingress_type = type

        self.origin_obj_name = "istio-ingressgateway"
        self.origin_obj_namespace = "istio-system"

    def save_clone_as_yaml(self, k8s_object, kind):
        try:
            # Just try to create if it doesn't exist
            os.makedirs(self.manifest_dir)

        except FileExistsError:
            LOGGER.debug("Dir already exists %s", self.manifest_dir)

        full_file_path = os.path.join(self.manifest_dir, self.name + '-' + kind + '.yaml')

        # Store in the file-system with the name provided
        # 
        with open(full_file_path, 'w') as yaml_file:
            yaml.dump(k8s_object, yaml_file, default_flow_style=False)

        LOGGER.info(crayons.yellow("Saved %s '%s' at %s: \n%s"), kind, self.name, full_file_path, k8s_object)

try:
    k8s_clone_name = "http2-ingressgateway"
    hostname = "my-nlb-awesome.a.company.com"
    nats = ["123.345.678.11", "333.444.222.111", "33.221.444.23"]
    manifest_dir = "out/clones"

    context = IngressGatewayContext(manifest_dir, k8s_clone_name, hostname, nats, "nlb")

    IngressGatewayCreator.clone_default_ingress(context)

except Exception as err:
  print("ERROR: {}".format(err))

不是 python,但我过去曾使用 jq 来快速克隆某些内容,其中包含每个用例所需的小定制(通常将秘密克隆到新的命名空间中)。

kc get pod whatever-85pmk -o json \
 | jq 'del(.status, .metadata ) | .metadata.name="newname"' \
 | kc apply -f - -o yaml --dry-run 

使用 Hikaru 这真的很容易做到。

这里是an example from my own open source project:

def duplicate_without_fields(obj: HikaruBase, omitted_fields: List[str]):
    """
    Duplicate a hikaru object, omitting the specified fields
    This is useful when you want to compare two versions of an object and first "cleanup" fields that shouldn't be
    compared.
    :param HikaruBase obj: A kubernetes object
    :param List[str] omitted_fields: List of fields to be omitted. Field name format should be '.' separated
                                     For example: ["status", "metadata.generation"]
    """
    if obj is None:
        return None

    duplication = obj.dup()

    for field_name in omitted_fields:
        field_parts = field_name.split(".")
        try:
            if len(field_parts) > 1:
                parent_obj = duplication.object_at_path(field_parts[:-1])
            else:
                parent_obj = duplication

            setattr(parent_obj, field_parts[-1], None)
        except Exception:
            pass  # in case the field doesn't exist on this object

    return duplication

之后将对象转储到 yaml 或将其重新应用到集群对于 Hikaru 来说是微不足道的

我们正在使用它来清理对象,以便在对象发生变化时向用户显示 github 风格的差异,而不会像 generation

那样经常发生变化的垃圾字段

只需使用 Kubernetes Client 对象现在提供的 to_dict()。请注意,它创建了一个部分深拷贝。所以为了安全起见:

copied_obj = copy.deepcopy(obj.to_dict())

可以将字典传递给 create*patch* 方法。

为了方便,你也可以将dict包裹在Prodict.

copied_obj = Prodict.from_dict(copy.deepcopy(obj.to_dict()))

最后一个问题是删除多余的字段。 (不幸的是,Kubernetes 将它们散布在整个对象中。)我使用 kopf 的内部工具来获取对象的“本质”。 (它负责深层复制。)

copied_obj = kopf.AnnotationsDiffBaseStorage().build(body=kopf.Body(obj.to_dict()))
copied_obj = Prodic.from_dict(copied_obj)