实战项目示例

实战项目示例

项目 1:Pod 自动清理工具

需求

自动清理处于 Failed/Succeeded 状态超过指定时间的 Pods。

Go 实现

package main

import (
    "context"
    "flag"
    "fmt"
    "time"
    
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

type PodCleaner struct {
    clientset     *kubernetes.Clientset
    ttl           time.Duration
    dryRun        bool
}

func NewPodCleaner(clientset *kubernetes.Clientset, ttl time.Duration, dryRun bool) *PodCleaner {
    return &PodCleaner{
        clientset: clientset,
        ttl:       ttl,
        dryRun:    dryRun,
    }
}

func (pc *PodCleaner) Run(stopCh <-chan struct{}) {
    // 创建 Informer 监听 Pod 变化
    factory := informers.NewSharedInformerFactory(pc.clientset, 30*time.Second)
    podInformer := factory.Core().V1().Pods()
    
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            pc.checkAndCleanPod(pod)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            pod := newObj.(*corev1.Pod)
            pc.checkAndCleanPod(pod)
        },
    })
    
    // 启动 Informer
    factory.Start(stopCh)
    cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced)
    
    // 定期扫描(防止遗漏)
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            pc.cleanAllExpiredPods()
        case <-stopCh:
            fmt.Println("Stopping pod cleaner")
            return
        }
    }
}

func (pc *PodCleaner) checkAndCleanPod(pod *corev1.Pod) {
    // 只处理 Failed 或 Succeeded 状态的 Pod
    if pod.Status.Phase != corev1.PodFailed && pod.Status.Phase != corev1.PodSucceeded {
        return
    }
    
    // 检查是否超过 TTL
    if !pc.isExpired(pod) {
        return
    }
    
    // 删除 Pod
    if pc.dryRun {
        fmt.Printf("[DRY RUN] Would delete pod %s/%s (Phase: %s, Age: %s)\n",
            pod.Namespace, pod.Name, pod.Status.Phase,
            time.Since(pod.Status.StartTime.Time))
        return
    }
    
    err := pc.clientset.CoreV1().Pods(pod.Namespace).Delete(
        context.TODO(),
        pod.Name,
        metav1.DeleteOptions{},
    )
    if err != nil {
        fmt.Printf("Error deleting pod %s/%s: %v\n", pod.Namespace, pod.Name, err)
        return
    }
    
    fmt.Printf("Deleted pod %s/%s (Phase: %s, Age: %s)\n",
        pod.Namespace, pod.Name, pod.Status.Phase,
        time.Since(pod.Status.StartTime.Time))
}

func (pc *PodCleaner) isExpired(pod *corev1.Pod) bool {
    // 检查 Pod 完成时间
    var finishTime time.Time
    
    // 从 ContainerStatuses 获取完成时间
    for _, cs := range pod.Status.ContainerStatuses {
        if cs.State.Terminated != nil {
            if finishTime.IsZero() || cs.State.Terminated.FinishedAt.Time.After(finishTime) {
                finishTime = cs.State.Terminated.FinishedAt.Time
            }
        }
    }
    
    if finishTime.IsZero() {
        return false
    }
    
    return time.Since(finishTime) > pc.ttl
}

func (pc *PodCleaner) cleanAllExpiredPods() {
    fmt.Println("Running periodic cleanup...")
    
    pods, err := pc.clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        fmt.Printf("Error listing pods: %v\n", err)
        return
    }
    
    for _, pod := range pods.Items {
        pc.checkAndCleanPod(&pod)
    }
}

func main() {
    kubeconfig := flag.String("kubeconfig", "", "path to kubeconfig")
    ttl := flag.Duration("ttl", 1*time.Hour, "time to live for completed pods")
    dryRun := flag.Bool("dry-run", false, "dry run mode")
    flag.Parse()
    
    // 创建 Kubernetes 客户端
    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }
    
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }
    
    // 创建并运行 Pod Cleaner
    cleaner := NewPodCleaner(clientset, *ttl, *dryRun)
    
    stopCh := make(chan struct{})
    defer close(stopCh)
    
    fmt.Printf("Starting pod cleaner (TTL: %s, Dry Run: %v)\n", *ttl, *dryRun)
    cleaner.Run(stopCh)
}

