项目2:节点资源监控告警

项目2:节点资源监控告警

项目背景

Kubernetes 集群节点资源不足会导致:

  • ❌ Pod 无法调度(Pending)
  • ❌ 现有 Pod 被驱逐(Evicted)
  • ❌ 集群稳定性下降

解决方案
实时监控节点资源使用情况,提前告警,防止资源耗尽。

功能需求

核心功能

  • ✅ 监控节点 CPU、内存、磁盘使用率
  • ✅ 可配置的告警阈值
  • ✅ 多种告警渠道(Slack、钉钉、企业微信)
  • ✅ 节点标签和污点检测
  • ✅ 节点 NotReady 检测

高级功能

  • ✅ 预测性告警(趋势分析)
  • ✅ 智能告警降噪
  • ✅ 历史数据存储
  • ✅ Dashboard 可视化

Go 完整实现

main.go

package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "github.com/sirupsen/logrus"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned"
    
    "node-monitor/pkg/config"
    "node-monitor/pkg/monitor"
    "node-monitor/pkg/notifier"
)

var log = logrus.New()

func main() {
    kubeconfig := flag.String("kubeconfig", "", "path to kubeconfig")
    configFile := flag.String("config", "", "path to config file")
    flag.Parse()
    
    // 加载配置
    cfg, err := config.LoadConfig(*configFile)
    if err != nil {
        log.Fatalf("Failed to load config: %v", err)
    }
    
    // 设置日志
    level, _ := logrus.ParseLevel(cfg.LogLevel)
    log.SetLevel(level)
    log.SetFormatter(&logrus.JSONFormatter{})
    
    // 创建 Kubernetes 客户端
    var k8sConfig *rest.Config
    if *kubeconfig != "" {
        k8sConfig, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
    } else {
        k8sConfig, err = rest.InClusterConfig()
    }
    if err != nil {
        log.Fatalf("Failed to create k8s config: %v", err)
    }
    
    clientset, err := kubernetes.NewForConfig(k8sConfig)
    if err != nil {
        log.Fatalf("Failed to create clientset: %v", err)
    }
    
    metricsClient, err := metricsv1beta1.NewForConfig(k8sConfig)
    if err != nil {
        log.Fatalf("Failed to create metrics client: %v", err)
    }
    
    // 创建通知器
    notifiers := []notifier.Notifier{}
    
    if cfg.Slack.WebhookURL != "" {
        notifiers = append(notifiers, notifier.NewSlackNotifier(cfg.Slack.WebhookURL))
    }
    
    if cfg.DingTalk.WebhookURL != "" {
        notifiers = append(notifiers, notifier.NewDingTalkNotifier(
            cfg.DingTalk.WebhookURL,
            cfg.DingTalk.Secret,
        ))
    }
    
    // 创建监控器
    nodeMonitor := monitor.NewNodeMonitor(clientset, metricsClient, cfg, notifiers, log)
    
    // 启动 Metrics Server
    go nodeMonitor.StartMetricsServer(cfg.MetricsPort)
    
    // 启动监控
    stopCh := make(chan struct{})
    go nodeMonitor.Run(stopCh)
    
    // 等待信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    
    log.Info("Node Monitor started")
    <-sigCh
    
    log.Info("Shutting down...")
    close(stopCh)
    time.Sleep(2 * time.Second)
}

pkg/monitor/monitor.go

package monitor

import (
    "context"
    "fmt"
    "time"
    
    "github.com/sirupsen/logrus"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned"
    
    "node-monitor/pkg/config"
    "node-monitor/pkg/notifier"
)

type NodeMonitor struct {
    clientset     *kubernetes.Clientset
    metricsClient *metricsv1beta1.Clientset
    config        *config.Config
    notifiers     []notifier.Notifier
    logger        *logrus.Logger
    alertHistory  map[string]time.Time  // 告警去重
}

type NodeMetrics struct {
    NodeName       string
    CPUUsage       int64  // millicores
    CPUCapacity    int64
    CPUPercent     float64
    MemoryUsage    int64  // bytes
    MemoryCapacity int64
    MemoryPercent  float64
    DiskUsage      int64
    DiskCapacity   int64
    DiskPercent    float64
    PodCount       int
    PodCapacity    int
    Conditions     []corev1.NodeCondition
}

