
一、kubernetes库概述:用途、原理与特性
在云原生技术飞速发展的今天,Kubernetes已成为容器编排的事实标准。而Python作为一门广泛应用于自动化脚本、云服务开发的编程语言,两者的结合催生了kubernetes
库——这是Python开发者与Kubernetes集群交互的核心工具。该库提供了完整的Kubernetes API客户端实现,让开发者能通过Python代码管理集群资源、监控状态、自动化运维流程。
其工作原理基于Kubernetes的RESTful API,通过封装API调用细节,将复杂的HTTP请求转换为直观的Python对象操作。开发者无需直接处理JSON数据和HTTP状态码,只需调用相应方法即可完成Pod创建、服务部署等操作。
优点:官方维护保障兼容性,API覆盖全面,支持所有Kubernetes资源操作;提供配置自动加载机制,简化集群连接流程。
缺点:部分高级功能需要深入理解Kubernetes概念;同步API调用在大规模操作时可能影响性能。该库采用Apache License 2.0许可,允许商业使用和修改,只需保留原版权声明。
二、kubernetes库安装与环境配置
2.1 安装kubernetes库
安装kubernetes
库非常简单,通过Python的包管理工具pip即可完成。打开终端或命令提示符,执行以下命令:
pip install kubernetes
如果需要安装特定版本(推荐与集群版本匹配),可以指定版本号:
pip install kubernetes==26.1.0 # 安装26.1.0版本,适配Kubernetes 1.26.x
对于需要开发环境的用户,可安装包含开发依赖的版本:
pip install kubernetes[dev] # 包含测试和开发所需依赖
安装完成后,可以通过以下代码验证安装是否成功:
try:
import kubernetes
print(f"kubernetes库安装成功,版本:{kubernetes.__version__}")
except ImportError:
print("kubernetes库安装失败")
运行后如果输出类似kubernetes库安装成功,版本:26.1.0
的信息,则表示安装成功。
2.2 集群连接配置
kubernetes
库需要正确的配置才能连接到Kubernetes集群,它支持多种配置方式,适应不同的使用场景。
2.2.1 本地集群配置(kubectl配置)
当你的Python脚本在已配置好kubectl
的环境中运行时(如开发机或集群节点),库会自动加载kubectl
的配置文件。默认情况下,配置文件位于:
- Linux/macOS:
~/.kube/config
- Windows:
C:\Users\<用户名>\.kube\config
这种方式是最常用的配置方式,无需额外代码即可连接集群:
from kubernetes import client, config
# 加载默认配置(从~/.kube/config读取)
config.load_kube_config()
# 创建API客户端实例
v1 = client.CoreV1Api()
# 测试连接:获取集群版本信息
try:
version_info = v1.get_code()
print(f"成功连接到Kubernetes集群,版本:{version_info.git_version}")
except Exception as e:
print(f"连接集群失败:{str(e)}")
2.2.2 手动指定配置文件
如果配置文件不在默认位置,可以手动指定配置文件路径:
from kubernetes import client, config
# 手动指定配置文件路径
config.load_kube_config(config_file="/path/to/your/kubeconfig")
# 验证连接
v1 = client.CoreV1Api()
print(f"集群服务器地址:{v1.api_client.configuration.host}")
2.2.3 集群内配置(In-Cluster配置)
当Python脚本在Kubernetes集群内部的Pod中运行时,推荐使用In-Cluster配置方式。这种方式不需要手动配置文件,而是通过集群内部的服务账户自动获取权限:
from kubernetes import client, config
# 加载集群内配置
config.load_incluster_config()
# 验证连接
v1 = client.CoreV1Api()
print(f"集群内连接成功,服务器地址:{v1.api_client.configuration.host}")
使用这种方式需要确保Pod的服务账户具有相应的RBAC权限,否则会出现权限不足的错误。
2.2.4 手动配置连接参数
在某些特殊场景下(如连接远程集群且没有配置文件),可以手动指定连接参数:
from kubernetes import client
# 手动配置连接参数
configuration = client.Configuration()
configuration.host = "https://your-kubernetes-api-server:6443" # API服务器地址
configuration.verify_ssl = True # 是否验证SSL证书
configuration.ca_cert = "/path/to/ca.crt" # CA证书路径
configuration.api_key = {"authorization": "Bearer YOUR_TOKEN"} # 认证Token
# 应用配置
client.Configuration.set_default(configuration)
# 验证连接
v1 = client.CoreV1Api()
try:
version = v1.get_code()
print(f"手动配置连接成功,集群版本:{version.git_version}")
except Exception as e:
print(f"手动配置连接失败:{str(e)}")
三、核心API对象操作详解
3.1 客户端对象初始化
kubernetes
库为Kubernetes的每个API组提供了对应的客户端类,最常用的包括:
CoreV1Api
:核心API组,包含Pod、Service、Namespace等基础资源AppsV1Api
:应用API组,包含Deployment、StatefulSet、DaemonSet等BatchV1Api
:批处理API组,包含Job、CronJob等NetworkingV1Api
:网络API组,包含Ingress等资源
初始化客户端对象的方式非常简单:
from kubernetes import client, config
# 加载配置
config.load_kube_config()
# 初始化各种API客户端
core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
batch_v1 = client.BatchV1Api()
networking_v1 = client.NetworkingV1Api()
所有客户端对象都继承自基础的API客户端,拥有一致的操作风格。
3.2 Namespace管理
Namespace用于在集群中创建资源隔离的逻辑分区,以下是Namespace的常用操作:
3.2.1 列出所有Namespace
from kubernetes import client, config
config.load_kube_config()
core_v1 = client.CoreV1Api()
def list_namespaces():
"""列出集群中所有的Namespace"""
try:
# 调用list_namespace方法获取所有Namespace
namespaces = core_v1.list_namespace()
print("集群中的Namespace列表:")
print("名称\t\t状态\t\t创建时间")
print("-" * 60)
for ns in namespaces.items:
# 获取Namespace名称、状态和创建时间
name = ns.metadata.name
status = ns.status.phase
create_time = ns.metadata.creation_timestamp.strftime("%Y-%m-%d %H:%M:%S")
print(f"{name.ljust(16)}{status.ljust(16)}{create_time}")
except Exception as e:
print(f"获取Namespace列表失败:{str(e)}")
if __name__ == "__main__":
list_namespaces()
运行这段代码会输出类似以下的结果:
集群中的Namespace列表:
名称 状态 创建时间
------------------------------------------------------------
default Active 2023-01-15 10:30:00
kube-system Active 2023-01-15 10:29:45
kube-public Active 2023-01-15 10:29:46
kube-node-lease Active 2023-01-15 10:29:47
3.2.2 创建Namespace
from kubernetes import client, config
from kubernetes.client.models import V1Namespace, V1ObjectMeta
def create_namespace(namespace_name, labels=None):
"""
创建新的Namespace
:param namespace_name: Namespace名称
:param labels: 可选的标签字典
:return: 创建的Namespace对象
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
# 定义Namespace元数据
metadata = V1ObjectMeta(
name=namespace_name,
labels=labels or {} # 如果没有提供标签则使用空字典
)
# 创建Namespace对象
namespace = V1Namespace(metadata=metadata)
try:
# 调用API创建Namespace
result = core_v1.create_namespace(body=namespace)
print(f"Namespace '{namespace_name}' 创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"Namespace '{namespace_name}' 已存在")
else:
print(f"创建Namespace失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
create_namespace(
namespace_name="python-k8s-demo",
labels={"env": "demo", "creator": "python-script"}
)
3.2.3 删除Namespace
from kubernetes import client, config
def delete_namespace(namespace_name):
"""
删除指定的Namespace
:param namespace_name: 要删除的Namespace名称
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
try:
# 调用API删除Namespace
# propagation_policy="Foreground" 表示Foreground级联删除
result = core_v1.delete_namespace(
name=namespace_name,
propagation_policy="Foreground"
)
print(f"Namespace '{namespace_name}' 删除请求已提交")
return result
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Namespace '{namespace_name}' 不存在")
else:
print(f"删除Namespace失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
delete_namespace("python-k8s-demo")
3.3 Pod资源操作
Pod是Kubernetes的最小部署单元,下面介绍如何通过Python代码操作Pod资源。
3.3.1 列出指定Namespace中的Pod
from kubernetes import client, config
def list_pods(namespace="default"):
"""
列出指定Namespace中的所有Pod
:param namespace: 命名空间,默认为default
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
try:
# 列出Pod,watch=False表示不监听变化,只获取当前状态
pods = core_v1.list_namespaced_pod(namespace=namespace, watch=False)
print(f"Namespace '{namespace}' 中的Pod列表:")
print("名称\t\t\t状态\t\t重启次数\tIP地址")
print("-" * 70)
for pod in pods.items:
name = pod.metadata.name
status = pod.status.phase
restart_count = pod.status.container_statuses[0].restart_count if pod.status.container_statuses else 0
ip = pod.status.pod_ip or "未分配"
print(f"{name.ljust(24)}{status.ljust(16)}{str(restart_count).ljust(12)}{ip}")
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Namespace '{namespace}' 不存在")
else:
print(f"获取Pod列表失败:{e.reason}")
# 使用示例
if __name__ == "__main__":
list_pods(namespace="kube-system") # 查看kube-system命名空间的Pod
list_pods(namespace="default") # 查看default命名空间的Pod
3.3.2 创建Pod
创建Pod需要定义相对复杂的配置,包括容器镜像、资源限制、环境变量等:
from kubernetes import client, config
from kubernetes.client.models import (
V1Pod, V1ObjectMeta, V1PodSpec,
V1Container, V1ResourceRequirements
)
def create_pod(namespace, pod_name, image,
command=None, args=None,
cpu_limit="500m", memory_limit="512Mi",
cpu_request="200m", memory_request="256Mi",
labels=None):
"""
创建一个新的Pod
:param namespace: 命名空间
:param pod_name: Pod名称
:param image: 容器镜像
:param command: 容器启动命令
:param args: 容器启动参数
:param cpu_limit: CPU限制
:param memory_limit: 内存限制
:param cpu_request: CPU请求
:param memory_request: 内存请求
:param labels: Pod标签
:return: 创建的Pod对象
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
# 定义资源需求和限制
resources = V1ResourceRequirements(
limits={
"cpu": cpu_limit,
"memory": memory_limit
},
requests={
"cpu": cpu_request,
"memory": memory_request
}
)
# 定义容器
container = V1Container(
name="main-container",
image=image,
resources=resources,
command=command,
args=args
)
# 定义Pod规格
spec = V1PodSpec(
containers=[container],
restart_policy="Always" # 重启策略:Always、OnFailure、Never
)
# 定义Pod元数据
metadata = V1ObjectMeta(
name=pod_name,
labels=labels or {"app": pod_name}
)
# 创建Pod对象
pod = V1Pod(
api_version="v1",
kind="Pod",
metadata=metadata,
spec=spec
)
try:
# 调用API创建Pod
result = core_v1.create_namespaced_pod(
namespace=namespace,
body=pod
)
print(f"Pod '{pod_name}' 在Namespace '{namespace}' 中创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"Pod '{pod_name}' 已存在")
else:
print(f"创建Pod失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
# 创建一个Nginx Pod
create_pod(
namespace="default",
pod_name="nginx-demo",
image="nginx:1.23",
labels={"app": "nginx", "env": "demo"},
cpu_limit="500m",
memory_limit="512Mi"
)
# 创建一个运行Python的Pod
create_pod(
namespace="default",
pod_name="python-demo",
image="python:3.9-slim",
command=["python"],
args=["-c", "import time; while True: print('Hello from Python Pod'); time.sleep(5)"],
labels={"app": "python", "env": "demo"}
)
3.3.3 获取Pod详情
from kubernetes import client, config
def get_pod_details(namespace, pod_name):
"""
获取指定Pod的详细信息
:param namespace: 命名空间
:param pod_name: Pod名称
:return: Pod对象或None
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
try:
# 获取Pod详情
pod = core_v1.read_namespaced_pod(name=pod_name, namespace=namespace)
print(f"Pod '{pod_name}' 详情:")
print(f"状态:{pod.status.phase}")
print(f"IP地址:{pod.status.pod_ip}")
print(f"节点:{pod.spec.node_name}")
print(f"创建时间:{pod.metadata.creation_timestamp}")
print("容器信息:")
for container in pod.spec.containers:
print(f" - 名称:{container.name}")
print(f" 镜像:{container.image}")
return pod
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Pod '{pod_name}' 在Namespace '{namespace}' 中不存在")
else:
print(f"获取Pod详情失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
get_pod_details(namespace="default", pod_name="nginx-demo")
3.3.4 删除Pod
from kubernetes import client, config
def delete_pod(namespace, pod_name):
"""
删除指定的Pod
:param namespace: 命名空间
:param pod_name: 要删除的Pod名称
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
try:
result = core_v1.delete_namespaced_pod(
name=pod_name,
namespace=namespace,
body=client.V1DeleteOptions(
propagation_policy="Foreground",
grace_period_seconds=30
)
)
print(f"Pod '{pod_name}' 删除请求已提交")
return result
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Pod '{pod_name}' 在Namespace '{namespace}' 中不存在")
else:
print(f"删除Pod失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
delete_pod(namespace="default", pod_name="nginx-demo")
delete_pod(namespace="default", pod_name="python-demo")
3.4 Deployment资源操作
Deployment是Kubernetes中最常用的资源之一,用于管理Pod的副本和滚动更新。
3.4.1 创建Deployment
from kubernetes import client, config
from kubernetes.client.models import (
V1Deployment, V1ObjectMeta, V1DeploymentSpec,
V1LabelSelector, V1PodTemplateSpec, V1Container,
V1ResourceRequirements
)
def create_deployment(namespace, deployment_name, image, replicas=3,
port=80, cpu_limit="500m", memory_limit="512Mi",
cpu_request="200m", memory_request="256Mi",
labels=None):
"""
创建Deployment
:param namespace: 命名空间
:param deployment_name: Deployment名称
:param image: 容器镜像
:param replicas: 副本数量
:param port: 容器端口
:param cpu_limit: CPU限制
:param memory_limit: 内存限制
:param cpu_request: CPU请求
:param memory_request: 内存请求
:param labels: 标签
:return: 创建的Deployment对象
"""
config.load_kube_config()
apps_v1 = client.AppsV1Api()
# 定义资源需求
resources = V1ResourceRequirements(
limits={"cpu": cpu_limit, "memory": memory_limit},
requests={"cpu": cpu_request, "memory": memory_request}
)
# 定义容器
container = V1Container(
name="main-container",
image=image,
ports=[client.V1ContainerPort(container_port=port)],
resources=resources
)
# 定义Pod模板
template = V1PodTemplateSpec(
metadata=V1ObjectMeta(labels=labels or {"app": deployment_name}),
spec=client.V1PodSpec(containers=[container])
)
# 定义选择器
selector = V1LabelSelector(
match_labels=labels or {"app": deployment_name}
)
# 定义Deployment规格
spec = V1DeploymentSpec(
replicas=replicas,
template=template,
selector=selector,
strategy=client.V1DeploymentStrategy(
type="RollingUpdate",
rolling_update=client.V1RollingUpdateDeployment(
max_surge="25%",
max_unavailable="25%"
)
)
)
# 创建Deployment对象
deployment = V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=V1ObjectMeta(name=deployment_name),
spec=spec
)
try:
# 创建Deployment
result = apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment
)
print(f"Deployment '{deployment_name}' 创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"Deployment '{deployment_name}' 已存在")
else:
print(f"创建Deployment失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
create_deployment(
namespace="default",
deployment_name="nginx-deployment",
image="nginx:1.23",
replicas=3,
port=80,
labels={"app": "nginx", "env": "demo"}
)
3.4.2 更新Deployment(滚动更新)
from kubernetes import client, config
def update_deployment_image(namespace, deployment_name, new_image):
"""
更新Deployment的镜像
:param namespace: 命名空间
:param deployment_name: Deployment名称
:param new_image: 新镜像
:return: 更新后的Deployment对象
"""
config.load_kube_config()
apps_v1 = client.AppsV1Api()
try:
# 获取当前Deployment
deployment = apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace=namespace
)
# 更新镜像
deployment.spec.template.spec.containers[0].image = new_image
# 执行更新
result = apps_v1.patch_namespaced_deployment(
name=deployment_name,
namespace=namespace,
body=deployment
)
print(f"Deployment '{deployment_name}' 镜像已更新为: {new_image}")
return result
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Deployment '{deployment_name}' 不存在")
else:
print(f"更新Deployment失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
update_deployment_image(
namespace="default",
deployment_name="nginx-deployment",
new_image="nginx:1.24"
)
3.4.3 扩缩容Deployment
from kubernetes import client, config
def scale_deployment(namespace, deployment_name, replicas):
"""
扩缩容Deployment
:param namespace: 命名空间
:param deployment_name: Deployment名称
:param replicas: 新的副本数
:return: 更新后的Deployment对象
"""
config.load_kube_config()
apps_v1 = client.AppsV1Api()
try:
# 获取当前Deployment
deployment = apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace=namespace
)
# 更新副本数
deployment.spec.replicas = replicas
# 执行更新
result = apps_v1.patch_namespaced_deployment(
name=deployment_name,
namespace=namespace,
body=deployment
)
print(f"Deployment '{deployment_name}' 已调整为 {replicas} 个副本")
return result
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Deployment '{deployment_name}' 不存在")
else:
print(f"调整Deployment副本数失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
# 扩容到5个副本
scale_deployment(
namespace="default",
deployment_name="nginx-deployment",
replicas=5
)
# 缩容到2个副本
scale_deployment(
namespace="default",
deployment_name="nginx-deployment",
replicas=2
)
3.5 Service资源操作
Service用于暴露Pod,使其可以被访问。
3.5.1 创建Service
from kubernetes import client, config
from kubernetes.client.models import (
V1Service, V1ObjectMeta, V1ServiceSpec,
V1ServicePort
)
def create_service(namespace, service_name, selector_labels,
port=80, target_port=80, service_type="ClusterIP"):
"""
创建Service
:param namespace: 命名空间
:param service_name: Service名称
:param selector_labels: 选择器标签
:param port: 服务端口
:param target_port: 目标端口
:param service_type: 服务类型(ClusterIP, NodePort, LoadBalancer)
:return: 创建的Service对象
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
# 定义Service端口
service_port = V1ServicePort(
port=port,
target_port=target_port,
protocol="TCP"
)
# 定义Service规格
spec = V1ServiceSpec(
selector=selector_labels,
ports=[service_port],
type=service_type
)
# 创建Service对象
service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(name=service_name),
spec=spec
)
try:
# 创建Service
result = core_v1.create_namespaced_service(
namespace=namespace,
body=service
)
print(f"Service '{service_name}' 创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"Service '{service_name}' 已存在")
else:
print(f"创建Service失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
# 为nginx-deployment创建ClusterIP类型的Service
create_service(
namespace="default",
service_name="nginx-service",
selector_labels={"app": "nginx"},
port=80,
target_port=80,
service_type="ClusterIP"
)
# 为nginx-deployment创建NodePort类型的Service
create_service(
namespace="default",
service_name="nginx-nodeport-service",
selector_labels={"app": "nginx"},
port=80,
target_port=80,
service_type="NodePort"
)
3.5.2 列出Services
from kubernetes import client, config
def list_services(namespace="default"):
"""
列出指定Namespace中的所有Service
:param namespace: 命名空间
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
try:
# 获取Services列表
services = core_v1.list_namespaced_service(namespace=namespace)
print(f"Namespace '{namespace}' 中的Service列表:")
print("名称\t\t\t类型\t\tCluster-IP\t\t端口")
print("-" * 80)
for service in services.items:
name = service.metadata.name
service_type = service.spec.type
cluster_ip = service.spec.cluster_ip
ports = ", ".join([f"{port.port}:{port.target_port}" for port in service.spec.ports])
print(f"{name.ljust(24)}{service_type.ljust(16)}{cluster_ip.ljust(24)}{ports}")
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Namespace '{namespace}' 不存在")
else:
print(f"获取Service列表失败:{e.reason}")
# 使用示例
if __name__ == "__main__":
list_services(namespace="default")
3.6 Ingress资源操作
Ingress用于提供集群外部对内部服务的HTTP/HTTPS访问。
3.6.1 创建Ingress
from kubernetes import client, config
from kubernetes.client.models import (
V1Ingress, V1ObjectMeta, V1IngressSpec,
V1IngressRule, V1HTTPIngressRuleValue,
V1HTTPIngressPath, V1IngressBackend,
V1IngressServiceBackend
)
def create_ingress(namespace, ingress_name, host, service_name, service_port=80):
"""
创建Ingress
:param namespace: 命名空间
:param ingress_name: Ingress名称
:param host: 域名
:param service_name: 后端服务名称
:param service_port: 后端服务端口
:return: 创建的Ingress对象
"""
config.load_kube_config()
networking_v1 = client.NetworkingV1Api()
# 定义Ingress路径
path = V1HTTPIngressPath(
path="/",
path_type="Prefix",
backend=V1IngressBackend(
service=V1IngressServiceBackend(
name=service_name,
port=client.V1ServiceBackendPort(number=service_port)
)
)
)
# 定义Ingress规则
rule = V1IngressRule(
host=host,
http=V1HTTPIngressRuleValue(paths=[path])
)
# 定义Ingress规格
spec = V1IngressSpec(rules=[rule])
# 创建Ingress对象
ingress = V1Ingress(
api_version="networking.k8s.io/v1",
kind="Ingress",
metadata=V1ObjectMeta(
name=ingress_name,
annotations={
"kubernetes.io/ingress.class": "nginx" # 使用nginx ingress控制器
}
),
spec=spec
)
try:
# 创建Ingress
result = networking_v1.create_namespaced_ingress(
namespace=namespace,
body=ingress
)
print(f"Ingress '{ingress_name}' 创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"Ingress '{ingress_name}' 已存在")
else:
print(f"创建Ingress失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
create_ingress(
namespace="default",
ingress_name="nginx-ingress",
host="nginx.example.com",
service_name="nginx-service",
service_port=80
)
3.7 Job和CronJob资源操作
3.7.1 创建Job
from kubernetes import client, config
from kubernetes.client.models import (
V1Job, V1ObjectMeta, V1JobSpec, V1PodTemplateSpec,
V1PodSpec, V1Container
)
def create_job(namespace, job_name, image, command=None, args=None):
"""
创建Job
:param namespace: 命名空间
:param job_name: Job名称
:param image: 容器镜像
:param command: 命令
:param args: 参数
:return: 创建的Job对象
"""
config.load_kube_config()
batch_v1 = client.BatchV1Api()
# 定义容器
container = V1Container(
name="job-container",
image=image,
command=command,
args=args
)
# 定义Pod模板
template = V1PodTemplateSpec(
spec=V1PodSpec(
containers=[container],
restart_policy="Never" # Job通常使用Never或OnFailure重启策略
)
)
# 定义Job规格
spec = V1JobSpec(
template=template,
backoff_limit=4 # 重试次数
)
# 创建Job对象
job = V1Job(
api_version="batch/v1",
kind="Job",
metadata=V1ObjectMeta(name=job_name),
spec=spec
)
try:
# 创建Job
result = batch_v1.create_namespaced_job(
namespace=namespace,
body=job
)
print(f"Job '{job_name}' 创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"Job '{job_name}' 已存在")
else:
print(f"创建Job失败:{e.reason}")
return None
# 使用示例:创建一个简单的计算Pi的Job
if __name__ == "__main__":
create_job(
namespace="default",
job_name="pi-calculation",
image="perl",
command=["perl"],
args=["-Mbignum=bpi", "-wle", "print bpi(2000)"]
)
3.7.2 创建CronJob
from kubernetes import client, config
from kubernetes.client.models import (
V1CronJob, V1ObjectMeta, V1CronJobSpec,
V1JobTemplateSpec, V1PodTemplateSpec,
V1PodSpec, V1Container
)
def create_cron_job(namespace, cron_job_name, schedule, image, command=None, args=None):
"""
创建CronJob
:param namespace: 命名空间
:param cron_job_name: CronJob名称
:param schedule: Cron表达式
:param image: 容器镜像
:param command: 命令
:param args: 参数
:return: 创建的CronJob对象
"""
config.load_kube_config()
batch_v1 = client.BatchV1Api()
# 定义容器
container = V1Container(
name="cron-job-container",
image=image,
command=command,
args=args
)
# 定义Pod模板
template = V1PodTemplateSpec(
spec=V1PodSpec(
containers=[container],
restart_policy="Never"
)
)
# 定义Job模板
job_template = V1JobTemplateSpec(
spec=V1JobSpec(
template=template,
backoff_limit=4
)
)
# 定义CronJob规格
spec = V1CronJobSpec(
schedule=schedule,
job_template=job_template,
starting_deadline_seconds=100, # 启动截止时间
concurrency_policy="Forbid", # 并发策略:禁止并发执行
successful_jobs_history_limit=3, # 保留成功Job历史记录数
failed_jobs_history_limit=1 # 保留失败Job历史记录数
)
# 创建CronJob对象
cron_job = V1CronJob(
api_version="batch/v1",
kind="CronJob",
metadata=V1ObjectMeta(name=cron_job_name),
spec=spec
)
try:
# 创建CronJob
result = batch_v1.create_namespaced_cron_job(
namespace=namespace,
body=cron_job
)
print(f"CronJob '{cron_job_name}' 创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"CronJob '{cron_job_name}' 已存在")
else:
print(f"创建CronJob失败:{e.reason}")
return None
# 使用示例:创建一个每分钟输出当前时间的CronJob
if __name__ == "__main__":
create_cron_job(
namespace="default",
cron_job_name="time-printer",
schedule="* * * * *", # 每分钟执行一次
image="busybox",
command=["/bin/sh"],
args=["-c", "date; echo 'Hello from CronJob'"]
)
四、高级应用场景
4.1 监控Kubernetes资源变化(Watch API)
kubernetes
库提供了Watch API,可以实时监控资源的变化。
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from kubernetes.watch import Watch
import time
def watch_pods(namespace="default", timeout_seconds=60):
"""
监控指定Namespace中Pod的变化
:param namespace: 命名空间
:param timeout_seconds: 超时时间(秒)
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
print(f"开始监控Namespace '{namespace}' 中的Pod变化,超时时间:{timeout_seconds}秒")
print("-" * 60)
# 创建Watch对象
w = Watch()
try:
# 使用watch参数启动监控
for event in w.stream(
func=core_v1.list_namespaced_pod,
namespace=namespace,
timeout_seconds=timeout_seconds
):
pod = event['object']
event_type = event['type']
# 获取Pod信息
pod_name = pod.metadata.name
pod_status = pod.status.phase
print(f"事件类型: {event_type}, Pod: {pod_name}, 状态: {pod_status}")
except ApiException as e:
print(f"监控过程中发生API异常: {e.reason}")
except Exception as e:
print(f"监控过程中发生未知异常: {str(e)}")
finally:
# 停止监控
w.stop()
print("监控已停止")
# 使用示例
if __name__ == "__main__":
# 监控default命名空间中的Pod变化,持续60秒
watch_pods(namespace="default", timeout_seconds=60)
4.2 执行容器命令
可以通过Python代码在运行中的容器内执行命令。
from kubernetes import client, config
from kubernetes.stream import stream
def execute_command_in_container(namespace, pod_name, container_name, command):
"""
在运行中的容器内执行命令
:param namespace: 命名空间
:param pod_name: Pod名称
:param container_name: 容器名称
:param command: 要执行的命令,可以是字符串或列表
:return: 命令执行结果
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
# 如果命令是字符串,转换为列表
if isinstance(command, str):
command = command.split()
try:
# 使用stream模块执行命令
result = stream(
core_v1.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
command=command,
container=container_name,
stderr=True,
stdin=False,
stdout=True,
tty=False
)
return result
except Exception as e:
print(f"执行命令失败: {str(e)}")
return None
# 使用示例
if __name__ == "__main__":
# 在nginx容器中执行ls命令
result = execute_command_in_container(
namespace="default",
pod_name="nginx-demo", # 确保这个Pod存在
container_name="main-container",
command="ls -l /"
)
if result:
print("命令执行结果:")
print(result)
4.3 日志收集
可以获取容器的日志输出。
from kubernetes import client, config
def get_container_logs(namespace, pod_name, container_name=None, tail_lines=100):
"""
获取容器日志
:param namespace: 命名空间
:param pod_name: Pod名称
:param container_name: 容器名称(如果Pod中有多个容器)
:param tail_lines: 获取的最后行数
:return: 日志内容
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
try:
# 获取容器日志
logs = core_v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
container=container_name,
tail_lines=tail_lines
)
return logs
except Exception as e:
print(f"获取日志失败: {str(e)}")
return None
# 使用示例
if __name__ == "__main__":
# 获取nginx容器的日志
logs = get_container_logs(
namespace="default",
pod_name="nginx-demo", # 确保这个Pod存在
container_name="main-container",
tail_lines=20
)
if logs:
print("容器日志内容:")
print(logs)
4.4 动态配置管理(ConfigMap和Secret)
4.4.1 ConfigMap管理
from kubernetes import client, config
from kubernetes.client.models import V1ConfigMap, V1ObjectMeta
def create_config_map(namespace, config_map_name, data):
"""
创建ConfigMap
:param namespace: 命名空间
:param config_map_name: ConfigMap名称
:param data: 配置数据,字典类型
:return: 创建的ConfigMap对象
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
# 创建ConfigMap对象
config_map = V1ConfigMap(
api_version="v1",
kind="ConfigMap",
metadata=V1ObjectMeta(name=config_map_name),
data=data
)
try:
# 创建ConfigMap
result = core_v1.create_namespaced_config_map(
namespace=namespace,
body=config_map
)
print(f"ConfigMap '{config_map_name}' 创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"ConfigMap '{config_map_name}' 已存在")
else:
print(f"创建ConfigMap失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
# 创建一个包含数据库配置的ConfigMap
create_config_map(
namespace="default",
config_map_name="db-config",
data={
"db_host": "localhost",
"db_port": "5432",
"db_name": "mydatabase",
"db_user": "user"
}
)
4.4.2 Secret管理
from kubernetes import client, config
from kubernetes.client.models import V1Secret, V1ObjectMeta
import base64
def create_secret(namespace, secret_name, data, secret_type="Opaque"):
"""
创建Secret
:param namespace: 命名空间
:param secret_name: Secret名称
:param data: 秘密数据,字典类型,值需要是base64编码
:param secret_type: Secret类型
:return: 创建的Secret对象
"""
config.load_kube_config()
core_v1 = client.CoreV1Api()
# 确保数据是base64编码
encoded_data = {}
for key, value in data.items():
if not isinstance(value, bytes):
value = value.encode('utf-8')
encoded_data[key] = base64.b64encode(value).decode('utf-8')
# 创建Secret对象
secret = V1Secret(
api_version="v1",
kind="Secret",
metadata=V1ObjectMeta(name=secret_name),
type=secret_type,
data=encoded_data
)
try:
# 创建Secret
result = core_v1.create_namespaced_secret(
namespace=namespace,
body=secret
)
print(f"Secret '{secret_name}' 创建成功")
return result
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"Secret '{secret_name}' 已存在")
else:
print(f"创建Secret失败:{e.reason}")
return None
# 使用示例
if __name__ == "__main__":
# 创建一个包含数据库密码的Secret
create_secret(
namespace="default",
secret_name="db-secret",
data={
"db_password": "mysecretpassword123"
}
)
五、实际案例:自动化部署Web应用
5.1 案例概述
我们将使用kubernetes
库创建一个自动化部署脚本,该脚本能够:
- 创建Namespace
- 部署PostgreSQL数据库
- 部署Flask Web应用
- 配置Service和Ingress
- 监控部署状态
5.2 完整脚本实现
import time
from kubernetes import client, config
from kubernetes.client.models import (
V1Namespace, V1ObjectMeta, V1PersistentVolumeClaim,
V1PersistentVolumeClaimSpec, V1ResourceRequirements,
V1Deployment, V1DeploymentSpec, V1PodTemplateSpec,
V1PodSpec, V1Container, V1Service, V1ServiceSpec,
V1ServicePort, V1Ingress, V1IngressSpec, V1IngressRule,
V1HTTPIngressRuleValue, V1HTTPIngressPath, V1IngressBackend,
V1IngressServiceBackend, V1ConfigMap, V1Secret
)
class K8sDeployer:
def __init__(self, namespace="web-app-demo"):
"""初始化Kubernetes客户端"""
config.load_kube_config()
self.namespace = namespace
# 初始化API客户端
self.core_v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.networking_v1 = client.NetworkingV1Api()
def create_namespace(self):
"""创建命名空间"""
print(f"创建命名空间: {self.namespace}")
namespace = V1Namespace(
metadata=V1ObjectMeta(name=self.namespace)
)
try:
self.core_v1.create_namespace(body=namespace)
print(f"命名空间 '{self.namespace}' 创建成功")
except client.exceptions.ApiException as e:
if e.status == 409:
print(f"命名空间 '{self.namespace}' 已存在")
else:
raise
def create_postgresql(self):
"""部署PostgreSQL数据库"""
print("部署PostgreSQL数据库...")
# 创建PersistentVolumeClaim
pvc = V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=V1ObjectMeta(name="postgres-pvc"),
spec=V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=V1ResourceRequirements(
requests={"storage": "1Gi"}
)
)
)
self.core_v1.create_namespaced_persistent_volume_claim(
namespace=self.namespace,
body=pvc
)
# 创建ConfigMap
config_map = V1ConfigMap(
api_version="v1",
kind="ConfigMap",
metadata=V1ObjectMeta(name="postgres-config"),
data={
"POSTGRES_DB": "webapp",
"POSTGRES_USER": "webapp"
}
)
self.core_v1.create_namespaced_config_map(
namespace=self.namespace,
body=config_map
)
# 创建Secret
import base64
password = base64.b64encode("webapp123".encode()).decode()
secret = V1Secret(
api_version="v1",
kind="Secret",
metadata=V1ObjectMeta(name="postgres-secret"),
type="Opaque",
data={"POSTGRES_PASSWORD": password}
)
self.core_v1.create_namespaced_secret(
namespace=self.namespace,
body=secret
)
# 创建Deployment
deployment = V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=V1ObjectMeta(name="postgres"),
spec=V1DeploymentSpec(
replicas=1,
selector={"matchLabels": {"app": "postgres"}},
template=V1PodTemplateSpec(
metadata=V1ObjectMeta(labels={"app": "postgres"}),
spec=V1PodSpec(
containers=[
V1Container(
name="postgres",
image="postgres:14",
ports=[V1ContainerPort(container_port=5432)],
env=[
client.V1EnvVar(
name="POSTGRES_DB",
value_from=client.V1EnvVarSource(
config_map_key_ref=client.V1ConfigMapKeySelector(
name="postgres-config",
key="POSTGRES_DB"
)
)
),
client.V1EnvVar(
name="POSTGRES_USER",
value_from=client.V1EnvVarSource(
config_map_key_ref=client.V1ConfigMapKeySelector(
name="postgres-config",
key="POSTGRES_USER"
)
)
),
client.V1EnvVar(
name="POSTGRES_PASSWORD",
value_from=client.V1EnvVarSource(
secret_key_ref=client.V1SecretKeySelector(
name="postgres-secret",
key="POSTGRES_PASSWORD"
)
)
)
],
volume_mounts=[
client.V1VolumeMount(
name="postgres-data",
mount_path="/var/lib/postgresql/data"
)
]
)
],
volumes=[
client.V1Volume(
name="postgres-data",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name="postgres-pvc"
)
)
]
)
)
)
)
self.apps_v1.create_namespaced_deployment(
namespace=self.namespace,
body=deployment
)
# 创建Service
service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(name="postgres"),
spec=V1ServiceSpec(
selector={"app": "postgres"},
ports=[V1ServicePort(port=5432, target_port=5432)]
)
)
self.core_v1.create_namespaced_service(
namespace=self.namespace,
body=service
)
print("PostgreSQL部署完成")
def create_web_app(self):
"""部署Web应用"""
print("部署Web应用...")
# 创建ConfigMap
config_map = V1ConfigMap(
api_version="v1",
kind="ConfigMap",
metadata=V1ObjectMeta(name="webapp-config"),
data={
"DB_HOST": "postgres",
"DB_PORT": "5432",
"DB_NAME": "webapp",
"DB_USER": "webapp"
}
)
self.core_v1.create_namespaced_config_map(
namespace=self.namespace,
body=config_map
)
# 创建Deployment
deployment = V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=V1ObjectMeta(name="webapp"),
spec=V1DeploymentSpec(
replicas=3,
selector={"matchLabels": {"app": "webapp"}},
template=V1PodTemplateSpec(
metadata=V1ObjectMeta(labels={"app": "webapp"}),
spec=V1PodSpec(
containers=[
V1Container(
name="webapp",
image="python:3.9-slim",
command=["python", "-m", "http.server", "8080"],
ports=[V1ContainerPort(container_port=8080)],
env=[
client.V1EnvVar(
name="DB_HOST",
value_from=client.V1EnvVarSource(
config_map_key_ref=client.V1ConfigMapKeySelector(
name="webapp-config",
key="DB_HOST"
)
)
),
client.V1EnvVar(
name="DB_PORT",
value_from=client.V1EnvVarSource(
config_map_key_ref=client.V1ConfigMapKeySelector(
name="webapp-config",
key="DB_PORT"
)
)
),
client.V1EnvVar(
name="DB_NAME",
value_from=client.V1EnvVarSource(
config_map_key_ref=client.V1ConfigMapKeySelector(
name="webapp-config",
key="DB_NAME"
)
)
),
client.V1EnvVar(
name="DB_USER",
value_from=client.V1EnvVarSource(
config_map_key_ref=client.V1ConfigMapKeySelector(
name="webapp-config",
key="DB_USER"
)
)
),
client.V1EnvVar(
name="DB_PASSWORD",
value_from=client.V1EnvVarSource(
secret_key_ref=client.V1SecretKeySelector(
name="postgres-secret",
key="POSTGRES_PASSWORD"
)
)
)
]
)
]
)
)
)
)
self.apps_v1.create_namespaced_deployment(
namespace=self.namespace,
body=deployment
)
# 创建Service
service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(name="webapp"),
spec=V1ServiceSpec(
type="ClusterIP",
selector={"app": "webapp"},
ports=[V1ServicePort(port=80, target_port=8080)]
)
)
self.core_v1.create_namespaced_service(
namespace=self.namespace,
body=service
)
print("Web应用部署完成")
def create_ingress(self, host="webapp.example.com"):
"""创建Ingress"""
print(f"创建Ingress: {host}")
ingress = V1Ingress(
api_version="networking.k8s.io/v1",
kind="Ingress",
metadata=V1ObjectMeta(
name="webapp-ingress",
annotations={
"kubernetes.io/ingress.class": "nginx"
}
),
spec=V1IngressSpec(
rules=[
V1IngressRule(
host=host,
http=V1HTTPIngressRuleValue(
paths=[
V1HTTPIngressPath(
path="/",
path_type="Prefix",
backend=V1IngressBackend(
service=V1IngressServiceBackend(
name="webapp",
port=client.V1ServiceBackendPort(number=80)
)
)
)
]
)
)
]
)
)
self.networking_v1.create_namespaced_ingress(
namespace=self.namespace,
body=ingress
)
print(f"Ingress已创建,访问地址: http://{host}")
def wait_for_deployment(self, deployment_name, timeout=300):
"""等待Deployment就绪"""
print(f"等待Deployment '{deployment_name}' 就绪...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
deployment = self.apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace=self.namespace
)
available_replicas = deployment.status.available_replicas or 0
desired_replicas = deployment.spec.replicas or 0
if available_replicas == desired_replicas:
print(f"Deployment '{deployment_name}' 已就绪")
return True
print(f"等待中: {available_replicas}/{desired_replicas} 个副本可用")
time.sleep(5)
except Exception as e:
print(f"检查Deployment状态时出错: {str(e)}")
time.sleep(5)
print(f"等待超时,Deployment '{deployment_name}' 未就绪")
return False
def deploy_web_application(self, domain="webapp.example.com"):
"""部署完整的Web应用"""
print("开始部署Web应用...")
try:
# 创建命名空间
self.create_namespace()
# 部署PostgreSQL
self.create_postgresql()
# 等待PostgreSQL就绪
if not self.wait_for_deployment("postgres"):
print("PostgreSQL部署失败,退出")
return
# 部署Web应用
self.create_web_app()
# 等待Web应用就绪
if not self.wait_for_deployment("webapp"):
print("Web应用部署失败,退出")
return
# 创建Ingress
self.create_ingress(host=domain)
print("=" * 60)
print(f"Web应用部署完成! 可以通过 http://{domain} 访问")
print("=" * 60)
except Exception as e:
print(f"部署过程中发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
deployer = K8sDeployer(namespace="web-app-demo")
deployer.deploy_web_application(domain="webapp.example.com")
六、总结与参考资源
6.1 总结
通过kubernetes
库,Python开发者可以方便地与Kubernetes集群进行交互,实现资源管理、自动化部署、监控等功能。本文详细介绍了该库的基本概念、安装配置、核心API操作以及实际案例应用。
在实际项目中,可以基于这些基础知识开发更复杂的自动化工具,如CI/CD流水线、资源调度系统等。掌握kubernetes
库的使用,将大大提升Python开发者在云原生领域的开发效率。
6.2 参考资源
- Pypi地址:https://pypi.org/project/kubernetes/
- Github地址:https://github.com/kubernetes-client/python
- 官方文档地址:https://kubernetes.io/docs/reference/generated/kubernetes-client/python/
通过这些资源,你可以获取更多关于kubernetes
库的详细信息和最新动态。建议在实际项目中参考官方文档,以获取最准确和最新的API使用方法。
关注我,每天分享一个实用的Python自动化工具。