Python 实现

#!/usr/bin/env python3

import argparse
import time
from datetime import datetime, timedelta
from kubernetes import client, config, watch

class PodCleaner:
    def __init__(self, ttl_seconds=3600, dry_run=False):
        config.load_kube_config()
        self.v1 = client.CoreV1Api()
        self.ttl = timedelta(seconds=ttl_seconds)
        self.dry_run = dry_run
    
    def run(self):
        """运行 Pod 清理器"""
        print(f"Starting pod cleaner (TTL: {self.ttl}, Dry Run: {self.dry_run})")
        
        # 启动 Watch
        w = watch.Watch()
        
        while True:
            try:
                # Watch Pod 事件
                for event in w.stream(self.v1.list_pod_for_all_namespaces, timeout_seconds=300):
                    event_type = event['type']
                    pod = event['object']
                    
                    if event_type in ['ADDED', 'MODIFIED']:
                        self.check_and_clean_pod(pod)
                
                # 定期全量扫描
                self.clean_all_expired_pods()
                
            except Exception as e:
                print(f"Error in watch loop: {e}")
                time.sleep(5)
    
    def check_and_clean_pod(self, pod):
        """检查并清理单个 Pod"""
        # 只处理 Failed 或 Succeeded 状态
        if pod.status.phase not in ['Failed', 'Succeeded']:
            return
        
        # 检查是否过期
        if not self.is_expired(pod):
            return
        
        # 删除 Pod
        if self.dry_run:
            age = self.get_pod_age(pod)
            print(f"[DRY RUN] Would delete pod {pod.metadata.namespace}/{pod.metadata.name} "
                  f"(Phase: {pod.status.phase}, Age: {age})")
            return
        
        try:
            self.v1.delete_namespaced_pod(
                name=pod.metadata.name,
                namespace=pod.metadata.namespace,
                body=client.V1DeleteOptions()
            )
            age = self.get_pod_age(pod)
            print(f"Deleted pod {pod.metadata.namespace}/{pod.metadata.name} "
                  f"(Phase: {pod.status.phase}, Age: {age})")
        except Exception as e:
            print(f"Error deleting pod {pod.metadata.namespace}/{pod.metadata.name}: {e}")
    
    def is_expired(self, pod):
        """检查 Pod 是否过期"""
        finish_time = None
        
        # 从容器状态获取完成时间
        if pod.status.container_statuses:
            for cs in pod.status.container_statuses:
                if cs.state.terminated:
                    terminated_time = cs.state.terminated.finished_at
                    if finish_time is None or terminated_time > finish_time:
                        finish_time = terminated_time
        
        if finish_time is None:
            return False
        
        # 移除时区信息进行比较
        finish_time = finish_time.replace(tzinfo=None)
        age = datetime.utcnow() - finish_time
        
        return age > self.ttl
    
    def get_pod_age(self, pod):
        """获取 Pod 年龄"""
        finish_time = None
        
        if pod.status.container_statuses:
            for cs in pod.status.container_statuses:
                if cs.state.terminated:
                    if finish_time is None or cs.state.terminated.finished_at > finish_time:
                        finish_time = cs.state.terminated.finished_at
        
        if finish_time:
            finish_time = finish_time.replace(tzinfo=None)
            return datetime.utcnow() - finish_time
        
        return timedelta(0)
    
    def clean_all_expired_pods(self):
        """清理所有过期 Pods"""
        print("Running periodic cleanup...")
        
        try:
            pods = self.v1.list_pod_for_all_namespaces()
            for pod in pods.items:
                self.check_and_clean_pod(pod)
        except Exception as e:
            print(f"Error in periodic cleanup: {e}")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Kubernetes Pod Cleaner')
    parser.add_argument('--ttl', type=int, default=3600,
                        help='Time to live in seconds (default: 3600)')
    parser.add_argument('--dry-run', action='store_true',
                        help='Dry run mode')
    
    args = parser.parse_args()
    
    cleaner = PodCleaner(ttl_seconds=args.ttl, dry_run=args.dry_run)
    cleaner.run()

