高级模式:Watch、Informer、Controller
高级模式:Watch、Informer、Controller
Watch 模式深度解析
Watch 原理
Watch 是 Kubernetes 的核心机制,通过 HTTP 长连接实现资源变化的实时推送。
Client API Server etcd
│ │ │
│ GET /api/v1/pods?watch=true │ │
├─────────────────────────────────>│ │
│ │ │
│ HTTP 200 OK (chunked) │ │
│<─────────────────────────────────┤ │
│ │ │
│ │ Watch /registry/pods │
│ ├───────────────────────────>│
│ │ │
│ │ ADDED: Pod A │
│ │<───────────────────────────┤
│ ADDED: Pod A │ │
│<─────────────────────────────────┤ │
│ │ │
│ │ MODIFIED: Pod A │
│ │<───────────────────────────┤
│ MODIFIED: Pod A │ │
│<─────────────────────────────────┤ │
Go 实现 Watch
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
func WatchPods(clientset *kubernetes.Clientset, namespace string) {
// 创建 Watch
watcher, err := clientset.CoreV1().Pods(namespace).Watch(
context.TODO(),
metav1.ListOptions{
LabelSelector: "app=nginx",
},
)
if err != nil {
panic(err)
}
defer watcher.Stop()
fmt.Println("Started watching pods...")
// 监听事件
for event := range watcher.ResultChan() {
pod := event.Object.(*corev1.Pod)
switch event.Type {
case watch.Added:
fmt.Printf("Pod ADDED: %s (Phase: %s)\n", pod.Name, pod.Status.Phase)
case watch.Modified:
fmt.Printf("Pod MODIFIED: %s (Phase: %s)\n", pod.Name, pod.Status.Phase)
case watch.Deleted:
fmt.Printf("Pod DELETED: %s\n", pod.Name)
case watch.Error:
fmt.Printf("Watch ERROR: %v\n", event.Object)
}
}
}
Python 实现 Watch
from kubernetes import client, config, watch
config.load_kube_config()
v1 = client.CoreV1Api()
def watch_pods(namespace="default", timeout_seconds=60):
"""Watch Pods"""
w = watch.Watch()
print("Started watching pods...")
for event in w.stream(
v1.list_namespaced_pod,
namespace=namespace,
label_selector="app=nginx",
timeout_seconds=timeout_seconds
):
event_type = event['type']
pod = event['object']
print(f"{event_type}: {pod.metadata.name} (Phase: {pod.status.phase})")
# 使用
watch_pods("default")
Watch 断点续传
func WatchPodsWithResume(clientset *kubernetes.Clientset, namespace string) {
var resourceVersion string
for {
// 1. 先 List 获取当前 resourceVersion
podList, err := clientset.CoreV1().Pods(namespace).List(
context.TODO(),
metav1.ListOptions{},
)
if err != nil {
log.Printf("Error listing pods: %v", err)
time.Sleep(5 * time.Second)
continue
}
resourceVersion = podList.ResourceVersion
// 2. 从该版本开始 Watch
watcher, err := clientset.CoreV1().Pods(namespace).Watch(
context.TODO(),
metav1.ListOptions{
ResourceVersion: resourceVersion,
},
)
if err != nil {
log.Printf("Error watching pods: %v", err)
time.Sleep(5 * time.Second)
continue
}
// 3. 处理事件
for event := range watcher.ResultChan() {
if event.Type == watch.Error {
log.Printf("Watch error, restarting...")
break
}
pod := event.Object.(*corev1.Pod)
handleEvent(event.Type, pod)
// 更新 resourceVersion
resourceVersion = pod.ResourceVersion
}
watcher.Stop()
time.Sleep(1 * time.Second)
}
}
Informer 模式
Informer 简介
Informer 是 client-go 提供的高级 Watch 封装,提供了:
- ✅ 本地缓存:减少 API Server 压力
- ✅ 事件处理器:AddFunc、UpdateFunc、DeleteFunc
- ✅ 断点续传:自动重连和恢复
- ✅ 索引器:快速查询缓存
Informer 架构
┌────────────────────────────────────────────┐
│ Application │
│ ┌────────────────────────────────────┐ │
│ │ Event Handlers │ │
│ │ - OnAdd(obj) │ │
│ │ - OnUpdate(old, new) │ │
│ │ - OnDelete(obj) │ │
│ └───────────┬────────────────────────┘ │
└──────────────┼────────────────────────────┘
│
┌──────────────▼────────────────────────────┐
│ Shared Informer │
│ ┌────────────────────────────────────┐ │
│ │ Reflector (Watch + List) │ │
│ └───────────┬────────────────────────┘ │
│ │ │
│ ┌───────────▼────────────────────────┐ │
│ │ DeltaFIFO Queue │ │
│ └───────────┬────────────────────────┘ │
│ │ │
│ ┌───────────▼────────────────────────┐ │
│ │ Indexer (Local Cache) │ │
│ └────────────────────────────────────┘ │
└───────────────┬────────────────────────────┘
│ Watch
▼
┌─────────────┐
│ API Server │
└─────────────┘
Go Informer 示例
package main
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
func main() {
clientset := GetKubernetesClient()
// 1. 创建 SharedInformerFactory
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
// 2. 获取 Pod Informer
podInformer := factory.Core().V1().Pods()
// 3. 注册事件处理器
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod ADDED: %s/%s\n", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
// 只处理 Phase 变化
if oldPod.Status.Phase != newPod.Status.Phase {
fmt.Printf("Pod UPDATED: %s/%s (%s -> %s)\n",
newPod.Namespace, newPod.Name,
oldPod.Status.Phase, newPod.Status.Phase,
)
}
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod DELETED: %s/%s\n", pod.Namespace, pod.Name)
},
})
// 4. 启动 Informer
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
// 5. 等待缓存同步
if !cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced) {
panic("Failed to sync cache")
}
fmt.Println("Informer started and synced!")
// 6. 使用 Lister 查询缓存
lister := podInformer.Lister()
// 列出所有 Pods
pods, err := lister.List(labels.Everything())
if err != nil {
panic(err)
}
fmt.Printf("Total pods in cache: %d\n", len(pods))
// 列出特定命名空间的 Pods
namespacedPods, err := lister.Pods("default").List(labels.Everything())
if err != nil {
panic(err)
}
fmt.Printf("Pods in 'default' namespace: %d\n", len(namespacedPods))
// 获取特定 Pod
pod, err := lister.Pods("default").Get("nginx-pod")
if err == nil {
fmt.Printf("Found pod: %s (Phase: %s)\n", pod.Name, pod.Status.Phase)
}
// 阻塞主线程
<-stopCh
}
使用 Label Selector 过滤
func InformerWithSelector(clientset *kubernetes.Clientset) {
// 创建带过滤的 Informer
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
30*time.Second,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = "app=nginx,env=production"
}),
)
podInformer := factory.Core().V1().Pods()
// 只会收到匹配 label 的 Pod 事件
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Matched Pod ADDED: %s\n", pod.Name)
},
})
stopCh := make(chan struct{})
factory.Start(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced)
<-stopCh
}
监听多种资源
func MultiResourceInformer(clientset *kubernetes.Clientset) {
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
// Pod Informer
podInformer := factory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("[POD] ADDED: %s\n", pod.Name)
},
})
// Service Informer
serviceInformer := factory.Core().V1().Services()
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
fmt.Printf("[SERVICE] ADDED: %s\n", svc.Name)
},
})
// Deployment Informer
deploymentInformer := factory.Apps().V1().Deployments()
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
deploy := obj.(*appsv1.Deployment)
fmt.Printf("[DEPLOYMENT] ADDED: %s\n", deploy.Name)
},
})
// 启动所有 Informer
stopCh := make(chan struct{})
factory.Start(stopCh)
// 等待所有缓存同步
cache.WaitForCacheSync(stopCh,
podInformer.Informer().HasSynced,
serviceInformer.Informer().HasSynced,
deploymentInformer.Informer().HasSynced,
)
<-stopCh
}
WorkQueue 模式
WorkQueue 简介
WorkQueue 用于解耦事件处理和业务逻辑,提供:
- ✅ 去重:相同 Key 只会排队一次
- ✅ 延迟重试:失败后延迟重新入队
- ✅ 速率限制:防止过载
- ✅ 并发处理:多 Worker 并发消费
WorkQueue 架构
Informer Event
↓
┌─────────────┐
│ AddFunc │
│ UpdateFunc │ → queue.Add(key)
│ DeleteFunc │
└──────┬──────┘
│
▼
┌─────────────────────────┐
│ WorkQueue │
│ ┌──────────────────┐ │
│ │ Item1 (dedupe) │ │
│ │ Item2 │ │
│ │ Item3 │ │
│ └──────────────────┘ │
└──────┬──────────────────┘
│
┌────┴────┐
│ │
▼ ▼
Worker1 Worker2
│ │
└────┬────┘
▼
syncHandler()
完整示例
package main
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
clientset *kubernetes.Clientset
podLister corev1listers.PodLister
podsSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
}
func NewController(clientset *kubernetes.Clientset) *Controller {
// 创建 Informer Factory
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
podInformer := factory.Core().V1().Pods()
// 创建 WorkQueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
controller := &Controller{
clientset: clientset,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
workqueue: queue,
}
// 注册事件处理器
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
// 启动 Informer
go factory.Start(wait.NeverStop)
return controller
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
fmt.Println("Starting controller")
// 等待缓存同步
if !cache.WaitForCacheSync(stopCh, c.podsSynced) {
return fmt.Errorf("failed to sync cache")
}
fmt.Println("Controller synced and ready")
// 启动 Worker
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
fmt.Println("Shutting down controller")
return nil
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
// 1. 从队列获取 Key
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
defer c.workqueue.Done(obj)
// 2. 处理 Key
key := obj.(string)
err := c.syncHandler(key)
// 3. 处理结果
if err == nil {
// 成功,移出队列
c.workqueue.Forget(obj)
fmt.Printf("Successfully synced '%s'\n", key)
return true
}
// 失败,重新入队
c.workqueue.AddRateLimited(obj)
runtime.HandleError(fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()))
return true
}
func (c *Controller) syncHandler(key string) error {
// 1. 解析 Key(格式:namespace/name)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("invalid resource key: %s", key)
}
// 2. 从 Lister 获取对象(从缓存)
pod, err := c.podLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
fmt.Printf("Pod %s/%s has been deleted\n", namespace, name)
return nil
}
return err
}
// 3. 业务逻辑
fmt.Printf("Processing Pod: %s/%s (Phase: %s)\n",
pod.Namespace, pod.Name, pod.Status.Phase)
// 示例:检查 Pod 是否需要处理
if pod.Status.Phase == corev1.PodFailed {
fmt.Printf("Pod %s/%s is in Failed state, taking action...\n",
pod.Namespace, pod.Name)
// 执行修复逻辑
}
return nil
}
func main() {
clientset := GetKubernetesClient()
controller := NewController(clientset)
stopCh := make(chan struct{})
defer close(stopCh)
// 启动 2 个 Worker
if err := controller.Run(2, stopCh); err != nil {
panic(err)
}
}
Controller 模式
Controller 核心组件
┌──────────────────────────────────────────┐
│ Controller │
│ ┌────────────────────────────────────┐ │
│ │ Informer (Watch Resources) │ │
│ └───────────┬────────────────────────┘ │
│ │ Events │
│ ▼ │
│ ┌────────────────────────────────────┐ │
│ │ WorkQueue (Dedup & Rate Limit) │ │
│ └───────────┬────────────────────────┘ │
│ │ Keys │
│ ▼ │
│ ┌────────────────────────────────────┐ │
│ │ Workers (Process Keys) │ │
│ │ ├─ Get Object from Lister │ │
│ │ ├─ Compare Desired vs Actual │ │
│ │ └─ Reconcile (Create/Update/Del) │ │
│ └────────────────────────────────────┘ │
└──────────────────────────────────────────┘
完整 Controller 示例
package main
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// DeploymentController 监控 Deployment 并自动创建 Service
type DeploymentController struct {
clientset *kubernetes.Clientset
deploymentLister appsv1listers.DeploymentLister
deploymentSynced cache.InformerSynced
serviceLister corev1listers.ServiceLister
serviceSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
}
func NewDeploymentController(clientset *kubernetes.Clientset) *DeploymentController {
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
deploymentInformer := factory.Apps().V1().Deployments()
serviceInformer := factory.Core().V1().Services()
controller := &DeploymentController{
clientset: clientset,
deploymentLister: deploymentInformer.Lister(),
deploymentSynced: deploymentInformer.Informer().HasSynced,
serviceLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}
// 监听 Deployment 事件
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueDeployment,
UpdateFunc: func(old, new interface{}) {
controller.enqueueDeployment(new)
},
DeleteFunc: controller.enqueueDeployment,
})
// 监听 Service 事件(检测手动删除)
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: controller.handleServiceDelete,
})
go factory.Start(wait.NeverStop)
return controller
}
func (c *DeploymentController) enqueueDeployment(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *DeploymentController) handleServiceDelete(obj interface{}) {
svc := obj.(*corev1.Service)
// 如果 Service 有 owner reference,将对应的 Deployment 重新入队
for _, owner := range svc.OwnerReferences {
if owner.Kind == "Deployment" {
key := fmt.Sprintf("%s/%s", svc.Namespace, owner.Name)
c.workqueue.Add(key)
}
}
}
func (c *DeploymentController) Run(workers int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
fmt.Println("Starting Deployment controller")
if !cache.WaitForCacheSync(stopCh, c.deploymentSynced, c.serviceSynced) {
return fmt.Errorf("failed to sync caches")
}
fmt.Println("Starting workers")
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
fmt.Println("Started workers")
<-stopCh
fmt.Println("Shutting down workers")
return nil
}
func (c *DeploymentController) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *DeploymentController) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
defer c.workqueue.Done(obj)
key := obj.(string)
err := c.syncHandler(key)
if err == nil {
c.workqueue.Forget(obj)
return true
}
c.workqueue.AddRateLimited(obj)
runtime.HandleError(fmt.Errorf("error syncing '%s': %s", key, err.Error()))
return true
}
func (c *DeploymentController) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("invalid resource key: %s", key)
}
// 1. 获取 Deployment
deployment, err := c.deploymentLister.Deployments(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
fmt.Printf("Deployment %s/%s deleted, cleaning up...\n", namespace, name)
return nil
}
return err
}
// 2. 检查是否需要创建 Service(通过 annotation 控制)
if deployment.Annotations["auto-service"] != "true" {
return nil
}
// 3. 检查 Service 是否已存在
serviceName := fmt.Sprintf("%s-service", deployment.Name)
_, err = c.serviceLister.Services(namespace).Get(serviceName)
if err == nil {
// Service 已存在
return nil
}
if !errors.IsNotFound(err) {
return err
}
// 4. 创建 Service
service := c.createServiceForDeployment(deployment)
_, err = c.clientset.CoreV1().Services(namespace).Create(
context.TODO(),
service,
metav1.CreateOptions{},
)
if err != nil {
return fmt.Errorf("failed to create service: %v", err)
}
fmt.Printf("Created service %s/%s for deployment %s\n",
namespace, serviceName, deployment.Name)
return nil
}
func (c *DeploymentController) createServiceForDeployment(deployment *appsv1.Deployment) *corev1.Service {
// 设置 OwnerReference(实现级联删除)
ownerRef := metav1.NewControllerRef(deployment, appsv1.SchemeGroupVersion.WithKind("Deployment"))
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-service", deployment.Name),
Namespace: deployment.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
Labels: map[string]string{
"app": deployment.Name,
"managed-by": "deployment-controller",
},
},
Spec: corev1.ServiceSpec{
Selector: deployment.Spec.Selector.MatchLabels,
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: "http",
Protocol: corev1.ProtocolTCP,
Port: 80,
TargetPort: intstr.FromInt(80),
},
},
},
}
}
func main() {
clientset := GetKubernetesClient()
controller := NewDeploymentController(clientset)
stopCh := make(chan struct{})
defer close(stopCh)
if err := controller.Run(2, stopCh); err != nil {
panic(err)
}
}
总结
三种模式对比
| 模式 | 使用场景 | 复杂度 | 性能 |
|---|---|---|---|
| Watch | 简单监听 | ⭐ | 低(频繁 API 调用) |
| Informer | 需要本地缓存 | ⭐⭐ | 高(本地缓存) |
| Controller | 复杂业务逻辑 | ⭐⭐⭐ | 高(解耦 + 重试) |
最佳实践
✅ 使用 Informer: 而非直接 Watch
✅ WorkQueue 解耦: 事件处理和业务逻辑分离
✅ 错误重试: 使用 RateLimitingQueue
✅ OwnerReference: 实现级联删除
✅ 并发 Workers: 提高处理吞吐量
✅ 优雅关闭: 正确处理 stopCh
Controller 设计模式
1. 监听资源变化(Informer)
↓
2. 入队 Key(WorkQueue)
↓
3. 从缓存获取对象(Lister)
↓
4. 计算期望状态 vs 实际状态
↓
5. 调谐(Reconcile)
- 创建缺失的资源
- 更新不一致的资源
- 删除多余的资源
↓
6. 更新状态(Status)
下一节将通过实战项目示例展示如何应用这些模式。