Python Client 深度实战

Python Client 深度实战

kubernetes-python 介绍

kubernetes (Python client) 是 Kubernetes 官方的 Python 客户端库,提供了与 Kubernetes API Server 交互的完整功能。

特点

  • 官方维护:由 Kubernetes 官方维护
  • 自动生成:从 OpenAPI spec 自动生成
  • 类型化 API:强类型化的 API
  • Watch 支持:支持资源监听
  • 完整功能:覆盖所有 Kubernetes 资源

安装

# 安装最新版本
pip install kubernetes

# 安装特定版本
pip install kubernetes==28.1.0

# 查看版本
python -c "import kubernetes; print(kubernetes.__version__)"

版本对应关系

Kubernetes Python Client
1.28 28.x.x
1.27 27.x.x
1.26 26.x.x
1.25 25.x.x

初始化客户端

方式 1:从 kubeconfig 初始化(集群外)

from kubernetes import client, config

# 加载 kubeconfig(默认 ~/.kube/config)
config.load_kube_config()

# 创建 API 客户端
v1 = client.CoreV1Api()

# 验证连接
version = v1.get_api_resources()
print("Connected to Kubernetes cluster!")

方式 2:指定 kubeconfig 路径

from kubernetes import client, config
import os

# 指定 kubeconfig 文件路径
kubeconfig_path = os.path.expanduser("~/.kube/config")
config.load_kube_config(config_file=kubeconfig_path)

v1 = client.CoreV1Api()

方式 3:从集群内初始化(Pod 内)

from kubernetes import client, config

# 加载集群内配置(从 ServiceAccount)
config.load_incluster_config()

v1 = client.CoreV1Api()
print("Running inside Kubernetes cluster!")

方式 4:自适应初始化(推荐)

from kubernetes import client, config
from kubernetes.config.config_exception import ConfigException

def get_kubernetes_client():
    """自适应获取 Kubernetes 客户端"""
    try:
        # 尝试加载集群内配置
        config.load_incluster_config()
        print("Using in-cluster config")
    except ConfigException:
        # 如果失败,加载 kubeconfig
        config.load_kube_config()
        print("Using kubeconfig")
    
    return client.CoreV1Api()

# 使用
v1 = get_kubernetes_client()

方式 5:手动配置

from kubernetes import client

# 手动创建配置
configuration = client.Configuration()
configuration.host = "https://kubernetes.example.com:6443"
configuration.api_key = {"authorization": "Bearer YOUR_TOKEN"}
configuration.verify_ssl = False  # 生产环境应该设为 True

# 使用配置创建客户端
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

Pod 操作

列出 Pods

from kubernetes import client, config

config.load_kube_config()
v1 = client.CoreV1Api()

def list_pods(namespace="default"):
    """列出指定命名空间的所有 Pods"""
    try:
        pods = v1.list_namespaced_pod(namespace)
        
        print(f"Found {len(pods.items)} pods in namespace '{namespace}':")
        
        for pod in pods.items:
            print(f"- {pod.metadata.name}")
            print(f"  Status: {pod.status.phase}")
            print(f"  Node: {pod.spec.node_name}")
            print(f"  IP: {pod.status.pod_ip}")
            
            # 列出容器
            for container in pod.spec.containers:
                print(f"  Container: {container.name} (Image: {container.image})")
            print()
    
    except client.exceptions.ApiException as e:
        print(f"Exception: {e}")

# 使用
list_pods("default")

使用 Label Selector

def list_pods_with_selector(namespace="default", label_selector=""):
    """使用 Label Selector 过滤 Pods"""
    pods = v1.list_namespaced_pod(
        namespace,
        label_selector=label_selector  # 例如:app=nginx,env=production
    )
    
    print(f"Found {len(pods.items)} pods matching selector '{label_selector}':")
    for pod in pods.items:
        print(f"- {pod.metadata.name}")
        print(f"  Labels: {pod.metadata.labels}")

# 使用
list_pods_with_selector("default", "app=nginx")

使用 Field Selector

def list_running_pods(namespace="default"):
    """列出所有 Running 状态的 Pods"""
    pods = v1.list_namespaced_pod(
        namespace,
        field_selector="status.phase=Running"
    )
    
    print(f"Found {len(pods.items)} running pods:")
    for pod in pods.items:
        print(f"- {pod.metadata.name} (Phase: {pod.status.phase})")

