实战项目示例
实战项目示例
项目 1:Pod 自动清理工具
需求
自动清理处于 Failed/Succeeded 状态超过指定时间的 Pods。
Go 实现
package main
import (
"context"
"flag"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
type PodCleaner struct {
clientset *kubernetes.Clientset
ttl time.Duration
dryRun bool
}
func NewPodCleaner(clientset *kubernetes.Clientset, ttl time.Duration, dryRun bool) *PodCleaner {
return &PodCleaner{
clientset: clientset,
ttl: ttl,
dryRun: dryRun,
}
}
func (pc *PodCleaner) Run(stopCh <-chan struct{}) {
// 创建 Informer 监听 Pod 变化
factory := informers.NewSharedInformerFactory(pc.clientset, 30*time.Second)
podInformer := factory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
pc.checkAndCleanPod(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*corev1.Pod)
pc.checkAndCleanPod(pod)
},
})
// 启动 Informer
factory.Start(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced)
// 定期扫描(防止遗漏)
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pc.cleanAllExpiredPods()
case <-stopCh:
fmt.Println("Stopping pod cleaner")
return
}
}
}
func (pc *PodCleaner) checkAndCleanPod(pod *corev1.Pod) {
// 只处理 Failed 或 Succeeded 状态的 Pod
if pod.Status.Phase != corev1.PodFailed && pod.Status.Phase != corev1.PodSucceeded {
return
}
// 检查是否超过 TTL
if !pc.isExpired(pod) {
return
}
// 删除 Pod
if pc.dryRun {
fmt.Printf("[DRY RUN] Would delete pod %s/%s (Phase: %s, Age: %s)\n",
pod.Namespace, pod.Name, pod.Status.Phase,
time.Since(pod.Status.StartTime.Time))
return
}
err := pc.clientset.CoreV1().Pods(pod.Namespace).Delete(
context.TODO(),
pod.Name,
metav1.DeleteOptions{},
)
if err != nil {
fmt.Printf("Error deleting pod %s/%s: %v\n", pod.Namespace, pod.Name, err)
return
}
fmt.Printf("Deleted pod %s/%s (Phase: %s, Age: %s)\n",
pod.Namespace, pod.Name, pod.Status.Phase,
time.Since(pod.Status.StartTime.Time))
}
func (pc *PodCleaner) isExpired(pod *corev1.Pod) bool {
// 检查 Pod 完成时间
var finishTime time.Time
// 从 ContainerStatuses 获取完成时间
for _, cs := range pod.Status.ContainerStatuses {
if cs.State.Terminated != nil {
if finishTime.IsZero() || cs.State.Terminated.FinishedAt.Time.After(finishTime) {
finishTime = cs.State.Terminated.FinishedAt.Time
}
}
}
if finishTime.IsZero() {
return false
}
return time.Since(finishTime) > pc.ttl
}
func (pc *PodCleaner) cleanAllExpiredPods() {
fmt.Println("Running periodic cleanup...")
pods, err := pc.clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
fmt.Printf("Error listing pods: %v\n", err)
return
}
for _, pod := range pods.Items {
pc.checkAndCleanPod(&pod)
}
}
func main() {
kubeconfig := flag.String("kubeconfig", "", "path to kubeconfig")
ttl := flag.Duration("ttl", 1*time.Hour, "time to live for completed pods")
dryRun := flag.Bool("dry-run", false, "dry run mode")
flag.Parse()
// 创建 Kubernetes 客户端
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 创建并运行 Pod Cleaner
cleaner := NewPodCleaner(clientset, *ttl, *dryRun)
stopCh := make(chan struct{})
defer close(stopCh)
fmt.Printf("Starting pod cleaner (TTL: %s, Dry Run: %v)\n", *ttl, *dryRun)
cleaner.Run(stopCh)
}
Python 实现
#!/usr/bin/env python3
import argparse
import time
from datetime import datetime, timedelta
from kubernetes import client, config, watch
class PodCleaner:
def __init__(self, ttl_seconds=3600, dry_run=False):
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.ttl = timedelta(seconds=ttl_seconds)
self.dry_run = dry_run
def run(self):
"""运行 Pod 清理器"""
print(f"Starting pod cleaner (TTL: {self.ttl}, Dry Run: {self.dry_run})")
# 启动 Watch
w = watch.Watch()
while True:
try:
# Watch Pod 事件
for event in w.stream(self.v1.list_pod_for_all_namespaces, timeout_seconds=300):
event_type = event['type']
pod = event['object']
if event_type in ['ADDED', 'MODIFIED']:
self.check_and_clean_pod(pod)
# 定期全量扫描
self.clean_all_expired_pods()
except Exception as e:
print(f"Error in watch loop: {e}")
time.sleep(5)
def check_and_clean_pod(self, pod):
"""检查并清理单个 Pod"""
# 只处理 Failed 或 Succeeded 状态
if pod.status.phase not in ['Failed', 'Succeeded']:
return
# 检查是否过期
if not self.is_expired(pod):
return
# 删除 Pod
if self.dry_run:
age = self.get_pod_age(pod)
print(f"[DRY RUN] Would delete pod {pod.metadata.namespace}/{pod.metadata.name} "
f"(Phase: {pod.status.phase}, Age: {age})")
return
try:
self.v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=client.V1DeleteOptions()
)
age = self.get_pod_age(pod)
print(f"Deleted pod {pod.metadata.namespace}/{pod.metadata.name} "
f"(Phase: {pod.status.phase}, Age: {age})")
except Exception as e:
print(f"Error deleting pod {pod.metadata.namespace}/{pod.metadata.name}: {e}")
def is_expired(self, pod):
"""检查 Pod 是否过期"""
finish_time = None
# 从容器状态获取完成时间
if pod.status.container_statuses:
for cs in pod.status.container_statuses:
if cs.state.terminated:
terminated_time = cs.state.terminated.finished_at
if finish_time is None or terminated_time > finish_time:
finish_time = terminated_time
if finish_time is None:
return False
# 移除时区信息进行比较
finish_time = finish_time.replace(tzinfo=None)
age = datetime.utcnow() - finish_time
return age > self.ttl
def get_pod_age(self, pod):
"""获取 Pod 年龄"""
finish_time = None
if pod.status.container_statuses:
for cs in pod.status.container_statuses:
if cs.state.terminated:
if finish_time is None or cs.state.terminated.finished_at > finish_time:
finish_time = cs.state.terminated.finished_at
if finish_time:
finish_time = finish_time.replace(tzinfo=None)
return datetime.utcnow() - finish_time
return timedelta(0)
def clean_all_expired_pods(self):
"""清理所有过期 Pods"""
print("Running periodic cleanup...")
try:
pods = self.v1.list_pod_for_all_namespaces()
for pod in pods.items:
self.check_and_clean_pod(pod)
except Exception as e:
print(f"Error in periodic cleanup: {e}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Kubernetes Pod Cleaner')
parser.add_argument('--ttl', type=int, default=3600,
help='Time to live in seconds (default: 3600)')
parser.add_argument('--dry-run', action='store_true',
help='Dry run mode')
args = parser.parse_args()
cleaner = PodCleaner(ttl_seconds=args.ttl, dry_run=args.dry_run)
cleaner.run()
部署到 Kubernetes
# pod-cleaner-deployment.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: pod-cleaner
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: pod-cleaner
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["list", "watch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: pod-cleaner
subjects:
- kind: ServiceAccount
name: pod-cleaner
namespace: kube-system
roleRef:
kind: ClusterRole
name: pod-cleaner
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: pod-cleaner
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: pod-cleaner
template:
metadata:
labels:
app: pod-cleaner
spec:
serviceAccountName: pod-cleaner
containers:
- name: pod-cleaner
image: your-registry/pod-cleaner:latest
args:
- --ttl=3600
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
项目 2:自动扩缩容监控器
需求
监控 Deployment 的资源使用情况,自动调整副本数。
Go 实现
package main
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/informers"
metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
)
type AutoScaler struct {
clientset *kubernetes.Clientset
metricsClient *metricsv.Clientset
minReplicas int32
maxReplicas int32
targetCPU int64 // 目标 CPU 使用率(百分比)
}
func NewAutoScaler(clientset *kubernetes.Clientset, metricsClient *metricsv.Clientset) *AutoScaler {
return &AutoScaler{
clientset: clientset,
metricsClient: metricsClient,
minReplicas: 2,
maxReplicas: 10,
targetCPU: 70,
}
}
func (as *AutoScaler) Run(stopCh <-chan struct{}) {
factory := informers.NewSharedInformerFactory(as.clientset, 30*time.Second)
deploymentInformer := factory.Apps().V1().Deployments()
// 监听有 auto-scale annotation 的 Deployment
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
deployment := obj.(*appsv1.Deployment)
if deployment.Annotations["auto-scale"] == "true" {
as.scaleDeployment(deployment)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
deployment := newObj.(*appsv1.Deployment)
if deployment.Annotations["auto-scale"] == "true" {
as.scaleDeployment(deployment)
}
},
})
factory.Start(stopCh)
cache.WaitForCacheSync(stopCh, deploymentInformer.Informer().HasSynced)
// 定期检查
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
as.checkAllDeployments()
case <-stopCh:
return
}
}
}
func (as *AutoScaler) scaleDeployment(deployment *appsv1.Deployment) {
// 获取 Pods 的 CPU 使用率
podMetrics, err := as.metricsClient.MetricsV1beta1().PodMetricses(deployment.Namespace).List(
context.TODO(),
metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(deployment.Spec.Selector),
},
)
if err != nil {
fmt.Printf("Error getting metrics for %s/%s: %v\n",
deployment.Namespace, deployment.Name, err)
return
}
if len(podMetrics.Items) == 0 {
return
}
// 计算平均 CPU 使用率
var totalCPU int64
for _, pm := range podMetrics.Items {
for _, container := range pm.Containers {
totalCPU += container.Usage.Cpu().MilliValue()
}
}
avgCPU := totalCPU / int64(len(podMetrics.Items))
// 计算期望副本数
currentReplicas := *deployment.Spec.Replicas
desiredReplicas := as.calculateDesiredReplicas(currentReplicas, avgCPU)
if desiredReplicas == currentReplicas {
return
}
// 执行扩缩容
fmt.Printf("Scaling %s/%s from %d to %d replicas (CPU: %dm)\n",
deployment.Namespace, deployment.Name, currentReplicas, desiredReplicas, avgCPU)
scale := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: deployment.Name,
Namespace: deployment.Namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: desiredReplicas,
},
}
_, err = as.clientset.AppsV1().Deployments(deployment.Namespace).UpdateScale(
context.TODO(),
deployment.Name,
scale,
metav1.UpdateOptions{},
)
if err != nil {
fmt.Printf("Error scaling deployment: %v\n", err)
}
}
func (as *AutoScaler) calculateDesiredReplicas(current int32, avgCPU int64) int32 {
// 简单的比例计算
// desiredReplicas = currentReplicas * (currentCPU / targetCPU)
ratio := float64(avgCPU) / float64(as.targetCPU)
desired := int32(float64(current) * ratio)
// 限制在 min-max 范围内
if desired < as.minReplicas {
desired = as.minReplicas
}
if desired > as.maxReplicas {
desired = as.maxReplicas
}
return desired
}
func (as *AutoScaler) checkAllDeployments() {
deployments, err := as.clientset.AppsV1().Deployments("").List(
context.TODO(),
metav1.ListOptions{},
)
if err != nil {
fmt.Printf("Error listing deployments: %v\n", err)
return
}
for _, deployment := range deployments.Items {
if deployment.Annotations["auto-scale"] == "true" {
as.scaleDeployment(&deployment)
}
}
}
项目 3:资源配额监控告警
需求
监控 Namespace 的资源配额使用情况,超过阈值时发送告警。
Python 实现
#!/usr/bin/env python3
import json
import requests
from kubernetes import client, config, watch
class QuotaMonitor:
def __init__(self, webhook_url, threshold=0.8):
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.webhook_url = webhook_url
self.threshold = threshold # 告警阈值(80%)
def run(self):
"""运行配额监控"""
print(f"Starting quota monitor (Threshold: {self.threshold * 100}%)")
w = watch.Watch()
# Watch ResourceQuota 变化
for event in w.stream(self.v1.list_resource_quota_for_all_namespaces):
quota = event['object']
self.check_quota(quota)
def check_quota(self, quota):
"""检查配额使用情况"""
if not quota.status or not quota.status.used:
return
namespace = quota.metadata.namespace
alerts = []
# 检查每个资源
for resource, hard_limit in (quota.spec.hard or {}).items():
used = quota.status.used.get(resource)
if not used:
continue
# 转换为数值(处理 Ki, Mi, Gi 等单位)
used_value = self.parse_quantity(str(used))
hard_value = self.parse_quantity(str(hard_limit))
if hard_value == 0:
continue
usage_percent = used_value / hard_value
if usage_percent >= self.threshold:
alerts.append({
'resource': resource,
'used': str(used),
'limit': str(hard_limit),
'percent': f"{usage_percent * 100:.1f}%"
})
# 发送告警
if alerts:
self.send_alert(namespace, quota.metadata.name, alerts)
def parse_quantity(self, quantity_str):
"""解析 Kubernetes 资源数量"""
import re
# 处理 Ki, Mi, Gi 等单位
multipliers = {
'Ki': 1024,
'Mi': 1024 ** 2,
'Gi': 1024 ** 3,
'Ti': 1024 ** 4,
'k': 1000,
'M': 1000 ** 2,
'G': 1000 ** 3,
'T': 1000 ** 4,
'm': 0.001, # millicores
}
match = re.match(r'^(\d+(?:\.\d+)?)([A-Za-z]*)$', quantity_str)
if not match:
return 0
value = float(match.group(1))
unit = match.group(2)
return value * multipliers.get(unit, 1)
def send_alert(self, namespace, quota_name, alerts):
"""发送告警"""
message = f"⚠️ **ResourceQuota Alert**\n\n"
message += f"**Namespace**: {namespace}\n"
message += f"**Quota**: {quota_name}\n\n"
message += "**Resources exceeding threshold**:\n"
for alert in alerts:
message += f"- {alert['resource']}: {alert['used']}/{alert['limit']} ({alert['percent']})\n"
# 发送到 Webhook(Slack、Teams、钉钉等)
payload = {
'text': message,
'username': 'Kubernetes Quota Monitor'
}
try:
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
print(f"Alert sent for {namespace}/{quota_name}")
else:
print(f"Failed to send alert: {response.status_code}")
except Exception as e:
print(f"Error sending alert: {e}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Kubernetes Quota Monitor')
parser.add_argument('--webhook-url', required=True,
help='Webhook URL for alerts')
parser.add_argument('--threshold', type=float, default=0.8,
help='Alert threshold (default: 0.8 = 80%%)')
args = parser.parse_args()
monitor = QuotaMonitor(webhook_url=args.webhook_url, threshold=args.threshold)
monitor.run()
项目 4:跨集群资源同步
需求
将一个集群的 ConfigMap/Secret 同步到另一个集群。
Go 实现
package main
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type ResourceSyncer struct {
sourceClient *kubernetes.Clientset
targetClient *kubernetes.Clientset
configMapLister corev1listers.ConfigMapLister
secretLister corev1listers.SecretLister
workqueue workqueue.RateLimitingInterface
}
type SyncItem struct {
ResourceType string // "configmap" or "secret"
Namespace string
Name string
}
func NewResourceSyncer(sourceClient, targetClient *kubernetes.Clientset) *ResourceSyncer {
factory := informers.NewSharedInformerFactory(sourceClient, 30*time.Second)
configMapInformer := factory.Core().V1().ConfigMaps()
secretInformer := factory.Core().V1().Secrets()
syncer := &ResourceSyncer{
sourceClient: sourceClient,
targetClient: targetClient,
configMapLister: configMapInformer.Lister(),
secretLister: secretInformer.Lister(),
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}
// 监听 ConfigMap 变化
configMapInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cm := obj.(*corev1.ConfigMap)
if cm.Annotations["sync"] == "true" {
syncer.enqueueConfigMap(cm)
}
},
UpdateFunc: func(old, new interface{}) {
cm := new.(*corev1.ConfigMap)
if cm.Annotations["sync"] == "true" {
syncer.enqueueConfigMap(cm)
}
},
DeleteFunc: func(obj interface{}) {
cm := obj.(*corev1.ConfigMap)
if cm.Annotations["sync"] == "true" {
syncer.deleteConfigMap(cm)
}
},
})
// 监听 Secret 变化
secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
secret := obj.(*corev1.Secret)
if secret.Annotations["sync"] == "true" {
syncer.enqueueSecret(secret)
}
},
UpdateFunc: func(old, new interface{}) {
secret := new.(*corev1.Secret)
if secret.Annotations["sync"] == "true" {
syncer.enqueueSecret(secret)
}
},
DeleteFunc: func(obj interface{}) {
secret := obj.(*corev1.Secret)
if secret.Annotations["sync"] == "true" {
syncer.deleteSecret(secret)
}
},
})
go factory.Start(wait.NeverStop)
cache.WaitForCacheSync(wait.NeverStop,
configMapInformer.Informer().HasSynced,
secretInformer.Informer().HasSynced,
)
return syncer
}
func (rs *ResourceSyncer) enqueueConfigMap(cm *corev1.ConfigMap) {
rs.workqueue.Add(SyncItem{
ResourceType: "configmap",
Namespace: cm.Namespace,
Name: cm.Name,
})
}
func (rs *ResourceSyncer) enqueueSecret(secret *corev1.Secret) {
rs.workqueue.Add(SyncItem{
ResourceType: "secret",
Namespace: secret.Namespace,
Name: secret.Name,
})
}
func (rs *ResourceSyncer) Run(workers int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer rs.workqueue.ShutDown()
fmt.Println("Starting resource syncer")
for i := 0; i < workers; i++ {
go wait.Until(rs.runWorker, time.Second, stopCh)
}
<-stopCh
fmt.Println("Shutting down resource syncer")
}
func (rs *ResourceSyncer) runWorker() {
for rs.processNextItem() {
}
}
func (rs *ResourceSyncer) processNextItem() bool {
obj, shutdown := rs.workqueue.Get()
if shutdown {
return false
}
defer rs.workqueue.Done(obj)
item := obj.(SyncItem)
err := rs.syncResource(item)
if err == nil {
rs.workqueue.Forget(obj)
return true
}
rs.workqueue.AddRateLimited(obj)
runtime.HandleError(fmt.Errorf("error syncing '%v': %s", item, err.Error()))
return true
}
func (rs *ResourceSyncer) syncResource(item SyncItem) error {
switch item.ResourceType {
case "configmap":
return rs.syncConfigMap(item.Namespace, item.Name)
case "secret":
return rs.syncSecret(item.Namespace, item.Name)
default:
return fmt.Errorf("unknown resource type: %s", item.ResourceType)
}
}
func (rs *ResourceSyncer) syncConfigMap(namespace, name string) error {
// 1. 从源集群获取 ConfigMap
cm, err := rs.configMapLister.ConfigMaps(namespace).Get(name)
if err != nil {
return err
}
// 2. 复制 ConfigMap(移除不需要的字段)
targetCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cm.Name,
Namespace: cm.Namespace,
Labels: cm.Labels,
Annotations: cm.Annotations,
},
Data: cm.Data,
BinaryData: cm.BinaryData,
}
// 3. 在目标集群创建或更新
_, err = rs.targetClient.CoreV1().ConfigMaps(namespace).Get(
context.TODO(),
name,
metav1.GetOptions{},
)
if err != nil {
// 不存在,创建
_, err = rs.targetClient.CoreV1().ConfigMaps(namespace).Create(
context.TODO(),
targetCM,
metav1.CreateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Created ConfigMap %s/%s in target cluster\n", namespace, name)
} else {
// 已存在,更新
_, err = rs.targetClient.CoreV1().ConfigMaps(namespace).Update(
context.TODO(),
targetCM,
metav1.UpdateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Updated ConfigMap %s/%s in target cluster\n", namespace, name)
}
return nil
}
func (rs *ResourceSyncer) syncSecret(namespace, name string) error {
// 类似 ConfigMap 的逻辑
secret, err := rs.secretLister.Secrets(namespace).Get(name)
if err != nil {
return err
}
targetSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secret.Name,
Namespace: secret.Namespace,
Labels: secret.Labels,
Annotations: secret.Annotations,
},
Type: secret.Type,
Data: secret.Data,
StringData: secret.StringData,
}
_, err = rs.targetClient.CoreV1().Secrets(namespace).Get(
context.TODO(),
name,
metav1.GetOptions{},
)
if err != nil {
_, err = rs.targetClient.CoreV1().Secrets(namespace).Create(
context.TODO(),
targetSecret,
metav1.CreateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Created Secret %s/%s in target cluster\n", namespace, name)
} else {
_, err = rs.targetClient.CoreV1().Secrets(namespace).Update(
context.TODO(),
targetSecret,
metav1.UpdateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Updated Secret %s/%s in target cluster\n", namespace, name)
}
return nil
}
func (rs *ResourceSyncer) deleteConfigMap(cm *corev1.ConfigMap) {
err := rs.targetClient.CoreV1().ConfigMaps(cm.Namespace).Delete(
context.TODO(),
cm.Name,
metav1.DeleteOptions{},
)
if err != nil {
fmt.Printf("Error deleting ConfigMap %s/%s: %v\n", cm.Namespace, cm.Name, err)
return
}
fmt.Printf("Deleted ConfigMap %s/%s from target cluster\n", cm.Namespace, cm.Name)
}
func (rs *ResourceSyncer) deleteSecret(secret *corev1.Secret) {
err := rs.targetClient.CoreV1().Secrets(secret.Namespace).Delete(
context.TODO(),
secret.Name,
metav1.DeleteOptions{},
)
if err != nil {
fmt.Printf("Error deleting Secret %s/%s: %v\n", secret.Namespace, secret.Name, err)
return
}
fmt.Printf("Deleted Secret %s/%s from target cluster\n", secret.Namespace, secret.Name)
}
func main() {
// 初始化两个集群的客户端
sourceConfig, _ := clientcmd.BuildConfigFromFlags("", "/path/to/source-kubeconfig")
targetConfig, _ := clientcmd.BuildConfigFromFlags("", "/path/to/target-kubeconfig")
sourceClient, _ := kubernetes.NewForConfig(sourceConfig)
targetClient, _ := kubernetes.NewForConfig(targetConfig)
// 创建并运行同步器
syncer := NewResourceSyncer(sourceClient, targetClient)
stopCh := make(chan struct{})
defer close(stopCh)
syncer.Run(2, stopCh)
}
总结
项目选择指南
| 项目 | 适用场景 | 难度 | 价值 |
|---|---|---|---|
| Pod 清理器 | 自动清理完成的 Pod | ⭐⭐ | 高(节省资源) |
| 自动扩缩容 | 基于指标自动扩缩容 | ⭐⭐⭐ | 高(提高可用性) |
| 配额监控 | 资源配额告警 | ⭐⭐ | 中(成本控制) |
| 资源同步 | 跨集群同步 | ⭐⭐⭐⭐ | 中(多集群管理) |
最佳实践
✅ 使用 Informer: 而非轮询 API
✅ WorkQueue 解耦: 提高可靠性
✅ 错误重试: 使用 RateLimitingQueue
✅ RBAC 最小权限: 只授予必要权限
✅ 优雅关闭: 正确处理 signal
✅ 可观测性: 添加 Metrics 和日志
✅ 配置外部化: 使用环境变量或 ConfigMap
部署建议
- 容器化: 打包为 Docker 镜像
- Deployment: 使用 Kubernetes Deployment 部署
- ServiceAccount: 创建专用 ServiceAccount
- RBAC: 配置最小权限
- 监控: 集成 Prometheus Metrics
- 日志: 结构化日志输出
- 高可用: 多副本 + Leader Election
这些实战项目展示了如何将 Kubernetes API 编程应用到实际场景中,可以作为开发自己的 Kubernetes 工具的参考。