高级模式:Watch、Informer、Controller

高级模式:Watch、Informer、Controller

Watch 模式深度解析

Watch 原理

Watch 是 Kubernetes 的核心机制,通过 HTTP 长连接实现资源变化的实时推送。

Client                          API Server                      etcd
  │                                  │                            │
  │ GET /api/v1/pods?watch=true      │                            │
  ├─────────────────────────────────>│                            │
  │                                  │                            │
  │ HTTP 200 OK (chunked)            │                            │
  │<─────────────────────────────────┤                            │
  │                                  │                            │
  │                                  │  Watch /registry/pods      │
  │                                  ├───────────────────────────>│
  │                                  │                            │
  │                                  │  ADDED: Pod A              │
  │                                  │<───────────────────────────┤
  │ ADDED: Pod A                     │                            │
  │<─────────────────────────────────┤                            │
  │                                  │                            │
  │                                  │  MODIFIED: Pod A           │
  │                                  │<───────────────────────────┤
  │ MODIFIED: Pod A                  │                            │
  │<─────────────────────────────────┤                            │

Go 实现 Watch

package main

import (
    "context"
    "fmt"
    
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
)

func WatchPods(clientset *kubernetes.Clientset, namespace string) {
    // 创建 Watch
    watcher, err := clientset.CoreV1().Pods(namespace).Watch(
        context.TODO(),
        metav1.ListOptions{
            LabelSelector: "app=nginx",
        },
    )
    if err != nil {
        panic(err)
    }
    defer watcher.Stop()
    
    fmt.Println("Started watching pods...")
    
    // 监听事件
    for event := range watcher.ResultChan() {
        pod := event.Object.(*corev1.Pod)
        
        switch event.Type {
        case watch.Added:
            fmt.Printf("Pod ADDED: %s (Phase: %s)\n", pod.Name, pod.Status.Phase)
        case watch.Modified:
            fmt.Printf("Pod MODIFIED: %s (Phase: %s)\n", pod.Name, pod.Status.Phase)
        case watch.Deleted:
            fmt.Printf("Pod DELETED: %s\n", pod.Name)
        case watch.Error:
            fmt.Printf("Watch ERROR: %v\n", event.Object)
        }
    }
}

Python 实现 Watch

from kubernetes import client, config, watch

config.load_kube_config()
v1 = client.CoreV1Api()

def watch_pods(namespace="default", timeout_seconds=60):
    """Watch Pods"""
    w = watch.Watch()
    
    print("Started watching pods...")
    
    for event in w.stream(
        v1.list_namespaced_pod,
        namespace=namespace,
        label_selector="app=nginx",
        timeout_seconds=timeout_seconds
    ):
        event_type = event['type']
        pod = event['object']
        
        print(f"{event_type}: {pod.metadata.name} (Phase: {pod.status.phase})")

# 使用
watch_pods("default")

Watch 断点续传