func NewNodeMonitor(
    clientset *kubernetes.Clientset,
    metricsClient *metricsv1beta1.Clientset,
    cfg *config.Config,
    notifiers []notifier.Notifier,
    logger *logrus.Logger,
) *NodeMonitor {
    return &NodeMonitor{
        clientset:     clientset,
        metricsClient: metricsClient,
        config:        cfg,
        notifiers:     notifiers,
        logger:        logger,
        alertHistory:  make(map[string]time.Time),
    }
}

func (nm *NodeMonitor) Run(stopCh <-chan struct{}) {
    // 创建 Informer 监听 Node 变化
    factory := informers.NewSharedInformerFactory(nm.clientset, 30*time.Second)
    nodeInformer := factory.Core().V1().Nodes()
    
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            node := obj.(*corev1.Node)
            nm.checkNode(node)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            node := newObj.(*corev1.Node)
            nm.checkNode(node)
        },
        DeleteFunc: func(obj interface{}) {
            node := obj.(*corev1.Node)
            nm.sendAlert("node_deleted", fmt.Sprintf("节点 %s 已删除", node.Name), "critical")
        },
    })
    
    factory.Start(stopCh)
    cache.WaitForCacheSync(stopCh, nodeInformer.Informer().HasSynced)
    
    nm.logger.Info("Informer synced, starting periodic check")
    
    // 定期检查
    ticker := time.NewTicker(time.Duration(nm.config.CheckInterval) * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            nm.checkAllNodes()
        case <-stopCh:
            nm.logger.Info("Stopping node monitor")
            return
        }
    }
}

func (nm *NodeMonitor) checkNode(node *corev1.Node) {
    // 检查节点状态
    for _, condition := range node.Status.Conditions {
        if condition.Type == corev1.NodeReady {
            if condition.Status != corev1.ConditionTrue {
                nm.sendAlert(
                    "node_not_ready",
                    fmt.Sprintf("节点 %s 状态异常: %s", node.Name, condition.Reason),
                    "critical",
                )
            }
        }
    }
    
    // 检查节点污点
    if len(node.Spec.Taints) > 0 {
        for _, taint := range node.Spec.Taints {
            if taint.Effect == corev1.TaintEffectNoSchedule {
                nm.logger.Warnf("Node %s has NoSchedule taint: %s=%s",
                    node.Name, taint.Key, taint.Value)
            }
        }
    }
}

func (nm *NodeMonitor) checkAllNodes() {
    nm.logger.Debug("Running periodic node check")
    
    // 获取所有节点
    nodes, err := nm.clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        nm.logger.WithError(err).Error("Failed to list nodes")
        return
    }
    
    // 获取节点指标
    nodeMetricsList, err := nm.metricsClient.MetricsV1beta1().NodeMetricses().List(
        context.TODO(),
        metav1.ListOptions{},
    )
    if err != nil {
        nm.logger.WithError(err).Error("Failed to get node metrics")
        return
    }
    
    // 构建指标映射
    metricsMap := make(map[string]*metricsv1beta1.NodeMetrics)
    for i := range nodeMetricsList.Items {
        nm := &nodeMetricsList.Items[i]
        metricsMap[nm.Name] = nm
    }
    
    // 检查每个节点
    for _, node := range nodes.Items {
        metrics := nm.collectNodeMetrics(&node, metricsMap[node.Name])
        nm.evaluateMetrics(metrics)
    }
}