部署到 Kubernetes

# pod-cleaner-deployment.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: pod-cleaner
  namespace: kube-system

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: pod-cleaner
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["list", "watch", "delete"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: pod-cleaner
subjects:
- kind: ServiceAccount
  name: pod-cleaner
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: pod-cleaner
  apiGroup: rbac.authorization.k8s.io

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pod-cleaner
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: pod-cleaner
  template:
    metadata:
      labels:
        app: pod-cleaner
    spec:
      serviceAccountName: pod-cleaner
      containers:
      - name: pod-cleaner
        image: your-registry/pod-cleaner:latest
        args:
        - --ttl=3600
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 200m
            memory: 256Mi

项目 2:自动扩缩容监控器

需求

监控 Deployment 的资源使用情况,自动调整副本数。

Go 实现

package main

import (
    "context"
    "fmt"
    "time"
    
    appsv1 "k8s.io/api/apps/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/informers"
    metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
)

type AutoScaler struct {
    clientset     *kubernetes.Clientset
    metricsClient *metricsv.Clientset
    minReplicas   int32
    maxReplicas   int32
    targetCPU     int64  // 目标 CPU 使用率(百分比)
}

func NewAutoScaler(clientset *kubernetes.Clientset, metricsClient *metricsv.Clientset) *AutoScaler {
    return &AutoScaler{
        clientset:     clientset,
        metricsClient: metricsClient,
        minReplicas:   2,
        maxReplicas:   10,
        targetCPU:     70,
    }
}

func (as *AutoScaler) Run(stopCh <-chan struct{}) {
    factory := informers.NewSharedInformerFactory(as.clientset, 30*time.Second)
    deploymentInformer := factory.Apps().V1().Deployments()
    
    // 监听有 auto-scale annotation 的 Deployment
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            deployment := obj.(*appsv1.Deployment)
            if deployment.Annotations["auto-scale"] == "true" {
                as.scaleDeployment(deployment)
            }
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            deployment := newObj.(*appsv1.Deployment)
            if deployment.Annotations["auto-scale"] == "true" {
                as.scaleDeployment(deployment)
            }
        },
    })
    
    factory.Start(stopCh)
    cache.WaitForCacheSync(stopCh, deploymentInformer.Informer().HasSynced)
    
    // 定期检查
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            as.checkAllDeployments()
        case <-stopCh:
            return
        }
    }
}

func (as *AutoScaler) scaleDeployment(deployment *appsv1.Deployment) {
    // 获取 Pods 的 CPU 使用率
    podMetrics, err := as.metricsClient.MetricsV1beta1().PodMetricses(deployment.Namespace).List(
        context.TODO(),
        metav1.ListOptions{
            LabelSelector: metav1.FormatLabelSelector(deployment.Spec.Selector),
        },
    )
    if err != nil {
        fmt.Printf("Error getting metrics for %s/%s: %v\n", 
            deployment.Namespace, deployment.Name, err)
        return
    }
    
    if len(podMetrics.Items) == 0 {
        return
    }
    
    // 计算平均 CPU 使用率
    var totalCPU int64
    for _, pm := range podMetrics.Items {
        for _, container := range pm.Containers {
            totalCPU += container.Usage.Cpu().MilliValue()
        }
    }
    avgCPU := totalCPU / int64(len(podMetrics.Items))
    
    // 计算期望副本数
    currentReplicas := *deployment.Spec.Replicas
    desiredReplicas := as.calculateDesiredReplicas(currentReplicas, avgCPU)
    
    if desiredReplicas == currentReplicas {
        return
    }
    
    // 执行扩缩容
    fmt.Printf("Scaling %s/%s from %d to %d replicas (CPU: %dm)\n",
        deployment.Namespace, deployment.Name, currentReplicas, desiredReplicas, avgCPU)
    
    scale := &autoscalingv1.Scale{
        ObjectMeta: metav1.ObjectMeta{
            Name:      deployment.Name,
            Namespace: deployment.Namespace,
        },
        Spec: autoscalingv1.ScaleSpec{
            Replicas: desiredReplicas,
        },
    }
    
    _, err = as.clientset.AppsV1().Deployments(deployment.Namespace).UpdateScale(
        context.TODO(),
        deployment.Name,
        scale,
        metav1.UpdateOptions{},
    )
    if err != nil {
        fmt.Printf("Error scaling deployment: %v\n", err)
    }
}

