项目1:Pod 自动清理工具

项目1:Pod 自动清理工具

项目背景

在 Kubernetes 集群中,Job、CronJob 等任务完成后会留下 Completed 或 Failed 状态的 Pod。这些 Pod 会持续占用 etcd 存储空间和 API Server 资源。

问题

  • ❌ etcd 存储空间被浪费
  • ❌ kubectl get pods 输出混乱
  • ❌ 影响集群性能

解决方案
自动清理超过指定时间的已完成 Pod。

功能需求

核心功能

  • ✅ 监听 Pod 状态变化
  • ✅ 自动清理 Failed/Succeeded Pod
  • ✅ 支持 TTL 配置(可配置保留时间)
  • ✅ 支持白名单(排除特定 Namespace)
  • ✅ 支持 Dry Run 模式
  • ✅ 提供 Prometheus Metrics

高级功能

  • ✅ 按 Label 过滤
  • ✅ 支持清理策略(保留最新 N 个)
  • ✅ Webhook 通知
  • ✅ 审计日志

Go 完整实现

项目结构

pod-cleaner/
├── main.go              # 主程序
├── pkg/
│   ├── cleaner/
│   │   ├── cleaner.go   # 清理逻辑
│   │   └── metrics.go   # Prometheus 指标
│   └── config/
│       └── config.go    # 配置管理
├── deploy/
│   ├── deployment.yaml  # Kubernetes 部署
│   └── rbac.yaml        # RBAC 配置
├── Dockerfile
└── go.mod

main.go

package main

import (
    "flag"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "github.com/sirupsen/logrus"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    
    "pod-cleaner/pkg/cleaner"
    "pod-cleaner/pkg/config"
)

var (
    log = logrus.New()
)

func main() {
    // 命令行参数
    kubeconfig := flag.String("kubeconfig", "", "path to kubeconfig file")
    configFile := flag.String("config", "", "path to config file")
    flag.Parse()
    
    // 加载配置
    cfg, err := config.LoadConfig(*configFile)
    if err != nil {
        log.Fatalf("Failed to load config: %v", err)
    }
    
    // 设置日志级别
    level, _ := logrus.ParseLevel(cfg.LogLevel)
    log.SetLevel(level)
    log.SetFormatter(&logrus.JSONFormatter{})
    
    // 创建 Kubernetes 客户端
    var k8sConfig *rest.Config
    if *kubeconfig != "" {
        k8sConfig, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
    } else {
        k8sConfig, err = rest.InClusterConfig()
    }
    if err != nil {
        log.Fatalf("Failed to create kubernetes config: %v", err)
    }
    
    clientset, err := kubernetes.NewForConfig(k8sConfig)
    if err != nil {
        log.Fatalf("Failed to create kubernetes client: %v", err)
    }
    
    // 创建 Cleaner
    podCleaner := cleaner.NewPodCleaner(clientset, cfg, log)
    
    // 启动 Metrics Server
    go podCleaner.StartMetricsServer(cfg.MetricsPort)
    
    // 启动 Cleaner
    stopCh := make(chan struct{})
    go podCleaner.Run(stopCh)
    
    // 等待信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    
    log.Info("Pod Cleaner started")
    <-sigCh
    
    log.Info("Shutting down...")
    close(stopCh)
    time.Sleep(2 * time.Second)
}

pkg/config/config.go

package config

import (
    "io/ioutil"
    "time"
    
    "gopkg.in/yaml.v2"
)

type Config struct {
    // 基本配置
    TTL          time.Duration `yaml:"ttl"`
    ScanInterval time.Duration `yaml:"scanInterval"`
    DryRun       bool          `yaml:"dryRun"`
    LogLevel     string        `yaml:"logLevel"`
    MetricsPort  int           `yaml:"metricsPort"`
    
    // 过滤配置
    IncludeNamespaces []string          `yaml:"includeNamespaces"`
    ExcludeNamespaces []string          `yaml:"excludeNamespaces"`
    LabelSelectors    map[string]string `yaml:"labelSelectors"`
    
    // 清理策略
    KeepLastN int `yaml:"keepLastN"`
    
    // 通知配置
    WebhookURL string `yaml:"webhookURL"`
}