func WatchPodsWithResume(clientset *kubernetes.Clientset, namespace string) {
    var resourceVersion string
    
    for {
        // 1. 先 List 获取当前 resourceVersion
        podList, err := clientset.CoreV1().Pods(namespace).List(
            context.TODO(),
            metav1.ListOptions{},
        )
        if err != nil {
            log.Printf("Error listing pods: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }
        resourceVersion = podList.ResourceVersion
        
        // 2. 从该版本开始 Watch
        watcher, err := clientset.CoreV1().Pods(namespace).Watch(
            context.TODO(),
            metav1.ListOptions{
                ResourceVersion: resourceVersion,
            },
        )
        if err != nil {
            log.Printf("Error watching pods: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }
        
        // 3. 处理事件
        for event := range watcher.ResultChan() {
            if event.Type == watch.Error {
                log.Printf("Watch error, restarting...")
                break
            }
            
            pod := event.Object.(*corev1.Pod)
            handleEvent(event.Type, pod)
            
            // 更新 resourceVersion
            resourceVersion = pod.ResourceVersion
        }
        
        watcher.Stop()
        time.Sleep(1 * time.Second)
    }
}

Informer 模式

Informer 简介

Informer 是 client-go 提供的高级 Watch 封装,提供了:

  • 本地缓存:减少 API Server 压力
  • 事件处理器:AddFunc、UpdateFunc、DeleteFunc
  • 断点续传:自动重连和恢复
  • 索引器:快速查询缓存

Informer 架构

┌────────────────────────────────────────────┐
│              Application                   │
│  ┌────────────────────────────────────┐   │
│  │  Event Handlers                    │   │
│  │  - OnAdd(obj)                      │   │
│  │  - OnUpdate(old, new)              │   │
│  │  - OnDelete(obj)                   │   │
│  └───────────┬────────────────────────┘   │
└──────────────┼────────────────────────────┘
               │
┌──────────────▼────────────────────────────┐
│          Shared Informer                  │
│  ┌────────────────────────────────────┐   │
│  │  Reflector (Watch + List)          │   │
│  └───────────┬────────────────────────┘   │
│              │                             │
│  ┌───────────▼────────────────────────┐   │
│  │  DeltaFIFO Queue                   │   │
│  └───────────┬────────────────────────┘   │
│              │                             │
│  ┌───────────▼────────────────────────┐   │
│  │  Indexer (Local Cache)             │   │
│  └────────────────────────────────────┘   │
└───────────────┬────────────────────────────┘
                │ Watch
                ▼
         ┌─────────────┐
         │ API Server  │
         └─────────────┘

Go Informer 示例

package main

import (
    "fmt"
    "time"
    
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
)

func main() {
    clientset := GetKubernetesClient()
    
    // 1. 创建 SharedInformerFactory
    factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
    
    // 2. 获取 Pod Informer
    podInformer := factory.Core().V1().Pods()
    
    // 3. 注册事件处理器
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            fmt.Printf("Pod ADDED: %s/%s\n", pod.Namespace, pod.Name)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            oldPod := oldObj.(*corev1.Pod)
            newPod := newObj.(*corev1.Pod)
            
            // 只处理 Phase 变化
            if oldPod.Status.Phase != newPod.Status.Phase {
                fmt.Printf("Pod UPDATED: %s/%s (%s -> %s)\n",
                    newPod.Namespace, newPod.Name,
                    oldPod.Status.Phase, newPod.Status.Phase,
                )
            }
        },
        DeleteFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            fmt.Printf("Pod DELETED: %s/%s\n", pod.Namespace, pod.Name)
        },
    })
    
    // 4. 启动 Informer
    stopCh := make(chan struct{})
    defer close(stopCh)
    
    factory.Start(stopCh)
    
    // 5. 等待缓存同步
    if !cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced) {
        panic("Failed to sync cache")
    }
    
    fmt.Println("Informer started and synced!")
    
    // 6. 使用 Lister 查询缓存
    lister := podInformer.Lister()
    
    // 列出所有 Pods
    pods, err := lister.List(labels.Everything())
    if err != nil {
        panic(err)
    }
    fmt.Printf("Total pods in cache: %d\n", len(pods))
    
    // 列出特定命名空间的 Pods
    namespacedPods, err := lister.Pods("default").List(labels.Everything())
    if err != nil {
        panic(err)
    }
    fmt.Printf("Pods in 'default' namespace: %d\n", len(namespacedPods))
    
    // 获取特定 Pod
    pod, err := lister.Pods("default").Get("nginx-pod")
    if err == nil {
        fmt.Printf("Found pod: %s (Phase: %s)\n", pod.Name, pod.Status.Phase)
    }
    
    // 阻塞主线程
    <-stopCh
}

使用 Label Selector 过滤

func InformerWithSelector(clientset *kubernetes.Clientset) {
    // 创建带过滤的 Informer
    factory := informers.NewSharedInformerFactoryWithOptions(
        clientset,
        30*time.Second,
        informers.WithTweakListOptions(func(options *metav1.ListOptions) {
            options.LabelSelector = "app=nginx,env=production"
        }),
    )
    
    podInformer := factory.Core().V1().Pods()
    
    // 只会收到匹配 label 的 Pod 事件
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            fmt.Printf("Matched Pod ADDED: %s\n", pod.Name)
        },
    })
    
    stopCh := make(chan struct{})
    factory.Start(stopCh)
    cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced)
    
    <-stopCh
}

监听多种资源

