网上找了零散的,然后看了k8s 的python api,里面很有方法,大致整理了一下操作的封装,很多操作返回都有to_dict()方法可以直接将返回的对象转为字典,之前没注意,后面懒得改了,写的比较乱,直接记录下来方便以后使用。
#!/usr/bin/python3 #coding:utf-8 __author__ = 'yang.su' from kubernetes import client, config from app.common import format_time class KubeApi(): def __init__(self, config_file): config.load_kube_config(config_file) self.core_api = client.CoreV1Api() # namespace,pod,service,pv,pvc self.apps_api = client.AppsV1Api() # deployment self.net_api = client.NetworkingV1Api() # ingress self.storage_api = client.StorageV1Api() # storage #转换标签 @staticmethod def conv_labels(labels): if labels: labels = ",".join(["{}={}".format(p, labels[p]) for p in labels if "kubernetes" not in p]) else: labels = "" return labels #获取命名空间列表 def get_namespace(self): namespaces = [] for p in self.core_api.list_namespace().items: namespaces.append(p.metadata.name) return namespaces #获取节点列表 def get_nodes(self): node_list = [] for p in self.core_api.list_node().items: node_list.append({ "name": p.metadata.name, "addresses": p.status.addresses[0].address, "status": p.status.conditions[-1].type, "kubelet_version": p.status.node_info.kubelet_version, "operating_system": p.status.node_info.operating_system, "kernel_version": p.status.node_info.kernel_version, "container_runtime_version": p.status.node_info.container_runtime_version, "os_image": p.status.node_info.os_image, "allocatable": p.status.allocatable, "capacity": p.status.capacity, "labels": self.conv_labels(p.metadata.labels), "unschedulable": p.spec.unschedulable, "creation_time": format_time(p.metadata.creation_timestamp) }) return node_list #获取node配置 def read_node(self, node_name): return self.core_api.read_node(node_name) #修改调度状态 def node_schedule(self, node_name, schedule_flag): body = { "spec": { "unschedulable": schedule_flag } } ret = self.core_api.patch_node(node_name, body) return ret #添加标签 def node_label_modify(self, node_name, labels): body = { "metadata": { "labels": labels } } ret = self.core_api.patch_node(node_name, body) return ret #获取POD列表 def get_pods(self, namespaced): pod_list = [] for pod in self.core_api.list_namespaced_pod(namespaced).items: if pod.status.container_statuses: restart_count = sum([p.restart_count for p in pod.status.container_statuses]) ready = "{}/{}".format([p.ready for p in pod.status.container_statuses].count(True), len([p.ready for p in pod.status.container_statuses])) else: restart_count = "" ready = "" pod_list.append({ "name": pod.metadata.name, "uid": pod.metadata.uid, "node_name": pod.spec.node_name, "phase": pod.status.phase, "host_ip": pod.status.host_ip, "pod_ip": pod.status.pod_ip, "node_selector": self.conv_labels(pod.spec.node_selector), "containers": [p.name for p in pod.spec.containers], "labels": self.conv_labels(pod.metadata.labels), "owner_kind": pod.metadata.owner_references[0].kind, "restart_count": restart_count, "ready": ready, "start_time": format_time(pod.status.start_time) }) return pod_list #读取pod配置 def read_pod(self, namespaced, pod_name): return self.core_api.read_namespaced_pod(pod_name, namespaced) #获取POD日志 def read_pod_log(self, namespaced, pod_name, container, tail_lines=1000): return self.core_api.read_namespaced_pod_log(pod_name, namespaced, container=container, tail_lines=tail_lines) #重启POD def delete_pod(self, namespaced, pod_name): return self.core_api.delete_namespaced_pod(pod_name, namespaced) #获取deploy列表 def get_deploy(self, namespaced): deploy_list = [] for dp in self.apps_api.list_namespaced_deployment(namespaced).items: deploy_list.append({ "name": dp.metadata.name, "node_selector": dp.spec.template.spec.node_selector, "replicas": dp.status.replicas, "ready_replicas": dp.status.ready_replicas, "available_replicas": dp.status.available_replicas, "unavailable_replicas": dp.status.unavailable_replicas, "labels": self.conv_labels(dp.metadata.labels), "rolling_update": self.conv_labels(dp.spec.strategy.rolling_update.to_dict()), "update_type": dp.spec.strategy.type, "uid": dp.metadata.uid, "creation_timestamp": format_time(dp.metadata.creation_timestamp) }) return deploy_list #读取deploy配置 def read_deploy(self, namespaced, deploy_name): return self.apps_api.read_namespaced_deployment(deploy_name, namespaced) #删除deploy资源 def delete_deploy(self, namespaced, deploy_name): return self.apps_api.delete_namespaced_deployment(deploy_name, namespaced) #修改deploy资源 def modify_deploy(self, namespaced, deploy_name, deploy_yaml): return self.apps_api.patch_namespaced_deployment(deploy_name, namespaced, deploy_yaml) #创建deploy资源 def create_deploy(self, namespaced, deploy_yaml): return self.apps_api.create_namespaced_deployment(namespaced, deploy_yaml) #获取services列表 def get_svc(self, namespaced): svc_list = [] for svc in self.core_api.list_namespaced_service(namespaced).items: svc_list.append({ "name": svc.metadata.name, "uid": svc.metadata.uid, "cluster_ip": svc.spec.cluster_ip, "type": svc.spec.type, "labels": self.conv_labels(svc.metadata.labels), "creation_timestamp": format_time(svc.metadata.creation_timestamp) }) return svc_list #读取svc配置 def read_svc(self, namespaced, svc_name): return self.core_api.read_namespaced_service(svc_name, namespaced) #删除svc资源 def delete_svc(self, namespaced, svc_name): return self.core_api.delete_namespaced_service(svc_name, namespaced) #修改svc资源 def modify_svc(self, namespaced, svc_name, svc_yaml): return self.core_api.patch_namespaced_service(svc_name, namespaced, svc_yaml) #获取damemon_set列表 def get_ds(self, namespaced): ds_list = [] for ds in self.apps_api.list_namespaced_daemon_set(namespaced).items: ds_list.append({ "name": ds.metadata.name, "uid": ds.metadata.uid, "node_selector": self.conv_labels(ds.spec.template.spec.node_selector), "labels": self.conv_labels(ds.metadata.labels), "number_ready": ds.status.number_ready, "desired_number_scheduled": ds.status.desired_number_scheduled, "current_number_scheduled": ds.status.current_number_scheduled, "number_available": ds.status.number_available, "number_misscheduled": ds.status.number_misscheduled, "number_unavailable": ds.status.number_unavailable, "creation_timestamp": format_time(ds.metadata.creation_timestamp) }) return ds_list #读取ds配置 def read_ds(self, namespaced, ds_name): return self.apps_api.read_namespaced_daemon_set(ds_name, namespaced) #删除ds资源 def delete_ds(self, namespaced, ds_name): return self.apps_api.delete_namespaced_daemon_set(ds_name, namespaced) #修改ds资源 def modify_ds(self, namespaced, ds_name, ds_yaml): return self.apps_api.patch_namespaced_daemon_set(ds_name, namespaced, ds_yaml) #获取stateful_set列表 def get_sts(self, namespaced): sts_list = [] for sts in self.apps_api.list_namespaced_stateful_set(namespaced).items: sts_list.append({ "name": sts.metadata.name, "node_selector": self.conv_labels(sts.spec.template.spec.node_selector), "labels": self.conv_labels(sts.metadata.labels), "replicas": sts.status.replicas, "ready_replicas": sts.status.ready_replicas, "current_replicas": sts.status.current_replicas, "uid": sts.metadata.uid, "creation_timestamp": format_time(sts.metadata.creation_timestamp) }) return sts_list #读取sts配置 def read_sts(self, namespaced, sts_name): return self.apps_api.read_namespaced_stateful_set(sts_name, namespaced) #删除sts资源 def delete_sts(self, namespaced, sts_name): return self.apps_api.delete_namespaced_stateful_set(sts_name, namespaced) #修改sts资源 def modify_sts(self, namespaced, sts_name, sts_yaml): return self.apps_api.patch_namespaced_stateful_set(sts_name, namespaced, sts_yaml) #获取configmap列表 def get_cm(self, namespaced): cm_list = [] for cm in self.core_api.list_namespaced_config_map(namespaced).items: cm_list.append({ "name": cm.metadata.name, "uid": cm.metadata.uid, "labels": self.conv_labels(cm.metadata.labels), "creation_timestamp": format_time(cm.metadata.creation_timestamp) }) return cm_list #读取cm配置 def read_cm(self, namespaced, cm_name): return self.core_api.read_namespaced_config_map(cm_name, namespaced) #删除cm资源 def delete_cm(self, namespaced, cm_name): return self.core_api.delete_namespaced_config_map(cm_name, namespaced) #获取service_account列表 def get_sa(self, namespaced): sa_list = [] for sa in self.core_api.list_namespaced_service_account(namespaced).items: sa_list.append({ "name": sa.metadata.name, "uid": sa.metadata.uid, "labels": self.conv_labels(sa.metadata.labels), "creation_timestamp": format_time(sa.metadata.creation_timestamp) }) return sa_list #读取service_account配置 def read_sa(self, namespaced, sa_name): return self.core_api.read_namespaced_service_account(sa_name, namespaced) #删除service_account资源 def delete_sa(self, namespaced, sa_name): return self.core_api.delete_namespaced_service_account(sa_name, namespaced) #获取secret列表 def get_secret(self, namespaced): sc_list = [] for sc in self.core_api.list_namespaced_secret(namespaced).items: sc_list.append({ "name": sc.metadata.name, "uid": sc.metadata.uid, "labels": self.conv_labels(sc.metadata.labels), "type": sc.type, "creation_timestamp": format_time(sc.metadata.creation_timestamp) }) return sc_list #读取secret配置 def read_secret(self, namespaced, secret_name): return self.core_api.read_namespaced_secret(secret_name, namespaced) #删除secret资源 def delete_secret(self, namespaced, secret_name): return self.core_api.delete_namespaced_secret(secret_name, namespaced) #获取pvc列表 def get_pvc(self, namespaced): pvc_list = [] for pvc in self.core_api.list_namespaced_persistent_volume_claim(namespaced).items: pvc_list.append({ "name": pvc.metadata.name, "uid": pvc.metadata.uid, "labels": self.conv_labels(pvc.metadata.labels), "access_modes": pvc.spec.access_modes, "storage_class_name": pvc.spec.storage_class_name, "volume_mode": pvc.spec.volume_mode, "volume_name": pvc.spec.volume_name, "capacity": self.conv_labels(pvc.status.to_dict()), "requests": self.conv_labels(pvc.spec.resources.to_dict()), "phase": pvc.status.phase, "creation_timestamp": format_time(pvc.metadata.creation_timestamp) }) return pvc_list #读取pvc配置 def read_pvc(self, namespaced, pvc_name): return self.core_api.read_namespaced_persistent_volume_claim(pvc_name, namespaced) #删除pvc资源 def delete_pvc(self, namespaced, pvc_name): return self.core_api.delete_namespaced_persistent_volume_claim(pvc_name, namespaced)
然后顺便附下时间转换的函数
def format_time(utc_datetime): dt = time.mktime(utc_datetime.timetuple()) dtime = time.localtime(int(dt)) return time.strftime("%Y-%m-%d %H:%M:%S", dtime)