# 使用
list_running_pods("default")

获取单个 Pod

def get_pod(namespace, name):
    """获取指定的 Pod"""
    try:
        pod = v1.read_namespaced_pod(name, namespace)
        
        print(f"Pod Name: {pod.metadata.name}")
        print(f"Namespace: {pod.metadata.namespace}")
        print(f"Status: {pod.status.phase}")
        print(f"Node: {pod.spec.node_name}")
        print(f"IP: {pod.status.pod_ip}")
        print(f"Created: {pod.metadata.creation_timestamp}")
        
        # Labels
        print("\nLabels:")
        for key, value in (pod.metadata.labels or {}).items():
            print(f"  {key}: {value}")
        
        # Containers
        print("\nContainers:")
        for container in pod.spec.containers:
            print(f"  Name: {container.name}")
            print(f"  Image: {container.image}")
            if container.ports:
                print(f"  Ports: {[p.container_port for p in container.ports]}")
        
        # Conditions
        print("\nConditions:")
        for condition in (pod.status.conditions or []):
            print(f"  {condition.type}: {condition.status} (Reason: {condition.reason})")
        
        return pod
    
    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Pod {name} not found in namespace {namespace}")
        else:
            print(f"Error: {e}")
        return None

# 使用
get_pod("default", "nginx-pod")

创建 Pod

def create_pod(namespace="default"):
    """创建 Pod"""
    # 定义 Pod
    pod = client.V1Pod(
        api_version="v1",
        kind="Pod",
        metadata=client.V1ObjectMeta(
            name="nginx-pod",
            labels={
                "app": "nginx",
                "env": "test"
            }
        ),
        spec=client.V1PodSpec(
            containers=[
                client.V1Container(
                    name="nginx",
                    image="nginx:1.21",
                    ports=[
                        client.V1ContainerPort(
                            name="http",
                            container_port=80,
                            protocol="TCP"
                        )
                    ],
                    resources=client.V1ResourceRequirements(
                        requests={
                            "cpu": "100m",
                            "memory": "128Mi"
                        },
                        limits={
                            "cpu": "200m",
                            "memory": "256Mi"
                        }
                    ),
                    env=[
                        client.V1EnvVar(
                            name="ENV",
                            value="production"
                        )
                    ]
                )
            ],
            restart_policy="Always"
        )
    )
    
    try:
        # 创建 Pod
        response = v1.create_namespaced_pod(namespace, pod)
        print(f"Pod created: {response.metadata.name}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error creating pod: {e}")
        return None

# 使用
create_pod("default")

更新 Pod

def update_pod(namespace, name):
    """更新 Pod(只能修改部分字段)"""
    try:
        # 1. 获取现有 Pod
        pod = v1.read_namespaced_pod(name, namespace)
        
        # 2. 修改标签和注解
        if pod.metadata.labels is None:
            pod.metadata.labels = {}
        pod.metadata.labels["updated"] = "true"
        
        if pod.metadata.annotations is None:
            pod.metadata.annotations = {}
        pod.metadata.annotations["updated-at"] = str(datetime.now())
        
        # 3. 更新 Pod
        response = v1.patch_namespaced_pod(name, namespace, pod)
        print(f"Pod updated: {response.metadata.name}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error updating pod: {e}")
        return None

# 使用
from datetime import datetime
update_pod("default", "nginx-pod")

删除 Pod

def delete_pod(namespace, name, grace_period_seconds=30):
    """删除 Pod"""
    try:
        # 删除选项
        delete_options = client.V1DeleteOptions(
            grace_period_seconds=grace_period_seconds,
            propagation_policy="Foreground"
        )
        
        response = v1.delete_namespaced_pod(
            name,
            namespace,
            body=delete_options
        )
        print(f"Pod deleted: {name}")
        return response
    
    except client.exceptions.ApiException as e:
        if e.status == 404:
            print(f"Pod {name} not found")
        else:
            print(f"Error deleting pod: {e}")
        return None

# 使用
delete_pod("default", "nginx-pod")

批量删除 Pods

