项目5:应用健康检查自愈

项目5:应用健康检查自愈

项目背景

Kubernetes 内置的 livenessProbe 和 readinessProbe 有时不足以检测复杂的应用故障:

  • ❌ 无法检测应用逻辑错误
  • ❌ 无法检测依赖服务故障
  • ❌ 无法执行自定义修复操作

解决方案
实现智能健康检查器,支持自定义检查逻辑和自动修复操作。

功能需求

核心功能

  • ✅ 自定义健康检查规则
  • ✅ HTTP/TCP/EXEC 多种检查方式
  • ✅ 依赖服务检查
  • ✅ 自动重启 Pod
  • ✅ 自动扩缩容
  • ✅ 告警通知

高级功能

  • ✅ 级联故障检测
  • ✅ 智能修复策略
  • ✅ 故障历史记录
  • ✅ 修复成功率统计

Go 完整实现

pkg/checker/checker.go

package checker

import (
    "context"
    "fmt"
    "net/http"
    "time"
    
    "github.com/sirupsen/logrus"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    
    "health-checker/pkg/config"
)

type HealthChecker struct {
    clientset *kubernetes.Clientset
    config    *config.Config
    logger    *logrus.Logger
    history   map[string]*FailureHistory
}

type CheckRule struct {
    Name           string
    Namespace      string
    LabelSelector  string
    CheckType      string  // http, tcp, exec, dependency
    CheckInterval  time.Duration
    Threshold      int
    
    // HTTP 检查
    HTTPEndpoint   string
    ExpectedStatus int
    
    // TCP 检查
    TCPPort        int
    
    // EXEC 检查
    ExecCommand    []string
    
    // 依赖检查
    Dependencies   []string
    
    // 修复操作
    HealingAction  string  // restart, scale, nothing
    ScaleReplicas  int32
}

type FailureHistory struct {
    PodName        string
    FailureCount   int
    LastFailure    time.Time
    LastHealing    time.Time
    HealingAttempts int
}

func NewHealthChecker(
    clientset *kubernetes.Clientset,
    cfg *config.Config,
    logger *logrus.Logger,
) *HealthChecker {
    return &HealthChecker{
        clientset: clientset,
        config:    cfg,
        logger:    logger,
        history:   make(map[string]*FailureHistory),
    }
}

func (hc *HealthChecker) Run(stopCh <-chan struct{}) {
    hc.logger.Info("Starting health checker")
    
    // 加载检查规则
    rules := hc.loadCheckRules()
    
    // 为每个规则启动检查任务
    for _, rule := range rules {
        go hc.runCheckRule(rule, stopCh)
    }
    
    <-stopCh
    hc.logger.Info("Stopping health checker")
}

func (hc *HealthChecker) loadCheckRules() []CheckRule {
    // 从配置文件或 CRD 加载规则
    // 简化示例:返回硬编码规则
    return []CheckRule{
        {
            Name:           "nginx-http-check",
            Namespace:      "default",
            LabelSelector:  "app=nginx",
            CheckType:      "http",
            CheckInterval:  30 * time.Second,
            Threshold:      3,
            HTTPEndpoint:   "http://localhost/healthz",
            ExpectedStatus: 200,
            HealingAction:  "restart",
        },
        {
            Name:           "backend-dependency-check",
            Namespace:      "default",
            LabelSelector:  "app=backend",
            CheckType:      "dependency",
            CheckInterval:  60 * time.Second,
            Threshold:      2,
            Dependencies:   []string{"database", "redis"},
            HealingAction:  "restart",
        },
    }
}

func (hc *HealthChecker) runCheckRule(rule CheckRule, stopCh <-chan struct{}) {
    ticker := time.NewTicker(rule.CheckInterval)
    defer ticker.Stop()
    
    hc.logger.Infof("Starting check rule: %s", rule.Name)
    
    for {
        select {
        case <-ticker.C:
            hc.checkPods(rule)
        case <-stopCh:
            return
        }
    }
}

func (hc *HealthChecker) checkPods(rule CheckRule) {
    // 获取匹配的 Pods
    pods, err := hc.clientset.CoreV1().Pods(rule.Namespace).List(
        context.TODO(),
        metav1.ListOptions{
            LabelSelector: rule.LabelSelector,
        },
    )
    if err != nil {
        hc.logger.WithError(err).Error("Failed to list pods")
        return
    }
    
    for _, pod := range pods.Items {
        if pod.Status.Phase != corev1.PodRunning {
            continue
        }
        
        healthy := hc.performCheck(&pod, rule)
        
        if !healthy {
            hc.handleUnhealthyPod(&pod, rule)
        } else {
            // 重置失败计数
            podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
            delete(hc.history, podKey)
        }
    }
}