func MultiResourceInformer(clientset *kubernetes.Clientset) {
    factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
    
    // Pod Informer
    podInformer := factory.Core().V1().Pods()
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            fmt.Printf("[POD] ADDED: %s\n", pod.Name)
        },
    })
    
    // Service Informer
    serviceInformer := factory.Core().V1().Services()
    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            svc := obj.(*corev1.Service)
            fmt.Printf("[SERVICE] ADDED: %s\n", svc.Name)
        },
    })
    
    // Deployment Informer
    deploymentInformer := factory.Apps().V1().Deployments()
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            deploy := obj.(*appsv1.Deployment)
            fmt.Printf("[DEPLOYMENT] ADDED: %s\n", deploy.Name)
        },
    })
    
    // 启动所有 Informer
    stopCh := make(chan struct{})
    factory.Start(stopCh)
    
    // 等待所有缓存同步
    cache.WaitForCacheSync(stopCh,
        podInformer.Informer().HasSynced,
        serviceInformer.Informer().HasSynced,
        deploymentInformer.Informer().HasSynced,
    )
    
    <-stopCh
}

WorkQueue 模式

WorkQueue 简介

WorkQueue 用于解耦事件处理和业务逻辑,提供:

  • 去重:相同 Key 只会排队一次
  • 延迟重试:失败后延迟重新入队
  • 速率限制:防止过载
  • 并发处理:多 Worker 并发消费

WorkQueue 架构

Informer Event
      ↓
┌─────────────┐
│  AddFunc    │
│  UpdateFunc │  → queue.Add(key)
│  DeleteFunc │
└──────┬──────┘
       │
       ▼
┌─────────────────────────┐
│     WorkQueue           │
│  ┌──────────────────┐   │
│  │  Item1 (dedupe)  │   │
│  │  Item2           │   │
│  │  Item3           │   │
│  └──────────────────┘   │
└──────┬──────────────────┘
       │
  ┌────┴────┐
  │         │
  ▼         ▼
Worker1   Worker2
  │         │
  └────┬────┘
       ▼
   syncHandler()

完整示例

package main

import (
    "fmt"
    "time"
    
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

type Controller struct {
    clientset  *kubernetes.Clientset
    podLister  corev1listers.PodLister
    podsSynced cache.InformerSynced
    workqueue  workqueue.RateLimitingInterface
}

func NewController(clientset *kubernetes.Clientset) *Controller {
    // 创建 Informer Factory
    factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
    podInformer := factory.Core().V1().Pods()
    
    // 创建 WorkQueue
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    
    controller := &Controller{
        clientset:  clientset,
        podLister:  podInformer.Lister(),
        podsSynced: podInformer.Informer().HasSynced,
        workqueue:  queue,
    }
    
    // 注册事件处理器
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(old, new interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(new)
            if err == nil {
                queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    })
    
    // 启动 Informer
    go factory.Start(wait.NeverStop)
    
    return controller
}

func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
    defer runtime.HandleCrash()
    defer c.workqueue.ShutDown()
    
    fmt.Println("Starting controller")
    
    // 等待缓存同步
    if !cache.WaitForCacheSync(stopCh, c.podsSynced) {
        return fmt.Errorf("failed to sync cache")
    }
    
    fmt.Println("Controller synced and ready")
    
    // 启动 Worker
    for i := 0; i < workers; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    
    <-stopCh
    fmt.Println("Shutting down controller")
    
    return nil
}

func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}

func (c *Controller) processNextWorkItem() bool {
    // 1. 从队列获取 Key
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    defer c.workqueue.Done(obj)
    
    // 2. 处理 Key
    key := obj.(string)
    err := c.syncHandler(key)
    
    // 3. 处理结果
    if err == nil {
        // 成功,移出队列
        c.workqueue.Forget(obj)
        fmt.Printf("Successfully synced '%s'\n", key)
        return true
    }
    
    // 失败,重新入队
    c.workqueue.AddRateLimited(obj)
    runtime.HandleError(fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()))
    
    return true
}

