使用 Python API 以编程方式克隆 Kubernetes 对象
Clone Kubernetes objects programmatically using the Python API
Python API 可用于从集群中读取对象。通过克隆我们可以说:
- 使用
kubectl get
获取现有 Kubernetes 对象的副本
- 更改对象的属性
- 应用新对象
直到最近,--export
api was deprecated in 1.14 的选项。我们如何使用 Python Kubernetes API 执行上述 1-3 的步骤?
关于如何提取 code from Python API to YAML 存在多个问题,但不清楚如何转换 Kubernetes API 对象。
看完需求后,我花了几个小时研究 Kubernetes Python API。 Issue 340 and others ask about how to transform the Kubernetes API object into a dict
, but the only workaround I found was to retrieve the raw data 然后转换为 JSON.
- 以下代码使用 Kubernetes API 从命名空间对象中获取
deployment
及其相关的 hpa
,但将它们的原始值检索为 JSON。
- 然后,将数据转换为dict后,您可以选择removing null references清理数据。
- 完成后,您可以将
dict
作为 YAML 负载转换为 save the YAML to the file system
- 最后,您可以使用
kubectl
或 Kubernetes Python API. 进行申请
注:
- 确保设置
KUBECONFIG=config
以便您可以指向一个集群
- 确保使用给定命名空间中要克隆的相应对象的名称来调整
origin_obj_name = "istio-ingressgateway"
和 origin_obj_namespace = "istio-system"
的值。
import os
import logging
import yaml
import json
logging.basicConfig(level = logging.INFO)
import crayons
from kubernetes import client, config
from kubernetes.client.rest import ApiException
LOGGER = logging.getLogger(" IngressGatewayCreator ")
class IngressGatewayCreator:
@staticmethod
def clone_default_ingress(clone_context):
# Clone the deployment
IngressGatewayCreator.clone_deployment_object(clone_context)
# Clone the deployment's HPA
IngressGatewayCreator.clone_hpa_object(clone_context)
@staticmethod
def clone_deployment_object(clone_context):
kubeconfig = os.getenv('KUBECONFIG')
config.load_kube_config(kubeconfig)
v1apps = client.AppsV1beta1Api()
deployment_name = clone_context.origin_obj_name
namespace = clone_context.origin_obj_namespace
try:
# gets an instance of the api without deserialization to model
# https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
deployment = v1apps.read_namespaced_deployment(deployment_name, namespace, _preload_content=False)
except ApiException as error:
if error.status == 404:
LOGGER.info("Deployment %s not found in namespace %s", deployment_name, namespace)
return
raise
# Clone the object deployment as a dic
cloned_dict = IngressGatewayCreator.clone_k8s_object(deployment, clone_context)
# Change additional objects
cloned_dict["spec"]["selector"]["matchLabels"]["istio"] = clone_context.name
cloned_dict["spec"]["template"]["metadata"]["labels"]["istio"] = clone_context.name
# Save the deployment template in the output dir
context.save_clone_as_yaml(cloned_dict, "deployment")
@staticmethod
def clone_hpa_object(clone_context):
kubeconfig = os.getenv('KUBECONFIG')
config.load_kube_config(kubeconfig)
hpas = client.AutoscalingV1Api()
hpa_name = clone_context.origin_obj_name
namespace = clone_context.origin_obj_namespace
try:
# gets an instance of the api without deserialization to model
# https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
hpa = hpas.read_namespaced_horizontal_pod_autoscaler(hpa_name, namespace, _preload_content=False)
except ApiException as error:
if error.status == 404:
LOGGER.info("HPA %s not found in namespace %s", hpa_name, namespace)
return
raise
# Clone the object deployment as a dic
cloned_dict = IngressGatewayCreator.clone_k8s_object(hpa, clone_context)
# Change additional objects
cloned_dict["spec"]["scaleTargetRef"]["name"] = clone_context.name
# Save the deployment template in the output dir
context.save_clone_as_yaml(cloned_dict, "hpa")
@staticmethod
def clone_k8s_object(k8s_object, clone_context):
# Manipilate in the dict level, not k8s api, but from the fetched raw object
# https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
cloned_obj = json.loads(k8s_object.data)
labels = cloned_obj['metadata']['labels']
labels['istio'] = clone_context.name
cloned_obj['status'] = None
# Scrub by removing the "null" and "None" values
cloned_obj = IngressGatewayCreator.scrub_dict(cloned_obj)
# Patch the metadata with the name and labels adjusted
cloned_obj['metadata'] = {
"name": clone_context.name,
"namespace": clone_context.origin_obj_namespace,
"labels": labels
}
return cloned_obj
#
@staticmethod
def scrub_dict(d):
new_dict = {}
for k, v in d.items():
if isinstance(v, dict):
v = IngressGatewayCreator.scrub_dict(v)
if isinstance(v, list):
v = IngressGatewayCreator.scrub_list(v)
if not v in (u'', None, {}):
new_dict[k] = v
return new_dict
#
@staticmethod
def scrub_list(d):
scrubbed_list = []
for i in d:
if isinstance(i, dict):
i = IngressGatewayCreator.scrub_dict(i)
scrubbed_list.append(i)
return scrubbed_list
class IngressGatewayContext:
def __init__(self, manifest_dir, name, hostname, nats, type):
self.manifest_dir = manifest_dir
self.name = name
self.hostname = hostname
self.nats = nats
self.ingress_type = type
self.origin_obj_name = "istio-ingressgateway"
self.origin_obj_namespace = "istio-system"
def save_clone_as_yaml(self, k8s_object, kind):
try:
# Just try to create if it doesn't exist
os.makedirs(self.manifest_dir)
except FileExistsError:
LOGGER.debug("Dir already exists %s", self.manifest_dir)
full_file_path = os.path.join(self.manifest_dir, self.name + '-' + kind + '.yaml')
# Store in the file-system with the name provided
#
with open(full_file_path, 'w') as yaml_file:
yaml.dump(k8s_object, yaml_file, default_flow_style=False)
LOGGER.info(crayons.yellow("Saved %s '%s' at %s: \n%s"), kind, self.name, full_file_path, k8s_object)
try:
k8s_clone_name = "http2-ingressgateway"
hostname = "my-nlb-awesome.a.company.com"
nats = ["123.345.678.11", "333.444.222.111", "33.221.444.23"]
manifest_dir = "out/clones"
context = IngressGatewayContext(manifest_dir, k8s_clone_name, hostname, nats, "nlb")
IngressGatewayCreator.clone_default_ingress(context)
except Exception as err:
print("ERROR: {}".format(err))
不是 python,但我过去曾使用 jq
来快速克隆某些内容,其中包含每个用例所需的小定制(通常将秘密克隆到新的命名空间中)。
kc get pod whatever-85pmk -o json \
| jq 'del(.status, .metadata ) | .metadata.name="newname"' \
| kc apply -f - -o yaml --dry-run
使用 Hikaru 这真的很容易做到。
这里是an example from my own open source project:
def duplicate_without_fields(obj: HikaruBase, omitted_fields: List[str]):
"""
Duplicate a hikaru object, omitting the specified fields
This is useful when you want to compare two versions of an object and first "cleanup" fields that shouldn't be
compared.
:param HikaruBase obj: A kubernetes object
:param List[str] omitted_fields: List of fields to be omitted. Field name format should be '.' separated
For example: ["status", "metadata.generation"]
"""
if obj is None:
return None
duplication = obj.dup()
for field_name in omitted_fields:
field_parts = field_name.split(".")
try:
if len(field_parts) > 1:
parent_obj = duplication.object_at_path(field_parts[:-1])
else:
parent_obj = duplication
setattr(parent_obj, field_parts[-1], None)
except Exception:
pass # in case the field doesn't exist on this object
return duplication
之后将对象转储到 yaml 或将其重新应用到集群对于 Hikaru 来说是微不足道的
我们正在使用它来清理对象,以便在对象发生变化时向用户显示 github 风格的差异,而不会像 generation
那样经常发生变化的垃圾字段
只需使用 Kubernetes Client 对象现在提供的 to_dict()
。请注意,它创建了一个部分深拷贝。所以为了安全起见:
copied_obj = copy.deepcopy(obj.to_dict())
可以将字典传递给 create*
和 patch*
方法。
为了方便,你也可以将dict包裹在Prodict
.
copied_obj = Prodict.from_dict(copy.deepcopy(obj.to_dict()))
最后一个问题是删除多余的字段。 (不幸的是,Kubernetes 将它们散布在整个对象中。)我使用 kopf
的内部工具来获取对象的“本质”。 (它负责深层复制。)
copied_obj = kopf.AnnotationsDiffBaseStorage().build(body=kopf.Body(obj.to_dict()))
copied_obj = Prodic.from_dict(copied_obj)
Python API 可用于从集群中读取对象。通过克隆我们可以说:
- 使用
kubectl get
获取现有 Kubernetes 对象的副本
- 更改对象的属性
- 应用新对象
直到最近,--export
api was deprecated in 1.14 的选项。我们如何使用 Python Kubernetes API 执行上述 1-3 的步骤?
关于如何提取 code from Python API to YAML 存在多个问题,但不清楚如何转换 Kubernetes API 对象。
看完需求后,我花了几个小时研究 Kubernetes Python API。 Issue 340 and others ask about how to transform the Kubernetes API object into a dict
, but the only workaround I found was to retrieve the raw data 然后转换为 JSON.
- 以下代码使用 Kubernetes API 从命名空间对象中获取
deployment
及其相关的hpa
,但将它们的原始值检索为 JSON。 - 然后,将数据转换为dict后,您可以选择removing null references清理数据。
- 完成后,您可以将
dict
作为 YAML 负载转换为 save the YAML to the file system - 最后,您可以使用
kubectl
或 Kubernetes Python API. 进行申请
注:
- 确保设置
KUBECONFIG=config
以便您可以指向一个集群 - 确保使用给定命名空间中要克隆的相应对象的名称来调整
origin_obj_name = "istio-ingressgateway"
和origin_obj_namespace = "istio-system"
的值。
import os
import logging
import yaml
import json
logging.basicConfig(level = logging.INFO)
import crayons
from kubernetes import client, config
from kubernetes.client.rest import ApiException
LOGGER = logging.getLogger(" IngressGatewayCreator ")
class IngressGatewayCreator:
@staticmethod
def clone_default_ingress(clone_context):
# Clone the deployment
IngressGatewayCreator.clone_deployment_object(clone_context)
# Clone the deployment's HPA
IngressGatewayCreator.clone_hpa_object(clone_context)
@staticmethod
def clone_deployment_object(clone_context):
kubeconfig = os.getenv('KUBECONFIG')
config.load_kube_config(kubeconfig)
v1apps = client.AppsV1beta1Api()
deployment_name = clone_context.origin_obj_name
namespace = clone_context.origin_obj_namespace
try:
# gets an instance of the api without deserialization to model
# https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
deployment = v1apps.read_namespaced_deployment(deployment_name, namespace, _preload_content=False)
except ApiException as error:
if error.status == 404:
LOGGER.info("Deployment %s not found in namespace %s", deployment_name, namespace)
return
raise
# Clone the object deployment as a dic
cloned_dict = IngressGatewayCreator.clone_k8s_object(deployment, clone_context)
# Change additional objects
cloned_dict["spec"]["selector"]["matchLabels"]["istio"] = clone_context.name
cloned_dict["spec"]["template"]["metadata"]["labels"]["istio"] = clone_context.name
# Save the deployment template in the output dir
context.save_clone_as_yaml(cloned_dict, "deployment")
@staticmethod
def clone_hpa_object(clone_context):
kubeconfig = os.getenv('KUBECONFIG')
config.load_kube_config(kubeconfig)
hpas = client.AutoscalingV1Api()
hpa_name = clone_context.origin_obj_name
namespace = clone_context.origin_obj_namespace
try:
# gets an instance of the api without deserialization to model
# https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
hpa = hpas.read_namespaced_horizontal_pod_autoscaler(hpa_name, namespace, _preload_content=False)
except ApiException as error:
if error.status == 404:
LOGGER.info("HPA %s not found in namespace %s", hpa_name, namespace)
return
raise
# Clone the object deployment as a dic
cloned_dict = IngressGatewayCreator.clone_k8s_object(hpa, clone_context)
# Change additional objects
cloned_dict["spec"]["scaleTargetRef"]["name"] = clone_context.name
# Save the deployment template in the output dir
context.save_clone_as_yaml(cloned_dict, "hpa")
@staticmethod
def clone_k8s_object(k8s_object, clone_context):
# Manipilate in the dict level, not k8s api, but from the fetched raw object
# https://github.com/kubernetes-client/python/issues/574#issuecomment-405400414
cloned_obj = json.loads(k8s_object.data)
labels = cloned_obj['metadata']['labels']
labels['istio'] = clone_context.name
cloned_obj['status'] = None
# Scrub by removing the "null" and "None" values
cloned_obj = IngressGatewayCreator.scrub_dict(cloned_obj)
# Patch the metadata with the name and labels adjusted
cloned_obj['metadata'] = {
"name": clone_context.name,
"namespace": clone_context.origin_obj_namespace,
"labels": labels
}
return cloned_obj
#
@staticmethod
def scrub_dict(d):
new_dict = {}
for k, v in d.items():
if isinstance(v, dict):
v = IngressGatewayCreator.scrub_dict(v)
if isinstance(v, list):
v = IngressGatewayCreator.scrub_list(v)
if not v in (u'', None, {}):
new_dict[k] = v
return new_dict
#
@staticmethod
def scrub_list(d):
scrubbed_list = []
for i in d:
if isinstance(i, dict):
i = IngressGatewayCreator.scrub_dict(i)
scrubbed_list.append(i)
return scrubbed_list
class IngressGatewayContext:
def __init__(self, manifest_dir, name, hostname, nats, type):
self.manifest_dir = manifest_dir
self.name = name
self.hostname = hostname
self.nats = nats
self.ingress_type = type
self.origin_obj_name = "istio-ingressgateway"
self.origin_obj_namespace = "istio-system"
def save_clone_as_yaml(self, k8s_object, kind):
try:
# Just try to create if it doesn't exist
os.makedirs(self.manifest_dir)
except FileExistsError:
LOGGER.debug("Dir already exists %s", self.manifest_dir)
full_file_path = os.path.join(self.manifest_dir, self.name + '-' + kind + '.yaml')
# Store in the file-system with the name provided
#
with open(full_file_path, 'w') as yaml_file:
yaml.dump(k8s_object, yaml_file, default_flow_style=False)
LOGGER.info(crayons.yellow("Saved %s '%s' at %s: \n%s"), kind, self.name, full_file_path, k8s_object)
try:
k8s_clone_name = "http2-ingressgateway"
hostname = "my-nlb-awesome.a.company.com"
nats = ["123.345.678.11", "333.444.222.111", "33.221.444.23"]
manifest_dir = "out/clones"
context = IngressGatewayContext(manifest_dir, k8s_clone_name, hostname, nats, "nlb")
IngressGatewayCreator.clone_default_ingress(context)
except Exception as err:
print("ERROR: {}".format(err))
不是 python,但我过去曾使用 jq
来快速克隆某些内容,其中包含每个用例所需的小定制(通常将秘密克隆到新的命名空间中)。
kc get pod whatever-85pmk -o json \
| jq 'del(.status, .metadata ) | .metadata.name="newname"' \
| kc apply -f - -o yaml --dry-run
使用 Hikaru 这真的很容易做到。
这里是an example from my own open source project:
def duplicate_without_fields(obj: HikaruBase, omitted_fields: List[str]):
"""
Duplicate a hikaru object, omitting the specified fields
This is useful when you want to compare two versions of an object and first "cleanup" fields that shouldn't be
compared.
:param HikaruBase obj: A kubernetes object
:param List[str] omitted_fields: List of fields to be omitted. Field name format should be '.' separated
For example: ["status", "metadata.generation"]
"""
if obj is None:
return None
duplication = obj.dup()
for field_name in omitted_fields:
field_parts = field_name.split(".")
try:
if len(field_parts) > 1:
parent_obj = duplication.object_at_path(field_parts[:-1])
else:
parent_obj = duplication
setattr(parent_obj, field_parts[-1], None)
except Exception:
pass # in case the field doesn't exist on this object
return duplication
之后将对象转储到 yaml 或将其重新应用到集群对于 Hikaru 来说是微不足道的
我们正在使用它来清理对象,以便在对象发生变化时向用户显示 github 风格的差异,而不会像 generation
只需使用 Kubernetes Client 对象现在提供的 to_dict()
。请注意,它创建了一个部分深拷贝。所以为了安全起见:
copied_obj = copy.deepcopy(obj.to_dict())
可以将字典传递给 create*
和 patch*
方法。
为了方便,你也可以将dict包裹在Prodict
.
copied_obj = Prodict.from_dict(copy.deepcopy(obj.to_dict()))
最后一个问题是删除多余的字段。 (不幸的是,Kubernetes 将它们散布在整个对象中。)我使用 kopf
的内部工具来获取对象的“本质”。 (它负责深层复制。)
copied_obj = kopf.AnnotationsDiffBaseStorage().build(body=kopf.Body(obj.to_dict()))
copied_obj = Prodic.from_dict(copied_obj)