func LoadConfig(path string) (*Config, error) {
    // 默认配置
    cfg := &Config{
        TTL:          1 * time.Hour,
        ScanInterval: 5 * time.Minute,
        DryRun:       false,
        LogLevel:     "info",
        MetricsPort:  9090,
        KeepLastN:    0,
    }
    
    if path == "" {
        return cfg, nil
    }
    
    data, err := ioutil.ReadFile(path)
    if err != nil {
        return nil, err
    }
    
    if err := yaml.Unmarshal(data, cfg); err != nil {
        return nil, err
    }
    
    return cfg, nil
}

pkg/cleaner/cleaner.go

package cleaner

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "sort"
    "time"
    
    "github.com/sirupsen/logrus"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    
    "pod-cleaner/pkg/config"
)

type PodCleaner struct {
    clientset *kubernetes.Clientset
    config    *config.Config
    logger    *logrus.Logger
    metrics   *Metrics
}

type PodInfo struct {
    Namespace  string
    Name       string
    FinishTime time.Time
}

func NewPodCleaner(clientset *kubernetes.Clientset, cfg *config.Config, logger *logrus.Logger) *PodCleaner {
    return &PodCleaner{
        clientset: clientset,
        config:    cfg,
        logger:    logger,
        metrics:   NewMetrics(),
    }
}

func (pc *PodCleaner) Run(stopCh <-chan struct{}) {
    // 创建 Informer
    factory := informers.NewSharedInformerFactory(pc.clientset, 30*time.Second)
    podInformer := factory.Core().V1().Pods()
    
    // 注册事件处理器
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            pc.checkAndCleanPod(pod)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            pod := newObj.(*corev1.Pod)
            pc.checkAndCleanPod(pod)
        },
    })
    
    // 启动 Informer
    factory.Start(stopCh)
    cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced)
    
    pc.logger.Info("Informer synced, starting periodic scan")
    
    // 定期全量扫描
    ticker := time.NewTicker(pc.config.ScanInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            pc.scanAndCleanPods()
        case <-stopCh:
            pc.logger.Info("Stopping pod cleaner")
            return
        }
    }
}

func (pc *PodCleaner) checkAndCleanPod(pod *corev1.Pod) {
    // 1. 检查状态
    if !pc.isCompletedPod(pod) {
        return
    }
    
    // 2. 检查命名空间
    if !pc.shouldProcessNamespace(pod.Namespace) {
        return
    }
    
    // 3. 检查标签
    if !pc.matchesLabelSelector(pod.Labels) {
        return
    }
    
    // 4. 检查是否过期
    finishTime := pc.getPodFinishTime(pod)
    if finishTime.IsZero() || time.Since(finishTime) < pc.config.TTL {
        return
    }
    
    // 5. 删除 Pod
    pc.deletePod(pod)
}

func (pc *PodCleaner) isCompletedPod(pod *corev1.Pod) bool {
    return pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded
}

func (pc *PodCleaner) shouldProcessNamespace(namespace string) bool {
    // 检查排除列表
    for _, ns := range pc.config.ExcludeNamespaces {
        if ns == namespace {
            return false
        }
    }
    
    // 如果有包含列表,检查是否在列表中
    if len(pc.config.IncludeNamespaces) > 0 {
        for _, ns := range pc.config.IncludeNamespaces {
            if ns == namespace {
                return true
            }
        }
        return false
    }
    
    return true
}

func (pc *PodCleaner) matchesLabelSelector(labels map[string]string) bool {
    if len(pc.config.LabelSelectors) == 0 {
        return true
    }
    
    for key, value := range pc.config.LabelSelectors {
        if labels[key] != value {
            return false
        }
    }
    
    return true
}

func (pc *PodCleaner) getPodFinishTime(pod *corev1.Pod) time.Time {
    var finishTime time.Time
    
    // 从 ContainerStatuses 获取最晚的完成时间
    for _, cs := range pod.Status.ContainerStatuses {
        if cs.State.Terminated != nil {
            t := cs.State.Terminated.FinishedAt.Time
            if finishTime.IsZero() || t.After(finishTime) {
                finishTime = t
            }
        }
    }
    
    return finishTime
}