func (c *Controller) syncHandler(key string) error {
    // 1. 解析 Key(格式:namespace/name)
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return fmt.Errorf("invalid resource key: %s", key)
    }
    
    // 2. 从 Lister 获取对象(从缓存)
    pod, err := c.podLister.Pods(namespace).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            fmt.Printf("Pod %s/%s has been deleted\n", namespace, name)
            return nil
        }
        return err
    }
    
    // 3. 业务逻辑
    fmt.Printf("Processing Pod: %s/%s (Phase: %s)\n", 
        pod.Namespace, pod.Name, pod.Status.Phase)
    
    // 示例:检查 Pod 是否需要处理
    if pod.Status.Phase == corev1.PodFailed {
        fmt.Printf("Pod %s/%s is in Failed state, taking action...\n", 
            pod.Namespace, pod.Name)
        // 执行修复逻辑
    }
    
    return nil
}

func main() {
    clientset := GetKubernetesClient()
    
    controller := NewController(clientset)
    
    stopCh := make(chan struct{})
    defer close(stopCh)
    
    // 启动 2 个 Worker
    if err := controller.Run(2, stopCh); err != nil {
        panic(err)
    }
}

Controller 模式

Controller 核心组件

┌──────────────────────────────────────────┐
│           Controller                     │
│  ┌────────────────────────────────────┐  │
│  │  Informer (Watch Resources)        │  │
│  └───────────┬────────────────────────┘  │
│              │ Events                    │
│              ▼                            │
│  ┌────────────────────────────────────┐  │
│  │  WorkQueue (Dedup & Rate Limit)    │  │
│  └───────────┬────────────────────────┘  │
│              │ Keys                      │
│              ▼                            │
│  ┌────────────────────────────────────┐  │
│  │  Workers (Process Keys)            │  │
│  │   ├─ Get Object from Lister        │  │
│  │   ├─ Compare Desired vs Actual     │  │
│  │   └─ Reconcile (Create/Update/Del) │  │
│  └────────────────────────────────────┘  │
└──────────────────────────────────────────┘

完整 Controller 示例

package main

import (
    "context"
    "fmt"
    "time"
    
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

// DeploymentController 监控 Deployment 并自动创建 Service
type DeploymentController struct {
    clientset        *kubernetes.Clientset
    deploymentLister appsv1listers.DeploymentLister
    deploymentSynced cache.InformerSynced
    serviceLister    corev1listers.ServiceLister
    serviceSynced    cache.InformerSynced
    workqueue        workqueue.RateLimitingInterface
}

func NewDeploymentController(clientset *kubernetes.Clientset) *DeploymentController {
    factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
    
    deploymentInformer := factory.Apps().V1().Deployments()
    serviceInformer := factory.Core().V1().Services()
    
    controller := &DeploymentController{
        clientset:        clientset,
        deploymentLister: deploymentInformer.Lister(),
        deploymentSynced: deploymentInformer.Informer().HasSynced,
        serviceLister:    serviceInformer.Lister(),
        serviceSynced:    serviceInformer.Informer().HasSynced,
        workqueue:        workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
    }
    
    // 监听 Deployment 事件
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueDeployment,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueDeployment(new)
        },
        DeleteFunc: controller.enqueueDeployment,
    })
    
    // 监听 Service 事件(检测手动删除)
    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: controller.handleServiceDelete,
    })
    
    go factory.Start(wait.NeverStop)
    
    return controller
}

func (c *DeploymentController) enqueueDeployment(obj interface{}) {
    key, err := cache.MetaNamespaceKeyFunc(obj)
    if err != nil {
        runtime.HandleError(err)
        return
    }
    c.workqueue.Add(key)
}

func (c *DeploymentController) handleServiceDelete(obj interface{}) {
    svc := obj.(*corev1.Service)
    
    // 如果 Service 有 owner reference,将对应的 Deployment 重新入队
    for _, owner := range svc.OwnerReferences {
        if owner.Kind == "Deployment" {
            key := fmt.Sprintf("%s/%s", svc.Namespace, owner.Name)
            c.workqueue.Add(key)
        }
    }
}

func (c *DeploymentController) Run(workers int, stopCh <-chan struct{}) error {
    defer runtime.HandleCrash()
    defer c.workqueue.ShutDown()
    
    fmt.Println("Starting Deployment controller")
    
    if !cache.WaitForCacheSync(stopCh, c.deploymentSynced, c.serviceSynced) {
        return fmt.Errorf("failed to sync caches")
    }
    
    fmt.Println("Starting workers")
    for i := 0; i < workers; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    
    fmt.Println("Started workers")
    <-stopCh
    fmt.Println("Shutting down workers")
    
    return nil
}

