项目7:GitOps 自动部署工具

项目7:GitOps 自动部署工具

项目背景

GitOps 将 Git 作为唯一的真实来源,通过 Git 操作触发自动化部署:

  • 📝 代码即配置
  • 🔄 自动同步
  • 📜 完整的审计日志
  • ⏮️ 轻松回滚

解决方案
监听 Git 仓库变化,自动将 Kubernetes 资源同步到集群。

功能需求

核心功能

  • ✅ Git 仓库监听(GitHub、GitLab、Bitbucket)
  • ✅ 自动同步 YAML 到集群
  • ✅ 差异检测(Diff)
  • ✅ 自动回滚
  • ✅ 部署历史记录

高级功能

  • ✅ 多环境部署(dev、staging、prod)
  • ✅ Kustomize/Helm 支持
  • ✅ 健康检查
  • ✅ 渐进式发布
  • ✅ Webhook 通知

Go 完整实现

pkg/gitops/deployer.go

package gitops

import (
    "context"
    "fmt"
    "os"
    "path/filepath"
    "time"
    
    "github.com/go-git/go-git/v5"
    "github.com/go-git/go-git/v5/plumbing"
    "github.com/sirupsen/logrus"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/yaml"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/kubernetes"
    
    "gitops-deployer/pkg/config"
)

type GitOpsDeployer struct {
    clientset     *kubernetes.Clientset
    dynamicClient dynamic.Interface
    config        *config.Config
    logger        *logrus.Logger
    repo          *git.Repository
    lastCommit    string
}

type DeploymentRecord struct {
    Timestamp   time.Time
    Commit      string
    Message     string
    Author      string
    FilesChanged int
    Success     bool
    Error       string
}

func NewGitOpsDeployer(
    clientset *kubernetes.Clientset,
    dynamicClient dynamic.Interface,
    cfg *config.Config,
    logger *logrus.Logger,
) *GitOpsDeployer {
    return &GitOpsDeployer{
        clientset:     clientset,
        dynamicClient: dynamicClient,
        config:        cfg,
        logger:        logger,
    }
}

func (gd *GitOpsDeployer) Run(stopCh <-chan struct{}) error {
    gd.logger.Info("Starting GitOps deployer")
    
    // 克隆仓库
    if err := gd.cloneRepository(); err != nil {
        return err
    }
    
    // 首次同步
    if err := gd.syncResources(); err != nil {
        gd.logger.WithError(err).Error("Initial sync failed")
    }
    
    // 定期检查更新
    ticker := time.NewTicker(time.Duration(gd.config.SyncInterval) * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            if err := gd.checkAndSync(); err != nil {
                gd.logger.WithError(err).Error("Sync failed")
            }
        case <-stopCh:
            gd.logger.Info("Stopping GitOps deployer")
            return nil
        }
    }
}

func (gd *GitOpsDeployer) cloneRepository() error {
    gd.logger.Infof("Cloning repository: %s", gd.config.GitURL)
    
    repoPath := gd.config.RepoPath
    
    // 如果目录已存在,打开现有仓库
    if _, err := os.Stat(repoPath); err == nil {
        repo, err := git.PlainOpen(repoPath)
        if err != nil {
            return fmt.Errorf("failed to open repository: %v", err)
        }
        gd.repo = repo
        gd.logger.Info("Opened existing repository")
        return nil
    }
    
    // 克隆新仓库
    repo, err := git.PlainClone(repoPath, false, &git.CloneOptions{
        URL:      gd.config.GitURL,
        Progress: os.Stdout,
        Auth:     gd.getGitAuth(),
    })
    if err != nil {
        return fmt.Errorf("failed to clone repository: %v", err)
    }
    
    gd.repo = repo
    gd.logger.Info("Repository cloned successfully")
    
    return nil
}

func (gd *GitOpsDeployer) getGitAuth() transport.AuthMethod {
    if gd.config.GitToken != "" {
        return &http.BasicAuth{
            Username: "git",
            Password: gd.config.GitToken,
        }
    }
    
    if gd.config.GitSSHKey != "" {
        sshAuth, err := ssh.NewPublicKeysFromFile("git", gd.config.GitSSHKey, "")
        if err == nil {
            return sshAuth
        }
    }
    
    return nil
}