func (nm *NodeMonitor) collectNodeMetrics(
    node *corev1.Node,
    nodeMetrics *metricsv1beta1.NodeMetrics,
) *NodeMetrics {
    metrics := &NodeMetrics{
        NodeName:   node.Name,
        Conditions: node.Status.Conditions,
    }
    
    // CPU 容量
    if cpu, ok := node.Status.Capacity[corev1.ResourceCPU]; ok {
        metrics.CPUCapacity = cpu.MilliValue()
    }
    
    // 内存容量
    if mem, ok := node.Status.Capacity[corev1.ResourceMemory]; ok {
        metrics.MemoryCapacity = mem.Value()
    }
    
    // Pod 容量
    if pods, ok := node.Status.Capacity[corev1.ResourcePods]; ok {
        metrics.PodCapacity = int(pods.Value())
    }
    
    // 当前 Pod 数量
    podList, _ := nm.clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
        FieldSelector: fmt.Sprintf("spec.nodeName=%s", node.Name),
    })
    metrics.PodCount = len(podList.Items)
    
    // CPU 和内存使用量
    if nodeMetrics != nil {
        metrics.CPUUsage = nodeMetrics.Usage.Cpu().MilliValue()
        metrics.MemoryUsage = nodeMetrics.Usage.Memory().Value()
    }
    
    // 计算百分比
    if metrics.CPUCapacity > 0 {
        metrics.CPUPercent = float64(metrics.CPUUsage) / float64(metrics.CPUCapacity) * 100
    }
    
    if metrics.MemoryCapacity > 0 {
        metrics.MemoryPercent = float64(metrics.MemoryUsage) / float64(metrics.MemoryCapacity) * 100
    }
    
    // 磁盘使用(从 node conditions 获取)
    for _, condition := range node.Status.Conditions {
        if condition.Type == corev1.NodeDiskPressure && condition.Status == corev1.ConditionTrue {
            metrics.DiskPercent = 90  // 估算值
        }
    }
    
    return metrics
}

func (nm *NodeMonitor) evaluateMetrics(metrics *NodeMetrics) {
    // 检查 CPU 使用率
    if metrics.CPUPercent > float64(nm.config.Thresholds.CPUWarning) {
        severity := "warning"
        if metrics.CPUPercent > float64(nm.config.Thresholds.CPUCritical) {
            severity = "critical"
        }
        
        nm.sendAlert(
            fmt.Sprintf("node_%s_cpu_high", metrics.NodeName),
            fmt.Sprintf("节点 %s CPU 使用率 %.2f%%", metrics.NodeName, metrics.CPUPercent),
            severity,
        )
    }
    
    // 检查内存使用率
    if metrics.MemoryPercent > float64(nm.config.Thresholds.MemoryWarning) {
        severity := "warning"
        if metrics.MemoryPercent > float64(nm.config.Thresholds.MemoryCritical) {
            severity = "critical"
        }
        
        nm.sendAlert(
            fmt.Sprintf("node_%s_memory_high", metrics.NodeName),
            fmt.Sprintf("节点 %s 内存使用率 %.2f%%", metrics.NodeName, metrics.MemoryPercent),
            severity,
        )
    }
    
    // 检查 Pod 数量
    if metrics.PodCapacity > 0 {
        podPercent := float64(metrics.PodCount) / float64(metrics.PodCapacity) * 100
        
        if podPercent > 80 {
            nm.sendAlert(
                fmt.Sprintf("node_%s_pod_high", metrics.NodeName),
                fmt.Sprintf("节点 %s Pod 数量 %d/%d (%.0f%%)",
                    metrics.NodeName, metrics.PodCount, metrics.PodCapacity, podPercent),
                "warning",
            )
        }
    }
    
    // 记录指标
    nm.logger.WithFields(logrus.Fields{
        "node":          metrics.NodeName,
        "cpu_percent":   fmt.Sprintf("%.2f%%", metrics.CPUPercent),
        "mem_percent":   fmt.Sprintf("%.2f%%", metrics.MemoryPercent),
        "pod_count":     metrics.PodCount,
        "pod_capacity":  metrics.PodCapacity,
    }).Debug("Node metrics")
}

func (nm *NodeMonitor) sendAlert(alertKey, message, severity string) {
    // 告警去重
    if lastAlert, exists := nm.alertHistory[alertKey]; exists {
        if time.Since(lastAlert) < time.Duration(nm.config.AlertCooldown)*time.Minute {
            return  // 冷却期内,跳过
        }
    }
    
    nm.alertHistory[alertKey] = time.Now()
    
    // 发送到所有通知渠道
    for _, n := range nm.notifiers {
        if err := n.Send(message, severity); err != nil {
            nm.logger.WithError(err).Error("Failed to send notification")
        }
    }
    
    nm.logger.WithFields(logrus.Fields{
        "alert_key": alertKey,
        "severity":  severity,
    }).Info(message)
}

pkg/notifier/slack.go

package notifier

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
)

type SlackNotifier struct {
    webhookURL string
}