def delete_pods_with_selector(namespace, label_selector):
    """批量删除符合条件的 Pods"""
    try:
        response = v1.delete_collection_namespaced_pod(
            namespace,
            label_selector=label_selector
        )
        print(f"Deleted pods matching selector: {label_selector}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error deleting pods: {e}")
        return None

# 使用
delete_pods_with_selector("default", "app=nginx,env=test")

获取 Pod 日志

def get_pod_logs(namespace, pod_name, container_name=None, tail_lines=100):
    """获取 Pod 日志"""
    try:
        logs = v1.read_namespaced_pod_log(
            name=pod_name,
            namespace=namespace,
            container=container_name,
            tail_lines=tail_lines,
            timestamps=True
        )
        
        print(f"Logs for pod {pod_name}:")
        print(logs)
        return logs
    
    except client.exceptions.ApiException as e:
        print(f"Error getting logs: {e}")
        return None

# 使用
get_pod_logs("default", "nginx-pod", tail_lines=50)

实时跟踪日志

from kubernetes.watch import Watch

def follow_pod_logs(namespace, pod_name, container_name=None):
    """实时跟踪 Pod 日志"""
    w = Watch()
    
    try:
        for line in w.stream(
            v1.read_namespaced_pod_log,
            name=pod_name,
            namespace=namespace,
            container=container_name,
            follow=True,
            timestamps=True
        ):
            print(line)
    
    except KeyboardInterrupt:
        print("\nStopped following logs")
    except client.exceptions.ApiException as e:
        print(f"Error following logs: {e}")

# 使用(Ctrl+C 停止)
follow_pod_logs("default", "nginx-pod")

Pod Exec(执行命令)

from kubernetes.stream import stream

def exec_in_pod(namespace, pod_name, container_name, command):
    """在 Pod 中执行命令"""
    try:
        response = stream(
            v1.connect_get_namespaced_pod_exec,
            name=pod_name,
            namespace=namespace,
            container=container_name,
            command=command,
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False
        )
        
        print("Command output:")
        print(response)
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error executing command: {e}")
        return None

# 使用
exec_in_pod("default", "nginx-pod", "nginx", ["ls", "-la", "/etc/nginx"])

Deployment 操作

创建 Deployment

from kubernetes import client

apps_v1 = client.AppsV1Api()

def create_deployment(namespace="default"):
    """创建 Deployment"""
    # 定义容器
    container = client.V1Container(
        name="nginx",
        image="nginx:1.21",
        ports=[client.V1ContainerPort(container_port=80)]
    )
    
    # 定义 Pod 模板
    template = client.V1PodTemplateSpec(
        metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
        spec=client.V1PodSpec(containers=[container])
    )
    
    # 定义 Deployment Spec
    spec = client.V1DeploymentSpec(
        replicas=3,
        selector=client.V1LabelSelector(
            match_labels={"app": "nginx"}
        ),
        template=template
    )
    
    # 定义 Deployment
    deployment = client.V1Deployment(
        api_version="apps/v1",
        kind="Deployment",
        metadata=client.V1ObjectMeta(name="nginx-deployment"),
        spec=spec
    )
    
    try:
        response = apps_v1.create_namespaced_deployment(
            namespace=namespace,
            body=deployment
        )
        print(f"Deployment created: {response.metadata.name}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error creating deployment: {e}")
        return None

# 使用
create_deployment("default")

更新 Deployment(滚动更新)

def update_deployment_image(namespace, name, new_image):
    """更新 Deployment 镜像"""
    try:
        # 1. 获取现有 Deployment
        deployment = apps_v1.read_namespaced_deployment(name, namespace)
        
        # 2. 更新镜像
        deployment.spec.template.spec.containers[0].image = new_image
        
        # 3. 应用更新
        response = apps_v1.patch_namespaced_deployment(
            name=name,
            namespace=namespace,
            body=deployment
        )
        print(f"Deployment updated with new image: {new_image}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error updating deployment: {e}")
        return None

# 使用
update_deployment_image("default", "nginx-deployment", "nginx:1.22")

扩缩容 Deployment