func (pc *PodCleaner) deletePod(pod *corev1.Pod) {
    logger := pc.logger.WithFields(logrus.Fields{
        "namespace": pod.Namespace,
        "pod":       pod.Name,
        "phase":     pod.Status.Phase,
        "age":       time.Since(pc.getPodFinishTime(pod)),
    })
    
    if pc.config.DryRun {
        logger.Info("[DRY RUN] Would delete pod")
        pc.metrics.PodsScanned.Inc()
        return
    }
    
    err := pc.clientset.CoreV1().Pods(pod.Namespace).Delete(
        context.TODO(),
        pod.Name,
        metav1.DeleteOptions{},
    )
    
    if err != nil {
        logger.WithError(err).Error("Failed to delete pod")
        pc.metrics.DeletionErrors.Inc()
        return
    }
    
    logger.Info("Deleted pod")
    pc.metrics.PodsDeleted.Inc()
    
    // 发送 Webhook 通知
    pc.sendWebhookNotification(pod)
}

func (pc *PodCleaner) scanAndCleanPods() {
    pc.logger.Info("Running periodic scan")
    
    pods, err := pc.clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        pc.logger.WithError(err).Error("Failed to list pods")
        return
    }
    
    // 按命名空间分组
    podsByNamespace := make(map[string][]*PodInfo)
    
    for _, pod := range pods.Items {
        if !pc.isCompletedPod(&pod) || !pc.shouldProcessNamespace(pod.Namespace) {
            continue
        }
        
        finishTime := pc.getPodFinishTime(&pod)
        if finishTime.IsZero() {
            continue
        }
        
        podsByNamespace[pod.Namespace] = append(podsByNamespace[pod.Namespace], &PodInfo{
            Namespace:  pod.Namespace,
            Name:       pod.Name,
            FinishTime: finishTime,
        })
    }
    
    // 处理每个命名空间
    for namespace, pods := range podsByNamespace {
        pc.cleanPodsInNamespace(namespace, pods)
    }
}

func (pc *PodCleaner) cleanPodsInNamespace(namespace string, pods []*PodInfo) {
    // 按完成时间排序(最新的在前)
    sort.Slice(pods, func(i, j int) bool {
        return pods[i].FinishTime.After(pods[j].FinishTime)
    })
    
    // 保留最新 N 个(如果配置了)
    startIndex := pc.config.KeepLastN
    if startIndex >= len(pods) {
        return
    }
    
    // 清理过期的 Pod
    for i := startIndex; i < len(pods); i++ {
        podInfo := pods[i]
        
        if time.Since(podInfo.FinishTime) < pc.config.TTL {
            continue
        }
        
        // 获取完整 Pod 对象
        pod, err := pc.clientset.CoreV1().Pods(namespace).Get(
            context.TODO(),
            podInfo.Name,
            metav1.GetOptions{},
        )
        if err != nil {
            continue
        }
        
        pc.deletePod(pod)
    }
}

func (pc *PodCleaner) sendWebhookNotification(pod *corev1.Pod) {
    if pc.config.WebhookURL == "" {
        return
    }
    
    payload := map[string]interface{}{
        "event": "pod_deleted",
        "pod": map[string]string{
            "namespace": pod.Namespace,
            "name":      pod.Name,
            "phase":     string(pod.Status.Phase),
        },
        "timestamp": time.Now().Format(time.RFC3339),
    }
    
    data, _ := json.Marshal(payload)
    
    resp, err := http.Post(pc.config.WebhookURL, "application/json", bytes.NewBuffer(data))
    if err != nil {
        pc.logger.WithError(err).Error("Failed to send webhook notification")
        return
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        pc.logger.Warnf("Webhook returned status %d", resp.StatusCode)
    }
}

pkg/cleaner/metrics.go

package cleaner