func (as *AutoScaler) calculateDesiredReplicas(current int32, avgCPU int64) int32 {
    // 简单的比例计算
    // desiredReplicas = currentReplicas * (currentCPU / targetCPU)
    
    ratio := float64(avgCPU) / float64(as.targetCPU)
    desired := int32(float64(current) * ratio)
    
    // 限制在 min-max 范围内
    if desired < as.minReplicas {
        desired = as.minReplicas
    }
    if desired > as.maxReplicas {
        desired = as.maxReplicas
    }
    
    return desired
}

func (as *AutoScaler) checkAllDeployments() {
    deployments, err := as.clientset.AppsV1().Deployments("").List(
        context.TODO(),
        metav1.ListOptions{},
    )
    if err != nil {
        fmt.Printf("Error listing deployments: %v\n", err)
        return
    }
    
    for _, deployment := range deployments.Items {
        if deployment.Annotations["auto-scale"] == "true" {
            as.scaleDeployment(&deployment)
        }
    }
}

项目 3:资源配额监控告警

需求

监控 Namespace 的资源配额使用情况,超过阈值时发送告警。

Python 实现

#!/usr/bin/env python3

import json
import requests
from kubernetes import client, config, watch

class QuotaMonitor:
    def __init__(self, webhook_url, threshold=0.8):
        config.load_kube_config()
        self.v1 = client.CoreV1Api()
        self.webhook_url = webhook_url
        self.threshold = threshold  # 告警阈值(80%)
    
    def run(self):
        """运行配额监控"""
        print(f"Starting quota monitor (Threshold: {self.threshold * 100}%)")
        
        w = watch.Watch()
        
        # Watch ResourceQuota 变化
        for event in w.stream(self.v1.list_resource_quota_for_all_namespaces):
            quota = event['object']
            self.check_quota(quota)
    
    def check_quota(self, quota):
        """检查配额使用情况"""
        if not quota.status or not quota.status.used:
            return
        
        namespace = quota.metadata.namespace
        alerts = []
        
        # 检查每个资源
        for resource, hard_limit in (quota.spec.hard or {}).items():
            used = quota.status.used.get(resource)
            if not used:
                continue
            
            # 转换为数值(处理 Ki, Mi, Gi 等单位)
            used_value = self.parse_quantity(str(used))
            hard_value = self.parse_quantity(str(hard_limit))
            
            if hard_value == 0:
                continue
            
            usage_percent = used_value / hard_value
            
            if usage_percent >= self.threshold:
                alerts.append({
                    'resource': resource,
                    'used': str(used),
                    'limit': str(hard_limit),
                    'percent': f"{usage_percent * 100:.1f}%"
                })
        
        # 发送告警
        if alerts:
            self.send_alert(namespace, quota.metadata.name, alerts)
    
    def parse_quantity(self, quantity_str):
        """解析 Kubernetes 资源数量"""
        import re
        
        # 处理 Ki, Mi, Gi 等单位
        multipliers = {
            'Ki': 1024,
            'Mi': 1024 ** 2,
            'Gi': 1024 ** 3,
            'Ti': 1024 ** 4,
            'k': 1000,
            'M': 1000 ** 2,
            'G': 1000 ** 3,
            'T': 1000 ** 4,
            'm': 0.001,  # millicores
        }
        
        match = re.match(r'^(\d+(?:\.\d+)?)([A-Za-z]*)$', quantity_str)
        if not match:
            return 0
        
        value = float(match.group(1))
        unit = match.group(2)
        
        return value * multipliers.get(unit, 1)
    
    def send_alert(self, namespace, quota_name, alerts):
        """发送告警"""
        message = f"⚠️ **ResourceQuota Alert**\n\n"
        message += f"**Namespace**: {namespace}\n"
        message += f"**Quota**: {quota_name}\n\n"
        message += "**Resources exceeding threshold**:\n"
        
        for alert in alerts:
            message += f"- {alert['resource']}: {alert['used']}/{alert['limit']} ({alert['percent']})\n"
        
        # 发送到 Webhook(Slack、Teams、钉钉等)
        payload = {
            'text': message,
            'username': 'Kubernetes Quota Monitor'
        }
        
        try:
            response = requests.post(
                self.webhook_url,
                data=json.dumps(payload),
                headers={'Content-Type': 'application/json'}
            )
            if response.status_code == 200:
                print(f"Alert sent for {namespace}/{quota_name}")
            else:
                print(f"Failed to send alert: {response.status_code}")
        except Exception as e:
            print(f"Error sending alert: {e}")