def scale_deployment(namespace, name, replicas):
    """扩缩容 Deployment"""
    try:
        # 方式 1:使用 patch(推荐)
        body = {
            "spec": {
                "replicas": replicas
            }
        }
        response = apps_v1.patch_namespaced_deployment_scale(
            name=name,
            namespace=namespace,
            body=body
        )
        
        print(f"Deployment scaled to {replicas} replicas")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error scaling deployment: {e}")
        return None

# 使用
scale_deployment("default", "nginx-deployment", 5)

查看 Deployment 状态

def get_deployment_status(namespace, name):
    """获取 Deployment 状态"""
    try:
        deployment = apps_v1.read_namespaced_deployment(name, namespace)
        
        print(f"Deployment: {deployment.metadata.name}")
        print(f"Desired Replicas: {deployment.spec.replicas}")
        print(f"Current Replicas: {deployment.status.replicas}")
        print(f"Ready Replicas: {deployment.status.ready_replicas}")
        print(f"Available Replicas: {deployment.status.available_replicas}")
        print(f"Updated Replicas: {deployment.status.updated_replicas}")
        
        # Conditions
        print("\nConditions:")
        for condition in (deployment.status.conditions or []):
            print(f"  Type: {condition.type}")
            print(f"  Status: {condition.status}")
            print(f"  Reason: {condition.reason}")
            print(f"  Message: {condition.message}")
            print()
        
        return deployment
    
    except client.exceptions.ApiException as e:
        print(f"Error getting deployment status: {e}")
        return None

# 使用
get_deployment_status("default", "nginx-deployment")

删除 Deployment

def delete_deployment(namespace, name):
    """删除 Deployment"""
    try:
        response = apps_v1.delete_namespaced_deployment(
            name=name,
            namespace=namespace,
            body=client.V1DeleteOptions(
                propagation_policy="Foreground",
                grace_period_seconds=30
            )
        )
        print(f"Deployment deleted: {name}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error deleting deployment: {e}")
        return None

# 使用
delete_deployment("default", "nginx-deployment")

Service 操作

创建 Service

def create_service(namespace="default"):
    """创建 ClusterIP Service"""
    service = client.V1Service(
        api_version="v1",
        kind="Service",
        metadata=client.V1ObjectMeta(name="nginx-service"),
        spec=client.V1ServiceSpec(
            selector={"app": "nginx"},
            type="ClusterIP",
            ports=[
                client.V1ServicePort(
                    name="http",
                    protocol="TCP",
                    port=80,
                    target_port=80
                )
            ]
        )
    )
    
    try:
        response = v1.create_namespaced_service(namespace, service)
        print(f"Service created: {response.metadata.name}")
        print(f"ClusterIP: {response.spec.cluster_ip}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error creating service: {e}")
        return None

# 使用
create_service("default")

创建 NodePort Service

def create_nodeport_service(namespace="default"):
    """创建 NodePort Service"""
    service = client.V1Service(
        metadata=client.V1ObjectMeta(name="nginx-nodeport"),
        spec=client.V1ServiceSpec(
            selector={"app": "nginx"},
            type="NodePort",
            ports=[
                client.V1ServicePort(
                    port=80,
                    target_port=80,
                    node_port=30080  # 可选:指定 NodePort
                )
            ]
        )
    )
    
    try:
        response = v1.create_namespaced_service(namespace, service)
        print(f"NodePort Service created: {response.metadata.name}")
        print(f"NodePort: {response.spec.ports[0].node_port}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error creating service: {e}")
        return None

# 使用
create_nodeport_service("default")

创建 LoadBalancer Service

def create_loadbalancer_service(namespace="default"):
    """创建 LoadBalancer Service"""
    service = client.V1Service(
        metadata=client.V1ObjectMeta(name="nginx-lb"),
        spec=client.V1ServiceSpec(
            selector={"app": "nginx"},
            type="LoadBalancer",
            ports=[
                client.V1ServicePort(
                    port=80,
                    target_port=80
                )
            ]
        )
    )
    
    try:
        response = v1.create_namespaced_service(namespace, service)
        print(f"LoadBalancer Service created: {response.metadata.name}")
        
        # LoadBalancer IP(可能需要等待云提供商分配)
        if response.status.load_balancer.ingress:
            print(f"External IP: {response.status.load_balancer.ingress[0].ip}")
        
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error creating service: {e}")
        return None

# 使用
create_loadbalancer_service("default")