import (
    "fmt"
    "net/http"
    
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

type Metrics struct {
    PodsScanned    prometheus.Counter
    PodsDeleted    prometheus.Counter
    DeletionErrors prometheus.Counter
}

func NewMetrics() *Metrics {
    m := &Metrics{
        PodsScanned: prometheus.NewCounter(prometheus.CounterOpts{
            Name: "pod_cleaner_pods_scanned_total",
            Help: "Total number of pods scanned",
        }),
        PodsDeleted: prometheus.NewCounter(prometheus.CounterOpts{
            Name: "pod_cleaner_pods_deleted_total",
            Help: "Total number of pods deleted",
        }),
        DeletionErrors: prometheus.NewCounter(prometheus.CounterOpts{
            Name: "pod_cleaner_deletion_errors_total",
            Help: "Total number of pod deletion errors",
        }),
    }
    
    prometheus.MustRegister(m.PodsScanned)
    prometheus.MustRegister(m.PodsDeleted)
    prometheus.MustRegister(m.DeletionErrors)
    
    return m
}

func (pc *PodCleaner) StartMetricsServer(port int) {
    http.Handle("/metrics", promhttp.Handler())
    
    addr := fmt.Sprintf(":%d", port)
    pc.logger.Infof("Starting metrics server on %s", addr)
    
    if err := http.ListenAndServe(addr, nil); err != nil {
        pc.logger.WithError(err).Fatal("Failed to start metrics server")
    }
}

Python 实现

pod_cleaner.py

#!/usr/bin/env python3

import time
import logging
import argparse
import yaml
from datetime import datetime, timedelta
from kubernetes import client, config, watch
from prometheus_client import Counter, start_http_server
import requests

# Prometheus Metrics
PODS_SCANNED = Counter('pod_cleaner_pods_scanned_total', 'Total pods scanned')
PODS_DELETED = Counter('pod_cleaner_pods_deleted_total', 'Total pods deleted')
DELETION_ERRORS = Counter('pod_cleaner_deletion_errors_total', 'Total deletion errors')

class PodCleaner:
    def __init__(self, config_file=None):
        # 加载 Kubernetes 配置
        try:
            config.load_incluster_config()
            logging.info("Using in-cluster config")
        except:
            config.load_kube_config()
            logging.info("Using kubeconfig")
        
        self.v1 = client.CoreV1Api()
        
        # 加载配置
        self.load_config(config_file)
        
        # 设置日志
        logging.basicConfig(
            level=getattr(logging, self.config['logLevel'].upper()),
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def load_config(self, config_file):
        """加载配置文件"""
        default_config = {
            'ttl': 3600,
            'scanInterval': 300,
            'dryRun': False,
            'logLevel': 'info',
            'metricsPort': 9090,
            'includeNamespaces': [],
            'excludeNamespaces': [],
            'labelSelectors': {},
            'keepLastN': 0,
            'webhookURL': ''
        }
        
        if config_file:
            with open(config_file, 'r') as f:
                user_config = yaml.safe_load(f)
                default_config.update(user_config)
        
        self.config = default_config
        self.ttl = timedelta(seconds=self.config['ttl'])
    
    def run(self):
        """运行清理器"""
        self.logger.info(f"Starting Pod Cleaner (TTL: {self.ttl}, Dry Run: {self.config['dryRun']})")
        
        # 启动 Metrics Server
        start_http_server(self.config['metricsPort'])
        self.logger.info(f"Metrics server started on port {self.config['metricsPort']}")
        
        # 启动 Watch
        w = watch.Watch()
        
        last_scan = time.time()
        
        while True:
            try:
                # Watch Pod 事件(超时后重新连接)
                for event in w.stream(
                    self.v1.list_pod_for_all_namespaces,
                    timeout_seconds=self.config['scanInterval']
                ):
                    if event['type'] in ['ADDED', 'MODIFIED']:
                        pod = event['object']
                        self.check_and_clean_pod(pod)
                
                # 定期全量扫描
                if time.time() - last_scan >= self.config['scanInterval']:
                    self.scan_and_clean_pods()
                    last_scan = time.time()
                
            except Exception as e:
                self.logger.error(f"Error in watch loop: {e}")
                time.sleep(5)
    
    def check_and_clean_pod(self, pod):
        """检查并清理单个 Pod"""
        # 检查状态
        if pod.status.phase not in ['Failed', 'Succeeded']:
            return
        
        # 检查命名空间
        if not self.should_process_namespace(pod.metadata.namespace):
            return
        
        # 检查标签
        if not self.matches_label_selector(pod.metadata.labels or {}):
            return
        
        # 检查是否过期
        finish_time = self.get_pod_finish_time(pod)
        if not finish_time or datetime.utcnow() - finish_time < self.ttl:
            return
        
        # 删除 Pod
        self.delete_pod(pod)
    
    def should_process_namespace(self, namespace):
        """检查是否应该处理该命名空间"""
        if namespace in self.config['excludeNamespaces']:
            return False
        
        if self.config['includeNamespaces']:
            return namespace in self.config['includeNamespaces']
        
        return True
    
    def matches_label_selector(self, labels):
        """检查标签是否匹配"""
        if not self.config['labelSelectors']:
            return True
        
        for key, value in self.config['labelSelectors'].items():
            if labels.get(key) != value:
                return False
        
        return True
    
    def get_pod_finish_time(self, pod):
        """获取 Pod 完成时间"""
        finish_time = None
        
        if pod.status.container_statuses:
            for cs in pod.status.container_statuses:
                if cs.state.terminated:
                    t = cs.state.terminated.finished_at.replace(tzinfo=None)
                    if not finish_time or t > finish_time:
                        finish_time = t
        
        return finish_time
    
    def delete_pod(self, pod):
        """删除 Pod"""
        age = datetime.utcnow() - self.get_pod_finish_time(pod)
        
        self.logger.info(
            f"{'[DRY RUN] Would delete' if self.config['dryRun'] else 'Deleting'} pod "
            f"{pod.metadata.namespace}/{pod.metadata.name} "
            f"(Phase: {pod.status.phase}, Age: {age})"
        )
        
        PODS_SCANNED.inc()
        
        if self.config['dryRun']:
            return
        
        try:
            self.v1.delete_namespaced_pod(
                name=pod.metadata.name,
                namespace=pod.metadata.namespace,
                body=client.V1DeleteOptions()
            )
            PODS_DELETED.inc()
            self.send_webhook_notification(pod)
        except Exception as e:
            self.logger.error(f"Failed to delete pod: {e}")
            DELETION_ERRORS.inc()
    
    def scan_and_clean_pods(self):
        """全量扫描并清理"""
        self.logger.info("Running periodic scan")
        
        try:
            pods = self.v1.list_pod_for_all_namespaces()
            
            # 按命名空间分组
            pods_by_namespace = {}
            
            for pod in pods.items:
                if pod.status.phase not in ['Failed', 'Succeeded']:
                    continue
                
                if not self.should_process_namespace(pod.metadata.namespace):
                    continue
                
                finish_time = self.get_pod_finish_time(pod)
                if not finish_time:
                    continue
                
                ns = pod.metadata.namespace
                if ns not in pods_by_namespace:
                    pods_by_namespace[ns] = []
                
                pods_by_namespace[ns].append({
                    'pod': pod,
                    'finish_time': finish_time
                })
            
            # 处理每个命名空间
            for namespace, pod_list in pods_by_namespace.items():
                self.clean_pods_in_namespace(namespace, pod_list)
        
        except Exception as e:
            self.logger.error(f"Error in periodic scan: {e}")
    
    def clean_pods_in_namespace(self, namespace, pod_list):
        """清理命名空间中的 Pods"""
        # 按完成时间排序(最新的在前)
        pod_list.sort(key=lambda x: x['finish_time'], reverse=True)
        
        # 保留最新 N 个
        start_index = self.config['keepLastN']
        
        for i in range(start_index, len(pod_list)):
            item = pod_list[i]
            
            age = datetime.utcnow() - item['finish_time']
            if age < self.ttl:
                continue
            
            self.delete_pod(item['pod'])
    
    def send_webhook_notification(self, pod):
        """发送 Webhook 通知"""
        if not self.config['webhookURL']:
            return
        
        payload = {
            'event': 'pod_deleted',
            'pod': {
                'namespace': pod.metadata.namespace,
                'name': pod.metadata.name,
                'phase': pod.status.phase
            },
            'timestamp': datetime.utcnow().isoformat()
        }
        
        try:
            response = requests.post(
                self.config['webhookURL'],
                json=payload,
                headers={'Content-Type': 'application/json'}
            )
            if response.status_code != 200:
                self.logger.warning(f"Webhook returned status {response.status_code}")
        except Exception as e:
            self.logger.error(f"Failed to send webhook: {e}")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Kubernetes Pod Cleaner')
    parser.add_argument('--config', help='Path to config file')
    args = parser.parse_args()
    
    cleaner = PodCleaner(config_file=args.config)
    cleaner.run()

配置示例

config.yaml

# 基本配置
ttl: 3600              # 1小时后清理
scanInterval: 300      # 每5分钟扫描一次
dryRun: false          # 生产模式
logLevel: info         # 日志级别
metricsPort: 9090      # Prometheus 端口

# 命名空间过滤
includeNamespaces: []  # 空表示所有(除排除列表外)
excludeNamespaces:     # 排除这些命名空间
  - kube-system
  - kube-public
  - kube-node-lease

# 标签过滤
labelSelectors:        # 只清理匹配这些标签的 Pod
  cleanup: "true"

# 清理策略
keepLastN: 5           # 每个命名空间保留最新5个

# Webhook 通知
webhookURL: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

Kubernetes 部署

deploy/rbac.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: pod-cleaner
  namespace: kube-system

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: pod-cleaner
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["list", "watch", "get", "delete"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: pod-cleaner
subjects:
- kind: ServiceAccount
  name: pod-cleaner
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: pod-cleaner
  apiGroup: rbac.authorization.k8s.io

deploy/deployment.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: pod-cleaner-config
  namespace: kube-system
data:
  config.yaml: |
    ttl: 3600
    scanInterval: 300
    dryRun: false
    logLevel: info
    metricsPort: 9090
    excludeNamespaces:
      - kube-system
      - kube-public
    keepLastN: 5

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pod-cleaner
  namespace: kube-system
  labels:
    app: pod-cleaner
spec:
  replicas: 1
  selector:
    matchLabels:
      app: pod-cleaner
  template:
    metadata:
      labels:
        app: pod-cleaner
    spec:
      serviceAccountName: pod-cleaner
      containers:
      - name: pod-cleaner
        image: your-registry/pod-cleaner:latest
        args:
        - --config=/etc/pod-cleaner/config.yaml
        volumeMounts:
        - name: config
          mountPath: /etc/pod-cleaner
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 200m
            memory: 256Mi
        ports:
        - name: metrics
          containerPort: 9090
        livenessProbe:
          httpGet:
            path: /metrics
            port: metrics
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /metrics
            port: metrics
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: config
        configMap:
          name: pod-cleaner-config

---
apiVersion: v1
kind: Service
metadata:
  name: pod-cleaner
  namespace: kube-system
  labels:
    app: pod-cleaner
spec:
  selector:
    app: pod-cleaner
  ports:
  - name: metrics
    port: 9090
    targetPort: metrics

deploy/servicemonitor.yaml

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: pod-cleaner
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: pod-cleaner
  endpoints:
  - port: metrics
    interval: 30s

测试

创建测试 Pod

# 创建几个会快速完成的 Pod
for i in {1..5}; do
  kubectl run test-pod-$i --image=busybox --restart=Never -- sh -c "sleep 5"
done

# 等待完成
sleep 10

# 查看 Pod 状态
kubectl get pods | grep test-pod

验证清理

# 等待超过 TTL 时间
# 查看是否被清理
kubectl get pods | grep test-pod

# 查看 Pod Cleaner 日志
kubectl logs -n kube-system deployment/pod-cleaner

# 查看 Metrics
kubectl port-forward -n kube-system service/pod-cleaner 9090:9090
curl http://localhost:9090/metrics | grep pod_cleaner

监控告警

Prometheus 告警规则

groups:
- name: pod-cleaner
  interval: 30s
  rules:
  - alert: PodCleanerHighDeletionErrors
    expr: rate(pod_cleaner_deletion_errors_total[5m]) > 0.1
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Pod Cleaner 删除失败率过高"
      description: "过去5分钟删除失败率 {{ $value }}"
  
  - alert: PodCleanerDown
    expr: up{job="pod-cleaner"} == 0
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Pod Cleaner 停止运行"

Grafana Dashboard

{
  "dashboard": {
    "title": "Pod Cleaner",
    "panels": [
      {
        "title": "Pods Deleted",
        "targets": [
          {
            "expr": "rate(pod_cleaner_pods_deleted_total[5m])"
          }
        ]
      },
      {
        "title": "Deletion Errors",
        "targets": [
          {
            "expr": "rate(pod_cleaner_deletion_errors_total[5m])"
          }
        ]
      }
    ]
  }
}

总结

功能特性

自动化: 无需手动清理
可配置: 灵活的 TTL 和过滤规则
安全: Dry Run 模式,白名单机制
可观测: Prometheus Metrics,结构化日志
通知: Webhook 集成
高可用: 支持多副本(需 Leader Election)

生产建议

  1. 从 Dry Run 开始: 先观察再实际删除
  2. 配置白名单: 排除重要命名空间
  3. 合理的 TTL: 不要设置太短(建议 >= 1小时)
  4. 监控告警: 及时发现异常
  5. 定期审查: 检查配置是否合理

下一个项目将介绍节点资源监控告警工具。