func (gd *GitOpsDeployer) checkAndSync() error {
    gd.logger.Debug("Checking for updates...")
    
    // 拉取最新代码
    worktree, err := gd.repo.Worktree()
    if err != nil {
        return err
    }
    
    err = worktree.Pull(&git.PullOptions{
        RemoteName: "origin",
        Auth:       gd.getGitAuth(),
    })
    
    if err != nil && err != git.NoErrAlreadyUpToDate {
        return fmt.Errorf("failed to pull: %v", err)
    }
    
    // 获取最新 commit
    ref, err := gd.repo.Head()
    if err != nil {
        return err
    }
    
    currentCommit := ref.Hash().String()
    
    // 检查是否有更新
    if currentCommit == gd.lastCommit {
        gd.logger.Debug("No updates found")
        return nil
    }
    
    gd.logger.Infof("New commit detected: %s", currentCommit[:8])
    
    // 同步资源
    if err := gd.syncResources(); err != nil {
        gd.recordDeployment(currentCommit, false, err.Error())
        return err
    }
    
    gd.lastCommit = currentCommit
    gd.recordDeployment(currentCommit, true, "")
    
    return nil
}

func (gd *GitOpsDeployer) syncResources() error {
    gd.logger.Info("Syncing resources from Git to cluster")
    
    // 遍历 manifests 目录
    manifestsPath := filepath.Join(gd.config.RepoPath, gd.config.ManifestsPath)
    
    var appliedCount int
    
    err := filepath.Walk(manifestsPath, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        
        if info.IsDir() || filepath.Ext(path) != ".yaml" && filepath.Ext(path) != ".yml" {
            return nil
        }
        
        if err := gd.applyManifest(path); err != nil {
            gd.logger.WithError(err).Warnf("Failed to apply manifest: %s", path)
            return nil  // 继续处理其他文件
        }
        
        appliedCount++
        
        return nil
    })
    
    if err != nil {
        return err
    }
    
    gd.logger.Infof("Applied %d manifests", appliedCount)
    
    return nil
}

func (gd *GitOpsDeployer) applyManifest(path string) error {
    gd.logger.Debugf("Applying manifest: %s", path)
    
    // 读取文件
    data, err := os.ReadFile(path)
    if err != nil {
        return err
    }
    
    // 解析 YAML
    decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(data), 4096)
    
    for {
        var obj unstructured.Unstructured
        if err := decoder.Decode(&obj); err != nil {
            if err == io.EOF {
                break
            }
            return err
        }
        
        if err := gd.applyResource(&obj); err != nil {
            return err
        }
    }
    
    return nil
}

func (gd *GitOpsDeployer) applyResource(obj *unstructured.Unstructured) error {
    gvk := obj.GroupVersionKind()
    
    gd.logger.Debugf("Applying %s/%s: %s/%s",
        gvk.Group, gvk.Version, obj.GetNamespace(), obj.GetName())
    
    // 获取对应的 GVR
    gvr := schema.GroupVersionResource{
        Group:    gvk.Group,
        Version:  gvk.Version,
        Resource: gd.pluralize(gvk.Kind),
    }
    
    namespace := obj.GetNamespace()
    if namespace == "" {
        namespace = "default"
    }
    
    // 检查资源是否存在
    existing, err := gd.dynamicClient.Resource(gvr).Namespace(namespace).Get(
        context.TODO(),
        obj.GetName(),
        metav1.GetOptions{},
    )
    
    if err != nil {
        // 资源不存在,创建
        _, err = gd.dynamicClient.Resource(gvr).Namespace(namespace).Create(
            context.TODO(),
            obj,
            metav1.CreateOptions{},
        )
        if err != nil {
            return fmt.Errorf("failed to create resource: %v", err)
        }
        
        gd.logger.Infof("Created %s: %s/%s", gvk.Kind, namespace, obj.GetName())
    } else {
        // 资源存在,更新
        obj.SetResourceVersion(existing.GetResourceVersion())
        
        _, err = gd.dynamicClient.Resource(gvr).Namespace(namespace).Update(
            context.TODO(),
            obj,
            metav1.UpdateOptions{},
        )
        if err != nil {
            return fmt.Errorf("failed to update resource: %v", err)
        }
        
        gd.logger.Infof("Updated %s: %s/%s", gvk.Kind, namespace, obj.GetName())
    }
    
    return nil
}

func (gd *GitOpsDeployer) pluralize(kind string) string {
    // 简化实现:添加 's'
    // 实际应该使用更智能的复数化
    lower := strings.ToLower(kind)
    
    if strings.HasSuffix(lower, "s") {
        return lower + "es"
    }
    
    return lower + "s"
}