ConfigMap & Secret 操作

ConfigMap

def create_configmap(namespace="default"):
    """创建 ConfigMap"""
    configmap = client.V1ConfigMap(
        metadata=client.V1ObjectMeta(name="app-config"),
        data={
            "app.properties": "key1=value1\nkey2=value2",
            "config.json": '{"setting": "value"}'
        },
        binary_data={
            "binary-file": b"Hello"  # bytes 类型会自动 base64 编码
        }
    )
    
    try:
        response = v1.create_namespaced_config_map(namespace, configmap)
        print(f"ConfigMap created: {response.metadata.name}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error creating configmap: {e}")
        return None

# 使用
create_configmap("default")

Secret

def create_secret(namespace="default"):
    """创建 Secret"""
    import base64
    
    secret = client.V1Secret(
        metadata=client.V1ObjectMeta(name="app-secret"),
        type="Opaque",
        # 方式 1:使用 string_data(自动 base64 编码)
        string_data={
            "username": "admin",
            "password": "secret123"
        }
        # 方式 2:使用 data(需要手动 base64 编码)
        # data={
        #     "username": base64.b64encode(b"admin").decode(),
        #     "password": base64.b64encode(b"secret123").decode()
        # }
    )
    
    try:
        response = v1.create_namespaced_secret(namespace, secret)
        print(f"Secret created: {response.metadata.name}")
        return response
    
    except client.exceptions.ApiException as e:
        print(f"Error creating secret: {e}")
        return None

# 使用
create_secret("default")

Watch 机制

监听 Pod 变化

from kubernetes.watch import Watch

def watch_pods(namespace="default", timeout_seconds=60):
    """监听 Pod 变化"""
    w = Watch()
    
    print(f"Watching pods in namespace '{namespace}'...")
    
    try:
        for event in w.stream(
            v1.list_namespaced_pod,
            namespace=namespace,
            timeout_seconds=timeout_seconds
        ):
            event_type = event['type']  # ADDED, MODIFIED, DELETED
            pod = event['object']
            
            print(f"Event: {event_type}")
            print(f"  Pod: {pod.metadata.name}")
            print(f"  Phase: {pod.status.phase}")
            print()
    
    except KeyboardInterrupt:
        print("Stopped watching")

# 使用(60秒后自动停止,或 Ctrl+C 手动停止)
watch_pods("default", timeout_seconds=60)

从特定 ResourceVersion 开始 Watch

def watch_pods_from_version(namespace="default"):
    """从特定版本开始 Watch"""
    # 1. 先列出所有 Pods 获取当前 resourceVersion
    pods = v1.list_namespaced_pod(namespace)
    resource_version = pods.metadata.resource_version
    
    print(f"Starting watch from resource version: {resource_version}")
    
    # 2. 从该版本开始 Watch
    w = Watch()
    for event in w.stream(
        v1.list_namespaced_pod,
        namespace=namespace,
        resource_version=resource_version,
        timeout_seconds=60
    ):
        event_type = event['type']
        pod = event['object']
        print(f"{event_type}: {pod.metadata.name} (Phase: {pod.status.phase})")

# 使用
watch_pods_from_version("default")

监听多种资源

def watch_multiple_resources(namespace="default"):
    """同时监听多种资源"""
    import threading
    
    def watch_pods():
        w = Watch()
        for event in w.stream(v1.list_namespaced_pod, namespace, timeout_seconds=60):
            print(f"[POD] {event['type']}: {event['object'].metadata.name}")
    
    def watch_services():
        w = Watch()
        for event in w.stream(v1.list_namespaced_service, namespace, timeout_seconds=60):
            print(f"[SERVICE] {event['type']}: {event['object'].metadata.name}")
    
    # 启动多个线程监听
    threads = [
        threading.Thread(target=watch_pods),
        threading.Thread(target=watch_services)
    ]
    
    for t in threads:
        t.start()
    
    for t in threads:
        t.join()

# 使用
watch_multiple_resources("default")

错误处理

异常处理

from kubernetes.client.rest import ApiException