if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='Kubernetes Quota Monitor')
    parser.add_argument('--webhook-url', required=True,
                        help='Webhook URL for alerts')
    parser.add_argument('--threshold', type=float, default=0.8,
                        help='Alert threshold (default: 0.8 = 80%%)')
    
    args = parser.parse_args()
    
    monitor = QuotaMonitor(webhook_url=args.webhook_url, threshold=args.threshold)
    monitor.run()

项目 4:跨集群资源同步

需求

将一个集群的 ConfigMap/Secret 同步到另一个集群。

Go 实现

package main

import (
    "context"
    "fmt"
    "time"
    
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

type ResourceSyncer struct {
    sourceClient    *kubernetes.Clientset
    targetClient    *kubernetes.Clientset
    configMapLister corev1listers.ConfigMapLister
    secretLister    corev1listers.SecretLister
    workqueue       workqueue.RateLimitingInterface
}

type SyncItem struct {
    ResourceType string // "configmap" or "secret"
    Namespace    string
    Name         string
}

func NewResourceSyncer(sourceClient, targetClient *kubernetes.Clientset) *ResourceSyncer {
    factory := informers.NewSharedInformerFactory(sourceClient, 30*time.Second)
    
    configMapInformer := factory.Core().V1().ConfigMaps()
    secretInformer := factory.Core().V1().Secrets()
    
    syncer := &ResourceSyncer{
        sourceClient:    sourceClient,
        targetClient:    targetClient,
        configMapLister: configMapInformer.Lister(),
        secretLister:    secretInformer.Lister(),
        workqueue:       workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
    }
    
    // 监听 ConfigMap 变化
    configMapInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            cm := obj.(*corev1.ConfigMap)
            if cm.Annotations["sync"] == "true" {
                syncer.enqueueConfigMap(cm)
            }
        },
        UpdateFunc: func(old, new interface{}) {
            cm := new.(*corev1.ConfigMap)
            if cm.Annotations["sync"] == "true" {
                syncer.enqueueConfigMap(cm)
            }
        },
        DeleteFunc: func(obj interface{}) {
            cm := obj.(*corev1.ConfigMap)
            if cm.Annotations["sync"] == "true" {
                syncer.deleteConfigMap(cm)
            }
        },
    })
    
    // 监听 Secret 变化
    secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            secret := obj.(*corev1.Secret)
            if secret.Annotations["sync"] == "true" {
                syncer.enqueueSecret(secret)
            }
        },
        UpdateFunc: func(old, new interface{}) {
            secret := new.(*corev1.Secret)
            if secret.Annotations["sync"] == "true" {
                syncer.enqueueSecret(secret)
            }
        },
        DeleteFunc: func(obj interface{}) {
            secret := obj.(*corev1.Secret)
            if secret.Annotations["sync"] == "true" {
                syncer.deleteSecret(secret)
            }
        },
    })
    
    go factory.Start(wait.NeverStop)
    cache.WaitForCacheSync(wait.NeverStop,
        configMapInformer.Informer().HasSynced,
        secretInformer.Informer().HasSynced,
    )
    
    return syncer
}