func NewSlackNotifier(webhookURL string) *SlackNotifier {
    return &SlackNotifier{webhookURL: webhookURL}
}

func (s *SlackNotifier) Send(message, severity string) error {
    color := "warning"
    if severity == "critical" {
        color = "danger"
    }
    
    payload := map[string]interface{}{
        "attachments": []map[string]interface{}{
            {
                "color":      color,
                "title":      fmt.Sprintf("[%s] Kubernetes 节点告警", severity),
                "text":       message,
                "footer":     "Node Monitor",
                "footer_icon": "https://kubernetes.io/images/favicon.png",
                "ts":         time.Now().Unix(),
            },
        },
    }
    
    data, _ := json.Marshal(payload)
    
    resp, err := http.Post(s.webhookURL, "application/json", bytes.NewBuffer(data))
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("slack returned status %d", resp.StatusCode)
    }
    
    return nil
}

pkg/notifier/dingtalk.go

package notifier

import (
    "bytes"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "net/http"
    "net/url"
    "time"
)

type DingTalkNotifier struct {
    webhookURL string
    secret     string
}

func NewDingTalkNotifier(webhookURL, secret string) *DingTalkNotifier {
    return &DingTalkNotifier{
        webhookURL: webhookURL,
        secret:     secret,
    }
}

func (d *DingTalkNotifier) Send(message, severity string) error {
    // 生成签名
    timestamp := time.Now().UnixMilli()
    signedURL := d.sign(timestamp)
    
    // 构建消息
    emoji := "⚠️"
    if severity == "critical" {
        emoji = "🚨"
    }
    
    payload := map[string]interface{}{
        "msgtype": "markdown",
        "markdown": map[string]string{
            "title": "Kubernetes 节点告警",
            "text": fmt.Sprintf("### %s %s\n\n%s\n\n> 时间: %s",
                emoji, severity, message, time.Now().Format("2006-01-02 15:04:05")),
        },
    }
    
    data, _ := json.Marshal(payload)
    
    resp, err := http.Post(signedURL, "application/json", bytes.NewBuffer(data))
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("dingtalk returned status %d", resp.StatusCode)
    }
    
    return nil
}

func (d *DingTalkNotifier) sign(timestamp int64) string {
    stringToSign := fmt.Sprintf("%d\n%s", timestamp, d.secret)
    
    h := hmac.New(sha256.New, []byte(d.secret))
    h.Write([]byte(stringToSign))
    signature := base64.StdEncoding.EncodeToString(h.Sum(nil))
    
    return fmt.Sprintf("%s&timestamp=%d&sign=%s",
        d.webhookURL, timestamp, url.QueryEscape(signature))
}

Python 实现

node_monitor.py

#!/usr/bin/env python3

import time
import logging
import hmac
import hashlib
import base64
import urllib.parse
from datetime import datetime, timedelta
from kubernetes import client, config, watch
import requests
import yaml

