Python操作Kubernetes全指南:从入门到实战的kubernetes库使用教程

一、kubernetes库概述:用途、原理与特性

在云原生技术飞速发展的今天,Kubernetes已成为容器编排的事实标准。而Python作为一门广泛应用于自动化脚本、云服务开发的编程语言,两者的结合催生了kubernetes库——这是Python开发者与Kubernetes集群交互的核心工具。该库提供了完整的Kubernetes API客户端实现,让开发者能通过Python代码管理集群资源、监控状态、自动化运维流程。

其工作原理基于Kubernetes的RESTful API,通过封装API调用细节,将复杂的HTTP请求转换为直观的Python对象操作。开发者无需直接处理JSON数据和HTTP状态码,只需调用相应方法即可完成Pod创建、服务部署等操作。

优点:官方维护保障兼容性,API覆盖全面,支持所有Kubernetes资源操作;提供配置自动加载机制,简化集群连接流程。

缺点:部分高级功能需要深入理解Kubernetes概念;同步API调用在大规模操作时可能影响性能。该库采用Apache License 2.0许可,允许商业使用和修改,只需保留原版权声明。

二、kubernetes库安装与环境配置

2.1 安装kubernetes库

安装kubernetes库非常简单,通过Python的包管理工具pip即可完成。打开终端或命令提示符,执行以下命令:

pip install kubernetes

如果需要安装特定版本(推荐与集群版本匹配),可以指定版本号:

pip install kubernetes==26.1.0  # 安装26.1.0版本,适配Kubernetes 1.26.x

对于需要开发环境的用户,可安装包含开发依赖的版本:

pip install kubernetes[dev]  # 包含测试和开发所需依赖

安装完成后,可以通过以下代码验证安装是否成功:

try:
    import kubernetes
    print(f"kubernetes库安装成功,版本:{kubernetes.__version__}")
except ImportError:
    print("kubernetes库安装失败")

运行后如果输出类似kubernetes库安装成功,版本:26.1.0的信息,则表示安装成功。

2.2 集群连接配置

kubernetes库需要正确的配置才能连接到Kubernetes集群,它支持多种配置方式,适应不同的使用场景。

2.2.1 本地集群配置(kubectl配置)

当你的Python脚本在已配置好kubectl的环境中运行时(如开发机或集群节点),库会自动加载kubectl的配置文件。默认情况下,配置文件位于:

  • Linux/macOS:~/.kube/config
  • Windows:C:\Users\<用户名>\.kube\config

这种方式是最常用的配置方式,无需额外代码即可连接集群:

from kubernetes import client, config

# 加载默认配置(从~/.kube/config读取)
config.load_kube_config()

# 创建API客户端实例
v1 = client.CoreV1Api()

# 测试连接:获取集群版本信息
try:
    version_info = v1.get_code()
    print(f"成功连接到Kubernetes集群,版本:{version_info.git_version}")
except Exception as e:
    print(f"连接集群失败:{str(e)}")

2.2.2 手动指定配置文件

如果配置文件不在默认位置,可以手动指定配置文件路径:

from kubernetes import client, config

# 手动指定配置文件路径
config.load_kube_config(config_file="/path/to/your/kubeconfig")

# 验证连接
v1 = client.CoreV1Api()
print(f"集群服务器地址:{v1.api_client.configuration.host}")

2.2.3 集群内配置(In-Cluster配置)

当Python脚本在Kubernetes集群内部的Pod中运行时,推荐使用In-Cluster配置方式。这种方式不需要手动配置文件,而是通过集群内部的服务账户自动获取权限:

from kubernetes import client, config

# 加载集群内配置
config.load_incluster_config()

# 验证连接
v1 = client.CoreV1Api()
print(f"集群内连接成功,服务器地址:{v1.api_client.configuration.host}")

使用这种方式需要确保Pod的服务账户具有相应的RBAC权限,否则会出现权限不足的错误。

2.2.4 手动配置连接参数

在某些特殊场景下(如连接远程集群且没有配置文件),可以手动指定连接参数:

from kubernetes import client

# 手动配置连接参数
configuration = client.Configuration()
configuration.host = "https://your-kubernetes-api-server:6443"  # API服务器地址
configuration.verify_ssl = True  # 是否验证SSL证书
configuration.ca_cert = "/path/to/ca.crt"  # CA证书路径
configuration.api_key = {"authorization": "Bearer YOUR_TOKEN"}  # 认证Token

# 应用配置
client.Configuration.set_default(configuration)

# 验证连接
v1 = client.CoreV1Api()
try:
    version = v1.get_code()
    print(f"手动配置连接成功,集群版本:{version.git_version}")
except Exception as e:
    print(f"手动配置连接失败:{str(e)}")

三、核心API对象操作详解

3.1 客户端对象初始化

kubernetes库为Kubernetes的每个API组提供了对应的客户端类,最常用的包括:

  • CoreV1Api:核心API组,包含Pod、Service、Namespace等基础资源
  • AppsV1Api:应用API组,包含Deployment、StatefulSet、DaemonSet等
  • BatchV1Api:批处理API组,包含Job、CronJob等
  • NetworkingV1Api:网络API组,包含Ingress等资源

初始化客户端对象的方式非常简单:

from kubernetes import client, config

# 加载配置
config.load_kube_config()

# 初始化各种API客户端
core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
batch_v1 = client.BatchV1Api()
networking_v1 = client.NetworkingV1Api()

所有客户端对象都继承自基础的API客户端,拥有一致的操作风格。

3.2 Namespace管理

Namespace用于在集群中创建资源隔离的逻辑分区,以下是Namespace的常用操作:

3.2.1 列出所有Namespace

from kubernetes import client, config

config.load_kube_config()
core_v1 = client.CoreV1Api()

