项目5:应用健康检查自愈
项目5:应用健康检查自愈
项目背景
Kubernetes 内置的 livenessProbe 和 readinessProbe 有时不足以检测复杂的应用故障:
- ❌ 无法检测应用逻辑错误
- ❌ 无法检测依赖服务故障
- ❌ 无法执行自定义修复操作
解决方案:
实现智能健康检查器,支持自定义检查逻辑和自动修复操作。
功能需求
核心功能
- ✅ 自定义健康检查规则
- ✅ HTTP/TCP/EXEC 多种检查方式
- ✅ 依赖服务检查
- ✅ 自动重启 Pod
- ✅ 自动扩缩容
- ✅ 告警通知
高级功能
- ✅ 级联故障检测
- ✅ 智能修复策略
- ✅ 故障历史记录
- ✅ 修复成功率统计
Go 完整实现
pkg/checker/checker.go
package checker
import (
"context"
"fmt"
"net/http"
"time"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"health-checker/pkg/config"
)
type HealthChecker struct {
clientset *kubernetes.Clientset
config *config.Config
logger *logrus.Logger
history map[string]*FailureHistory
}
type CheckRule struct {
Name string
Namespace string
LabelSelector string
CheckType string // http, tcp, exec, dependency
CheckInterval time.Duration
Threshold int
// HTTP 检查
HTTPEndpoint string
ExpectedStatus int
// TCP 检查
TCPPort int
// EXEC 检查
ExecCommand []string
// 依赖检查
Dependencies []string
// 修复操作
HealingAction string // restart, scale, nothing
ScaleReplicas int32
}
type FailureHistory struct {
PodName string
FailureCount int
LastFailure time.Time
LastHealing time.Time
HealingAttempts int
}
func NewHealthChecker(
clientset *kubernetes.Clientset,
cfg *config.Config,
logger *logrus.Logger,
) *HealthChecker {
return &HealthChecker{
clientset: clientset,
config: cfg,
logger: logger,
history: make(map[string]*FailureHistory),
}
}
func (hc *HealthChecker) Run(stopCh <-chan struct{}) {
hc.logger.Info("Starting health checker")
// 加载检查规则
rules := hc.loadCheckRules()
// 为每个规则启动检查任务
for _, rule := range rules {
go hc.runCheckRule(rule, stopCh)
}
<-stopCh
hc.logger.Info("Stopping health checker")
}
func (hc *HealthChecker) loadCheckRules() []CheckRule {
// 从配置文件或 CRD 加载规则
// 简化示例:返回硬编码规则
return []CheckRule{
{
Name: "nginx-http-check",
Namespace: "default",
LabelSelector: "app=nginx",
CheckType: "http",
CheckInterval: 30 * time.Second,
Threshold: 3,
HTTPEndpoint: "http://localhost/healthz",
ExpectedStatus: 200,
HealingAction: "restart",
},
{
Name: "backend-dependency-check",
Namespace: "default",
LabelSelector: "app=backend",
CheckType: "dependency",
CheckInterval: 60 * time.Second,
Threshold: 2,
Dependencies: []string{"database", "redis"},
HealingAction: "restart",
},
}
}
func (hc *HealthChecker) runCheckRule(rule CheckRule, stopCh <-chan struct{}) {
ticker := time.NewTicker(rule.CheckInterval)
defer ticker.Stop()
hc.logger.Infof("Starting check rule: %s", rule.Name)
for {
select {
case <-ticker.C:
hc.checkPods(rule)
case <-stopCh:
return
}
}
}
func (hc *HealthChecker) checkPods(rule CheckRule) {
// 获取匹配的 Pods
pods, err := hc.clientset.CoreV1().Pods(rule.Namespace).List(
context.TODO(),
metav1.ListOptions{
LabelSelector: rule.LabelSelector,
},
)
if err != nil {
hc.logger.WithError(err).Error("Failed to list pods")
return
}
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
continue
}
healthy := hc.performCheck(&pod, rule)
if !healthy {
hc.handleUnhealthyPod(&pod, rule)
} else {
// 重置失败计数
podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
delete(hc.history, podKey)
}
}
}
func (hc *HealthChecker) performCheck(pod *corev1.Pod, rule CheckRule) bool {
switch rule.CheckType {
case "http":
return hc.checkHTTP(pod, rule)
case "tcp":
return hc.checkTCP(pod, rule)
case "exec":
return hc.checkExec(pod, rule)
case "dependency":
return hc.checkDependency(pod, rule)
default:
return true
}
}
func (hc *HealthChecker) checkHTTP(pod *corev1.Pod, rule CheckRule) bool {
// 构建检查 URL(使用 Pod IP)
url := fmt.Sprintf("http://%s%s", pod.Status.PodIP, rule.HTTPEndpoint)
client := &http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Get(url)
if err != nil {
hc.logger.WithError(err).Warnf("HTTP check failed for pod %s", pod.Name)
return false
}
defer resp.Body.Close()
if resp.StatusCode != rule.ExpectedStatus {
hc.logger.Warnf("HTTP check failed: expected %d, got %d for pod %s",
rule.ExpectedStatus, resp.StatusCode, pod.Name)
return false
}
return true
}
func (hc *HealthChecker) checkTCP(pod *corev1.Pod, rule CheckRule) bool {
// TCP 连接检查
addr := fmt.Sprintf("%s:%d", pod.Status.PodIP, rule.TCPPort)
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
hc.logger.WithError(err).Warnf("TCP check failed for pod %s", pod.Name)
return false
}
conn.Close()
return true
}
func (hc *HealthChecker) checkExec(pod *corev1.Pod, rule CheckRule) bool {
// 在 Pod 中执行命令
// 简化实现:这里应该使用 exec 接口
return true
}
func (hc *HealthChecker) checkDependency(pod *corev1.Pod, rule CheckRule) bool {
// 检查依赖服务
for _, dep := range rule.Dependencies {
if !hc.isDependencyHealthy(dep, pod.Namespace) {
hc.logger.Warnf("Dependency %s is unhealthy for pod %s", dep, pod.Name)
return false
}
}
return true
}
func (hc *HealthChecker) isDependencyHealthy(serviceName, namespace string) bool {
// 检查 Service 的 Endpoints
endpoints, err := hc.clientset.CoreV1().Endpoints(namespace).Get(
context.TODO(),
serviceName,
metav1.GetOptions{},
)
if err != nil {
return false
}
// 检查是否有可用的 endpoints
for _, subset := range endpoints.Subsets {
if len(subset.Addresses) > 0 {
return true
}
}
return false
}
func (hc *HealthChecker) handleUnhealthyPod(pod *corev1.Pod, rule CheckRule) {
podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
// 更新失败历史
if _, exists := hc.history[podKey]; !exists {
hc.history[podKey] = &FailureHistory{
PodName: pod.Name,
}
}
history := hc.history[podKey]
history.FailureCount++
history.LastFailure = time.Now()
// 检查是否达到阈值
if history.FailureCount < rule.Threshold {
hc.logger.Warnf("Pod %s unhealthy (%d/%d)", pod.Name, history.FailureCount, rule.Threshold)
return
}
// 检查冷却期
if time.Since(history.LastHealing) < 5*time.Minute {
hc.logger.Warnf("Pod %s in healing cooldown", pod.Name)
return
}
// 执行修复操作
hc.logger.Infof("Healing pod %s with action: %s", pod.Name, rule.HealingAction)
switch rule.HealingAction {
case "restart":
hc.restartPod(pod)
case "scale":
hc.scaleDeployment(pod, rule.ScaleReplicas)
}
history.LastHealing = time.Now()
history.HealingAttempts++
}
func (hc *HealthChecker) restartPod(pod *corev1.Pod) {
hc.logger.Infof("Restarting pod %s/%s", pod.Namespace, pod.Name)
err := hc.clientset.CoreV1().Pods(pod.Namespace).Delete(
context.TODO(),
pod.Name,
metav1.DeleteOptions{},
)
if err != nil {
hc.logger.WithError(err).Error("Failed to delete pod")
return
}
hc.logger.Infof("Pod %s deleted successfully", pod.Name)
}
func (hc *HealthChecker) scaleDeployment(pod *corev1.Pod, replicas int32) {
// 获取 Pod 所属的 Deployment
for _, owner := range pod.OwnerReferences {
if owner.Kind == "ReplicaSet" {
// 从 ReplicaSet 获取 Deployment
rs, err := hc.clientset.AppsV1().ReplicaSets(pod.Namespace).Get(
context.TODO(),
owner.Name,
metav1.GetOptions{},
)
if err != nil {
continue
}
for _, rsOwner := range rs.OwnerReferences {
if rsOwner.Kind == "Deployment" {
hc.logger.Infof("Scaling deployment %s to %d replicas",
rsOwner.Name, replicas)
scale := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: rsOwner.Name,
Namespace: pod.Namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: replicas,
},
}
_, err := hc.clientset.AppsV1().Deployments(pod.Namespace).UpdateScale(
context.TODO(),
rsOwner.Name,
scale,
metav1.UpdateOptions{},
)
if err != nil {
hc.logger.WithError(err).Error("Failed to scale deployment")
}
return
}
}
}
}
}
Python 实现
health_checker.py
#!/usr/bin/env python3
import time
import requests
import socket
from datetime import datetime, timedelta
from kubernetes import client, config
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class CheckRule:
name: str
namespace: str
label_selector: str
check_type: str
check_interval: int = 30
threshold: int = 3
http_endpoint: str = ""
expected_status: int = 200
tcp_port: int = 0
exec_command: List[str] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list)
healing_action: str = "restart"
scale_replicas: int = 3
@dataclass
class FailureHistory:
pod_name: str
failure_count: int = 0
last_failure: datetime = None
last_healing: datetime = None
healing_attempts: int = 0
class HealthChecker:
def __init__(self):
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.history: Dict[str, FailureHistory] = {}
def run(self):
"""运行健康检查器"""
print("Starting health checker...")
# 加载检查规则
rules = self.load_check_rules()
# 启动检查循环
import threading
for rule in rules:
thread = threading.Thread(
target=self.run_check_rule,
args=(rule,),
daemon=True
)
thread.start()
# 保持主线程运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping health checker...")
def load_check_rules(self) -> List[CheckRule]:
"""加载检查规则"""
return [
CheckRule(
name="nginx-http-check",
namespace="default",
label_selector="app=nginx",
check_type="http",
check_interval=30,
threshold=3,
http_endpoint="/healthz",
expected_status=200,
healing_action="restart"
),
CheckRule(
name="backend-dependency-check",
namespace="default",
label_selector="app=backend",
check_type="dependency",
check_interval=60,
threshold=2,
dependencies=["database", "redis"],
healing_action="restart"
),
]
def run_check_rule(self, rule: CheckRule):
"""运行检查规则"""
print(f"Starting check rule: {rule.name}")
while True:
self.check_pods(rule)
time.sleep(rule.check_interval)
def check_pods(self, rule: CheckRule):
"""检查 Pods"""
try:
pods = self.v1.list_namespaced_pod(
namespace=rule.namespace,
label_selector=rule.label_selector
)
for pod in pods.items:
if pod.status.phase != 'Running':
continue
healthy = self.perform_check(pod, rule)
if not healthy:
self.handle_unhealthy_pod(pod, rule)
else:
# 重置失败计数
pod_key = f"{pod.metadata.namespace}/{pod.metadata.name}"
if pod_key in self.history:
del self.history[pod_key]
except Exception as e:
print(f"Error checking pods: {e}")
def perform_check(self, pod, rule: CheckRule) -> bool:
"""执行健康检查"""
if rule.check_type == "http":
return self.check_http(pod, rule)
elif rule.check_type == "tcp":
return self.check_tcp(pod, rule)
elif rule.check_type == "exec":
return self.check_exec(pod, rule)
elif rule.check_type == "dependency":
return self.check_dependency(pod, rule)
return True
def check_http(self, pod, rule: CheckRule) -> bool:
"""HTTP 健康检查"""
url = f"http://{pod.status.pod_ip}{rule.http_endpoint}"
try:
response = requests.get(url, timeout=5)
if response.status_code != rule.expected_status:
print(f"HTTP check failed: expected {rule.expected_status}, "
f"got {response.status_code} for pod {pod.metadata.name}")
return False
return True
except Exception as e:
print(f"HTTP check failed for pod {pod.metadata.name}: {e}")
return False
def check_tcp(self, pod, rule: CheckRule) -> bool:
"""TCP 健康检查"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((pod.status.pod_ip, rule.tcp_port))
sock.close()
if result != 0:
print(f"TCP check failed for pod {pod.metadata.name}")
return False
return True
except Exception as e:
print(f"TCP check failed for pod {pod.metadata.name}: {e}")
return False
def check_exec(self, pod, rule: CheckRule) -> bool:
"""EXEC 健康检查"""
# 简化实现
return True
def check_dependency(self, pod, rule: CheckRule) -> bool:
"""依赖服务检查"""
for dep in rule.dependencies:
if not self.is_dependency_healthy(dep, pod.metadata.namespace):
print(f"Dependency {dep} is unhealthy for pod {pod.metadata.name}")
return False
return True
def is_dependency_healthy(self, service_name: str, namespace: str) -> bool:
"""检查依赖服务是否健康"""
try:
endpoints = self.v1.read_namespaced_endpoints(
name=service_name,
namespace=namespace
)
# 检查是否有可用的 endpoints
if endpoints.subsets:
for subset in endpoints.subsets:
if subset.addresses:
return True
return False
except Exception as e:
print(f"Failed to check dependency {service_name}: {e}")
return False
def handle_unhealthy_pod(self, pod, rule: CheckRule):
"""处理不健康的 Pod"""
pod_key = f"{pod.metadata.namespace}/{pod.metadata.name}"
# 更新失败历史
if pod_key not in self.history:
self.history[pod_key] = FailureHistory(pod_name=pod.metadata.name)
history = self.history[pod_key]
history.failure_count += 1
history.last_failure = datetime.now()
# 检查是否达到阈值
if history.failure_count < rule.threshold:
print(f"Pod {pod.metadata.name} unhealthy ({history.failure_count}/{rule.threshold})")
return
# 检查冷却期
if history.last_healing:
cooldown = timedelta(minutes=5)
if datetime.now() - history.last_healing < cooldown:
print(f"Pod {pod.metadata.name} in healing cooldown")
return
# 执行修复操作
print(f"Healing pod {pod.metadata.name} with action: {rule.healing_action}")
if rule.healing_action == "restart":
self.restart_pod(pod)
elif rule.healing_action == "scale":
self.scale_deployment(pod, rule.scale_replicas)
history.last_healing = datetime.now()
history.healing_attempts += 1
def restart_pod(self, pod):
"""重启 Pod"""
print(f"Restarting pod {pod.metadata.namespace}/{pod.metadata.name}")
try:
self.v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=client.V1DeleteOptions()
)
print(f"Pod {pod.metadata.name} deleted successfully")
except Exception as e:
print(f"Failed to delete pod: {e}")
def scale_deployment(self, pod, replicas: int):
"""扩缩容 Deployment"""
# 获取 Pod 所属的 Deployment
for owner in pod.metadata.owner_references or []:
if owner.kind == "ReplicaSet":
try:
rs = self.apps_v1.read_namespaced_replica_set(
name=owner.name,
namespace=pod.metadata.namespace
)
for rs_owner in rs.metadata.owner_references or []:
if rs_owner.kind == "Deployment":
print(f"Scaling deployment {rs_owner.name} to {replicas} replicas")
# 获取当前 Deployment
deployment = self.apps_v1.read_namespaced_deployment(
name=rs_owner.name,
namespace=pod.metadata.namespace
)
# 更新副本数
deployment.spec.replicas = replicas
self.apps_v1.patch_namespaced_deployment_scale(
name=rs_owner.name,
namespace=pod.metadata.namespace,
body={'spec': {'replicas': replicas}}
)
return
except Exception as e:
print(f"Failed to scale deployment: {e}")
if __name__ == '__main__':
checker = HealthChecker()
checker.run()
配置示例
check-rules.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: health-check-rules
namespace: kube-system
data:
rules.yaml: |
rules:
- name: nginx-http-check
namespace: default
labelSelector: app=nginx
checkType: http
checkInterval: 30s
threshold: 3
httpEndpoint: /healthz
expectedStatus: 200
healingAction: restart
- name: backend-dependency-check
namespace: default
labelSelector: app=backend
checkType: dependency
checkInterval: 60s
threshold: 2
dependencies:
- database
- redis
healingAction: restart
- name: database-tcp-check
namespace: default
labelSelector: app=database
checkType: tcp
checkInterval: 30s
threshold: 3
tcpPort: 3306
healingAction: scale
scaleReplicas: 5
部署
deploy/deployment.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: health-checker
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: health-checker
rules:
- apiGroups: [""]
resources: ["pods", "endpoints"]
verbs: ["list", "get", "watch", "delete"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets", "deployments/scale"]
verbs: ["list", "get", "patch", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: health-checker
subjects:
- kind: ServiceAccount
name: health-checker
namespace: kube-system
roleRef:
kind: ClusterRole
name: health-checker
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: health-checker
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: health-checker
template:
metadata:
labels:
app: health-checker
spec:
serviceAccountName: health-checker
containers:
- name: health-checker
image: your-registry/health-checker:latest
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
使用示例
1. 为应用启用健康检查
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
health-check: enabled # 标记启用健康检查
spec:
containers:
- name: nginx
image: nginx:1.21
ports:
- containerPort: 80
2. 配置健康检查端点
应用需要提供健康检查端点:
// Go 示例
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
// 检查应用健康状态
if isHealthy() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("Unhealthy"))
}
})
监控告警
Prometheus 指标
var (
checksTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "health_checks_total",
Help: "Total number of health checks",
},
[]string{"rule", "result"},
)
healingTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "healing_actions_total",
Help: "Total number of healing actions",
},
[]string{"action", "result"},
)
)
告警规则
groups:
- name: health-checker
rules:
- alert: HighHealingRate
expr: rate(healing_actions_total[5m]) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "高频率自愈操作"
description: "过去5分钟自愈操作频率 {{ $value }}"
总结
功能特性
✅ 智能检查: HTTP、TCP、EXEC、依赖检查
✅ 自动修复: 重启、扩缩容
✅ 故障阈值: 避免误操作
✅ 冷却期: 防止频繁修复
✅ 历史记录: 追踪修复效果
最佳实践
- 合理设置阈值: 避免过于敏感
- 配置冷却期: 防止频繁重启
- 监控告警: 及时发现异常
- 渐进式修复: 先尝试轻量级修复
- 记录日志: 便于问题排查
扩展方向
- ML 预测: 基于历史数据预测故障
- 级联修复: 自动处理依赖故障
- 修复策略: 更多修复手段(流量切换、降级等)
- A/B 测试: 验证修复效果
下一个项目将介绍多集群资源聚合工具。