class NodeMonitor:
    def __init__(self, config_file=None):
        # 加载 Kubernetes 配置
        try:
            config.load_incluster_config()
        except:
            config.load_kube_config()
        
        self.v1 = client.CoreV1Api()
        self.metrics_api = client.CustomObjectsApi()
        
        # 加载配置
        self.load_config(config_file)
        
        # 告警历史
        self.alert_history = {}
        
        # 设置日志
        logging.basicConfig(
            level=getattr(logging, self.config['logLevel'].upper()),
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def load_config(self, config_file):
        """加载配置"""
        default_config = {
            'checkInterval': 60,
            'logLevel': 'info',
            'thresholds': {
                'cpuWarning': 70,
                'cpuCritical': 90,
                'memoryWarning': 70,
                'memoryCritical': 90
            },
            'alertCooldown': 15,
            'slack': {'webhookURL': ''},
            'dingtalk': {'webhookURL': '', 'secret': ''}
        }
        
        if config_file:
            with open(config_file, 'r') as f:
                user_config = yaml.safe_load(f)
                default_config.update(user_config)
        
        self.config = default_config
    
    def run(self):
        """运行监控"""
        self.logger.info("Starting Node Monitor")
        
        # 启动 Watch
        w = watch.Watch()
        
        last_check = time.time()
        
        while True:
            try:
                # Watch Node 事件
                for event in w.stream(
                    self.v1.list_node,
                    timeout_seconds=self.config['checkInterval']
                ):
                    event_type = event['type']
                    node = event['object']
                    
                    if event_type in ['ADDED', 'MODIFIED']:
                        self.check_node(node)
                    elif event_type == 'DELETED':
                        self.send_alert(
                            f"node_{node.metadata.name}_deleted",
                            f"节点 {node.metadata.name} 已删除",
                            "critical"
                        )
                
                # 定期全量检查
                if time.time() - last_check >= self.config['checkInterval']:
                    self.check_all_nodes()
                    last_check = time.time()
            
            except Exception as e:
                self.logger.error(f"Error in watch loop: {e}")
                time.sleep(5)
    
    def check_node(self, node):
        """检查单个节点"""
        # 检查节点状态
        for condition in node.status.conditions or []:
            if condition.type == 'Ready':
                if condition.status != 'True':
                    self.send_alert(
                        f"node_{node.metadata.name}_not_ready",
                        f"节点 {node.metadata.name} 状态异常: {condition.reason}",
                        "critical"
                    )
        
        # 检查污点
        if node.spec.taints:
            for taint in node.spec.taints:
                if taint.effect == 'NoSchedule':
                    self.logger.warning(
                        f"Node {node.metadata.name} has NoSchedule taint: "
                        f"{taint.key}={taint.value}"
                    )
    
    def check_all_nodes(self):
        """检查所有节点"""
        self.logger.debug("Running periodic node check")
        
        try:
            # 获取所有节点
            nodes = self.v1.list_node()
            
            # 获取节点指标
            node_metrics = self.get_node_metrics()
            
            # 检查每个节点
            for node in nodes.items:
                metrics = self.collect_node_metrics(node, node_metrics)
                self.evaluate_metrics(metrics)
        
        except Exception as e:
            self.logger.error(f"Error checking nodes: {e}")
    
    def get_node_metrics(self):
        """获取节点指标"""
        try:
            metrics = self.metrics_api.list_cluster_custom_object(
                group="metrics.k8s.io",
                version="v1beta1",
                plural="nodes"
            )
            
            # 构建指标映射
            metrics_map = {}
            for item in metrics.get('items', []):
                node_name = item['metadata']['name']
                metrics_map[node_name] = item
            
            return metrics_map
        
        except Exception as e:
            self.logger.error(f"Failed to get node metrics: {e}")
            return {}
    
    def collect_node_metrics(self, node, node_metrics_map):
        """收集节点指标"""
        node_name = node.metadata.name
        
        metrics = {
            'node_name': node_name,
            'cpu_capacity': 0,
            'cpu_usage': 0,
            'cpu_percent': 0,
            'memory_capacity': 0,
            'memory_usage': 0,
            'memory_percent': 0,
            'pod_count': 0,
            'pod_capacity': 0
        }
        
        # 容量
        if node.status.capacity:
            if 'cpu' in node.status.capacity:
                metrics['cpu_capacity'] = self.parse_cpu(node.status.capacity['cpu'])
            if 'memory' in node.status.capacity:
                metrics['memory_capacity'] = self.parse_memory(node.status.capacity['memory'])
            if 'pods' in node.status.capacity:
                metrics['pod_capacity'] = int(node.status.capacity['pods'])
        
        # 使用量
        if node_name in node_metrics_map:
            node_metrics = node_metrics_map[node_name]
            usage = node_metrics.get('usage', {})
            
            if 'cpu' in usage:
                metrics['cpu_usage'] = self.parse_cpu(usage['cpu'])
            if 'memory' in usage:
                metrics['memory_usage'] = self.parse_memory(usage['memory'])
        
        # Pod 数量
        pods = self.v1.list_pod_for_all_namespaces(
            field_selector=f"spec.nodeName={node_name}"
        )
        metrics['pod_count'] = len(pods.items)
        
        # 计算百分比
        if metrics['cpu_capacity'] > 0:
            metrics['cpu_percent'] = (metrics['cpu_usage'] / metrics['cpu_capacity']) * 100
        
        if metrics['memory_capacity'] > 0:
            metrics['memory_percent'] = (metrics['memory_usage'] / metrics['memory_capacity']) * 100
        
        return metrics
    
    def parse_cpu(self, cpu_str):
        """解析 CPU 数量(转换为 millicores)"""
        if cpu_str.endswith('m'):
            return int(cpu_str[:-1])
        elif cpu_str.endswith('n'):
            return int(cpu_str[:-1]) // 1000000
        else:
            return int(float(cpu_str) * 1000)
    
    def parse_memory(self, mem_str):
        """解析内存数量(转换为 bytes)"""
        units = {
            'Ki': 1024,
            'Mi': 1024 ** 2,
            'Gi': 1024 ** 3,
            'Ti': 1024 ** 4,
            'K': 1000,
            'M': 1000 ** 2,
            'G': 1000 ** 3,
            'T': 1000 ** 4
        }
        
        for unit, multiplier in units.items():
            if mem_str.endswith(unit):
                return int(mem_str[:-len(unit)]) * multiplier
        
        return int(mem_str)
    
    def evaluate_metrics(self, metrics):
        """评估指标并发送告警"""
        node_name = metrics['node_name']
        
        # 检查 CPU
        if metrics['cpu_percent'] > self.config['thresholds']['cpuWarning']:
            severity = 'warning'
            if metrics['cpu_percent'] > self.config['thresholds']['cpuCritical']:
                severity = 'critical'
            
            self.send_alert(
                f"node_{node_name}_cpu_high",
                f"节点 {node_name} CPU 使用率 {metrics['cpu_percent']:.2f}%",
                severity
            )
        
        # 检查内存
        if metrics['memory_percent'] > self.config['thresholds']['memoryWarning']:
            severity = 'warning'
            if metrics['memory_percent'] > self.config['thresholds']['memoryCritical']:
                severity = 'critical'
            
            self.send_alert(
                f"node_{node_name}_memory_high",
                f"节点 {node_name} 内存使用率 {metrics['memory_percent']:.2f}%",
                severity
            )
        
        # 检查 Pod 数量
        if metrics['pod_capacity'] > 0:
            pod_percent = (metrics['pod_count'] / metrics['pod_capacity']) * 100
            
            if pod_percent > 80:
                self.send_alert(
                    f"node_{node_name}_pod_high",
                    f"节点 {node_name} Pod 数量 {metrics['pod_count']}/{metrics['pod_capacity']} ({pod_percent:.0f}%)",
                    "warning"
                )
        
        self.logger.debug(
            f"Node {node_name}: "
            f"CPU {metrics['cpu_percent']:.2f}%, "
            f"Memory {metrics['memory_percent']:.2f}%, "
            f"Pods {metrics['pod_count']}/{metrics['pod_capacity']}"
        )
    
    def send_alert(self, alert_key, message, severity):
        """发送告警"""
        # 告警去重
        if alert_key in self.alert_history:
            last_alert = self.alert_history[alert_key]
            cooldown = timedelta(minutes=self.config['alertCooldown'])
            
            if datetime.now() - last_alert < cooldown:
                return
        
        self.alert_history[alert_key] = datetime.now()
        
        # 发送到 Slack
        if self.config['slack']['webhookURL']:
            self.send_slack(message, severity)
        
        # 发送到钉钉
        if self.config['dingtalk']['webhookURL']:
            self.send_dingtalk(message, severity)
        
        self.logger.info(f"[{severity}] {message}")
    
    def send_slack(self, message, severity):
        """发送 Slack 通知"""
        color = 'warning' if severity == 'warning' else 'danger'
        
        payload = {
            'attachments': [{
                'color': color,
                'title': f"[{severity}] Kubernetes 节点告警",
                'text': message,
                'footer': 'Node Monitor',
                'ts': int(time.time())
            }]
        }
        
        try:
            response = requests.post(
                self.config['slack']['webhookURL'],
                json=payload
            )
            if response.status_code != 200:
                self.logger.error(f"Slack returned status {response.status_code}")
        except Exception as e:
            self.logger.error(f"Failed to send Slack notification: {e}")
    
    def send_dingtalk(self, message, severity):
        """发送钉钉通知"""
        timestamp = str(int(time.time() * 1000))
        secret = self.config['dingtalk']['secret']
        
        # 生成签名
        string_to_sign = f"{timestamp}\n{secret}"
        hmac_code = hmac.new(
            secret.encode('utf-8'),
            string_to_sign.encode('utf-8'),
            digestmod=hashlib.sha256
        ).digest()
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
        
        # 构建 URL
        webhook_url = (
            f"{self.config['dingtalk']['webhookURL']}"
            f"&timestamp={timestamp}&sign={sign}"
        )
        
        # 构建消息
        emoji = '⚠️' if severity == 'warning' else '🚨'
        
        payload = {
            'msgtype': 'markdown',
            'markdown': {
                'title': 'Kubernetes 节点告警',
                'text': (
                    f"### {emoji} {severity}\n\n"
                    f"{message}\n\n"
                    f"> 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
                )
            }
        }
        
        try:
            response = requests.post(webhook_url, json=payload)
            if response.status_code != 200:
                self.logger.error(f"DingTalk returned status {response.status_code}")
        except Exception as e:
            self.logger.error(f"Failed to send DingTalk notification: {e}")

if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='Kubernetes Node Monitor')
    parser.add_argument('--config', help='Path to config file')
    args = parser.parse_args()
    
    monitor = NodeMonitor(config_file=args.config)
    monitor.run()

