网上找了零散的,然后看了k8s 的python api,里面很有方法,大致整理了一下操作的封装,很多操作返回都有to_dict()方法可以直接将返回的对象转为字典,之前没注意,后面懒得改了,写的比较乱,直接记录下来方便以后使用。
#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from kubernetes import client, config
from app.common import format_time
class KubeApi():
def __init__(self, config_file):
config.load_kube_config(config_file)
self.core_api = client.CoreV1Api() # namespace,pod,service,pv,pvc
self.apps_api = client.AppsV1Api() # deployment
self.net_api = client.NetworkingV1Api() # ingress
self.storage_api = client.StorageV1Api() # storage
#转换标签
@staticmethod
def conv_labels(labels):
if labels:
labels = ",".join(["{}={}".format(p, labels[p]) for p in labels if "kubernetes" not in p])
else:
labels = ""
return labels
#获取命名空间列表
def get_namespace(self):
namespaces = []
for p in self.core_api.list_namespace().items:
namespaces.append(p.metadata.name)
return namespaces
#获取节点列表
def get_nodes(self):
node_list = []
for p in self.core_api.list_node().items:
node_list.append({
"name": p.metadata.name,
"addresses": p.status.addresses[0].address,
"status": p.status.conditions[-1].type,
"kubelet_version": p.status.node_info.kubelet_version,
"operating_system": p.status.node_info.operating_system,
"kernel_version": p.status.node_info.kernel_version,
"container_runtime_version": p.status.node_info.container_runtime_version,
"os_image": p.status.node_info.os_image,
"allocatable": p.status.allocatable,
"capacity": p.status.capacity,
"labels": self.conv_labels(p.metadata.labels),
"unschedulable": p.spec.unschedulable,
"creation_time": format_time(p.metadata.creation_timestamp)
})
return node_list
#获取node配置
def read_node(self, node_name):
return self.core_api.read_node(node_name)
#修改调度状态
def node_schedule(self, node_name, schedule_flag):
body = {
"spec": {
"unschedulable": schedule_flag
}
}
ret = self.core_api.patch_node(node_name, body)
return ret
#添加标签
def node_label_modify(self, node_name, labels):
body = {
"metadata": {
"labels": labels
}
}
ret = self.core_api.patch_node(node_name, body)
return ret
#获取POD列表
def get_pods(self, namespaced):
pod_list = []
for pod in self.core_api.list_namespaced_pod(namespaced).items:
if pod.status.container_statuses:
restart_count = sum([p.restart_count for p in pod.status.container_statuses])
ready = "{}/{}".format([p.ready for p in pod.status.container_statuses].count(True), len([p.ready for p in pod.status.container_statuses]))
else:
restart_count = ""
ready = ""
pod_list.append({
"name": pod.metadata.name,
"uid": pod.metadata.uid,
"node_name": pod.spec.node_name,
"phase": pod.status.phase,
"host_ip": pod.status.host_ip,
"pod_ip": pod.status.pod_ip,
"node_selector": self.conv_labels(pod.spec.node_selector),
"containers": [p.name for p in pod.spec.containers],
"labels": self.conv_labels(pod.metadata.labels),
"owner_kind": pod.metadata.owner_references[0].kind,
"restart_count": restart_count,
"ready": ready,
"start_time": format_time(pod.status.start_time)
})
return pod_list
#读取pod配置
def read_pod(self, namespaced, pod_name):
return self.core_api.read_namespaced_pod(pod_name, namespaced)
#获取POD日志
def read_pod_log(self, namespaced, pod_name, container, tail_lines=1000):
return self.core_api.read_namespaced_pod_log(pod_name, namespaced, container=container, tail_lines=tail_lines)
#重启POD
def delete_pod(self, namespaced, pod_name):
return self.core_api.delete_namespaced_pod(pod_name, namespaced)
#获取deploy列表
def get_deploy(self, namespaced):
deploy_list = []
for dp in self.apps_api.list_namespaced_deployment(namespaced).items:
deploy_list.append({
"name": dp.metadata.name,
"node_selector": dp.spec.template.spec.node_selector,
"replicas": dp.status.replicas,
"ready_replicas": dp.status.ready_replicas,
"available_replicas": dp.status.available_replicas,
"unavailable_replicas": dp.status.unavailable_replicas,
"labels": self.conv_labels(dp.metadata.labels),
"rolling_update": self.conv_labels(dp.spec.strategy.rolling_update.to_dict()),
"update_type": dp.spec.strategy.type,
"uid": dp.metadata.uid,
"creation_timestamp": format_time(dp.metadata.creation_timestamp)
})
return deploy_list
#读取deploy配置
def read_deploy(self, namespaced, deploy_name):
return self.apps_api.read_namespaced_deployment(deploy_name, namespaced)
#删除deploy资源
def delete_deploy(self, namespaced, deploy_name):
return self.apps_api.delete_namespaced_deployment(deploy_name, namespaced)
#修改deploy资源
def modify_deploy(self, namespaced, deploy_name, deploy_yaml):
return self.apps_api.patch_namespaced_deployment(deploy_name, namespaced, deploy_yaml)
#创建deploy资源
def create_deploy(self, namespaced, deploy_yaml):
return self.apps_api.create_namespaced_deployment(namespaced, deploy_yaml)
#获取services列表
def get_svc(self, namespaced):
svc_list = []
for svc in self.core_api.list_namespaced_service(namespaced).items:
svc_list.append({
"name": svc.metadata.name,
"uid": svc.metadata.uid,
"cluster_ip": svc.spec.cluster_ip,
"type": svc.spec.type,
"labels": self.conv_labels(svc.metadata.labels),
"creation_timestamp": format_time(svc.metadata.creation_timestamp)
})
return svc_list
#读取svc配置
def read_svc(self, namespaced, svc_name):
return self.core_api.read_namespaced_service(svc_name, namespaced)
#删除svc资源
def delete_svc(self, namespaced, svc_name):
return self.core_api.delete_namespaced_service(svc_name, namespaced)
#修改svc资源
def modify_svc(self, namespaced, svc_name, svc_yaml):
return self.core_api.patch_namespaced_service(svc_name, namespaced, svc_yaml)
#获取damemon_set列表
def get_ds(self, namespaced):
ds_list = []
for ds in self.apps_api.list_namespaced_daemon_set(namespaced).items:
ds_list.append({
"name": ds.metadata.name,
"uid": ds.metadata.uid,
"node_selector": self.conv_labels(ds.spec.template.spec.node_selector),
"labels": self.conv_labels(ds.metadata.labels),
"number_ready": ds.status.number_ready,
"desired_number_scheduled": ds.status.desired_number_scheduled,
"current_number_scheduled": ds.status.current_number_scheduled,
"number_available": ds.status.number_available,
"number_misscheduled": ds.status.number_misscheduled,
"number_unavailable": ds.status.number_unavailable,
"creation_timestamp": format_time(ds.metadata.creation_timestamp)
})
return ds_list
#读取ds配置
def read_ds(self, namespaced, ds_name):
return self.apps_api.read_namespaced_daemon_set(ds_name, namespaced)
#删除ds资源
def delete_ds(self, namespaced, ds_name):
return self.apps_api.delete_namespaced_daemon_set(ds_name, namespaced)
#修改ds资源
def modify_ds(self, namespaced, ds_name, ds_yaml):
return self.apps_api.patch_namespaced_daemon_set(ds_name, namespaced, ds_yaml)
#获取stateful_set列表
def get_sts(self, namespaced):
sts_list = []
for sts in self.apps_api.list_namespaced_stateful_set(namespaced).items:
sts_list.append({
"name": sts.metadata.name,
"node_selector": self.conv_labels(sts.spec.template.spec.node_selector),
"labels": self.conv_labels(sts.metadata.labels),
"replicas": sts.status.replicas,
"ready_replicas": sts.status.ready_replicas,
"current_replicas": sts.status.current_replicas,
"uid": sts.metadata.uid,
"creation_timestamp": format_time(sts.metadata.creation_timestamp)
})
return sts_list
#读取sts配置
def read_sts(self, namespaced, sts_name):
return self.apps_api.read_namespaced_stateful_set(sts_name, namespaced)
#删除sts资源
def delete_sts(self, namespaced, sts_name):
return self.apps_api.delete_namespaced_stateful_set(sts_name, namespaced)
#修改sts资源
def modify_sts(self, namespaced, sts_name, sts_yaml):
return self.apps_api.patch_namespaced_stateful_set(sts_name, namespaced, sts_yaml)
#获取configmap列表
def get_cm(self, namespaced):
cm_list = []
for cm in self.core_api.list_namespaced_config_map(namespaced).items:
cm_list.append({
"name": cm.metadata.name,
"uid": cm.metadata.uid,
"labels": self.conv_labels(cm.metadata.labels),
"creation_timestamp": format_time(cm.metadata.creation_timestamp)
})
return cm_list
#读取cm配置
def read_cm(self, namespaced, cm_name):
return self.core_api.read_namespaced_config_map(cm_name, namespaced)
#删除cm资源
def delete_cm(self, namespaced, cm_name):
return self.core_api.delete_namespaced_config_map(cm_name, namespaced)
#获取service_account列表
def get_sa(self, namespaced):
sa_list = []
for sa in self.core_api.list_namespaced_service_account(namespaced).items:
sa_list.append({
"name": sa.metadata.name,
"uid": sa.metadata.uid,
"labels": self.conv_labels(sa.metadata.labels),
"creation_timestamp": format_time(sa.metadata.creation_timestamp)
})
return sa_list
#读取service_account配置
def read_sa(self, namespaced, sa_name):
return self.core_api.read_namespaced_service_account(sa_name, namespaced)
#删除service_account资源
def delete_sa(self, namespaced, sa_name):
return self.core_api.delete_namespaced_service_account(sa_name, namespaced)
#获取secret列表
def get_secret(self, namespaced):
sc_list = []
for sc in self.core_api.list_namespaced_secret(namespaced).items:
sc_list.append({
"name": sc.metadata.name,
"uid": sc.metadata.uid,
"labels": self.conv_labels(sc.metadata.labels),
"type": sc.type,
"creation_timestamp": format_time(sc.metadata.creation_timestamp)
})
return sc_list
#读取secret配置
def read_secret(self, namespaced, secret_name):
return self.core_api.read_namespaced_secret(secret_name, namespaced)
#删除secret资源
def delete_secret(self, namespaced, secret_name):
return self.core_api.delete_namespaced_secret(secret_name, namespaced)
#获取pvc列表
def get_pvc(self, namespaced):
pvc_list = []
for pvc in self.core_api.list_namespaced_persistent_volume_claim(namespaced).items:
pvc_list.append({
"name": pvc.metadata.name,
"uid": pvc.metadata.uid,
"labels": self.conv_labels(pvc.metadata.labels),
"access_modes": pvc.spec.access_modes,
"storage_class_name": pvc.spec.storage_class_name,
"volume_mode": pvc.spec.volume_mode,
"volume_name": pvc.spec.volume_name,
"capacity": self.conv_labels(pvc.status.to_dict()),
"requests": self.conv_labels(pvc.spec.resources.to_dict()),
"phase": pvc.status.phase,
"creation_timestamp": format_time(pvc.metadata.creation_timestamp)
})
return pvc_list
#读取pvc配置
def read_pvc(self, namespaced, pvc_name):
return self.core_api.read_namespaced_persistent_volume_claim(pvc_name, namespaced)
#删除pvc资源
def delete_pvc(self, namespaced, pvc_name):
return self.core_api.delete_namespaced_persistent_volume_claim(pvc_name, namespaced)然后顺便附下时间转换的函数
def format_time(utc_datetime):
dt = time.mktime(utc_datetime.timetuple())
dtime = time.localtime(int(dt))
return time.strftime("%Y-%m-%d %H:%M:%S", dtime)内容版权声明:除非注明,否则皆为本站原创文章。
转载注明出处:https://sulao.cn/post/906
相关阅读
- k8s集群部署gpu-operator支持gpu节点自动发现和gpu上报
- k8s节点多网卡下指定某一个ip为节点INTERNAL-IP
- flask实现站点地图(sitemap.xml)功能
- flask开发中几种接口参数验证的技巧
- k8s使用SA和Secret配置私有仓库镜像拉取凭证
- python读取大文件的几种方法
- k8s使用flannel作为CNI网络插件
- k8s中harbor-database-0日志报Permissions should be u=rwx (0700)的处理方法
- k8s使用helm部署harbor镜像仓库并使用nodeport方式暴露
- k8s集群部署prometheus/node-exporter/dcgm-exporter
评论列表