func (rs *ResourceSyncer) enqueueConfigMap(cm *corev1.ConfigMap) {
    rs.workqueue.Add(SyncItem{
        ResourceType: "configmap",
        Namespace:    cm.Namespace,
        Name:         cm.Name,
    })
}

func (rs *ResourceSyncer) enqueueSecret(secret *corev1.Secret) {
    rs.workqueue.Add(SyncItem{
        ResourceType: "secret",
        Namespace:    secret.Namespace,
        Name:         secret.Name,
    })
}

func (rs *ResourceSyncer) Run(workers int, stopCh <-chan struct{}) {
    defer runtime.HandleCrash()
    defer rs.workqueue.ShutDown()
    
    fmt.Println("Starting resource syncer")
    
    for i := 0; i < workers; i++ {
        go wait.Until(rs.runWorker, time.Second, stopCh)
    }
    
    <-stopCh
    fmt.Println("Shutting down resource syncer")
}

func (rs *ResourceSyncer) runWorker() {
    for rs.processNextItem() {
    }
}

func (rs *ResourceSyncer) processNextItem() bool {
    obj, shutdown := rs.workqueue.Get()
    if shutdown {
        return false
    }
    defer rs.workqueue.Done(obj)
    
    item := obj.(SyncItem)
    err := rs.syncResource(item)
    
    if err == nil {
        rs.workqueue.Forget(obj)
        return true
    }
    
    rs.workqueue.AddRateLimited(obj)
    runtime.HandleError(fmt.Errorf("error syncing '%v': %s", item, err.Error()))
    
    return true
}

func (rs *ResourceSyncer) syncResource(item SyncItem) error {
    switch item.ResourceType {
    case "configmap":
        return rs.syncConfigMap(item.Namespace, item.Name)
    case "secret":
        return rs.syncSecret(item.Namespace, item.Name)
    default:
        return fmt.Errorf("unknown resource type: %s", item.ResourceType)
    }
}

func (rs *ResourceSyncer) syncConfigMap(namespace, name string) error {
    // 1. 从源集群获取 ConfigMap
    cm, err := rs.configMapLister.ConfigMaps(namespace).Get(name)
    if err != nil {
        return err
    }
    
    // 2. 复制 ConfigMap(移除不需要的字段)
    targetCM := &corev1.ConfigMap{
        ObjectMeta: metav1.ObjectMeta{
            Name:        cm.Name,
            Namespace:   cm.Namespace,
            Labels:      cm.Labels,
            Annotations: cm.Annotations,
        },
        Data:       cm.Data,
        BinaryData: cm.BinaryData,
    }
    
    // 3. 在目标集群创建或更新
    _, err = rs.targetClient.CoreV1().ConfigMaps(namespace).Get(
        context.TODO(),
        name,
        metav1.GetOptions{},
    )
    
    if err != nil {
        // 不存在,创建
        _, err = rs.targetClient.CoreV1().ConfigMaps(namespace).Create(
            context.TODO(),
            targetCM,
            metav1.CreateOptions{},
        )
        if err != nil {
            return err
        }
        fmt.Printf("Created ConfigMap %s/%s in target cluster\n", namespace, name)
    } else {
        // 已存在,更新
        _, err = rs.targetClient.CoreV1().ConfigMaps(namespace).Update(
            context.TODO(),
            targetCM,
            metav1.UpdateOptions{},
        )
        if err != nil {
            return err
        }
        fmt.Printf("Updated ConfigMap %s/%s in target cluster\n", namespace, name)
    }
    
    return nil
}