def list_namespaces():
    """列出集群中所有的Namespace"""
    try:
        # 调用list_namespace方法获取所有Namespace
        namespaces = core_v1.list_namespace()

        print("集群中的Namespace列表:")
        print("名称\t\t状态\t\t创建时间")
        print("-" * 60)

        for ns in namespaces.items:
            # 获取Namespace名称、状态和创建时间
            name = ns.metadata.name
            status = ns.status.phase
            create_time = ns.metadata.creation_timestamp.strftime("%Y-%m-%d %H:%M:%S")
            print(f"{name.ljust(16)}{status.ljust(16)}{create_time}")

    except Exception as e:
        print(f"获取Namespace列表失败:{str(e)}")

if __name__ == "__main__":
    list_namespaces()

运行这段代码会输出类似以下的结果:

集群中的Namespace列表:
名称        状态      创建时间
------------------------------------------------------------
default           Active           2023-01-15 10:30:00
kube-system       Active           2023-01-15 10:29:45
kube-public       Active           2023-01-15 10:29:46
kube-node-lease   Active           2023-01-15 10:29:47

3.2.2 创建Namespace

from kubernetes import client, config
from kubernetes.client.models import V1Namespace, V1ObjectMeta

def create_namespace(namespace_name, labels=None):
    """
    创建新的Namespace
    :param namespace_name: Namespace名称
    :param labels: 可选的标签字典
    :return: 创建的Namespace对象
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    # 定义Namespace元数据
    metadata = V1ObjectMeta(
        name=namespace_name,
        labels=labels or {}  # 如果没有提供标签则使用空字典
    )

    # 创建Namespace对象
    namespace = V1Namespace(metadata=metadata)

    try:
        # 调用API创建Namespace
        result = core_v1.create_namespace(body=namespace)
        print(f"Namespace '{namespace_name}' 创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"Namespace '{namespace_name}' 已存在")
        else:
            print(f"创建Namespace失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    create_namespace(
        namespace_name="python-k8s-demo",
        labels={"env": "demo", "creator": "python-script"}
    )

3.2.3 删除Namespace

from kubernetes import client, config

def delete_namespace(namespace_name):
    """
    删除指定的Namespace
    :param namespace_name: 要删除的Namespace名称
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    try:
        # 调用API删除Namespace
        # propagation_policy="Foreground" 表示Foreground级联删除
        result = core_v1.delete_namespace(
            name=namespace_name,
            propagation_policy="Foreground"
        )
        print(f"Namespace '{namespace_name}' 删除请求已提交")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Namespace '{namespace_name}' 不存在")
        else:
            print(f"删除Namespace失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    delete_namespace("python-k8s-demo")

3.3 Pod资源操作

Pod是Kubernetes的最小部署单元,下面介绍如何通过Python代码操作Pod资源。

3.3.1 列出指定Namespace中的Pod

from kubernetes import client, config

def list_pods(namespace="default"):
    """
    列出指定Namespace中的所有Pod
    :param namespace: 命名空间,默认为default
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    try:
        # 列出Pod,watch=False表示不监听变化,只获取当前状态
        pods = core_v1.list_namespaced_pod(namespace=namespace, watch=False)

        print(f"Namespace '{namespace}' 中的Pod列表:")
        print("名称\t\t\t状态\t\t重启次数\tIP地址")
        print("-" * 70)

        for pod in pods.items:
            name = pod.metadata.name
            status = pod.status.phase
            restart_count = pod.status.container_statuses[0].restart_count if pod.status.container_statuses else 0
            ip = pod.status.pod_ip or "未分配"
            print(f"{name.ljust(24)}{status.ljust(16)}{str(restart_count).ljust(12)}{ip}")

    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Namespace '{namespace}' 不存在")
        else:
            print(f"获取Pod列表失败:{e.reason}")

# 使用示例
if __name__ == "__main__":
    list_pods(namespace="kube-system")  # 查看kube-system命名空间的Pod
    list_pods(namespace="default")      # 查看default命名空间的Pod

3.3.2 创建Pod

创建Pod需要定义相对复杂的配置,包括容器镜像、资源限制、环境变量等:

from kubernetes import client, config
from kubernetes.client.models import (
    V1Pod, V1ObjectMeta, V1PodSpec, 
    V1Container, V1ResourceRequirements
)

def create_pod(namespace, pod_name, image, 
               command=None, args=None, 
               cpu_limit="500m", memory_limit="512Mi",
               cpu_request="200m", memory_request="256Mi",
               labels=None):
    """
    创建一个新的Pod
    :param namespace: 命名空间
    :param pod_name: Pod名称
    :param image: 容器镜像
    :param command: 容器启动命令
    :param args: 容器启动参数
    :param cpu_limit: CPU限制
    :param memory_limit: 内存限制
    :param cpu_request: CPU请求
    :param memory_request: 内存请求
    :param labels: Pod标签
    :return: 创建的Pod对象
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    # 定义资源需求和限制
    resources = V1ResourceRequirements(
        limits={
            "cpu": cpu_limit,
            "memory": memory_limit
        },
        requests={
            "cpu": cpu_request,
            "memory": memory_request
        }
    )

    # 定义容器
    container = V1Container(
        name="main-container",
        image=image,
        resources=resources,
        command=command,
        args=args
    )

    # 定义Pod规格
    spec = V1PodSpec(
        containers=[container],
        restart_policy="Always"  # 重启策略:Always、OnFailure、Never
    )

    # 定义Pod元数据
    metadata = V1ObjectMeta(
        name=pod_name,
        labels=labels or {"app": pod_name}
    )

    # 创建Pod对象
    pod = V1Pod(
        api_version="v1",
        kind="Pod",
        metadata=metadata,
        spec=spec
    )

    try:
        # 调用API创建Pod
        result = core_v1.create_namespaced_pod(
            namespace=namespace,
            body=pod
        )
        print(f"Pod '{pod_name}' 在Namespace '{namespace}' 中创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"Pod '{pod_name}' 已存在")
        else:
            print(f"创建Pod失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    # 创建一个Nginx Pod
    create_pod(
        namespace="default",
        pod_name="nginx-demo",
        image="nginx:1.23",
        labels={"app": "nginx", "env": "demo"},
        cpu_limit="500m",
        memory_limit="512Mi"
    )

    # 创建一个运行Python的Pod
    create_pod(
        namespace="default",
        pod_name="python-demo",
        image="python:3.9-slim",
        command=["python"],
        args=["-c", "import time; while True: print('Hello from Python Pod'); time.sleep(5)"],
        labels={"app": "python", "env": "demo"}
    )

3.3.3 获取Pod详情

from kubernetes import client, config

def get_pod_details(namespace, pod_name):
    """
    获取指定Pod的详细信息
    :param namespace: 命名空间
    :param pod_name: Pod名称
    :return: Pod对象或None
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    try:
        # 获取Pod详情
        pod = core_v1.read_namespaced_pod(name=pod_name, namespace=namespace)
        print(f"Pod '{pod_name}' 详情:")
        print(f"状态:{pod.status.phase}")
        print(f"IP地址:{pod.status.pod_ip}")
        print(f"节点:{pod.spec.node_name}")
        print(f"创建时间:{pod.metadata.creation_timestamp}")
        print("容器信息:")
        for container in pod.spec.containers:
            print(f"  - 名称:{container.name}")
            print(f"    镜像:{container.image}")
        return pod
    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Pod '{pod_name}' 在Namespace '{namespace}' 中不存在")
        else:
            print(f"获取Pod详情失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    get_pod_details(namespace="default", pod_name="nginx-demo")

3.3.4 删除Pod

from kubernetes import client, config

def delete_pod(namespace, pod_name):
    """
    删除指定的Pod
    :param namespace: 命名空间
    :param pod_name: 要删除的Pod名称
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    try:
        result = core_v1.delete_namespaced_pod(
            name=pod_name,
            namespace=namespace,
            body=client.V1DeleteOptions(
                propagation_policy="Foreground",
                grace_period_seconds=30
            )
        )
        print(f"Pod '{pod_name}' 删除请求已提交")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Pod '{pod_name}' 在Namespace '{namespace}' 中不存在")
        else:
            print(f"删除Pod失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    delete_pod(namespace="default", pod_name="nginx-demo")
    delete_pod(namespace="default", pod_name="python-demo")

3.4 Deployment资源操作

Deployment是Kubernetes中最常用的资源之一,用于管理Pod的副本和滚动更新。

3.4.1 创建Deployment

from kubernetes import client, config
from kubernetes.client.models import (
    V1Deployment, V1ObjectMeta, V1DeploymentSpec,
    V1LabelSelector, V1PodTemplateSpec, V1Container,
    V1ResourceRequirements
)

def create_deployment(namespace, deployment_name, image, replicas=3,
                     port=80, cpu_limit="500m", memory_limit="512Mi",
                     cpu_request="200m", memory_request="256Mi",
                     labels=None):
    """
    创建Deployment
    :param namespace: 命名空间
    :param deployment_name: Deployment名称
    :param image: 容器镜像
    :param replicas: 副本数量
    :param port: 容器端口
    :param cpu_limit: CPU限制
    :param memory_limit: 内存限制
    :param cpu_request: CPU请求
    :param memory_request: 内存请求
    :param labels: 标签
    :return: 创建的Deployment对象
    """
    config.load_kube_config()
    apps_v1 = client.AppsV1Api()

    # 定义资源需求
    resources = V1ResourceRequirements(
        limits={"cpu": cpu_limit, "memory": memory_limit},
        requests={"cpu": cpu_request, "memory": memory_request}
    )

    # 定义容器
    container = V1Container(
        name="main-container",
        image=image,
        ports=[client.V1ContainerPort(container_port=port)],
        resources=resources
    )

    # 定义Pod模板
    template = V1PodTemplateSpec(
        metadata=V1ObjectMeta(labels=labels or {"app": deployment_name}),
        spec=client.V1PodSpec(containers=[container])
    )

    # 定义选择器
    selector = V1LabelSelector(
        match_labels=labels or {"app": deployment_name}
    )

    # 定义Deployment规格
    spec = V1DeploymentSpec(
        replicas=replicas,
        template=template,
        selector=selector,
        strategy=client.V1DeploymentStrategy(
            type="RollingUpdate",
            rolling_update=client.V1RollingUpdateDeployment(
                max_surge="25%",
                max_unavailable="25%"
            )
        )
    )

    # 创建Deployment对象
    deployment = V1Deployment(
        api_version="apps/v1",
        kind="Deployment",
        metadata=V1ObjectMeta(name=deployment_name),
        spec=spec
    )

    try:
        # 创建Deployment
        result = apps_v1.create_namespaced_deployment(
            namespace=namespace,
            body=deployment
        )
        print(f"Deployment '{deployment_name}' 创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"Deployment '{deployment_name}' 已存在")
        else:
            print(f"创建Deployment失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    create_deployment(
        namespace="default",
        deployment_name="nginx-deployment",
        image="nginx:1.23",
        replicas=3,
        port=80,
        labels={"app": "nginx", "env": "demo"}
    )

3.4.2 更新Deployment(滚动更新)

from kubernetes import client, config

def update_deployment_image(namespace, deployment_name, new_image):
    """
    更新Deployment的镜像
    :param namespace: 命名空间
    :param deployment_name: Deployment名称
    :param new_image: 新镜像
    :return: 更新后的Deployment对象
    """
    config.load_kube_config()
    apps_v1 = client.AppsV1Api()

    try:
        # 获取当前Deployment
        deployment = apps_v1.read_namespaced_deployment(
            name=deployment_name,
            namespace=namespace
        )

        # 更新镜像
        deployment.spec.template.spec.containers[0].image = new_image

        # 执行更新
        result = apps_v1.patch_namespaced_deployment(
            name=deployment_name,
            namespace=namespace,
            body=deployment
        )

        print(f"Deployment '{deployment_name}' 镜像已更新为: {new_image}")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Deployment '{deployment_name}' 不存在")
        else:
            print(f"更新Deployment失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    update_deployment_image(
        namespace="default",
        deployment_name="nginx-deployment",
        new_image="nginx:1.24"
    )

3.4.3 扩缩容Deployment

from kubernetes import client, config

def scale_deployment(namespace, deployment_name, replicas):
    """
    扩缩容Deployment
    :param namespace: 命名空间
    :param deployment_name: Deployment名称
    :param replicas: 新的副本数
    :return: 更新后的Deployment对象
    """
    config.load_kube_config()
    apps_v1 = client.AppsV1Api()

    try:
        # 获取当前Deployment
        deployment = apps_v1.read_namespaced_deployment(
            name=deployment_name,
            namespace=namespace
        )

        # 更新副本数
        deployment.spec.replicas = replicas

        # 执行更新
        result = apps_v1.patch_namespaced_deployment(
            name=deployment_name,
            namespace=namespace,
            body=deployment
        )

        print(f"Deployment '{deployment_name}' 已调整为 {replicas} 个副本")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Deployment '{deployment_name}' 不存在")
        else:
            print(f"调整Deployment副本数失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    # 扩容到5个副本
    scale_deployment(
        namespace="default",
        deployment_name="nginx-deployment",
        replicas=5
    )

    # 缩容到2个副本
    scale_deployment(
        namespace="default",
        deployment_name="nginx-deployment",
        replicas=2
    )

3.5 Service资源操作

Service用于暴露Pod,使其可以被访问。

3.5.1 创建Service

from kubernetes import client, config
from kubernetes.client.models import (
    V1Service, V1ObjectMeta, V1ServiceSpec,
    V1ServicePort
)

def create_service(namespace, service_name, selector_labels, 
                  port=80, target_port=80, service_type="ClusterIP"):
    """
    创建Service
    :param namespace: 命名空间
    :param service_name: Service名称
    :param selector_labels: 选择器标签
    :param port: 服务端口
    :param target_port: 目标端口
    :param service_type: 服务类型(ClusterIP, NodePort, LoadBalancer)
    :return: 创建的Service对象
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    # 定义Service端口
    service_port = V1ServicePort(
        port=port,
        target_port=target_port,
        protocol="TCP"
    )

    # 定义Service规格
    spec = V1ServiceSpec(
        selector=selector_labels,
        ports=[service_port],
        type=service_type
    )

    # 创建Service对象
    service = V1Service(
        api_version="v1",
        kind="Service",
        metadata=V1ObjectMeta(name=service_name),
        spec=spec
    )

    try:
        # 创建Service
        result = core_v1.create_namespaced_service(
            namespace=namespace,
            body=service
        )
        print(f"Service '{service_name}' 创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"Service '{service_name}' 已存在")
        else:
            print(f"创建Service失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    # 为nginx-deployment创建ClusterIP类型的Service
    create_service(
        namespace="default",
        service_name="nginx-service",
        selector_labels={"app": "nginx"},
        port=80,
        target_port=80,
        service_type="ClusterIP"
    )

    # 为nginx-deployment创建NodePort类型的Service
    create_service(
        namespace="default",
        service_name="nginx-nodeport-service",
        selector_labels={"app": "nginx"},
        port=80,
        target_port=80,
        service_type="NodePort"
    )

3.5.2 列出Services

from kubernetes import client, config

def list_services(namespace="default"):
    """
    列出指定Namespace中的所有Service
    :param namespace: 命名空间
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    try:
        # 获取Services列表
        services = core_v1.list_namespaced_service(namespace=namespace)

        print(f"Namespace '{namespace}' 中的Service列表:")
        print("名称\t\t\t类型\t\tCluster-IP\t\t端口")
        print("-" * 80)

        for service in services.items:
            name = service.metadata.name
            service_type = service.spec.type
            cluster_ip = service.spec.cluster_ip
            ports = ", ".join([f"{port.port}:{port.target_port}" for port in service.spec.ports])

            print(f"{name.ljust(24)}{service_type.ljust(16)}{cluster_ip.ljust(24)}{ports}")

    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Namespace '{namespace}' 不存在")
        else:
            print(f"获取Service列表失败:{e.reason}")

# 使用示例
if __name__ == "__main__":
    list_services(namespace="default")

3.6 Ingress资源操作

Ingress用于提供集群外部对内部服务的HTTP/HTTPS访问。

3.6.1 创建Ingress

from kubernetes import client, config
from kubernetes.client.models import (
    V1Ingress, V1ObjectMeta, V1IngressSpec,
    V1IngressRule, V1HTTPIngressRuleValue,
    V1HTTPIngressPath, V1IngressBackend,
    V1IngressServiceBackend
)

def create_ingress(namespace, ingress_name, host, service_name, service_port=80):
    """
    创建Ingress
    :param namespace: 命名空间
    :param ingress_name: Ingress名称
    :param host: 域名
    :param service_name: 后端服务名称
    :param service_port: 后端服务端口
    :return: 创建的Ingress对象
    """
    config.load_kube_config()
    networking_v1 = client.NetworkingV1Api()

    # 定义Ingress路径
    path = V1HTTPIngressPath(
        path="/",
        path_type="Prefix",
        backend=V1IngressBackend(
            service=V1IngressServiceBackend(
                name=service_name,
                port=client.V1ServiceBackendPort(number=service_port)
            )
        )
    )

    # 定义Ingress规则
    rule = V1IngressRule(
        host=host,
        http=V1HTTPIngressRuleValue(paths=[path])
    )

    # 定义Ingress规格
    spec = V1IngressSpec(rules=[rule])

    # 创建Ingress对象
    ingress = V1Ingress(
        api_version="networking.k8s.io/v1",
        kind="Ingress",
        metadata=V1ObjectMeta(
            name=ingress_name,
            annotations={
                "kubernetes.io/ingress.class": "nginx"  # 使用nginx ingress控制器
            }
        ),
        spec=spec
    )

    try:
        # 创建Ingress
        result = networking_v1.create_namespaced_ingress(
            namespace=namespace,
            body=ingress
        )
        print(f"Ingress '{ingress_name}' 创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"Ingress '{ingress_name}' 已存在")
        else:
            print(f"创建Ingress失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    create_ingress(
        namespace="default",
        ingress_name="nginx-ingress",
        host="nginx.example.com",
        service_name="nginx-service",
        service_port=80
    )

3.7 Job和CronJob资源操作

3.7.1 创建Job

from kubernetes import client, config
from kubernetes.client.models import (
    V1Job, V1ObjectMeta, V1JobSpec, V1PodTemplateSpec,
    V1PodSpec, V1Container
)

def create_job(namespace, job_name, image, command=None, args=None):
    """
    创建Job
    :param namespace: 命名空间
    :param job_name: Job名称
    :param image: 容器镜像
    :param command: 命令
    :param args: 参数
    :return: 创建的Job对象
    """
    config.load_kube_config()
    batch_v1 = client.BatchV1Api()

    # 定义容器
    container = V1Container(
        name="job-container",
        image=image,
        command=command,
        args=args
    )

    # 定义Pod模板
    template = V1PodTemplateSpec(
        spec=V1PodSpec(
            containers=[container],
            restart_policy="Never"  # Job通常使用Never或OnFailure重启策略
        )
    )

    # 定义Job规格
    spec = V1JobSpec(
        template=template,
        backoff_limit=4  # 重试次数
    )

    # 创建Job对象
    job = V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=V1ObjectMeta(name=job_name),
        spec=spec
    )

    try:
        # 创建Job
        result = batch_v1.create_namespaced_job(
            namespace=namespace,
            body=job
        )
        print(f"Job '{job_name}' 创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"Job '{job_name}' 已存在")
        else:
            print(f"创建Job失败:{e.reason}")
        return None

# 使用示例:创建一个简单的计算Pi的Job
if __name__ == "__main__":
    create_job(
        namespace="default",
        job_name="pi-calculation",
        image="perl",
        command=["perl"],
        args=["-Mbignum=bpi", "-wle", "print bpi(2000)"]
    )

3.7.2 创建CronJob

from kubernetes import client, config
from kubernetes.client.models import (
    V1CronJob, V1ObjectMeta, V1CronJobSpec,
    V1JobTemplateSpec, V1PodTemplateSpec,
    V1PodSpec, V1Container
)

def create_cron_job(namespace, cron_job_name, schedule, image, command=None, args=None):
    """
    创建CronJob
    :param namespace: 命名空间
    :param cron_job_name: CronJob名称
    :param schedule: Cron表达式
    :param image: 容器镜像
    :param command: 命令
    :param args: 参数
    :return: 创建的CronJob对象
    """
    config.load_kube_config()
    batch_v1 = client.BatchV1Api()

    # 定义容器
    container = V1Container(
        name="cron-job-container",
        image=image,
        command=command,
        args=args
    )

    # 定义Pod模板
    template = V1PodTemplateSpec(
        spec=V1PodSpec(
            containers=[container],
            restart_policy="Never"
        )
    )

    # 定义Job模板
    job_template = V1JobTemplateSpec(
        spec=V1JobSpec(
            template=template,
            backoff_limit=4
        )
    )

    # 定义CronJob规格
    spec = V1CronJobSpec(
        schedule=schedule,
        job_template=job_template,
        starting_deadline_seconds=100,  # 启动截止时间
        concurrency_policy="Forbid",  # 并发策略:禁止并发执行
        successful_jobs_history_limit=3,  # 保留成功Job历史记录数
        failed_jobs_history_limit=1  # 保留失败Job历史记录数
    )

    # 创建CronJob对象
    cron_job = V1CronJob(
        api_version="batch/v1",
        kind="CronJob",
        metadata=V1ObjectMeta(name=cron_job_name),
        spec=spec
    )

    try:
        # 创建CronJob
        result = batch_v1.create_namespaced_cron_job(
            namespace=namespace,
            body=cron_job
        )
        print(f"CronJob '{cron_job_name}' 创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"CronJob '{cron_job_name}' 已存在")
        else:
            print(f"创建CronJob失败:{e.reason}")
        return None

# 使用示例:创建一个每分钟输出当前时间的CronJob
if __name__ == "__main__":
    create_cron_job(
        namespace="default",
        cron_job_name="time-printer",
        schedule="* * * * *",  # 每分钟执行一次
        image="busybox",
        command=["/bin/sh"],
        args=["-c", "date; echo 'Hello from CronJob'"]
    )

四、高级应用场景

4.1 监控Kubernetes资源变化(Watch API)

kubernetes库提供了Watch API,可以实时监控资源的变化。

from kubernetes import client, config
from kubernetes.client.rest import ApiException
from kubernetes.watch import Watch
import time

def watch_pods(namespace="default", timeout_seconds=60):
    """
    监控指定Namespace中Pod的变化
    :param namespace: 命名空间
    :param timeout_seconds: 超时时间(秒)
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    print(f"开始监控Namespace '{namespace}' 中的Pod变化,超时时间:{timeout_seconds}秒")
    print("-" * 60)

    # 创建Watch对象
    w = Watch()

    try:
        # 使用watch参数启动监控
        for event in w.stream(
            func=core_v1.list_namespaced_pod,
            namespace=namespace,
            timeout_seconds=timeout_seconds
        ):
            pod = event['object']
            event_type = event['type']

            # 获取Pod信息
            pod_name = pod.metadata.name
            pod_status = pod.status.phase

            print(f"事件类型: {event_type}, Pod: {pod_name}, 状态: {pod_status}")

    except ApiException as e:
        print(f"监控过程中发生API异常: {e.reason}")
    except Exception as e:
        print(f"监控过程中发生未知异常: {str(e)}")
    finally:
        # 停止监控
        w.stop()
        print("监控已停止")

# 使用示例
if __name__ == "__main__":
    # 监控default命名空间中的Pod变化,持续60秒
    watch_pods(namespace="default", timeout_seconds=60)

4.2 执行容器命令

可以通过Python代码在运行中的容器内执行命令。

from kubernetes import client, config
from kubernetes.stream import stream

def execute_command_in_container(namespace, pod_name, container_name, command):
    """
    在运行中的容器内执行命令
    :param namespace: 命名空间
    :param pod_name: Pod名称
    :param container_name: 容器名称
    :param command: 要执行的命令,可以是字符串或列表
    :return: 命令执行结果
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    # 如果命令是字符串,转换为列表
    if isinstance(command, str):
        command = command.split()

    try:
        # 使用stream模块执行命令
        result = stream(
            core_v1.connect_get_namespaced_pod_exec,
            name=pod_name,
            namespace=namespace,
            command=command,
            container=container_name,
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False
        )

        return result
    except Exception as e:
        print(f"执行命令失败: {str(e)}")
        return None

# 使用示例
if __name__ == "__main__":
    # 在nginx容器中执行ls命令
    result = execute_command_in_container(
        namespace="default",
        pod_name="nginx-demo",  # 确保这个Pod存在
        container_name="main-container",
        command="ls -l /"
    )

    if result:
        print("命令执行结果:")
        print(result)

4.3 日志收集

可以获取容器的日志输出。

from kubernetes import client, config

def get_container_logs(namespace, pod_name, container_name=None, tail_lines=100):
    """
    获取容器日志
    :param namespace: 命名空间
    :param pod_name: Pod名称
    :param container_name: 容器名称(如果Pod中有多个容器)
    :param tail_lines: 获取的最后行数
    :return: 日志内容
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    try:
        # 获取容器日志
        logs = core_v1.read_namespaced_pod_log(
            name=pod_name,
            namespace=namespace,
            container=container_name,
            tail_lines=tail_lines
        )

        return logs
    except Exception as e:
        print(f"获取日志失败: {str(e)}")
        return None

# 使用示例
if __name__ == "__main__":
    # 获取nginx容器的日志
    logs = get_container_logs(
        namespace="default",
        pod_name="nginx-demo",  # 确保这个Pod存在
        container_name="main-container",
        tail_lines=20
    )

    if logs:
        print("容器日志内容:")
        print(logs)

4.4 动态配置管理(ConfigMap和Secret)

4.4.1 ConfigMap管理

from kubernetes import client, config
from kubernetes.client.models import V1ConfigMap, V1ObjectMeta

def create_config_map(namespace, config_map_name, data):
    """
    创建ConfigMap
    :param namespace: 命名空间
    :param config_map_name: ConfigMap名称
    :param data: 配置数据,字典类型
    :return: 创建的ConfigMap对象
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    # 创建ConfigMap对象
    config_map = V1ConfigMap(
        api_version="v1",
        kind="ConfigMap",
        metadata=V1ObjectMeta(name=config_map_name),
        data=data
    )

    try:
        # 创建ConfigMap
        result = core_v1.create_namespaced_config_map(
            namespace=namespace,
            body=config_map
        )
        print(f"ConfigMap '{config_map_name}' 创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"ConfigMap '{config_map_name}' 已存在")
        else:
            print(f"创建ConfigMap失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    # 创建一个包含数据库配置的ConfigMap
    create_config_map(
        namespace="default",
        config_map_name="db-config",
        data={
            "db_host": "localhost",
            "db_port": "5432",
            "db_name": "mydatabase",
            "db_user": "user"
        }
    )

4.4.2 Secret管理

from kubernetes import client, config
from kubernetes.client.models import V1Secret, V1ObjectMeta
import base64

def create_secret(namespace, secret_name, data, secret_type="Opaque"):
    """
    创建Secret
    :param namespace: 命名空间
    :param secret_name: Secret名称
    :param data: 秘密数据,字典类型,值需要是base64编码
    :param secret_type: Secret类型
    :return: 创建的Secret对象
    """
    config.load_kube_config()
    core_v1 = client.CoreV1Api()

    # 确保数据是base64编码
    encoded_data = {}
    for key, value in data.items():
        if not isinstance(value, bytes):
            value = value.encode('utf-8')
        encoded_data[key] = base64.b64encode(value).decode('utf-8')

    # 创建Secret对象
    secret = V1Secret(
        api_version="v1",
        kind="Secret",
        metadata=V1ObjectMeta(name=secret_name),
        type=secret_type,
        data=encoded_data
    )

    try:
        # 创建Secret
        result = core_v1.create_namespaced_secret(
            namespace=namespace,
            body=secret
        )
        print(f"Secret '{secret_name}' 创建成功")
        return result
    except client.exceptions.ApiException as e:
        if e.status == 409:
            print(f"Secret '{secret_name}' 已存在")
        else:
            print(f"创建Secret失败:{e.reason}")
        return None

# 使用示例
if __name__ == "__main__":
    # 创建一个包含数据库密码的Secret
    create_secret(
        namespace="default",
        secret_name="db-secret",
        data={
            "db_password": "mysecretpassword123"
        }
    )

五、实际案例:自动化部署Web应用

5.1 案例概述

我们将使用kubernetes库创建一个自动化部署脚本,该脚本能够:

  1. 创建Namespace
  2. 部署PostgreSQL数据库
  3. 部署Flask Web应用
  4. 配置Service和Ingress
  5. 监控部署状态

5.2 完整脚本实现

import time
from kubernetes import client, config
from kubernetes.client.models import (
    V1Namespace, V1ObjectMeta, V1PersistentVolumeClaim,
    V1PersistentVolumeClaimSpec, V1ResourceRequirements,
    V1Deployment, V1DeploymentSpec, V1PodTemplateSpec,
    V1PodSpec, V1Container, V1Service, V1ServiceSpec,
    V1ServicePort, V1Ingress, V1IngressSpec, V1IngressRule,
    V1HTTPIngressRuleValue, V1HTTPIngressPath, V1IngressBackend,
    V1IngressServiceBackend, V1ConfigMap, V1Secret
)

class K8sDeployer:
    def __init__(self, namespace="web-app-demo"):
        """初始化Kubernetes客户端"""
        config.load_kube_config()
        self.namespace = namespace

        # 初始化API客户端
        self.core_v1 = client.CoreV1Api()
        self.apps_v1 = client.AppsV1Api()
        self.networking_v1 = client.NetworkingV1Api()

    def create_namespace(self):
        """创建命名空间"""
        print(f"创建命名空间: {self.namespace}")

        namespace = V1Namespace(
            metadata=V1ObjectMeta(name=self.namespace)
        )

        try:
            self.core_v1.create_namespace(body=namespace)
            print(f"命名空间 '{self.namespace}' 创建成功")
        except client.exceptions.ApiException as e:
            if e.status == 409:
                print(f"命名空间 '{self.namespace}' 已存在")
            else:
                raise

    def create_postgresql(self):
        """部署PostgreSQL数据库"""
        print("部署PostgreSQL数据库...")

        # 创建PersistentVolumeClaim
        pvc = V1PersistentVolumeClaim(
            api_version="v1",
            kind="PersistentVolumeClaim",
            metadata=V1ObjectMeta(name="postgres-pvc"),
            spec=V1PersistentVolumeClaimSpec(
                access_modes=["ReadWriteOnce"],
                resources=V1ResourceRequirements(
                    requests={"storage": "1Gi"}
                )
            )
        )

        self.core_v1.create_namespaced_persistent_volume_claim(
            namespace=self.namespace,
            body=pvc
        )

        # 创建ConfigMap
        config_map = V1ConfigMap(
            api_version="v1",
            kind="ConfigMap",
            metadata=V1ObjectMeta(name="postgres-config"),
            data={
                "POSTGRES_DB": "webapp",
                "POSTGRES_USER": "webapp"
            }
        )

        self.core_v1.create_namespaced_config_map(
            namespace=self.namespace,
            body=config_map
        )

        # 创建Secret
        import base64
        password = base64.b64encode("webapp123".encode()).decode()

        secret = V1Secret(
            api_version="v1",
            kind="Secret",
            metadata=V1ObjectMeta(name="postgres-secret"),
            type="Opaque",
            data={"POSTGRES_PASSWORD": password}
        )

        self.core_v1.create_namespaced_secret(
            namespace=self.namespace,
            body=secret
        )

        # 创建Deployment
        deployment = V1Deployment(
            api_version="apps/v1",
            kind="Deployment",
            metadata=V1ObjectMeta(name="postgres"),
            spec=V1DeploymentSpec(
                replicas=1,
                selector={"matchLabels": {"app": "postgres"}},
                template=V1PodTemplateSpec(
                    metadata=V1ObjectMeta(labels={"app": "postgres"}),
                    spec=V1PodSpec(
                        containers=[
                            V1Container(
                                name="postgres",
                                image="postgres:14",
                                ports=[V1ContainerPort(container_port=5432)],
                                env=[
                                    client.V1EnvVar(
                                        name="POSTGRES_DB",
                                        value_from=client.V1EnvVarSource(
                                            config_map_key_ref=client.V1ConfigMapKeySelector(
                                                name="postgres-config",
                                                key="POSTGRES_DB"
                                            )
                                        )
                                    ),
                                    client.V1EnvVar(
                                        name="POSTGRES_USER",
                                        value_from=client.V1EnvVarSource(
                                            config_map_key_ref=client.V1ConfigMapKeySelector(
                                                name="postgres-config",
                                                key="POSTGRES_USER"
                                            )
                                        )
                                    ),
                                    client.V1EnvVar(
                                        name="POSTGRES_PASSWORD",
                                        value_from=client.V1EnvVarSource(
                                            secret_key_ref=client.V1SecretKeySelector(
                                                name="postgres-secret",
                                                key="POSTGRES_PASSWORD"
                                            )
                                        )
                                    )
                                ],
                                volume_mounts=[
                                    client.V1VolumeMount(
                                        name="postgres-data",
                                        mount_path="/var/lib/postgresql/data"
                                    )
                                ]
                            )
                        ],
                        volumes=[
                            client.V1Volume(
                                name="postgres-data",
                                persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
                                    claim_name="postgres-pvc"
                                )
                            )
                        ]
                    )
                )
            )
        )

        self.apps_v1.create_namespaced_deployment(
            namespace=self.namespace,
            body=deployment
        )

        # 创建Service
        service = V1Service(
            api_version="v1",
            kind="Service",
            metadata=V1ObjectMeta(name="postgres"),
            spec=V1ServiceSpec(
                selector={"app": "postgres"},
                ports=[V1ServicePort(port=5432, target_port=5432)]
            )
        )

        self.core_v1.create_namespaced_service(
            namespace=self.namespace,
            body=service
        )

        print("PostgreSQL部署完成")

    def create_web_app(self):
        """部署Web应用"""
        print("部署Web应用...")

        # 创建ConfigMap
        config_map = V1ConfigMap(
            api_version="v1",
            kind="ConfigMap",
            metadata=V1ObjectMeta(name="webapp-config"),
            data={
                "DB_HOST": "postgres",
                "DB_PORT": "5432",
                "DB_NAME": "webapp",
                "DB_USER": "webapp"
            }
        )

        self.core_v1.create_namespaced_config_map(
            namespace=self.namespace,
            body=config_map
        )

        # 创建Deployment
        deployment = V1Deployment(
            api_version="apps/v1",
            kind="Deployment",
            metadata=V1ObjectMeta(name="webapp"),
            spec=V1DeploymentSpec(
                replicas=3,
                selector={"matchLabels": {"app": "webapp"}},
                template=V1PodTemplateSpec(
                    metadata=V1ObjectMeta(labels={"app": "webapp"}),
                    spec=V1PodSpec(
                        containers=[
                            V1Container(
                                name="webapp",
                                image="python:3.9-slim",
                                command=["python", "-m", "http.server", "8080"],
                                ports=[V1ContainerPort(container_port=8080)],
                                env=[
                                    client.V1EnvVar(
                                        name="DB_HOST",
                                        value_from=client.V1EnvVarSource(
                                            config_map_key_ref=client.V1ConfigMapKeySelector(
                                                name="webapp-config",
                                                key="DB_HOST"
                                            )
                                        )
                                    ),
                                    client.V1EnvVar(
                                        name="DB_PORT",
                                        value_from=client.V1EnvVarSource(
                                            config_map_key_ref=client.V1ConfigMapKeySelector(
                                                name="webapp-config",
                                                key="DB_PORT"
                                            )
                                        )
                                    ),
                                    client.V1EnvVar(
                                        name="DB_NAME",
                                        value_from=client.V1EnvVarSource(
                                            config_map_key_ref=client.V1ConfigMapKeySelector(
                                                name="webapp-config",
                                                key="DB_NAME"
                                            )
                                        )
                                    ),
                                    client.V1EnvVar(
                                        name="DB_USER",
                                        value_from=client.V1EnvVarSource(
                                            config_map_key_ref=client.V1ConfigMapKeySelector(
                                                name="webapp-config",
                                                key="DB_USER"
                                            )
                                        )
                                    ),
                                    client.V1EnvVar(
                                        name="DB_PASSWORD",
                                        value_from=client.V1EnvVarSource(
                                            secret_key_ref=client.V1SecretKeySelector(
                                                name="postgres-secret",
                                                key="POSTGRES_PASSWORD"
                                            )
                                        )
                                    )
                                ]
                            )
                        ]
                    )
                )
            )
        )

        self.apps_v1.create_namespaced_deployment(
            namespace=self.namespace,
            body=deployment
        )

        # 创建Service
        service = V1Service(
            api_version="v1",
            kind="Service",
            metadata=V1ObjectMeta(name="webapp"),
            spec=V1ServiceSpec(
                type="ClusterIP",
                selector={"app": "webapp"},
                ports=[V1ServicePort(port=80, target_port=8080)]
            )
        )

        self.core_v1.create_namespaced_service(
            namespace=self.namespace,
            body=service
        )

        print("Web应用部署完成")

    def create_ingress(self, host="webapp.example.com"):
        """创建Ingress"""
        print(f"创建Ingress: {host}")

        ingress = V1Ingress(
            api_version="networking.k8s.io/v1",
            kind="Ingress",
            metadata=V1ObjectMeta(
                name="webapp-ingress",
                annotations={
                    "kubernetes.io/ingress.class": "nginx"
                }
            ),
            spec=V1IngressSpec(
                rules=[
                    V1IngressRule(
                        host=host,
                        http=V1HTTPIngressRuleValue(
                            paths=[
                                V1HTTPIngressPath(
                                    path="/",
                                    path_type="Prefix",
                                    backend=V1IngressBackend(
                                        service=V1IngressServiceBackend(
                                            name="webapp",
                                            port=client.V1ServiceBackendPort(number=80)
                                        )
                                    )
                                )
                            ]
                        )
                    )
                ]
            )
        )

        self.networking_v1.create_namespaced_ingress(
            namespace=self.namespace,
            body=ingress
        )

        print(f"Ingress已创建,访问地址: http://{host}")

    def wait_for_deployment(self, deployment_name, timeout=300):
        """等待Deployment就绪"""
        print(f"等待Deployment '{deployment_name}' 就绪...")

        start_time = time.time()

        while time.time() - start_time &lt; timeout:
            try:
                deployment = self.apps_v1.read_namespaced_deployment(
                    name=deployment_name,
                    namespace=self.namespace
                )

                available_replicas = deployment.status.available_replicas or 0
                desired_replicas = deployment.spec.replicas or 0

                if available_replicas == desired_replicas:
                    print(f"Deployment '{deployment_name}' 已就绪")
                    return True

                print(f"等待中: {available_replicas}/{desired_replicas} 个副本可用")
                time.sleep(5)

            except Exception as e:
                print(f"检查Deployment状态时出错: {str(e)}")
                time.sleep(5)

        print(f"等待超时,Deployment '{deployment_name}' 未就绪")
        return False

    def deploy_web_application(self, domain="webapp.example.com"):
        """部署完整的Web应用"""
        print("开始部署Web应用...")

        try:
            # 创建命名空间
            self.create_namespace()

            # 部署PostgreSQL
            self.create_postgresql()

            # 等待PostgreSQL就绪
            if not self.wait_for_deployment("postgres"):
                print("PostgreSQL部署失败,退出")
                return

            # 部署Web应用
            self.create_web_app()

            # 等待Web应用就绪
            if not self.wait_for_deployment("webapp"):
                print("Web应用部署失败,退出")
                return

            # 创建Ingress
            self.create_ingress(host=domain)

            print("=" * 60)
            print(f"Web应用部署完成! 可以通过 http://{domain} 访问")
            print("=" * 60)

        except Exception as e:
            print(f"部署过程中发生错误: {str(e)}")

# 使用示例
if __name__ == "__main__":
    deployer = K8sDeployer(namespace="web-app-demo")
    deployer.deploy_web_application(domain="webapp.example.com")

六、总结与参考资源

6.1 总结

通过kubernetes库,Python开发者可以方便地与Kubernetes集群进行交互,实现资源管理、自动化部署、监控等功能。本文详细介绍了该库的基本概念、安装配置、核心API操作以及实际案例应用。

在实际项目中,可以基于这些基础知识开发更复杂的自动化工具,如CI/CD流水线、资源调度系统等。掌握kubernetes库的使用,将大大提升Python开发者在云原生领域的开发效率。

6.2 参考资源

通过这些资源,你可以获取更多关于kubernetes库的详细信息和最新动态。建议在实际项目中参考官方文档,以获取最准确和最新的API使用方法。

关注我,每天分享一个实用的Python自动化工具。