配置示例

config.yaml

checkInterval: 60     # 检查间隔(秒)
logLevel: info        # 日志级别
alertCooldown: 15     # 告警冷却期(分钟)

# 阈值配置
thresholds:
  cpuWarning: 70      # CPU 警告阈值(%)
  cpuCritical: 90     # CPU 严重阈值(%)
  memoryWarning: 70   # 内存警告阈值(%)
  memoryCritical: 90  # 内存严重阈值(%)

# Slack 配置
slack:
  webhookURL: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

# 钉钉配置
dingtalk:
  webhookURL: "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"
  secret: "YOUR_SECRET"

部署

deploy/deployment.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: node-monitor
  namespace: kube-system

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: node-monitor
rules:
- apiGroups: [""]
  resources: ["nodes", "pods"]
  verbs: ["list", "get", "watch"]
- apiGroups: ["metrics.k8s.io"]
  resources: ["nodes"]
  verbs: ["list", "get"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: node-monitor
subjects:
- kind: ServiceAccount
  name: node-monitor
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: node-monitor
  apiGroup: rbac.authorization.k8s.io

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: node-monitor-config
  namespace: kube-system
data:
  config.yaml: |
    checkInterval: 60
    logLevel: info
    alertCooldown: 15
    thresholds:
      cpuWarning: 70
      cpuCritical: 90
      memoryWarning: 70
      memoryCritical: 90

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: node-monitor
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: node-monitor
  template:
    metadata:
      labels:
        app: node-monitor
    spec:
      serviceAccountName: node-monitor
      containers:
      - name: node-monitor
        image: your-registry/node-monitor:latest
        args:
        - --config=/etc/node-monitor/config.yaml
        volumeMounts:
        - name: config
          mountPath: /etc/node-monitor
        env:
        - name: SLACK_WEBHOOK_URL
          valueFrom:
            secretKeyRef:
              name: node-monitor-secrets
              key: slack-webhook-url
              optional: true
        - name: DINGTALK_WEBHOOK_URL
          valueFrom:
            secretKeyRef:
              name: node-monitor-secrets
              key: dingtalk-webhook-url
              optional: true
        - name: DINGTALK_SECRET
          valueFrom:
            secretKeyRef:
              name: node-monitor-secrets
              key: dingtalk-secret
              optional: true
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 200m
            memory: 256Mi
      volumes:
      - name: config
        configMap:
          name: node-monitor-config

总结

功能特性

实时监控: Watch 机制实时监听
多维度指标: CPU、内存、磁盘、Pod 数量
灵活告警: 可配置阈值和冷却期
多渠道通知: Slack、钉钉、企业微信
告警去重: 防止告警风暴

扩展方向

  1. 预测性告警: 基于历史数据预测趋势
  2. 自动扩容: 集成 Cluster Autoscaler
  3. Dashboard: Grafana 可视化
  4. 数据持久化: 存储历史指标到 InfluxDB
  5. 智能降噪: ML 模型识别异常告警

下一个项目将介绍 PVC 自动扩容工具。