func (gd *GitOpsDeployer) recordDeployment(commit string, success bool, errorMsg string) {
    record := DeploymentRecord{
        Timestamp: time.Now(),
        Commit:    commit,
        Success:   success,
        Error:     errorMsg,
    }
    
    // 保存到数据库或文件
    gd.logger.WithFields(logrus.Fields{
        "commit":  commit[:8],
        "success": success,
        "error":   errorMsg,
    }).Info("Deployment recorded")
}

func (gd *GitOpsDeployer) Rollback(commitHash string) error {
    gd.logger.Infof("Rolling back to commit: %s", commitHash)
    
    worktree, err := gd.repo.Worktree()
    if err != nil {
        return err
    }
    
    // 检出指定 commit
    err = worktree.Checkout(&git.CheckoutOptions{
        Hash: plumbing.NewHash(commitHash),
    })
    if err != nil {
        return err
    }
    
    // 重新同步
    return gd.syncResources()
}

Python 实现

gitops_deployer.py

#!/usr/bin/env python3

import os
import time
import yaml
from datetime import datetime
from pathlib import Path
from git import Repo
from kubernetes import client, config, dynamic
from kubernetes.client import api_client

class GitOpsDeployer:
    def __init__(self, git_url, repo_path, manifests_path, sync_interval=60):
        # 加载 Kubernetes 配置
        try:
            config.load_incluster_config()
        except:
            config.load_kube_config()
        
        self.k8s_client = client.ApiClient()
        self.dynamic_client = dynamic.DynamicClient(self.k8s_client)
        self.v1 = client.CoreV1Api()
        self.apps_v1 = client.AppsV1Api()
        
        self.git_url = git_url
        self.repo_path = repo_path
        self.manifests_path = manifests_path
        self.sync_interval = sync_interval
        self.last_commit = None
        self.repo = None
    
    def run(self):
        """运行 GitOps 部署器"""
        print("Starting GitOps deployer...")
        
        # 克隆仓库
        self.clone_repository()
        
        # 首次同步
        self.sync_resources()
        
        # 定期检查更新
        while True:
            try:
                self.check_and_sync()
                time.sleep(self.sync_interval)
            except KeyboardInterrupt:
                print("Stopping GitOps deployer...")
                break
            except Exception as e:
                print(f"Error: {e}")
                time.sleep(self.sync_interval)
    
    def clone_repository(self):
        """克隆 Git 仓库"""
        print(f"Cloning repository: {self.git_url}")
        
        if os.path.exists(self.repo_path):
            self.repo = Repo(self.repo_path)
            print("Opened existing repository")
        else:
            self.repo = Repo.clone_from(self.git_url, self.repo_path)
            print("Repository cloned successfully")
    
    def check_and_sync(self):
        """检查更新并同步"""
        print("Checking for updates...")
        
        # 拉取最新代码
        origin = self.repo.remotes.origin
        origin.pull()
        
        # 获取最新 commit
        current_commit = self.repo.head.commit.hexsha
        
        if current_commit == self.last_commit:
            print("No updates found")
            return
        
        print(f"New commit detected: {current_commit[:8]}")
        
        # 同步资源
        try:
            self.sync_resources()
            self.last_commit = current_commit
            self.record_deployment(current_commit, True, "")
        except Exception as e:
            print(f"Sync failed: {e}")
            self.record_deployment(current_commit, False, str(e))
    
    def sync_resources(self):
        """同步资源到集群"""
        print("Syncing resources from Git to cluster")
        
        manifests_dir = os.path.join(self.repo_path, self.manifests_path)
        applied_count = 0
        
        # 遍历 YAML 文件
        for yaml_file in Path(manifests_dir).rglob('*.yaml'):
            try:
                self.apply_manifest(str(yaml_file))
                applied_count += 1
            except Exception as e:
                print(f"Failed to apply {yaml_file}: {e}")
        
        print(f"Applied {applied_count} manifests")
    
    def apply_manifest(self, file_path):
        """应用单个 manifest"""
        print(f"Applying manifest: {file_path}")
        
        with open(file_path, 'r') as f:
            # 支持多文档 YAML
            for doc in yaml.safe_load_all(f):
                if doc is None:
                    continue
                
                self.apply_resource(doc)
    
    def apply_resource(self, resource_dict):
        """应用单个资源"""
        kind = resource_dict.get('kind')
        api_version = resource_dict.get('apiVersion')
        namespace = resource_dict.get('metadata', {}).get('namespace', 'default')
        name = resource_dict.get('metadata', {}).get('name')
        
        print(f"Applying {kind}: {namespace}/{name}")
        
        # 根据资源类型选择 API
        if kind == 'Deployment':
            self.apply_deployment(resource_dict, namespace, name)
        elif kind == 'Service':
            self.apply_service(resource_dict, namespace, name)
        elif kind == 'ConfigMap':
            self.apply_configmap(resource_dict, namespace, name)
        elif kind == 'Secret':
            self.apply_secret(resource_dict, namespace, name)
        else:
            # 使用 dynamic client 处理其他资源
            self.apply_with_dynamic_client(resource_dict)
    
    def apply_deployment(self, resource_dict, namespace, name):
        """应用 Deployment"""
        try:
            # 尝试获取现有 Deployment
            self.apps_v1.read_namespaced_deployment(name, namespace)
            
            # 存在,更新
            body = client.ApiClient().sanitize_for_serialization(resource_dict)
            self.apps_v1.patch_namespaced_deployment(
                name, namespace, body
            )
            print(f"Updated Deployment: {namespace}/{name}")
        
        except client.exceptions.ApiException as e:
            if e.status == 404:
                # 不存在,创建
                body = client.ApiClient().sanitize_for_serialization(resource_dict)
                self.apps_v1.create_namespaced_deployment(namespace, body)
                print(f"Created Deployment: {namespace}/{name}")
            else:
                raise
    
    def apply_service(self, resource_dict, namespace, name):
        """应用 Service"""
        try:
            self.v1.read_namespaced_service(name, namespace)
            
            body = client.ApiClient().sanitize_for_serialization(resource_dict)
            self.v1.patch_namespaced_service(name, namespace, body)
            print(f"Updated Service: {namespace}/{name}")
        
        except client.exceptions.ApiException as e:
            if e.status == 404:
                body = client.ApiClient().sanitize_for_serialization(resource_dict)
                self.v1.create_namespaced_service(namespace, body)
                print(f"Created Service: {namespace}/{name}")
            else:
                raise
    
    def apply_configmap(self, resource_dict, namespace, name):
        """应用 ConfigMap"""
        try:
            self.v1.read_namespaced_config_map(name, namespace)
            
            body = client.ApiClient().sanitize_for_serialization(resource_dict)
            self.v1.patch_namespaced_config_map(name, namespace, body)
            print(f"Updated ConfigMap: {namespace}/{name}")
        
        except client.exceptions.ApiException as e:
            if e.status == 404:
                body = client.ApiClient().sanitize_for_serialization(resource_dict)
                self.v1.create_namespaced_config_map(namespace, body)
                print(f"Created ConfigMap: {namespace}/{name}")
            else:
                raise
    
    def apply_secret(self, resource_dict, namespace, name):
        """应用 Secret"""
        try:
            self.v1.read_namespaced_secret(name, namespace)
            
            body = client.ApiClient().sanitize_for_serialization(resource_dict)
            self.v1.patch_namespaced_secret(name, namespace, body)
            print(f"Updated Secret: {namespace}/{name}")
        
        except client.exceptions.ApiException as e:
            if e.status == 404:
                body = client.ApiClient().sanitize_for_serialization(resource_dict)
                self.v1.create_namespaced_secret(namespace, body)
                print(f"Created Secret: {namespace}/{name}")
            else:
                raise
    
    def apply_with_dynamic_client(self, resource_dict):
        """使用 dynamic client 应用资源"""
        # 简化实现
        print(f"Skipping {resource_dict.get('kind')} (not implemented)")
    
    def record_deployment(self, commit, success, error):
        """记录部署历史"""
        record = {
            'timestamp': datetime.now().isoformat(),
            'commit': commit,
            'success': success,
            'error': error
        }
        
        print(f"Deployment recorded: {record}")
        
        # 保存到文件或数据库
        # with open('deployment_history.json', 'a') as f:
        #     json.dump(record, f)
        #     f.write('\n')
    
    def rollback(self, commit_hash):
        """回滚到指定 commit"""
        print(f"Rolling back to commit: {commit_hash}")
        
        # 检出指定 commit
        self.repo.git.checkout(commit_hash)
        
        # 重新同步
        self.sync_resources()