def handle_api_exception(namespace, pod_name):
    """处理 API 异常"""
    try:
        pod = v1.read_namespaced_pod(pod_name, namespace)
        return pod
    
    except ApiException as e:
        if e.status == 404:
            print(f"Pod {pod_name} not found in namespace {namespace}")
        elif e.status == 403:
            print(f"Forbidden: insufficient permissions")
        elif e.status == 401:
            print(f"Unauthorized: authentication required")
        elif e.status == 409:
            print(f"Conflict: resource version mismatch")
        elif e.status == 422:
            print(f"Invalid: validation failed")
            print(f"Details: {e.body}")
        else:
            print(f"API Exception: {e}")
        
        return None
    
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

# 使用
handle_api_exception("default", "non-existent-pod")

重试机制

import time
from kubernetes.client.rest import ApiException

def retry_on_error(func, max_retries=3, delay=1):
    """重试装饰器"""
    def wrapper(*args, **kwargs):
        for attempt in range(max_retries):
            try:
                return func(*args, **kwargs)
            except ApiException as e:
                if e.status in [500, 503, 504]:  # 服务器错误可重试
                    if attempt < max_retries - 1:
                        print(f"Retry {attempt + 1}/{max_retries} after {delay}s...")
                        time.sleep(delay)
                    else:
                        raise
                else:
                    raise  # 其他错误直接抛出
    return wrapper

# 使用
@retry_on_error
def get_pod_with_retry(namespace, name):
    return v1.read_namespaced_pod(name, namespace)

# 调用
pod = get_pod_with_retry("default", "nginx-pod")

实用工具函数

等待 Pod 就绪

import time

def wait_for_pod_ready(namespace, pod_name, timeout=300):
    """等待 Pod 进入 Ready 状态"""
    start_time = time.time()
    
    while time.time() - start_time < timeout:
        try:
            pod = v1.read_namespaced_pod(pod_name, namespace)
            
            # 检查 Pod 是否 Ready
            if pod.status.conditions:
                for condition in pod.status.conditions:
                    if condition.type == "Ready" and condition.status == "True":
                        print(f"Pod {pod_name} is ready!")
                        return True
            
            print(f"Waiting for pod {pod_name} to be ready... (Phase: {pod.status.phase})")
            time.sleep(5)
        
        except ApiException as e:
            print(f"Error checking pod status: {e}")
            time.sleep(5)
    
    print(f"Timeout waiting for pod {pod_name} to be ready")
    return False

# 使用
wait_for_pod_ready("default", "nginx-pod", timeout=300)

批量操作

def batch_create_pods(namespace, count=10):
    """批量创建 Pods"""
    for i in range(count):
        pod = client.V1Pod(
            metadata=client.V1ObjectMeta(name=f"nginx-pod-{i}"),
            spec=client.V1PodSpec(
                containers=[
                    client.V1Container(
                        name="nginx",
                        image="nginx:1.21"
                    )
                ]
            )
        )
        
        try:
            v1.create_namespaced_pod(namespace, pod)
            print(f"Created pod: nginx-pod-{i}")
        except ApiException as e:
            print(f"Error creating pod {i}: {e}")

# 使用
batch_create_pods("default", count=5)

总结

Python Client 核心 API

v1 = client.CoreV1Api()              # Pod, Service, ConfigMap, Secret, Namespace...
apps_v1 = client.AppsV1Api()         # Deployment, StatefulSet, DaemonSet
batch_v1 = client.BatchV1Api()       # Job, CronJob
networking_v1 = client.NetworkingV1Api()  # Ingress, NetworkPolicy
rbac_v1 = client.RbacAuthorizationV1Api()  # Role, RoleBinding, ClusterRole
storage_v1 = client.StorageV1Api()   # StorageClass, VolumeAttachment

最佳实践

异常处理: 使用 try-except 处理 ApiException
资源清理: 使用 try-finally 确保资源释放
Watch 超时: 设置 timeout_seconds 避免无限等待
Label Selector: 高效过滤资源
批量操作: 使用 delete_collection 批量删除
重试机制: 对临时错误实现重试

常见陷阱

忘记指定 namespace: 很多 API 需要明确指定
ResourceVersion 冲突: 更新资源时要注意版本
Watch 不设超时: 可能导致连接泄漏
忘记关闭 Watch: 使用完要 stop()

下一节将介绍 Watch、Informer、Controller 等高级模式。