Python Client 深度实战
Python Client 深度实战
kubernetes-python 介绍
kubernetes (Python client) 是 Kubernetes 官方的 Python 客户端库,提供了与 Kubernetes API Server 交互的完整功能。
特点
- ✅ 官方维护:由 Kubernetes 官方维护
- ✅ 自动生成:从 OpenAPI spec 自动生成
- ✅ 类型化 API:强类型化的 API
- ✅ Watch 支持:支持资源监听
- ✅ 完整功能:覆盖所有 Kubernetes 资源
安装
# 安装最新版本
pip install kubernetes
# 安装特定版本
pip install kubernetes==28.1.0
# 查看版本
python -c "import kubernetes; print(kubernetes.__version__)"
版本对应关系
| Kubernetes | Python Client |
|---|---|
| 1.28 | 28.x.x |
| 1.27 | 27.x.x |
| 1.26 | 26.x.x |
| 1.25 | 25.x.x |
初始化客户端
方式 1:从 kubeconfig 初始化(集群外)
from kubernetes import client, config
# 加载 kubeconfig(默认 ~/.kube/config)
config.load_kube_config()
# 创建 API 客户端
v1 = client.CoreV1Api()
# 验证连接
version = v1.get_api_resources()
print("Connected to Kubernetes cluster!")
方式 2:指定 kubeconfig 路径
from kubernetes import client, config
import os
# 指定 kubeconfig 文件路径
kubeconfig_path = os.path.expanduser("~/.kube/config")
config.load_kube_config(config_file=kubeconfig_path)
v1 = client.CoreV1Api()
方式 3:从集群内初始化(Pod 内)
from kubernetes import client, config
# 加载集群内配置(从 ServiceAccount)
config.load_incluster_config()
v1 = client.CoreV1Api()
print("Running inside Kubernetes cluster!")
方式 4:自适应初始化(推荐)
from kubernetes import client, config
from kubernetes.config.config_exception import ConfigException
def get_kubernetes_client():
"""自适应获取 Kubernetes 客户端"""
try:
# 尝试加载集群内配置
config.load_incluster_config()
print("Using in-cluster config")
except ConfigException:
# 如果失败,加载 kubeconfig
config.load_kube_config()
print("Using kubeconfig")
return client.CoreV1Api()
# 使用
v1 = get_kubernetes_client()
方式 5:手动配置
from kubernetes import client
# 手动创建配置
configuration = client.Configuration()
configuration.host = "https://kubernetes.example.com:6443"
configuration.api_key = {"authorization": "Bearer YOUR_TOKEN"}
configuration.verify_ssl = False # 生产环境应该设为 True
# 使用配置创建客户端
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
Pod 操作
列出 Pods
from kubernetes import client, config
config.load_kube_config()
v1 = client.CoreV1Api()
def list_pods(namespace="default"):
"""列出指定命名空间的所有 Pods"""
try:
pods = v1.list_namespaced_pod(namespace)
print(f"Found {len(pods.items)} pods in namespace '{namespace}':")
for pod in pods.items:
print(f"- {pod.metadata.name}")
print(f" Status: {pod.status.phase}")
print(f" Node: {pod.spec.node_name}")
print(f" IP: {pod.status.pod_ip}")
# 列出容器
for container in pod.spec.containers:
print(f" Container: {container.name} (Image: {container.image})")
print()
except client.exceptions.ApiException as e:
print(f"Exception: {e}")
# 使用
list_pods("default")
使用 Label Selector
def list_pods_with_selector(namespace="default", label_selector=""):
"""使用 Label Selector 过滤 Pods"""
pods = v1.list_namespaced_pod(
namespace,
label_selector=label_selector # 例如:app=nginx,env=production
)
print(f"Found {len(pods.items)} pods matching selector '{label_selector}':")
for pod in pods.items:
print(f"- {pod.metadata.name}")
print(f" Labels: {pod.metadata.labels}")
# 使用
list_pods_with_selector("default", "app=nginx")
使用 Field Selector
def list_running_pods(namespace="default"):
"""列出所有 Running 状态的 Pods"""
pods = v1.list_namespaced_pod(
namespace,
field_selector="status.phase=Running"
)
print(f"Found {len(pods.items)} running pods:")
for pod in pods.items:
print(f"- {pod.metadata.name} (Phase: {pod.status.phase})")
# 使用
list_running_pods("default")
获取单个 Pod
def get_pod(namespace, name):
"""获取指定的 Pod"""
try:
pod = v1.read_namespaced_pod(name, namespace)
print(f"Pod Name: {pod.metadata.name}")
print(f"Namespace: {pod.metadata.namespace}")
print(f"Status: {pod.status.phase}")
print(f"Node: {pod.spec.node_name}")
print(f"IP: {pod.status.pod_ip}")
print(f"Created: {pod.metadata.creation_timestamp}")
# Labels
print("\nLabels:")
for key, value in (pod.metadata.labels or {}).items():
print(f" {key}: {value}")
# Containers
print("\nContainers:")
for container in pod.spec.containers:
print(f" Name: {container.name}")
print(f" Image: {container.image}")
if container.ports:
print(f" Ports: {[p.container_port for p in container.ports]}")
# Conditions
print("\nConditions:")
for condition in (pod.status.conditions or []):
print(f" {condition.type}: {condition.status} (Reason: {condition.reason})")
return pod
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Pod {name} not found in namespace {namespace}")
else:
print(f"Error: {e}")
return None
# 使用
get_pod("default", "nginx-pod")
创建 Pod
def create_pod(namespace="default"):
"""创建 Pod"""
# 定义 Pod
pod = client.V1Pod(
api_version="v1",
kind="Pod",
metadata=client.V1ObjectMeta(
name="nginx-pod",
labels={
"app": "nginx",
"env": "test"
}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="nginx",
image="nginx:1.21",
ports=[
client.V1ContainerPort(
name="http",
container_port=80,
protocol="TCP"
)
],
resources=client.V1ResourceRequirements(
requests={
"cpu": "100m",
"memory": "128Mi"
},
limits={
"cpu": "200m",
"memory": "256Mi"
}
),
env=[
client.V1EnvVar(
name="ENV",
value="production"
)
]
)
],
restart_policy="Always"
)
)
try:
# 创建 Pod
response = v1.create_namespaced_pod(namespace, pod)
print(f"Pod created: {response.metadata.name}")
return response
except client.exceptions.ApiException as e:
print(f"Error creating pod: {e}")
return None
# 使用
create_pod("default")
更新 Pod
def update_pod(namespace, name):
"""更新 Pod(只能修改部分字段)"""
try:
# 1. 获取现有 Pod
pod = v1.read_namespaced_pod(name, namespace)
# 2. 修改标签和注解
if pod.metadata.labels is None:
pod.metadata.labels = {}
pod.metadata.labels["updated"] = "true"
if pod.metadata.annotations is None:
pod.metadata.annotations = {}
pod.metadata.annotations["updated-at"] = str(datetime.now())
# 3. 更新 Pod
response = v1.patch_namespaced_pod(name, namespace, pod)
print(f"Pod updated: {response.metadata.name}")
return response
except client.exceptions.ApiException as e:
print(f"Error updating pod: {e}")
return None
# 使用
from datetime import datetime
update_pod("default", "nginx-pod")
删除 Pod
def delete_pod(namespace, name, grace_period_seconds=30):
"""删除 Pod"""
try:
# 删除选项
delete_options = client.V1DeleteOptions(
grace_period_seconds=grace_period_seconds,
propagation_policy="Foreground"
)
response = v1.delete_namespaced_pod(
name,
namespace,
body=delete_options
)
print(f"Pod deleted: {name}")
return response
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"Pod {name} not found")
else:
print(f"Error deleting pod: {e}")
return None
# 使用
delete_pod("default", "nginx-pod")
批量删除 Pods
def delete_pods_with_selector(namespace, label_selector):
"""批量删除符合条件的 Pods"""
try:
response = v1.delete_collection_namespaced_pod(
namespace,
label_selector=label_selector
)
print(f"Deleted pods matching selector: {label_selector}")
return response
except client.exceptions.ApiException as e:
print(f"Error deleting pods: {e}")
return None
# 使用
delete_pods_with_selector("default", "app=nginx,env=test")
获取 Pod 日志
def get_pod_logs(namespace, pod_name, container_name=None, tail_lines=100):
"""获取 Pod 日志"""
try:
logs = v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
container=container_name,
tail_lines=tail_lines,
timestamps=True
)
print(f"Logs for pod {pod_name}:")
print(logs)
return logs
except client.exceptions.ApiException as e:
print(f"Error getting logs: {e}")
return None
# 使用
get_pod_logs("default", "nginx-pod", tail_lines=50)
实时跟踪日志
from kubernetes.watch import Watch
def follow_pod_logs(namespace, pod_name, container_name=None):
"""实时跟踪 Pod 日志"""
w = Watch()
try:
for line in w.stream(
v1.read_namespaced_pod_log,
name=pod_name,
namespace=namespace,
container=container_name,
follow=True,
timestamps=True
):
print(line)
except KeyboardInterrupt:
print("\nStopped following logs")
except client.exceptions.ApiException as e:
print(f"Error following logs: {e}")
# 使用(Ctrl+C 停止)
follow_pod_logs("default", "nginx-pod")
Pod Exec(执行命令)
from kubernetes.stream import stream
def exec_in_pod(namespace, pod_name, container_name, command):
"""在 Pod 中执行命令"""
try:
response = stream(
v1.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
container=container_name,
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False
)
print("Command output:")
print(response)
return response
except client.exceptions.ApiException as e:
print(f"Error executing command: {e}")
return None
# 使用
exec_in_pod("default", "nginx-pod", "nginx", ["ls", "-la", "/etc/nginx"])
Deployment 操作
创建 Deployment
from kubernetes import client
apps_v1 = client.AppsV1Api()
def create_deployment(namespace="default"):
"""创建 Deployment"""
# 定义容器
container = client.V1Container(
name="nginx",
image="nginx:1.21",
ports=[client.V1ContainerPort(container_port=80)]
)
# 定义 Pod 模板
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
spec=client.V1PodSpec(containers=[container])
)
# 定义 Deployment Spec
spec = client.V1DeploymentSpec(
replicas=3,
selector=client.V1LabelSelector(
match_labels={"app": "nginx"}
),
template=template
)
# 定义 Deployment
deployment = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=client.V1ObjectMeta(name="nginx-deployment"),
spec=spec
)
try:
response = apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment
)
print(f"Deployment created: {response.metadata.name}")
return response
except client.exceptions.ApiException as e:
print(f"Error creating deployment: {e}")
return None
# 使用
create_deployment("default")
更新 Deployment(滚动更新)
def update_deployment_image(namespace, name, new_image):
"""更新 Deployment 镜像"""
try:
# 1. 获取现有 Deployment
deployment = apps_v1.read_namespaced_deployment(name, namespace)
# 2. 更新镜像
deployment.spec.template.spec.containers[0].image = new_image
# 3. 应用更新
response = apps_v1.patch_namespaced_deployment(
name=name,
namespace=namespace,
body=deployment
)
print(f"Deployment updated with new image: {new_image}")
return response
except client.exceptions.ApiException as e:
print(f"Error updating deployment: {e}")
return None
# 使用
update_deployment_image("default", "nginx-deployment", "nginx:1.22")
扩缩容 Deployment
def scale_deployment(namespace, name, replicas):
"""扩缩容 Deployment"""
try:
# 方式 1:使用 patch(推荐)
body = {
"spec": {
"replicas": replicas
}
}
response = apps_v1.patch_namespaced_deployment_scale(
name=name,
namespace=namespace,
body=body
)
print(f"Deployment scaled to {replicas} replicas")
return response
except client.exceptions.ApiException as e:
print(f"Error scaling deployment: {e}")
return None
# 使用
scale_deployment("default", "nginx-deployment", 5)
查看 Deployment 状态
def get_deployment_status(namespace, name):
"""获取 Deployment 状态"""
try:
deployment = apps_v1.read_namespaced_deployment(name, namespace)
print(f"Deployment: {deployment.metadata.name}")
print(f"Desired Replicas: {deployment.spec.replicas}")
print(f"Current Replicas: {deployment.status.replicas}")
print(f"Ready Replicas: {deployment.status.ready_replicas}")
print(f"Available Replicas: {deployment.status.available_replicas}")
print(f"Updated Replicas: {deployment.status.updated_replicas}")
# Conditions
print("\nConditions:")
for condition in (deployment.status.conditions or []):
print(f" Type: {condition.type}")
print(f" Status: {condition.status}")
print(f" Reason: {condition.reason}")
print(f" Message: {condition.message}")
print()
return deployment
except client.exceptions.ApiException as e:
print(f"Error getting deployment status: {e}")
return None
# 使用
get_deployment_status("default", "nginx-deployment")
删除 Deployment
def delete_deployment(namespace, name):
"""删除 Deployment"""
try:
response = apps_v1.delete_namespaced_deployment(
name=name,
namespace=namespace,
body=client.V1DeleteOptions(
propagation_policy="Foreground",
grace_period_seconds=30
)
)
print(f"Deployment deleted: {name}")
return response
except client.exceptions.ApiException as e:
print(f"Error deleting deployment: {e}")
return None
# 使用
delete_deployment("default", "nginx-deployment")
Service 操作
创建 Service
def create_service(namespace="default"):
"""创建 ClusterIP Service"""
service = client.V1Service(
api_version="v1",
kind="Service",
metadata=client.V1ObjectMeta(name="nginx-service"),
spec=client.V1ServiceSpec(
selector={"app": "nginx"},
type="ClusterIP",
ports=[
client.V1ServicePort(
name="http",
protocol="TCP",
port=80,
target_port=80
)
]
)
)
try:
response = v1.create_namespaced_service(namespace, service)
print(f"Service created: {response.metadata.name}")
print(f"ClusterIP: {response.spec.cluster_ip}")
return response
except client.exceptions.ApiException as e:
print(f"Error creating service: {e}")
return None
# 使用
create_service("default")
创建 NodePort Service
def create_nodeport_service(namespace="default"):
"""创建 NodePort Service"""
service = client.V1Service(
metadata=client.V1ObjectMeta(name="nginx-nodeport"),
spec=client.V1ServiceSpec(
selector={"app": "nginx"},
type="NodePort",
ports=[
client.V1ServicePort(
port=80,
target_port=80,
node_port=30080 # 可选:指定 NodePort
)
]
)
)
try:
response = v1.create_namespaced_service(namespace, service)
print(f"NodePort Service created: {response.metadata.name}")
print(f"NodePort: {response.spec.ports[0].node_port}")
return response
except client.exceptions.ApiException as e:
print(f"Error creating service: {e}")
return None
# 使用
create_nodeport_service("default")
创建 LoadBalancer Service
def create_loadbalancer_service(namespace="default"):
"""创建 LoadBalancer Service"""
service = client.V1Service(
metadata=client.V1ObjectMeta(name="nginx-lb"),
spec=client.V1ServiceSpec(
selector={"app": "nginx"},
type="LoadBalancer",
ports=[
client.V1ServicePort(
port=80,
target_port=80
)
]
)
)
try:
response = v1.create_namespaced_service(namespace, service)
print(f"LoadBalancer Service created: {response.metadata.name}")
# LoadBalancer IP(可能需要等待云提供商分配)
if response.status.load_balancer.ingress:
print(f"External IP: {response.status.load_balancer.ingress[0].ip}")
return response
except client.exceptions.ApiException as e:
print(f"Error creating service: {e}")
return None
# 使用
create_loadbalancer_service("default")
ConfigMap & Secret 操作
ConfigMap
def create_configmap(namespace="default"):
"""创建 ConfigMap"""
configmap = client.V1ConfigMap(
metadata=client.V1ObjectMeta(name="app-config"),
data={
"app.properties": "key1=value1\nkey2=value2",
"config.json": '{"setting": "value"}'
},
binary_data={
"binary-file": b"Hello" # bytes 类型会自动 base64 编码
}
)
try:
response = v1.create_namespaced_config_map(namespace, configmap)
print(f"ConfigMap created: {response.metadata.name}")
return response
except client.exceptions.ApiException as e:
print(f"Error creating configmap: {e}")
return None
# 使用
create_configmap("default")
Secret
def create_secret(namespace="default"):
"""创建 Secret"""
import base64
secret = client.V1Secret(
metadata=client.V1ObjectMeta(name="app-secret"),
type="Opaque",
# 方式 1:使用 string_data(自动 base64 编码)
string_data={
"username": "admin",
"password": "secret123"
}
# 方式 2:使用 data(需要手动 base64 编码)
# data={
# "username": base64.b64encode(b"admin").decode(),
# "password": base64.b64encode(b"secret123").decode()
# }
)
try:
response = v1.create_namespaced_secret(namespace, secret)
print(f"Secret created: {response.metadata.name}")
return response
except client.exceptions.ApiException as e:
print(f"Error creating secret: {e}")
return None
# 使用
create_secret("default")
Watch 机制
监听 Pod 变化
from kubernetes.watch import Watch
def watch_pods(namespace="default", timeout_seconds=60):
"""监听 Pod 变化"""
w = Watch()
print(f"Watching pods in namespace '{namespace}'...")
try:
for event in w.stream(
v1.list_namespaced_pod,
namespace=namespace,
timeout_seconds=timeout_seconds
):
event_type = event['type'] # ADDED, MODIFIED, DELETED
pod = event['object']
print(f"Event: {event_type}")
print(f" Pod: {pod.metadata.name}")
print(f" Phase: {pod.status.phase}")
print()
except KeyboardInterrupt:
print("Stopped watching")
# 使用(60秒后自动停止,或 Ctrl+C 手动停止)
watch_pods("default", timeout_seconds=60)
从特定 ResourceVersion 开始 Watch
def watch_pods_from_version(namespace="default"):
"""从特定版本开始 Watch"""
# 1. 先列出所有 Pods 获取当前 resourceVersion
pods = v1.list_namespaced_pod(namespace)
resource_version = pods.metadata.resource_version
print(f"Starting watch from resource version: {resource_version}")
# 2. 从该版本开始 Watch
w = Watch()
for event in w.stream(
v1.list_namespaced_pod,
namespace=namespace,
resource_version=resource_version,
timeout_seconds=60
):
event_type = event['type']
pod = event['object']
print(f"{event_type}: {pod.metadata.name} (Phase: {pod.status.phase})")
# 使用
watch_pods_from_version("default")
监听多种资源
def watch_multiple_resources(namespace="default"):
"""同时监听多种资源"""
import threading
def watch_pods():
w = Watch()
for event in w.stream(v1.list_namespaced_pod, namespace, timeout_seconds=60):
print(f"[POD] {event['type']}: {event['object'].metadata.name}")
def watch_services():
w = Watch()
for event in w.stream(v1.list_namespaced_service, namespace, timeout_seconds=60):
print(f"[SERVICE] {event['type']}: {event['object'].metadata.name}")
# 启动多个线程监听
threads = [
threading.Thread(target=watch_pods),
threading.Thread(target=watch_services)
]
for t in threads:
t.start()
for t in threads:
t.join()
# 使用
watch_multiple_resources("default")
错误处理
异常处理
from kubernetes.client.rest import ApiException
def handle_api_exception(namespace, pod_name):
"""处理 API 异常"""
try:
pod = v1.read_namespaced_pod(pod_name, namespace)
return pod
except ApiException as e:
if e.status == 404:
print(f"Pod {pod_name} not found in namespace {namespace}")
elif e.status == 403:
print(f"Forbidden: insufficient permissions")
elif e.status == 401:
print(f"Unauthorized: authentication required")
elif e.status == 409:
print(f"Conflict: resource version mismatch")
elif e.status == 422:
print(f"Invalid: validation failed")
print(f"Details: {e.body}")
else:
print(f"API Exception: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# 使用
handle_api_exception("default", "non-existent-pod")
重试机制
import time
from kubernetes.client.rest import ApiException
def retry_on_error(func, max_retries=3, delay=1):
"""重试装饰器"""
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except ApiException as e:
if e.status in [500, 503, 504]: # 服务器错误可重试
if attempt < max_retries - 1:
print(f"Retry {attempt + 1}/{max_retries} after {delay}s...")
time.sleep(delay)
else:
raise
else:
raise # 其他错误直接抛出
return wrapper
# 使用
@retry_on_error
def get_pod_with_retry(namespace, name):
return v1.read_namespaced_pod(name, namespace)
# 调用
pod = get_pod_with_retry("default", "nginx-pod")
实用工具函数
等待 Pod 就绪
import time
def wait_for_pod_ready(namespace, pod_name, timeout=300):
"""等待 Pod 进入 Ready 状态"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
pod = v1.read_namespaced_pod(pod_name, namespace)
# 检查 Pod 是否 Ready
if pod.status.conditions:
for condition in pod.status.conditions:
if condition.type == "Ready" and condition.status == "True":
print(f"Pod {pod_name} is ready!")
return True
print(f"Waiting for pod {pod_name} to be ready... (Phase: {pod.status.phase})")
time.sleep(5)
except ApiException as e:
print(f"Error checking pod status: {e}")
time.sleep(5)
print(f"Timeout waiting for pod {pod_name} to be ready")
return False
# 使用
wait_for_pod_ready("default", "nginx-pod", timeout=300)
批量操作
def batch_create_pods(namespace, count=10):
"""批量创建 Pods"""
for i in range(count):
pod = client.V1Pod(
metadata=client.V1ObjectMeta(name=f"nginx-pod-{i}"),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="nginx",
image="nginx:1.21"
)
]
)
)
try:
v1.create_namespaced_pod(namespace, pod)
print(f"Created pod: nginx-pod-{i}")
except ApiException as e:
print(f"Error creating pod {i}: {e}")
# 使用
batch_create_pods("default", count=5)
总结
Python Client 核心 API
v1 = client.CoreV1Api() # Pod, Service, ConfigMap, Secret, Namespace...
apps_v1 = client.AppsV1Api() # Deployment, StatefulSet, DaemonSet
batch_v1 = client.BatchV1Api() # Job, CronJob
networking_v1 = client.NetworkingV1Api() # Ingress, NetworkPolicy
rbac_v1 = client.RbacAuthorizationV1Api() # Role, RoleBinding, ClusterRole
storage_v1 = client.StorageV1Api() # StorageClass, VolumeAttachment
最佳实践
✅ 异常处理: 使用 try-except 处理 ApiException
✅ 资源清理: 使用 try-finally 确保资源释放
✅ Watch 超时: 设置 timeout_seconds 避免无限等待
✅ Label Selector: 高效过滤资源
✅ 批量操作: 使用 delete_collection 批量删除
✅ 重试机制: 对临时错误实现重试
常见陷阱
❌ 忘记指定 namespace: 很多 API 需要明确指定
❌ ResourceVersion 冲突: 更新资源时要注意版本
❌ Watch 不设超时: 可能导致连接泄漏
❌ 忘记关闭 Watch: 使用完要 stop()
下一节将介绍 Watch、Informer、Controller 等高级模式。