python k8s api操作类

网上找了零散的,然后看了k8s 的python api,里面很有方法,大致整理了一下操作的封装,很多操作返回都有to_dict()方法可以直接将返回的对象转为字典,之前没注意,后面懒得改了,写的比较乱,直接记录下来方便以后使用。

#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from kubernetes import client, config
from app.common import format_time

class KubeApi():
    def __init__(self, config_file):
        config.load_kube_config(config_file)
        self.core_api = client.CoreV1Api() # namespace,pod,service,pv,pvc
        self.apps_api = client.AppsV1Api() # deployment
        self.net_api = client.NetworkingV1Api() # ingress
        self.storage_api = client.StorageV1Api() # storage

    #转换标签
    @staticmethod
    def conv_labels(labels):
        if labels:
            labels = ",".join(["{}={}".format(p, labels[p]) for p in labels if "kubernetes" not in p])
        else:
            labels = ""
        return labels
   
    #获取命名空间列表
    def get_namespace(self):
        namespaces = []
        for p in self.core_api.list_namespace().items:
            namespaces.append(p.metadata.name)
        return namespaces
   
    #获取节点列表
    def get_nodes(self):
        node_list = []
        for p in self.core_api.list_node().items:
            node_list.append({
                "name": p.metadata.name,
                "addresses": p.status.addresses[0].address,
                "status": p.status.conditions[-1].type,
                "kubelet_version": p.status.node_info.kubelet_version,
                "operating_system": p.status.node_info.operating_system,
                "kernel_version": p.status.node_info.kernel_version,
                "container_runtime_version": p.status.node_info.container_runtime_version,
                "os_image": p.status.node_info.os_image,
                "allocatable": p.status.allocatable,
                "capacity": p.status.capacity,
                "labels": self.conv_labels(p.metadata.labels),
                "unschedulable": p.spec.unschedulable,
                "creation_time": format_time(p.metadata.creation_timestamp)
            })
        return node_list
   
    #获取node配置
    def read_node(self, node_name):
        return self.core_api.read_node(node_name)
   
    #修改调度状态
    def node_schedule(self, node_name, schedule_flag):
        body = {
            "spec": {
                "unschedulable": schedule_flag
            }
        }
        ret = self.core_api.patch_node(node_name, body)
        return ret
   
    #添加标签
    def node_label_modify(self, node_name, labels):
        body = {
            "metadata": {
                "labels": labels
            }
        }
        ret = self.core_api.patch_node(node_name, body)
        return ret

    #获取POD列表
    def get_pods(self, namespaced):
        pod_list = []
        for pod in self.core_api.list_namespaced_pod(namespaced).items:
            if pod.status.container_statuses:
                restart_count = sum([p.restart_count for p in pod.status.container_statuses])
                ready = "{}/{}".format([p.ready for p in pod.status.container_statuses].count(True), len([p.ready for p in pod.status.container_statuses]))
            else:
                restart_count = ""
                ready = ""
            pod_list.append({
                "name": pod.metadata.name,
                "uid": pod.metadata.uid,
                "node_name": pod.spec.node_name,
                "phase": pod.status.phase,
                "host_ip": pod.status.host_ip,
                "pod_ip": pod.status.pod_ip,
                "node_selector": self.conv_labels(pod.spec.node_selector),
                "containers": [p.name for p in pod.spec.containers],
                "labels": self.conv_labels(pod.metadata.labels),
                "owner_kind": pod.metadata.owner_references[0].kind,
                "restart_count": restart_count,
                "ready": ready,
                "start_time": format_time(pod.status.start_time)          
            })
        return pod_list
   
    #读取pod配置
    def read_pod(self, namespaced, pod_name):
        return self.core_api.read_namespaced_pod(pod_name, namespaced)
   
    #获取POD日志
    def read_pod_log(self, namespaced, pod_name, container, tail_lines=1000):
        return self.core_api.read_namespaced_pod_log(pod_name, namespaced, container=container, tail_lines=tail_lines)
   
    #重启POD
    def delete_pod(self, namespaced, pod_name):
        return self.core_api.delete_namespaced_pod(pod_name, namespaced)
   
    #获取deploy列表
    def get_deploy(self, namespaced):
        deploy_list = []
        for dp in self.apps_api.list_namespaced_deployment(namespaced).items:
            deploy_list.append({
                "name": dp.metadata.name,
                "node_selector": dp.spec.template.spec.node_selector,
                "replicas": dp.status.replicas,
                "ready_replicas": dp.status.ready_replicas,
                "available_replicas": dp.status.available_replicas,
                "unavailable_replicas": dp.status.unavailable_replicas,
                "labels": self.conv_labels(dp.metadata.labels),
                "rolling_update": self.conv_labels(dp.spec.strategy.rolling_update.to_dict()),
                "update_type": dp.spec.strategy.type,
                "uid": dp.metadata.uid,
                "creation_timestamp": format_time(dp.metadata.creation_timestamp)
            })
        return deploy_list
   
    #读取deploy配置
    def read_deploy(self, namespaced, deploy_name):
        return self.apps_api.read_namespaced_deployment(deploy_name, namespaced)
   
    #删除deploy资源
    def delete_deploy(self, namespaced, deploy_name):
        return self.apps_api.delete_namespaced_deployment(deploy_name, namespaced)
   
    #修改deploy资源
    def modify_deploy(self, namespaced, deploy_name, deploy_yaml):
        return self.apps_api.patch_namespaced_deployment(deploy_name, namespaced, deploy_yaml)
   
    #创建deploy资源
    def create_deploy(self, namespaced, deploy_yaml):
        return self.apps_api.create_namespaced_deployment(namespaced, deploy_yaml)

    #获取services列表
    def get_svc(self, namespaced):
        svc_list = []
        for svc in self.core_api.list_namespaced_service(namespaced).items:
            svc_list.append({
                "name": svc.metadata.name,
                "uid": svc.metadata.uid,
                "cluster_ip": svc.spec.cluster_ip,
                "type": svc.spec.type,
                "labels": self.conv_labels(svc.metadata.labels),
                "creation_timestamp": format_time(svc.metadata.creation_timestamp)
            })
        return svc_list
   
    #读取svc配置
    def read_svc(self, namespaced, svc_name):
        return self.core_api.read_namespaced_service(svc_name, namespaced)
   
    #删除svc资源
    def delete_svc(self, namespaced, svc_name):
        return self.core_api.delete_namespaced_service(svc_name, namespaced)

    #修改svc资源
    def modify_svc(self, namespaced, svc_name, svc_yaml):
        return self.core_api.patch_namespaced_service(svc_name, namespaced, svc_yaml)

    #获取damemon_set列表
    def get_ds(self, namespaced):
        ds_list = []
        for ds in self.apps_api.list_namespaced_daemon_set(namespaced).items:
            ds_list.append({
                "name": ds.metadata.name,
                "uid": ds.metadata.uid,
                "node_selector": self.conv_labels(ds.spec.template.spec.node_selector),
                "labels": self.conv_labels(ds.metadata.labels),
                "number_ready": ds.status.number_ready,
                "desired_number_scheduled": ds.status.desired_number_scheduled,
                "current_number_scheduled": ds.status.current_number_scheduled,
                "number_available": ds.status.number_available,
                "number_misscheduled": ds.status.number_misscheduled,
                "number_unavailable": ds.status.number_unavailable,
                "creation_timestamp": format_time(ds.metadata.creation_timestamp)
            })
        return ds_list
   
    #读取ds配置
    def read_ds(self, namespaced, ds_name):
        return self.apps_api.read_namespaced_daemon_set(ds_name, namespaced)
   
    #删除ds资源
    def delete_ds(self, namespaced, ds_name):
        return self.apps_api.delete_namespaced_daemon_set(ds_name, namespaced)

    #修改ds资源
    def modify_ds(self, namespaced, ds_name, ds_yaml):
        return self.apps_api.patch_namespaced_daemon_set(ds_name, namespaced, ds_yaml)

    #获取stateful_set列表
    def get_sts(self, namespaced):
        sts_list = []
        for sts in self.apps_api.list_namespaced_stateful_set(namespaced).items:
            sts_list.append({
                "name": sts.metadata.name,
                "node_selector": self.conv_labels(sts.spec.template.spec.node_selector),
                "labels": self.conv_labels(sts.metadata.labels),
                "replicas": sts.status.replicas,
                "ready_replicas": sts.status.ready_replicas,
                "current_replicas": sts.status.current_replicas,
                "uid": sts.metadata.uid,
                "creation_timestamp": format_time(sts.metadata.creation_timestamp)
            })
        return sts_list
   
    #读取sts配置
    def read_sts(self, namespaced, sts_name):
        return self.apps_api.read_namespaced_stateful_set(sts_name, namespaced)
   
    #删除sts资源
    def delete_sts(self, namespaced, sts_name):
        return self.apps_api.delete_namespaced_stateful_set(sts_name, namespaced)
   
    #修改sts资源
    def modify_sts(self, namespaced, sts_name, sts_yaml):
        return self.apps_api.patch_namespaced_stateful_set(sts_name, namespaced, sts_yaml)

    #获取configmap列表
    def get_cm(self, namespaced):
        cm_list = []
        for cm in self.core_api.list_namespaced_config_map(namespaced).items:
            cm_list.append({
                "name": cm.metadata.name,
                "uid": cm.metadata.uid,
                "labels": self.conv_labels(cm.metadata.labels),
                "creation_timestamp": format_time(cm.metadata.creation_timestamp)
            })
        return cm_list
   
    #读取cm配置
    def read_cm(self, namespaced, cm_name):
        return self.core_api.read_namespaced_config_map(cm_name, namespaced)
   
    #删除cm资源
    def delete_cm(self, namespaced, cm_name):
        return self.core_api.delete_namespaced_config_map(cm_name, namespaced)

    #获取service_account列表
    def get_sa(self, namespaced):
        sa_list = []
        for sa in self.core_api.list_namespaced_service_account(namespaced).items:
            sa_list.append({
                "name": sa.metadata.name,
                "uid": sa.metadata.uid,
                "labels": self.conv_labels(sa.metadata.labels),
                "creation_timestamp": format_time(sa.metadata.creation_timestamp)
            })
        return sa_list
   
    #读取service_account配置
    def read_sa(self, namespaced, sa_name):
        return self.core_api.read_namespaced_service_account(sa_name, namespaced)
   
    #删除service_account资源
    def delete_sa(self, namespaced, sa_name):
        return self.core_api.delete_namespaced_service_account(sa_name, namespaced)

    #获取secret列表
    def get_secret(self, namespaced):
        sc_list = []
        for sc in self.core_api.list_namespaced_secret(namespaced).items:
            sc_list.append({
                "name": sc.metadata.name,
                "uid": sc.metadata.uid,
                "labels": self.conv_labels(sc.metadata.labels),
                "type": sc.type,
                "creation_timestamp": format_time(sc.metadata.creation_timestamp)
            })
        return sc_list
   
    #读取secret配置
    def read_secret(self, namespaced, secret_name):
        return self.core_api.read_namespaced_secret(secret_name, namespaced)
   
    #删除secret资源
    def delete_secret(self, namespaced, secret_name):
        return self.core_api.delete_namespaced_secret(secret_name, namespaced)

    #获取pvc列表
    def get_pvc(self, namespaced):
        pvc_list = []
        for pvc in self.core_api.list_namespaced_persistent_volume_claim(namespaced).items:
            pvc_list.append({
                "name": pvc.metadata.name,
                "uid": pvc.metadata.uid,
                "labels": self.conv_labels(pvc.metadata.labels),
                "access_modes": pvc.spec.access_modes,
                "storage_class_name": pvc.spec.storage_class_name,
                "volume_mode": pvc.spec.volume_mode,
                "volume_name": pvc.spec.volume_name,
                "capacity": self.conv_labels(pvc.status.to_dict()),
                "requests": self.conv_labels(pvc.spec.resources.to_dict()),
                "phase": pvc.status.phase,
                "creation_timestamp": format_time(pvc.metadata.creation_timestamp)
            })
        return pvc_list
   
    #读取pvc配置
    def read_pvc(self, namespaced, pvc_name):
        return self.core_api.read_namespaced_persistent_volume_claim(pvc_name, namespaced)
   
    #删除pvc资源
    def delete_pvc(self, namespaced, pvc_name):
        return self.core_api.delete_namespaced_persistent_volume_claim(pvc_name, namespaced)

然后顺便附下时间转换的函数

def format_time(utc_datetime):
    dt = time.mktime(utc_datetime.timetuple())
    dtime = time.localtime(int(dt))
    return time.strftime("%Y-%m-%d %H:%M:%S", dtime)


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://sulao.cn/post/909.html

我要评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。