if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='GitOps Deployer')
    parser.add_argument('--git-url', required=True, help='Git repository URL')
    parser.add_argument('--repo-path', default='/tmp/gitops-repo', help='Local repo path')
    parser.add_argument('--manifests-path', default='manifests', help='Manifests directory')
    parser.add_argument('--sync-interval', type=int, default=60, help='Sync interval (seconds)')
    
    args = parser.parse_args()
    
    deployer = GitOpsDeployer(
        git_url=args.git_url,
        repo_path=args.repo_path,
        manifests_path=args.manifests_path,
        sync_interval=args.sync_interval
    )
    
    deployer.run()

仓库结构示例

my-k8s-config/
├── manifests/
│   ├── base/
│   │   ├── deployment.yaml
│   │   ├── service.yaml
│   │   └── configmap.yaml
│   ├── dev/
│   │   └── kustomization.yaml
│   ├── staging/
│   │   └── kustomization.yaml
│   └── production/
│       └── kustomization.yaml
└── README.md

deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx
  namespace: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.21
        ports:
        - containerPort: 80

部署

Kubernetes Deployment

apiVersion: v1
kind: ServiceAccount
metadata:
  name: gitops-deployer
  namespace: kube-system

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: gitops-deployer
rules:
- apiGroups: ["*"]
  resources: ["*"]
  verbs: ["*"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: gitops-deployer
subjects:
- kind: ServiceAccount
  name: gitops-deployer
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: gitops-deployer
  apiGroup: rbac.authorization.k8s.io

---
apiVersion: v1
kind: Secret
metadata:
  name: git-credentials
  namespace: kube-system
type: Opaque
stringData:
  token: "your-github-token"

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: gitops-deployer
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gitops-deployer
  template:
    metadata:
      labels:
        app: gitops-deployer
    spec:
      serviceAccountName: gitops-deployer
      containers:
      - name: deployer
        image: your-registry/gitops-deployer:latest
        args:
        - --git-url=https://github.com/your-org/k8s-config.git
        - --repo-path=/repo
        - --manifests-path=manifests/production
        - --sync-interval=60
        env:
        - name: GIT_TOKEN
          valueFrom:
            secretKeyRef:
              name: git-credentials
              key: token
        volumeMounts:
        - name: repo
          mountPath: /repo
      volumes:
      - name: repo
        emptyDir: {}

Webhook 集成

GitHub Webhook

from flask import Flask, request
import hmac
import hashlib

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def github_webhook():
    # 验证签名
    signature = request.headers.get('X-Hub-Signature-256')
    if not verify_signature(request.data, signature):
        return 'Invalid signature', 401
    
    # 处理事件
    event = request.headers.get('X-GitHub-Event')
    payload = request.json
    
    if event == 'push':
        # 触发同步
        deployer.check_and_sync()
        return 'Sync triggered', 200
    
    return 'OK', 200

def verify_signature(payload, signature):
    secret = os.environ.get('WEBHOOK_SECRET')
    expected = 'sha256=' + hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    
    return hmac.compare_digest(expected, signature)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

使用示例

1. 提交变更到 Git

# 修改 deployment.yaml
vi manifests/production/deployment.yaml

# 提交变更
git add manifests/production/deployment.yaml
git commit -m "Update nginx replicas to 5"
git push origin main

2. 自动部署

GitOps Deployer 会自动:

  1. 检测到新 commit
  2. 拉取最新代码
  3. 应用变更到集群
  4. 记录部署历史

3. 回滚

# 查看历史
git log

# 回滚到指定 commit
kubectl exec -it gitops-deployer-xxx -- \
  /app/deployer rollback <commit-hash>

监控

Prometheus Metrics

var (
    syncTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "gitops_syncs_total",
            Help: "Total number of syncs",
        },
        []string{"status"},
    )
    
    lastSyncTime = prometheus.NewGauge(
        prometheus.GaugeOpts{
            Name: "gitops_last_sync_timestamp",
            Help: "Timestamp of last successful sync",
        },
    )
)

总结

功能特性

Git 作为真实来源: 所有变更通过 Git
自动同步: 无需手动 kubectl apply
审计日志: 完整的 Git 历史
轻松回滚: Git revert/checkout
多环境支持: dev/staging/prod

最佳实践

  1. 分支策略: 不同环境使用不同分支
  2. PR 审查: 通过 PR 审查变更
  3. 测试环境: 先部署到测试环境
  4. 健康检查: 部署后验证健康状态
  5. 告警通知: 部署失败时通知

对比 ArgoCD/Flux

特性 本项目 ArgoCD Flux
轻量级
易定制
UI
多集群

最后一个项目将介绍备份恢复工具。