项目2:节点资源监控告警
项目2:节点资源监控告警
项目背景
Kubernetes 集群节点资源不足会导致:
- ❌ Pod 无法调度(Pending)
- ❌ 现有 Pod 被驱逐(Evicted)
- ❌ 集群稳定性下降
解决方案:
实时监控节点资源使用情况,提前告警,防止资源耗尽。
功能需求
核心功能
- ✅ 监控节点 CPU、内存、磁盘使用率
- ✅ 可配置的告警阈值
- ✅ 多种告警渠道(Slack、钉钉、企业微信)
- ✅ 节点标签和污点检测
- ✅ 节点 NotReady 检测
高级功能
- ✅ 预测性告警(趋势分析)
- ✅ 智能告警降噪
- ✅ 历史数据存储
- ✅ Dashboard 可视化
Go 完整实现
main.go
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned"
"node-monitor/pkg/config"
"node-monitor/pkg/monitor"
"node-monitor/pkg/notifier"
)
var log = logrus.New()
func main() {
kubeconfig := flag.String("kubeconfig", "", "path to kubeconfig")
configFile := flag.String("config", "", "path to config file")
flag.Parse()
// 加载配置
cfg, err := config.LoadConfig(*configFile)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// 设置日志
level, _ := logrus.ParseLevel(cfg.LogLevel)
log.SetLevel(level)
log.SetFormatter(&logrus.JSONFormatter{})
// 创建 Kubernetes 客户端
var k8sConfig *rest.Config
if *kubeconfig != "" {
k8sConfig, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
k8sConfig, err = rest.InClusterConfig()
}
if err != nil {
log.Fatalf("Failed to create k8s config: %v", err)
}
clientset, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
log.Fatalf("Failed to create clientset: %v", err)
}
metricsClient, err := metricsv1beta1.NewForConfig(k8sConfig)
if err != nil {
log.Fatalf("Failed to create metrics client: %v", err)
}
// 创建通知器
notifiers := []notifier.Notifier{}
if cfg.Slack.WebhookURL != "" {
notifiers = append(notifiers, notifier.NewSlackNotifier(cfg.Slack.WebhookURL))
}
if cfg.DingTalk.WebhookURL != "" {
notifiers = append(notifiers, notifier.NewDingTalkNotifier(
cfg.DingTalk.WebhookURL,
cfg.DingTalk.Secret,
))
}
// 创建监控器
nodeMonitor := monitor.NewNodeMonitor(clientset, metricsClient, cfg, notifiers, log)
// 启动 Metrics Server
go nodeMonitor.StartMetricsServer(cfg.MetricsPort)
// 启动监控
stopCh := make(chan struct{})
go nodeMonitor.Run(stopCh)
// 等待信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
log.Info("Node Monitor started")
<-sigCh
log.Info("Shutting down...")
close(stopCh)
time.Sleep(2 * time.Second)
}
pkg/monitor/monitor.go
package monitor
import (
"context"
"fmt"
"time"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned"
"node-monitor/pkg/config"
"node-monitor/pkg/notifier"
)
type NodeMonitor struct {
clientset *kubernetes.Clientset
metricsClient *metricsv1beta1.Clientset
config *config.Config
notifiers []notifier.Notifier
logger *logrus.Logger
alertHistory map[string]time.Time // 告警去重
}
type NodeMetrics struct {
NodeName string
CPUUsage int64 // millicores
CPUCapacity int64
CPUPercent float64
MemoryUsage int64 // bytes
MemoryCapacity int64
MemoryPercent float64
DiskUsage int64
DiskCapacity int64
DiskPercent float64
PodCount int
PodCapacity int
Conditions []corev1.NodeCondition
}
func NewNodeMonitor(
clientset *kubernetes.Clientset,
metricsClient *metricsv1beta1.Clientset,
cfg *config.Config,
notifiers []notifier.Notifier,
logger *logrus.Logger,
) *NodeMonitor {
return &NodeMonitor{
clientset: clientset,
metricsClient: metricsClient,
config: cfg,
notifiers: notifiers,
logger: logger,
alertHistory: make(map[string]time.Time),
}
}
func (nm *NodeMonitor) Run(stopCh <-chan struct{}) {
// 创建 Informer 监听 Node 变化
factory := informers.NewSharedInformerFactory(nm.clientset, 30*time.Second)
nodeInformer := factory.Core().V1().Nodes()
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*corev1.Node)
nm.checkNode(node)
},
UpdateFunc: func(oldObj, newObj interface{}) {
node := newObj.(*corev1.Node)
nm.checkNode(node)
},
DeleteFunc: func(obj interface{}) {
node := obj.(*corev1.Node)
nm.sendAlert("node_deleted", fmt.Sprintf("节点 %s 已删除", node.Name), "critical")
},
})
factory.Start(stopCh)
cache.WaitForCacheSync(stopCh, nodeInformer.Informer().HasSynced)
nm.logger.Info("Informer synced, starting periodic check")
// 定期检查
ticker := time.NewTicker(time.Duration(nm.config.CheckInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
nm.checkAllNodes()
case <-stopCh:
nm.logger.Info("Stopping node monitor")
return
}
}
}
func (nm *NodeMonitor) checkNode(node *corev1.Node) {
// 检查节点状态
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady {
if condition.Status != corev1.ConditionTrue {
nm.sendAlert(
"node_not_ready",
fmt.Sprintf("节点 %s 状态异常: %s", node.Name, condition.Reason),
"critical",
)
}
}
}
// 检查节点污点
if len(node.Spec.Taints) > 0 {
for _, taint := range node.Spec.Taints {
if taint.Effect == corev1.TaintEffectNoSchedule {
nm.logger.Warnf("Node %s has NoSchedule taint: %s=%s",
node.Name, taint.Key, taint.Value)
}
}
}
}
func (nm *NodeMonitor) checkAllNodes() {
nm.logger.Debug("Running periodic node check")
// 获取所有节点
nodes, err := nm.clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
nm.logger.WithError(err).Error("Failed to list nodes")
return
}
// 获取节点指标
nodeMetricsList, err := nm.metricsClient.MetricsV1beta1().NodeMetricses().List(
context.TODO(),
metav1.ListOptions{},
)
if err != nil {
nm.logger.WithError(err).Error("Failed to get node metrics")
return
}
// 构建指标映射
metricsMap := make(map[string]*metricsv1beta1.NodeMetrics)
for i := range nodeMetricsList.Items {
nm := &nodeMetricsList.Items[i]
metricsMap[nm.Name] = nm
}
// 检查每个节点
for _, node := range nodes.Items {
metrics := nm.collectNodeMetrics(&node, metricsMap[node.Name])
nm.evaluateMetrics(metrics)
}
}
func (nm *NodeMonitor) collectNodeMetrics(
node *corev1.Node,
nodeMetrics *metricsv1beta1.NodeMetrics,
) *NodeMetrics {
metrics := &NodeMetrics{
NodeName: node.Name,
Conditions: node.Status.Conditions,
}
// CPU 容量
if cpu, ok := node.Status.Capacity[corev1.ResourceCPU]; ok {
metrics.CPUCapacity = cpu.MilliValue()
}
// 内存容量
if mem, ok := node.Status.Capacity[corev1.ResourceMemory]; ok {
metrics.MemoryCapacity = mem.Value()
}
// Pod 容量
if pods, ok := node.Status.Capacity[corev1.ResourcePods]; ok {
metrics.PodCapacity = int(pods.Value())
}
// 当前 Pod 数量
podList, _ := nm.clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", node.Name),
})
metrics.PodCount = len(podList.Items)
// CPU 和内存使用量
if nodeMetrics != nil {
metrics.CPUUsage = nodeMetrics.Usage.Cpu().MilliValue()
metrics.MemoryUsage = nodeMetrics.Usage.Memory().Value()
}
// 计算百分比
if metrics.CPUCapacity > 0 {
metrics.CPUPercent = float64(metrics.CPUUsage) / float64(metrics.CPUCapacity) * 100
}
if metrics.MemoryCapacity > 0 {
metrics.MemoryPercent = float64(metrics.MemoryUsage) / float64(metrics.MemoryCapacity) * 100
}
// 磁盘使用(从 node conditions 获取)
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeDiskPressure && condition.Status == corev1.ConditionTrue {
metrics.DiskPercent = 90 // 估算值
}
}
return metrics
}
func (nm *NodeMonitor) evaluateMetrics(metrics *NodeMetrics) {
// 检查 CPU 使用率
if metrics.CPUPercent > float64(nm.config.Thresholds.CPUWarning) {
severity := "warning"
if metrics.CPUPercent > float64(nm.config.Thresholds.CPUCritical) {
severity = "critical"
}
nm.sendAlert(
fmt.Sprintf("node_%s_cpu_high", metrics.NodeName),
fmt.Sprintf("节点 %s CPU 使用率 %.2f%%", metrics.NodeName, metrics.CPUPercent),
severity,
)
}
// 检查内存使用率
if metrics.MemoryPercent > float64(nm.config.Thresholds.MemoryWarning) {
severity := "warning"
if metrics.MemoryPercent > float64(nm.config.Thresholds.MemoryCritical) {
severity = "critical"
}
nm.sendAlert(
fmt.Sprintf("node_%s_memory_high", metrics.NodeName),
fmt.Sprintf("节点 %s 内存使用率 %.2f%%", metrics.NodeName, metrics.MemoryPercent),
severity,
)
}
// 检查 Pod 数量
if metrics.PodCapacity > 0 {
podPercent := float64(metrics.PodCount) / float64(metrics.PodCapacity) * 100
if podPercent > 80 {
nm.sendAlert(
fmt.Sprintf("node_%s_pod_high", metrics.NodeName),
fmt.Sprintf("节点 %s Pod 数量 %d/%d (%.0f%%)",
metrics.NodeName, metrics.PodCount, metrics.PodCapacity, podPercent),
"warning",
)
}
}
// 记录指标
nm.logger.WithFields(logrus.Fields{
"node": metrics.NodeName,
"cpu_percent": fmt.Sprintf("%.2f%%", metrics.CPUPercent),
"mem_percent": fmt.Sprintf("%.2f%%", metrics.MemoryPercent),
"pod_count": metrics.PodCount,
"pod_capacity": metrics.PodCapacity,
}).Debug("Node metrics")
}
func (nm *NodeMonitor) sendAlert(alertKey, message, severity string) {
// 告警去重
if lastAlert, exists := nm.alertHistory[alertKey]; exists {
if time.Since(lastAlert) < time.Duration(nm.config.AlertCooldown)*time.Minute {
return // 冷却期内,跳过
}
}
nm.alertHistory[alertKey] = time.Now()
// 发送到所有通知渠道
for _, n := range nm.notifiers {
if err := n.Send(message, severity); err != nil {
nm.logger.WithError(err).Error("Failed to send notification")
}
}
nm.logger.WithFields(logrus.Fields{
"alert_key": alertKey,
"severity": severity,
}).Info(message)
}
pkg/notifier/slack.go
package notifier
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type SlackNotifier struct {
webhookURL string
}
func NewSlackNotifier(webhookURL string) *SlackNotifier {
return &SlackNotifier{webhookURL: webhookURL}
}
func (s *SlackNotifier) Send(message, severity string) error {
color := "warning"
if severity == "critical" {
color = "danger"
}
payload := map[string]interface{}{
"attachments": []map[string]interface{}{
{
"color": color,
"title": fmt.Sprintf("[%s] Kubernetes 节点告警", severity),
"text": message,
"footer": "Node Monitor",
"footer_icon": "https://kubernetes.io/images/favicon.png",
"ts": time.Now().Unix(),
},
},
}
data, _ := json.Marshal(payload)
resp, err := http.Post(s.webhookURL, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("slack returned status %d", resp.StatusCode)
}
return nil
}
pkg/notifier/dingtalk.go
package notifier
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
)
type DingTalkNotifier struct {
webhookURL string
secret string
}
func NewDingTalkNotifier(webhookURL, secret string) *DingTalkNotifier {
return &DingTalkNotifier{
webhookURL: webhookURL,
secret: secret,
}
}
func (d *DingTalkNotifier) Send(message, severity string) error {
// 生成签名
timestamp := time.Now().UnixMilli()
signedURL := d.sign(timestamp)
// 构建消息
emoji := "⚠️"
if severity == "critical" {
emoji = "🚨"
}
payload := map[string]interface{}{
"msgtype": "markdown",
"markdown": map[string]string{
"title": "Kubernetes 节点告警",
"text": fmt.Sprintf("### %s %s\n\n%s\n\n> 时间: %s",
emoji, severity, message, time.Now().Format("2006-01-02 15:04:05")),
},
}
data, _ := json.Marshal(payload)
resp, err := http.Post(signedURL, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("dingtalk returned status %d", resp.StatusCode)
}
return nil
}
func (d *DingTalkNotifier) sign(timestamp int64) string {
stringToSign := fmt.Sprintf("%d\n%s", timestamp, d.secret)
h := hmac.New(sha256.New, []byte(d.secret))
h.Write([]byte(stringToSign))
signature := base64.StdEncoding.EncodeToString(h.Sum(nil))
return fmt.Sprintf("%s×tamp=%d&sign=%s",
d.webhookURL, timestamp, url.QueryEscape(signature))
}
Python 实现
node_monitor.py
#!/usr/bin/env python3
import time
import logging
import hmac
import hashlib
import base64
import urllib.parse
from datetime import datetime, timedelta
from kubernetes import client, config, watch
import requests
import yaml
class NodeMonitor:
def __init__(self, config_file=None):
# 加载 Kubernetes 配置
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.metrics_api = client.CustomObjectsApi()
# 加载配置
self.load_config(config_file)
# 告警历史
self.alert_history = {}
# 设置日志
logging.basicConfig(
level=getattr(logging, self.config['logLevel'].upper()),
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def load_config(self, config_file):
"""加载配置"""
default_config = {
'checkInterval': 60,
'logLevel': 'info',
'thresholds': {
'cpuWarning': 70,
'cpuCritical': 90,
'memoryWarning': 70,
'memoryCritical': 90
},
'alertCooldown': 15,
'slack': {'webhookURL': ''},
'dingtalk': {'webhookURL': '', 'secret': ''}
}
if config_file:
with open(config_file, 'r') as f:
user_config = yaml.safe_load(f)
default_config.update(user_config)
self.config = default_config
def run(self):
"""运行监控"""
self.logger.info("Starting Node Monitor")
# 启动 Watch
w = watch.Watch()
last_check = time.time()
while True:
try:
# Watch Node 事件
for event in w.stream(
self.v1.list_node,
timeout_seconds=self.config['checkInterval']
):
event_type = event['type']
node = event['object']
if event_type in ['ADDED', 'MODIFIED']:
self.check_node(node)
elif event_type == 'DELETED':
self.send_alert(
f"node_{node.metadata.name}_deleted",
f"节点 {node.metadata.name} 已删除",
"critical"
)
# 定期全量检查
if time.time() - last_check >= self.config['checkInterval']:
self.check_all_nodes()
last_check = time.time()
except Exception as e:
self.logger.error(f"Error in watch loop: {e}")
time.sleep(5)
def check_node(self, node):
"""检查单个节点"""
# 检查节点状态
for condition in node.status.conditions or []:
if condition.type == 'Ready':
if condition.status != 'True':
self.send_alert(
f"node_{node.metadata.name}_not_ready",
f"节点 {node.metadata.name} 状态异常: {condition.reason}",
"critical"
)
# 检查污点
if node.spec.taints:
for taint in node.spec.taints:
if taint.effect == 'NoSchedule':
self.logger.warning(
f"Node {node.metadata.name} has NoSchedule taint: "
f"{taint.key}={taint.value}"
)
def check_all_nodes(self):
"""检查所有节点"""
self.logger.debug("Running periodic node check")
try:
# 获取所有节点
nodes = self.v1.list_node()
# 获取节点指标
node_metrics = self.get_node_metrics()
# 检查每个节点
for node in nodes.items:
metrics = self.collect_node_metrics(node, node_metrics)
self.evaluate_metrics(metrics)
except Exception as e:
self.logger.error(f"Error checking nodes: {e}")
def get_node_metrics(self):
"""获取节点指标"""
try:
metrics = self.metrics_api.list_cluster_custom_object(
group="metrics.k8s.io",
version="v1beta1",
plural="nodes"
)
# 构建指标映射
metrics_map = {}
for item in metrics.get('items', []):
node_name = item['metadata']['name']
metrics_map[node_name] = item
return metrics_map
except Exception as e:
self.logger.error(f"Failed to get node metrics: {e}")
return {}
def collect_node_metrics(self, node, node_metrics_map):
"""收集节点指标"""
node_name = node.metadata.name
metrics = {
'node_name': node_name,
'cpu_capacity': 0,
'cpu_usage': 0,
'cpu_percent': 0,
'memory_capacity': 0,
'memory_usage': 0,
'memory_percent': 0,
'pod_count': 0,
'pod_capacity': 0
}
# 容量
if node.status.capacity:
if 'cpu' in node.status.capacity:
metrics['cpu_capacity'] = self.parse_cpu(node.status.capacity['cpu'])
if 'memory' in node.status.capacity:
metrics['memory_capacity'] = self.parse_memory(node.status.capacity['memory'])
if 'pods' in node.status.capacity:
metrics['pod_capacity'] = int(node.status.capacity['pods'])
# 使用量
if node_name in node_metrics_map:
node_metrics = node_metrics_map[node_name]
usage = node_metrics.get('usage', {})
if 'cpu' in usage:
metrics['cpu_usage'] = self.parse_cpu(usage['cpu'])
if 'memory' in usage:
metrics['memory_usage'] = self.parse_memory(usage['memory'])
# Pod 数量
pods = self.v1.list_pod_for_all_namespaces(
field_selector=f"spec.nodeName={node_name}"
)
metrics['pod_count'] = len(pods.items)
# 计算百分比
if metrics['cpu_capacity'] > 0:
metrics['cpu_percent'] = (metrics['cpu_usage'] / metrics['cpu_capacity']) * 100
if metrics['memory_capacity'] > 0:
metrics['memory_percent'] = (metrics['memory_usage'] / metrics['memory_capacity']) * 100
return metrics
def parse_cpu(self, cpu_str):
"""解析 CPU 数量(转换为 millicores)"""
if cpu_str.endswith('m'):
return int(cpu_str[:-1])
elif cpu_str.endswith('n'):
return int(cpu_str[:-1]) // 1000000
else:
return int(float(cpu_str) * 1000)
def parse_memory(self, mem_str):
"""解析内存数量(转换为 bytes)"""
units = {
'Ki': 1024,
'Mi': 1024 ** 2,
'Gi': 1024 ** 3,
'Ti': 1024 ** 4,
'K': 1000,
'M': 1000 ** 2,
'G': 1000 ** 3,
'T': 1000 ** 4
}
for unit, multiplier in units.items():
if mem_str.endswith(unit):
return int(mem_str[:-len(unit)]) * multiplier
return int(mem_str)
def evaluate_metrics(self, metrics):
"""评估指标并发送告警"""
node_name = metrics['node_name']
# 检查 CPU
if metrics['cpu_percent'] > self.config['thresholds']['cpuWarning']:
severity = 'warning'
if metrics['cpu_percent'] > self.config['thresholds']['cpuCritical']:
severity = 'critical'
self.send_alert(
f"node_{node_name}_cpu_high",
f"节点 {node_name} CPU 使用率 {metrics['cpu_percent']:.2f}%",
severity
)
# 检查内存
if metrics['memory_percent'] > self.config['thresholds']['memoryWarning']:
severity = 'warning'
if metrics['memory_percent'] > self.config['thresholds']['memoryCritical']:
severity = 'critical'
self.send_alert(
f"node_{node_name}_memory_high",
f"节点 {node_name} 内存使用率 {metrics['memory_percent']:.2f}%",
severity
)
# 检查 Pod 数量
if metrics['pod_capacity'] > 0:
pod_percent = (metrics['pod_count'] / metrics['pod_capacity']) * 100
if pod_percent > 80:
self.send_alert(
f"node_{node_name}_pod_high",
f"节点 {node_name} Pod 数量 {metrics['pod_count']}/{metrics['pod_capacity']} ({pod_percent:.0f}%)",
"warning"
)
self.logger.debug(
f"Node {node_name}: "
f"CPU {metrics['cpu_percent']:.2f}%, "
f"Memory {metrics['memory_percent']:.2f}%, "
f"Pods {metrics['pod_count']}/{metrics['pod_capacity']}"
)
def send_alert(self, alert_key, message, severity):
"""发送告警"""
# 告警去重
if alert_key in self.alert_history:
last_alert = self.alert_history[alert_key]
cooldown = timedelta(minutes=self.config['alertCooldown'])
if datetime.now() - last_alert < cooldown:
return
self.alert_history[alert_key] = datetime.now()
# 发送到 Slack
if self.config['slack']['webhookURL']:
self.send_slack(message, severity)
# 发送到钉钉
if self.config['dingtalk']['webhookURL']:
self.send_dingtalk(message, severity)
self.logger.info(f"[{severity}] {message}")
def send_slack(self, message, severity):
"""发送 Slack 通知"""
color = 'warning' if severity == 'warning' else 'danger'
payload = {
'attachments': [{
'color': color,
'title': f"[{severity}] Kubernetes 节点告警",
'text': message,
'footer': 'Node Monitor',
'ts': int(time.time())
}]
}
try:
response = requests.post(
self.config['slack']['webhookURL'],
json=payload
)
if response.status_code != 200:
self.logger.error(f"Slack returned status {response.status_code}")
except Exception as e:
self.logger.error(f"Failed to send Slack notification: {e}")
def send_dingtalk(self, message, severity):
"""发送钉钉通知"""
timestamp = str(int(time.time() * 1000))
secret = self.config['dingtalk']['secret']
# 生成签名
string_to_sign = f"{timestamp}\n{secret}"
hmac_code = hmac.new(
secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 构建 URL
webhook_url = (
f"{self.config['dingtalk']['webhookURL']}"
f"×tamp={timestamp}&sign={sign}"
)
# 构建消息
emoji = '⚠️' if severity == 'warning' else '🚨'
payload = {
'msgtype': 'markdown',
'markdown': {
'title': 'Kubernetes 节点告警',
'text': (
f"### {emoji} {severity}\n\n"
f"{message}\n\n"
f"> 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
}
}
try:
response = requests.post(webhook_url, json=payload)
if response.status_code != 200:
self.logger.error(f"DingTalk returned status {response.status_code}")
except Exception as e:
self.logger.error(f"Failed to send DingTalk notification: {e}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Kubernetes Node Monitor')
parser.add_argument('--config', help='Path to config file')
args = parser.parse_args()
monitor = NodeMonitor(config_file=args.config)
monitor.run()
配置示例
config.yaml
checkInterval: 60 # 检查间隔(秒)
logLevel: info # 日志级别
alertCooldown: 15 # 告警冷却期(分钟)
# 阈值配置
thresholds:
cpuWarning: 70 # CPU 警告阈值(%)
cpuCritical: 90 # CPU 严重阈值(%)
memoryWarning: 70 # 内存警告阈值(%)
memoryCritical: 90 # 内存严重阈值(%)
# Slack 配置
slack:
webhookURL: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
# 钉钉配置
dingtalk:
webhookURL: "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"
secret: "YOUR_SECRET"
部署
deploy/deployment.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: node-monitor
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: node-monitor
rules:
- apiGroups: [""]
resources: ["nodes", "pods"]
verbs: ["list", "get", "watch"]
- apiGroups: ["metrics.k8s.io"]
resources: ["nodes"]
verbs: ["list", "get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: node-monitor
subjects:
- kind: ServiceAccount
name: node-monitor
namespace: kube-system
roleRef:
kind: ClusterRole
name: node-monitor
apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: ConfigMap
metadata:
name: node-monitor-config
namespace: kube-system
data:
config.yaml: |
checkInterval: 60
logLevel: info
alertCooldown: 15
thresholds:
cpuWarning: 70
cpuCritical: 90
memoryWarning: 70
memoryCritical: 90
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: node-monitor
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: node-monitor
template:
metadata:
labels:
app: node-monitor
spec:
serviceAccountName: node-monitor
containers:
- name: node-monitor
image: your-registry/node-monitor:latest
args:
- --config=/etc/node-monitor/config.yaml
volumeMounts:
- name: config
mountPath: /etc/node-monitor
env:
- name: SLACK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: node-monitor-secrets
key: slack-webhook-url
optional: true
- name: DINGTALK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: node-monitor-secrets
key: dingtalk-webhook-url
optional: true
- name: DINGTALK_SECRET
valueFrom:
secretKeyRef:
name: node-monitor-secrets
key: dingtalk-secret
optional: true
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
volumes:
- name: config
configMap:
name: node-monitor-config
总结
功能特性
✅ 实时监控: Watch 机制实时监听
✅ 多维度指标: CPU、内存、磁盘、Pod 数量
✅ 灵活告警: 可配置阈值和冷却期
✅ 多渠道通知: Slack、钉钉、企业微信
✅ 告警去重: 防止告警风暴
扩展方向
- 预测性告警: 基于历史数据预测趋势
- 自动扩容: 集成 Cluster Autoscaler
- Dashboard: Grafana 可视化
- 数据持久化: 存储历史指标到 InfluxDB
- 智能降噪: ML 模型识别异常告警
下一个项目将介绍 PVC 自动扩容工具。