func (c *DeploymentController) runWorker() {
    for c.processNextWorkItem() {
    }
}

func (c *DeploymentController) processNextWorkItem() bool {
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    defer c.workqueue.Done(obj)
    
    key := obj.(string)
    err := c.syncHandler(key)
    
    if err == nil {
        c.workqueue.Forget(obj)
        return true
    }
    
    c.workqueue.AddRateLimited(obj)
    runtime.HandleError(fmt.Errorf("error syncing '%s': %s", key, err.Error()))
    
    return true
}

func (c *DeploymentController) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return fmt.Errorf("invalid resource key: %s", key)
    }
    
    // 1. 获取 Deployment
    deployment, err := c.deploymentLister.Deployments(namespace).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            fmt.Printf("Deployment %s/%s deleted, cleaning up...\n", namespace, name)
            return nil
        }
        return err
    }
    
    // 2. 检查是否需要创建 Service(通过 annotation 控制)
    if deployment.Annotations["auto-service"] != "true" {
        return nil
    }
    
    // 3. 检查 Service 是否已存在
    serviceName := fmt.Sprintf("%s-service", deployment.Name)
    _, err = c.serviceLister.Services(namespace).Get(serviceName)
    if err == nil {
        // Service 已存在
        return nil
    }
    
    if !errors.IsNotFound(err) {
        return err
    }
    
    // 4. 创建 Service
    service := c.createServiceForDeployment(deployment)
    _, err = c.clientset.CoreV1().Services(namespace).Create(
        context.TODO(),
        service,
        metav1.CreateOptions{},
    )
    if err != nil {
        return fmt.Errorf("failed to create service: %v", err)
    }
    
    fmt.Printf("Created service %s/%s for deployment %s\n", 
        namespace, serviceName, deployment.Name)
    
    return nil
}

func (c *DeploymentController) createServiceForDeployment(deployment *appsv1.Deployment) *corev1.Service {
    // 设置 OwnerReference(实现级联删除)
    ownerRef := metav1.NewControllerRef(deployment, appsv1.SchemeGroupVersion.WithKind("Deployment"))
    
    return &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:            fmt.Sprintf("%s-service", deployment.Name),
            Namespace:       deployment.Namespace,
            OwnerReferences: []metav1.OwnerReference{*ownerRef},
            Labels: map[string]string{
                "app":        deployment.Name,
                "managed-by": "deployment-controller",
            },
        },
        Spec: corev1.ServiceSpec{
            Selector: deployment.Spec.Selector.MatchLabels,
            Type:     corev1.ServiceTypeClusterIP,
            Ports: []corev1.ServicePort{
                {
                    Name:       "http",
                    Protocol:   corev1.ProtocolTCP,
                    Port:       80,
                    TargetPort: intstr.FromInt(80),
                },
            },
        },
    }
}

func main() {
    clientset := GetKubernetesClient()
    
    controller := NewDeploymentController(clientset)
    
    stopCh := make(chan struct{})
    defer close(stopCh)
    
    if err := controller.Run(2, stopCh); err != nil {
        panic(err)
    }
}

总结

三种模式对比

模式 使用场景 复杂度 性能
Watch 简单监听 低(频繁 API 调用)
Informer 需要本地缓存 ⭐⭐ 高(本地缓存)
Controller 复杂业务逻辑 ⭐⭐⭐ 高(解耦 + 重试)

最佳实践

使用 Informer: 而非直接 Watch
WorkQueue 解耦: 事件处理和业务逻辑分离
错误重试: 使用 RateLimitingQueue
OwnerReference: 实现级联删除
并发 Workers: 提高处理吞吐量
优雅关闭: 正确处理 stopCh

Controller 设计模式

1. 监听资源变化(Informer)
   ↓
2. 入队 Key(WorkQueue)
   ↓
3. 从缓存获取对象(Lister)
   ↓
4. 计算期望状态 vs 实际状态
   ↓
5. 调谐(Reconcile)
   - 创建缺失的资源
   - 更新不一致的资源
   - 删除多余的资源
   ↓
6. 更新状态(Status)

下一节将通过实战项目示例展示如何应用这些模式。