func (hc *HealthChecker) performCheck(pod *corev1.Pod, rule CheckRule) bool {
    switch rule.CheckType {
    case "http":
        return hc.checkHTTP(pod, rule)
    case "tcp":
        return hc.checkTCP(pod, rule)
    case "exec":
        return hc.checkExec(pod, rule)
    case "dependency":
        return hc.checkDependency(pod, rule)
    default:
        return true
    }
}

func (hc *HealthChecker) checkHTTP(pod *corev1.Pod, rule CheckRule) bool {
    // 构建检查 URL(使用 Pod IP)
    url := fmt.Sprintf("http://%s%s", pod.Status.PodIP, rule.HTTPEndpoint)
    
    client := &http.Client{
        Timeout: 5 * time.Second,
    }
    
    resp, err := client.Get(url)
    if err != nil {
        hc.logger.WithError(err).Warnf("HTTP check failed for pod %s", pod.Name)
        return false
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != rule.ExpectedStatus {
        hc.logger.Warnf("HTTP check failed: expected %d, got %d for pod %s",
            rule.ExpectedStatus, resp.StatusCode, pod.Name)
        return false
    }
    
    return true
}

func (hc *HealthChecker) checkTCP(pod *corev1.Pod, rule CheckRule) bool {
    // TCP 连接检查
    addr := fmt.Sprintf("%s:%d", pod.Status.PodIP, rule.TCPPort)
    
    conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
    if err != nil {
        hc.logger.WithError(err).Warnf("TCP check failed for pod %s", pod.Name)
        return false
    }
    conn.Close()
    
    return true
}

func (hc *HealthChecker) checkExec(pod *corev1.Pod, rule CheckRule) bool {
    // 在 Pod 中执行命令
    // 简化实现:这里应该使用 exec 接口
    return true
}

func (hc *HealthChecker) checkDependency(pod *corev1.Pod, rule CheckRule) bool {
    // 检查依赖服务
    for _, dep := range rule.Dependencies {
        if !hc.isDependencyHealthy(dep, pod.Namespace) {
            hc.logger.Warnf("Dependency %s is unhealthy for pod %s", dep, pod.Name)
            return false
        }
    }
    
    return true
}

func (hc *HealthChecker) isDependencyHealthy(serviceName, namespace string) bool {
    // 检查 Service 的 Endpoints
    endpoints, err := hc.clientset.CoreV1().Endpoints(namespace).Get(
        context.TODO(),
        serviceName,
        metav1.GetOptions{},
    )
    if err != nil {
        return false
    }
    
    // 检查是否有可用的 endpoints
    for _, subset := range endpoints.Subsets {
        if len(subset.Addresses) > 0 {
            return true
        }
    }
    
    return false
}

func (hc *HealthChecker) handleUnhealthyPod(pod *corev1.Pod, rule CheckRule) {
    podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
    
    // 更新失败历史
    if _, exists := hc.history[podKey]; !exists {
        hc.history[podKey] = &FailureHistory{
            PodName: pod.Name,
        }
    }
    
    history := hc.history[podKey]
    history.FailureCount++
    history.LastFailure = time.Now()
    
    // 检查是否达到阈值
    if history.FailureCount < rule.Threshold {
        hc.logger.Warnf("Pod %s unhealthy (%d/%d)", pod.Name, history.FailureCount, rule.Threshold)
        return
    }
    
    // 检查冷却期
    if time.Since(history.LastHealing) < 5*time.Minute {
        hc.logger.Warnf("Pod %s in healing cooldown", pod.Name)
        return
    }
    
    // 执行修复操作
    hc.logger.Infof("Healing pod %s with action: %s", pod.Name, rule.HealingAction)
    
    switch rule.HealingAction {
    case "restart":
        hc.restartPod(pod)
    case "scale":
        hc.scaleDeployment(pod, rule.ScaleReplicas)
    }
    
    history.LastHealing = time.Now()
    history.HealingAttempts++
}

func (hc *HealthChecker) restartPod(pod *corev1.Pod) {
    hc.logger.Infof("Restarting pod %s/%s", pod.Namespace, pod.Name)
    
    err := hc.clientset.CoreV1().Pods(pod.Namespace).Delete(
        context.TODO(),
        pod.Name,
        metav1.DeleteOptions{},
    )
    
    if err != nil {
        hc.logger.WithError(err).Error("Failed to delete pod")
        return
    }
    
    hc.logger.Infof("Pod %s deleted successfully", pod.Name)
}

func (hc *HealthChecker) scaleDeployment(pod *corev1.Pod, replicas int32) {
    // 获取 Pod 所属的 Deployment
    for _, owner := range pod.OwnerReferences {
        if owner.Kind == "ReplicaSet" {
            // 从 ReplicaSet 获取 Deployment
            rs, err := hc.clientset.AppsV1().ReplicaSets(pod.Namespace).Get(
                context.TODO(),
                owner.Name,
                metav1.GetOptions{},
            )
            if err != nil {
                continue
            }
            
            for _, rsOwner := range rs.OwnerReferences {
                if rsOwner.Kind == "Deployment" {
                    hc.logger.Infof("Scaling deployment %s to %d replicas",
                        rsOwner.Name, replicas)
                    
                    scale := &autoscalingv1.Scale{
                        ObjectMeta: metav1.ObjectMeta{
                            Name:      rsOwner.Name,
                            Namespace: pod.Namespace,
                        },
                        Spec: autoscalingv1.ScaleSpec{
                            Replicas: replicas,
                        },
                    }
                    
                    _, err := hc.clientset.AppsV1().Deployments(pod.Namespace).UpdateScale(
                        context.TODO(),
                        rsOwner.Name,
                        scale,
                        metav1.UpdateOptions{},
                    )
                    
                    if err != nil {
                        hc.logger.WithError(err).Error("Failed to scale deployment")
                    }
                    
                    return
                }
            }
        }
    }
}

Python 实现

health_checker.py

#!/usr/bin/env python3

import time
import requests
import socket
from datetime import datetime, timedelta
from kubernetes import client, config
from dataclasses import dataclass, field
from typing import List, Dict

@dataclass
class CheckRule:
    name: str
    namespace: str
    label_selector: str
    check_type: str
    check_interval: int = 30
    threshold: int = 3
    http_endpoint: str = ""
    expected_status: int = 200
    tcp_port: int = 0
    exec_command: List[str] = field(default_factory=list)
    dependencies: List[str] = field(default_factory=list)
    healing_action: str = "restart"
    scale_replicas: int = 3

@dataclass
class FailureHistory:
    pod_name: str
    failure_count: int = 0
    last_failure: datetime = None
    last_healing: datetime = None
    healing_attempts: int = 0

class HealthChecker:
    def __init__(self):
        try:
            config.load_incluster_config()
        except:
            config.load_kube_config()
        
        self.v1 = client.CoreV1Api()
        self.apps_v1 = client.AppsV1Api()
        self.history: Dict[str, FailureHistory] = {}
    
    def run(self):
        """运行健康检查器"""
        print("Starting health checker...")
        
        # 加载检查规则
        rules = self.load_check_rules()
        
        # 启动检查循环
        import threading
        
        for rule in rules:
            thread = threading.Thread(
                target=self.run_check_rule,
                args=(rule,),
                daemon=True
            )
            thread.start()
        
        # 保持主线程运行
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("Stopping health checker...")
    
    def load_check_rules(self) -> List[CheckRule]:
        """加载检查规则"""
        return [
            CheckRule(
                name="nginx-http-check",
                namespace="default",
                label_selector="app=nginx",
                check_type="http",
                check_interval=30,
                threshold=3,
                http_endpoint="/healthz",
                expected_status=200,
                healing_action="restart"
            ),
            CheckRule(
                name="backend-dependency-check",
                namespace="default",
                label_selector="app=backend",
                check_type="dependency",
                check_interval=60,
                threshold=2,
                dependencies=["database", "redis"],
                healing_action="restart"
            ),
        ]
    
    def run_check_rule(self, rule: CheckRule):
        """运行检查规则"""
        print(f"Starting check rule: {rule.name}")
        
        while True:
            self.check_pods(rule)
            time.sleep(rule.check_interval)
    
    def check_pods(self, rule: CheckRule):
        """检查 Pods"""
        try:
            pods = self.v1.list_namespaced_pod(
                namespace=rule.namespace,
                label_selector=rule.label_selector
            )
            
            for pod in pods.items:
                if pod.status.phase != 'Running':
                    continue
                
                healthy = self.perform_check(pod, rule)
                
                if not healthy:
                    self.handle_unhealthy_pod(pod, rule)
                else:
                    # 重置失败计数
                    pod_key = f"{pod.metadata.namespace}/{pod.metadata.name}"
                    if pod_key in self.history:
                        del self.history[pod_key]
        
        except Exception as e:
            print(f"Error checking pods: {e}")
    
    def perform_check(self, pod, rule: CheckRule) -> bool:
        """执行健康检查"""
        if rule.check_type == "http":
            return self.check_http(pod, rule)
        elif rule.check_type == "tcp":
            return self.check_tcp(pod, rule)
        elif rule.check_type == "exec":
            return self.check_exec(pod, rule)
        elif rule.check_type == "dependency":
            return self.check_dependency(pod, rule)
        
        return True
    
    def check_http(self, pod, rule: CheckRule) -> bool:
        """HTTP 健康检查"""
        url = f"http://{pod.status.pod_ip}{rule.http_endpoint}"
        
        try:
            response = requests.get(url, timeout=5)
            
            if response.status_code != rule.expected_status:
                print(f"HTTP check failed: expected {rule.expected_status}, "
                      f"got {response.status_code} for pod {pod.metadata.name}")
                return False
            
            return True
        
        except Exception as e:
            print(f"HTTP check failed for pod {pod.metadata.name}: {e}")
            return False
    
    def check_tcp(self, pod, rule: CheckRule) -> bool:
        """TCP 健康检查"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(5)
            result = sock.connect_ex((pod.status.pod_ip, rule.tcp_port))
            sock.close()
            
            if result != 0:
                print(f"TCP check failed for pod {pod.metadata.name}")
                return False
            
            return True
        
        except Exception as e:
            print(f"TCP check failed for pod {pod.metadata.name}: {e}")
            return False
    
    def check_exec(self, pod, rule: CheckRule) -> bool:
        """EXEC 健康检查"""
        # 简化实现
        return True
    
    def check_dependency(self, pod, rule: CheckRule) -> bool:
        """依赖服务检查"""
        for dep in rule.dependencies:
            if not self.is_dependency_healthy(dep, pod.metadata.namespace):
                print(f"Dependency {dep} is unhealthy for pod {pod.metadata.name}")
                return False
        
        return True
    
    def is_dependency_healthy(self, service_name: str, namespace: str) -> bool:
        """检查依赖服务是否健康"""
        try:
            endpoints = self.v1.read_namespaced_endpoints(
                name=service_name,
                namespace=namespace
            )
            
            # 检查是否有可用的 endpoints
            if endpoints.subsets:
                for subset in endpoints.subsets:
                    if subset.addresses:
                        return True
            
            return False
        
        except Exception as e:
            print(f"Failed to check dependency {service_name}: {e}")
            return False
    
    def handle_unhealthy_pod(self, pod, rule: CheckRule):
        """处理不健康的 Pod"""
        pod_key = f"{pod.metadata.namespace}/{pod.metadata.name}"
        
        # 更新失败历史
        if pod_key not in self.history:
            self.history[pod_key] = FailureHistory(pod_name=pod.metadata.name)
        
        history = self.history[pod_key]
        history.failure_count += 1
        history.last_failure = datetime.now()
        
        # 检查是否达到阈值
        if history.failure_count < rule.threshold:
            print(f"Pod {pod.metadata.name} unhealthy ({history.failure_count}/{rule.threshold})")
            return
        
        # 检查冷却期
        if history.last_healing:
            cooldown = timedelta(minutes=5)
            if datetime.now() - history.last_healing < cooldown:
                print(f"Pod {pod.metadata.name} in healing cooldown")
                return
        
        # 执行修复操作
        print(f"Healing pod {pod.metadata.name} with action: {rule.healing_action}")
        
        if rule.healing_action == "restart":
            self.restart_pod(pod)
        elif rule.healing_action == "scale":
            self.scale_deployment(pod, rule.scale_replicas)
        
        history.last_healing = datetime.now()
        history.healing_attempts += 1
    
    def restart_pod(self, pod):
        """重启 Pod"""
        print(f"Restarting pod {pod.metadata.namespace}/{pod.metadata.name}")
        
        try:
            self.v1.delete_namespaced_pod(
                name=pod.metadata.name,
                namespace=pod.metadata.namespace,
                body=client.V1DeleteOptions()
            )
            print(f"Pod {pod.metadata.name} deleted successfully")
        except Exception as e:
            print(f"Failed to delete pod: {e}")
    
    def scale_deployment(self, pod, replicas: int):
        """扩缩容 Deployment"""
        # 获取 Pod 所属的 Deployment
        for owner in pod.metadata.owner_references or []:
            if owner.kind == "ReplicaSet":
                try:
                    rs = self.apps_v1.read_namespaced_replica_set(
                        name=owner.name,
                        namespace=pod.metadata.namespace
                    )
                    
                    for rs_owner in rs.metadata.owner_references or []:
                        if rs_owner.kind == "Deployment":
                            print(f"Scaling deployment {rs_owner.name} to {replicas} replicas")
                            
                            # 获取当前 Deployment
                            deployment = self.apps_v1.read_namespaced_deployment(
                                name=rs_owner.name,
                                namespace=pod.metadata.namespace
                            )
                            
                            # 更新副本数
                            deployment.spec.replicas = replicas
                            
                            self.apps_v1.patch_namespaced_deployment_scale(
                                name=rs_owner.name,
                                namespace=pod.metadata.namespace,
                                body={'spec': {'replicas': replicas}}
                            )
                            
                            return
                
                except Exception as e:
                    print(f"Failed to scale deployment: {e}")

if __name__ == '__main__':
    checker = HealthChecker()
    checker.run()

配置示例

check-rules.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: health-check-rules
  namespace: kube-system
data:
  rules.yaml: |
    rules:
    - name: nginx-http-check
      namespace: default
      labelSelector: app=nginx
      checkType: http
      checkInterval: 30s
      threshold: 3
      httpEndpoint: /healthz
      expectedStatus: 200
      healingAction: restart
    
    - name: backend-dependency-check
      namespace: default
      labelSelector: app=backend
      checkType: dependency
      checkInterval: 60s
      threshold: 2
      dependencies:
        - database
        - redis
      healingAction: restart
    
    - name: database-tcp-check
      namespace: default
      labelSelector: app=database
      checkType: tcp
      checkInterval: 30s
      threshold: 3
      tcpPort: 3306
      healingAction: scale
      scaleReplicas: 5

部署

deploy/deployment.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: health-checker
  namespace: kube-system

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: health-checker
rules:
- apiGroups: [""]
  resources: ["pods", "endpoints"]
  verbs: ["list", "get", "watch", "delete"]
- apiGroups: ["apps"]
  resources: ["deployments", "replicasets", "deployments/scale"]
  verbs: ["list", "get", "patch", "update"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: health-checker
subjects:
- kind: ServiceAccount
  name: health-checker
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: health-checker
  apiGroup: rbac.authorization.k8s.io

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: health-checker
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: health-checker
  template:
    metadata:
      labels:
        app: health-checker
    spec:
      serviceAccountName: health-checker
      containers:
      - name: health-checker
        image: your-registry/health-checker:latest
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 200m
            memory: 256Mi

使用示例

1. 为应用启用健康检查

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
        health-check: enabled  # 标记启用健康检查
    spec:
      containers:
      - name: nginx
        image: nginx:1.21
        ports:
        - containerPort: 80

2. 配置健康检查端点

应用需要提供健康检查端点:

// Go 示例
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
    // 检查应用健康状态
    if isHealthy() {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    } else {
        w.WriteHeader(http.StatusServiceUnavailable)
        w.Write([]byte("Unhealthy"))
    }
})

监控告警

Prometheus 指标

var (
    checksTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "health_checks_total",
            Help: "Total number of health checks",
        },
        []string{"rule", "result"},
    )
    
    healingTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "healing_actions_total",
            Help: "Total number of healing actions",
        },
        []string{"action", "result"},
    )
)

告警规则

groups:
- name: health-checker
  rules:
  - alert: HighHealingRate
    expr: rate(healing_actions_total[5m]) > 1
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "高频率自愈操作"
      description: "过去5分钟自愈操作频率 {{ $value }}"

总结

功能特性

智能检查: HTTP、TCP、EXEC、依赖检查
自动修复: 重启、扩缩容
故障阈值: 避免误操作
冷却期: 防止频繁修复
历史记录: 追踪修复效果

最佳实践

  1. 合理设置阈值: 避免过于敏感
  2. 配置冷却期: 防止频繁重启
  3. 监控告警: 及时发现异常
  4. 渐进式修复: 先尝试轻量级修复
  5. 记录日志: 便于问题排查

扩展方向

  1. ML 预测: 基于历史数据预测故障
  2. 级联修复: 自动处理依赖故障
  3. 修复策略: 更多修复手段(流量切换、降级等)
  4. A/B 测试: 验证修复效果

下一个项目将介绍多集群资源聚合工具。