func (rs *ResourceSyncer) syncSecret(namespace, name string) error {
    // 类似 ConfigMap 的逻辑
    secret, err := rs.secretLister.Secrets(namespace).Get(name)
    if err != nil {
        return err
    }
    
    targetSecret := &corev1.Secret{
        ObjectMeta: metav1.ObjectMeta{
            Name:        secret.Name,
            Namespace:   secret.Namespace,
            Labels:      secret.Labels,
            Annotations: secret.Annotations,
        },
        Type:       secret.Type,
        Data:       secret.Data,
        StringData: secret.StringData,
    }
    
    _, err = rs.targetClient.CoreV1().Secrets(namespace).Get(
        context.TODO(),
        name,
        metav1.GetOptions{},
    )
    
    if err != nil {
        _, err = rs.targetClient.CoreV1().Secrets(namespace).Create(
            context.TODO(),
            targetSecret,
            metav1.CreateOptions{},
        )
        if err != nil {
            return err
        }
        fmt.Printf("Created Secret %s/%s in target cluster\n", namespace, name)
    } else {
        _, err = rs.targetClient.CoreV1().Secrets(namespace).Update(
            context.TODO(),
            targetSecret,
            metav1.UpdateOptions{},
        )
        if err != nil {
            return err
        }
        fmt.Printf("Updated Secret %s/%s in target cluster\n", namespace, name)
    }
    
    return nil
}

func (rs *ResourceSyncer) deleteConfigMap(cm *corev1.ConfigMap) {
    err := rs.targetClient.CoreV1().ConfigMaps(cm.Namespace).Delete(
        context.TODO(),
        cm.Name,
        metav1.DeleteOptions{},
    )
    if err != nil {
        fmt.Printf("Error deleting ConfigMap %s/%s: %v\n", cm.Namespace, cm.Name, err)
        return
    }
    fmt.Printf("Deleted ConfigMap %s/%s from target cluster\n", cm.Namespace, cm.Name)
}

func (rs *ResourceSyncer) deleteSecret(secret *corev1.Secret) {
    err := rs.targetClient.CoreV1().Secrets(secret.Namespace).Delete(
        context.TODO(),
        secret.Name,
        metav1.DeleteOptions{},
    )
    if err != nil {
        fmt.Printf("Error deleting Secret %s/%s: %v\n", secret.Namespace, secret.Name, err)
        return
    }
    fmt.Printf("Deleted Secret %s/%s from target cluster\n", secret.Namespace, secret.Name)
}

func main() {
    // 初始化两个集群的客户端
    sourceConfig, _ := clientcmd.BuildConfigFromFlags("", "/path/to/source-kubeconfig")
    targetConfig, _ := clientcmd.BuildConfigFromFlags("", "/path/to/target-kubeconfig")
    
    sourceClient, _ := kubernetes.NewForConfig(sourceConfig)
    targetClient, _ := kubernetes.NewForConfig(targetConfig)
    
    // 创建并运行同步器
    syncer := NewResourceSyncer(sourceClient, targetClient)
    
    stopCh := make(chan struct{})
    defer close(stopCh)
    
    syncer.Run(2, stopCh)
}

总结

项目选择指南

项目 适用场景 难度 价值
Pod 清理器 自动清理完成的 Pod ⭐⭐ 高(节省资源)
自动扩缩容 基于指标自动扩缩容 ⭐⭐⭐ 高(提高可用性)
配额监控 资源配额告警 ⭐⭐ 中(成本控制)
资源同步 跨集群同步 ⭐⭐⭐⭐ 中(多集群管理)

最佳实践

使用 Informer: 而非轮询 API
WorkQueue 解耦: 提高可靠性
错误重试: 使用 RateLimitingQueue
RBAC 最小权限: 只授予必要权限
优雅关闭: 正确处理 signal
可观测性: 添加 Metrics 和日志
配置外部化: 使用环境变量或 ConfigMap

部署建议

  1. 容器化: 打包为 Docker 镜像
  2. Deployment: 使用 Kubernetes Deployment 部署
  3. ServiceAccount: 创建专用 ServiceAccount
  4. RBAC: 配置最小权限
  5. 监控: 集成 Prometheus Metrics
  6. 日志: 结构化日志输出
  7. 高可用: 多副本 + Leader Election

这些实战项目展示了如何将 Kubernetes API 编程应用到实际场景中,可以作为开发自己的 Kubernetes 工具的参考。