项目7:GitOps 自动部署工具
项目7:GitOps 自动部署工具
项目背景
GitOps 将 Git 作为唯一的真实来源,通过 Git 操作触发自动化部署:
- 📝 代码即配置
- 🔄 自动同步
- 📜 完整的审计日志
- ⏮️ 轻松回滚
解决方案:
监听 Git 仓库变化,自动将 Kubernetes 资源同步到集群。
功能需求
核心功能
- ✅ Git 仓库监听(GitHub、GitLab、Bitbucket)
- ✅ 自动同步 YAML 到集群
- ✅ 差异检测(Diff)
- ✅ 自动回滚
- ✅ 部署历史记录
高级功能
- ✅ 多环境部署(dev、staging、prod)
- ✅ Kustomize/Helm 支持
- ✅ 健康检查
- ✅ 渐进式发布
- ✅ Webhook 通知
Go 完整实现
pkg/gitops/deployer.go
package gitops
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"gitops-deployer/pkg/config"
)
type GitOpsDeployer struct {
clientset *kubernetes.Clientset
dynamicClient dynamic.Interface
config *config.Config
logger *logrus.Logger
repo *git.Repository
lastCommit string
}
type DeploymentRecord struct {
Timestamp time.Time
Commit string
Message string
Author string
FilesChanged int
Success bool
Error string
}
func NewGitOpsDeployer(
clientset *kubernetes.Clientset,
dynamicClient dynamic.Interface,
cfg *config.Config,
logger *logrus.Logger,
) *GitOpsDeployer {
return &GitOpsDeployer{
clientset: clientset,
dynamicClient: dynamicClient,
config: cfg,
logger: logger,
}
}
func (gd *GitOpsDeployer) Run(stopCh <-chan struct{}) error {
gd.logger.Info("Starting GitOps deployer")
// 克隆仓库
if err := gd.cloneRepository(); err != nil {
return err
}
// 首次同步
if err := gd.syncResources(); err != nil {
gd.logger.WithError(err).Error("Initial sync failed")
}
// 定期检查更新
ticker := time.NewTicker(time.Duration(gd.config.SyncInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := gd.checkAndSync(); err != nil {
gd.logger.WithError(err).Error("Sync failed")
}
case <-stopCh:
gd.logger.Info("Stopping GitOps deployer")
return nil
}
}
}
func (gd *GitOpsDeployer) cloneRepository() error {
gd.logger.Infof("Cloning repository: %s", gd.config.GitURL)
repoPath := gd.config.RepoPath
// 如果目录已存在,打开现有仓库
if _, err := os.Stat(repoPath); err == nil {
repo, err := git.PlainOpen(repoPath)
if err != nil {
return fmt.Errorf("failed to open repository: %v", err)
}
gd.repo = repo
gd.logger.Info("Opened existing repository")
return nil
}
// 克隆新仓库
repo, err := git.PlainClone(repoPath, false, &git.CloneOptions{
URL: gd.config.GitURL,
Progress: os.Stdout,
Auth: gd.getGitAuth(),
})
if err != nil {
return fmt.Errorf("failed to clone repository: %v", err)
}
gd.repo = repo
gd.logger.Info("Repository cloned successfully")
return nil
}
func (gd *GitOpsDeployer) getGitAuth() transport.AuthMethod {
if gd.config.GitToken != "" {
return &http.BasicAuth{
Username: "git",
Password: gd.config.GitToken,
}
}
if gd.config.GitSSHKey != "" {
sshAuth, err := ssh.NewPublicKeysFromFile("git", gd.config.GitSSHKey, "")
if err == nil {
return sshAuth
}
}
return nil
}
func (gd *GitOpsDeployer) checkAndSync() error {
gd.logger.Debug("Checking for updates...")
// 拉取最新代码
worktree, err := gd.repo.Worktree()
if err != nil {
return err
}
err = worktree.Pull(&git.PullOptions{
RemoteName: "origin",
Auth: gd.getGitAuth(),
})
if err != nil && err != git.NoErrAlreadyUpToDate {
return fmt.Errorf("failed to pull: %v", err)
}
// 获取最新 commit
ref, err := gd.repo.Head()
if err != nil {
return err
}
currentCommit := ref.Hash().String()
// 检查是否有更新
if currentCommit == gd.lastCommit {
gd.logger.Debug("No updates found")
return nil
}
gd.logger.Infof("New commit detected: %s", currentCommit[:8])
// 同步资源
if err := gd.syncResources(); err != nil {
gd.recordDeployment(currentCommit, false, err.Error())
return err
}
gd.lastCommit = currentCommit
gd.recordDeployment(currentCommit, true, "")
return nil
}
func (gd *GitOpsDeployer) syncResources() error {
gd.logger.Info("Syncing resources from Git to cluster")
// 遍历 manifests 目录
manifestsPath := filepath.Join(gd.config.RepoPath, gd.config.ManifestsPath)
var appliedCount int
err := filepath.Walk(manifestsPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() || filepath.Ext(path) != ".yaml" && filepath.Ext(path) != ".yml" {
return nil
}
if err := gd.applyManifest(path); err != nil {
gd.logger.WithError(err).Warnf("Failed to apply manifest: %s", path)
return nil // 继续处理其他文件
}
appliedCount++
return nil
})
if err != nil {
return err
}
gd.logger.Infof("Applied %d manifests", appliedCount)
return nil
}
func (gd *GitOpsDeployer) applyManifest(path string) error {
gd.logger.Debugf("Applying manifest: %s", path)
// 读取文件
data, err := os.ReadFile(path)
if err != nil {
return err
}
// 解析 YAML
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(data), 4096)
for {
var obj unstructured.Unstructured
if err := decoder.Decode(&obj); err != nil {
if err == io.EOF {
break
}
return err
}
if err := gd.applyResource(&obj); err != nil {
return err
}
}
return nil
}
func (gd *GitOpsDeployer) applyResource(obj *unstructured.Unstructured) error {
gvk := obj.GroupVersionKind()
gd.logger.Debugf("Applying %s/%s: %s/%s",
gvk.Group, gvk.Version, obj.GetNamespace(), obj.GetName())
// 获取对应的 GVR
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: gd.pluralize(gvk.Kind),
}
namespace := obj.GetNamespace()
if namespace == "" {
namespace = "default"
}
// 检查资源是否存在
existing, err := gd.dynamicClient.Resource(gvr).Namespace(namespace).Get(
context.TODO(),
obj.GetName(),
metav1.GetOptions{},
)
if err != nil {
// 资源不存在,创建
_, err = gd.dynamicClient.Resource(gvr).Namespace(namespace).Create(
context.TODO(),
obj,
metav1.CreateOptions{},
)
if err != nil {
return fmt.Errorf("failed to create resource: %v", err)
}
gd.logger.Infof("Created %s: %s/%s", gvk.Kind, namespace, obj.GetName())
} else {
// 资源存在,更新
obj.SetResourceVersion(existing.GetResourceVersion())
_, err = gd.dynamicClient.Resource(gvr).Namespace(namespace).Update(
context.TODO(),
obj,
metav1.UpdateOptions{},
)
if err != nil {
return fmt.Errorf("failed to update resource: %v", err)
}
gd.logger.Infof("Updated %s: %s/%s", gvk.Kind, namespace, obj.GetName())
}
return nil
}
func (gd *GitOpsDeployer) pluralize(kind string) string {
// 简化实现:添加 's'
// 实际应该使用更智能的复数化
lower := strings.ToLower(kind)
if strings.HasSuffix(lower, "s") {
return lower + "es"
}
return lower + "s"
}
func (gd *GitOpsDeployer) recordDeployment(commit string, success bool, errorMsg string) {
record := DeploymentRecord{
Timestamp: time.Now(),
Commit: commit,
Success: success,
Error: errorMsg,
}
// 保存到数据库或文件
gd.logger.WithFields(logrus.Fields{
"commit": commit[:8],
"success": success,
"error": errorMsg,
}).Info("Deployment recorded")
}
func (gd *GitOpsDeployer) Rollback(commitHash string) error {
gd.logger.Infof("Rolling back to commit: %s", commitHash)
worktree, err := gd.repo.Worktree()
if err != nil {
return err
}
// 检出指定 commit
err = worktree.Checkout(&git.CheckoutOptions{
Hash: plumbing.NewHash(commitHash),
})
if err != nil {
return err
}
// 重新同步
return gd.syncResources()
}
Python 实现
gitops_deployer.py
#!/usr/bin/env python3
import os
import time
import yaml
from datetime import datetime
from pathlib import Path
from git import Repo
from kubernetes import client, config, dynamic
from kubernetes.client import api_client
class GitOpsDeployer:
def __init__(self, git_url, repo_path, manifests_path, sync_interval=60):
# 加载 Kubernetes 配置
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.k8s_client = client.ApiClient()
self.dynamic_client = dynamic.DynamicClient(self.k8s_client)
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.git_url = git_url
self.repo_path = repo_path
self.manifests_path = manifests_path
self.sync_interval = sync_interval
self.last_commit = None
self.repo = None
def run(self):
"""运行 GitOps 部署器"""
print("Starting GitOps deployer...")
# 克隆仓库
self.clone_repository()
# 首次同步
self.sync_resources()
# 定期检查更新
while True:
try:
self.check_and_sync()
time.sleep(self.sync_interval)
except KeyboardInterrupt:
print("Stopping GitOps deployer...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(self.sync_interval)
def clone_repository(self):
"""克隆 Git 仓库"""
print(f"Cloning repository: {self.git_url}")
if os.path.exists(self.repo_path):
self.repo = Repo(self.repo_path)
print("Opened existing repository")
else:
self.repo = Repo.clone_from(self.git_url, self.repo_path)
print("Repository cloned successfully")
def check_and_sync(self):
"""检查更新并同步"""
print("Checking for updates...")
# 拉取最新代码
origin = self.repo.remotes.origin
origin.pull()
# 获取最新 commit
current_commit = self.repo.head.commit.hexsha
if current_commit == self.last_commit:
print("No updates found")
return
print(f"New commit detected: {current_commit[:8]}")
# 同步资源
try:
self.sync_resources()
self.last_commit = current_commit
self.record_deployment(current_commit, True, "")
except Exception as e:
print(f"Sync failed: {e}")
self.record_deployment(current_commit, False, str(e))
def sync_resources(self):
"""同步资源到集群"""
print("Syncing resources from Git to cluster")
manifests_dir = os.path.join(self.repo_path, self.manifests_path)
applied_count = 0
# 遍历 YAML 文件
for yaml_file in Path(manifests_dir).rglob('*.yaml'):
try:
self.apply_manifest(str(yaml_file))
applied_count += 1
except Exception as e:
print(f"Failed to apply {yaml_file}: {e}")
print(f"Applied {applied_count} manifests")
def apply_manifest(self, file_path):
"""应用单个 manifest"""
print(f"Applying manifest: {file_path}")
with open(file_path, 'r') as f:
# 支持多文档 YAML
for doc in yaml.safe_load_all(f):
if doc is None:
continue
self.apply_resource(doc)
def apply_resource(self, resource_dict):
"""应用单个资源"""
kind = resource_dict.get('kind')
api_version = resource_dict.get('apiVersion')
namespace = resource_dict.get('metadata', {}).get('namespace', 'default')
name = resource_dict.get('metadata', {}).get('name')
print(f"Applying {kind}: {namespace}/{name}")
# 根据资源类型选择 API
if kind == 'Deployment':
self.apply_deployment(resource_dict, namespace, name)
elif kind == 'Service':
self.apply_service(resource_dict, namespace, name)
elif kind == 'ConfigMap':
self.apply_configmap(resource_dict, namespace, name)
elif kind == 'Secret':
self.apply_secret(resource_dict, namespace, name)
else:
# 使用 dynamic client 处理其他资源
self.apply_with_dynamic_client(resource_dict)
def apply_deployment(self, resource_dict, namespace, name):
"""应用 Deployment"""
try:
# 尝试获取现有 Deployment
self.apps_v1.read_namespaced_deployment(name, namespace)
# 存在,更新
body = client.ApiClient().sanitize_for_serialization(resource_dict)
self.apps_v1.patch_namespaced_deployment(
name, namespace, body
)
print(f"Updated Deployment: {namespace}/{name}")
except client.exceptions.ApiException as e:
if e.status == 404:
# 不存在,创建
body = client.ApiClient().sanitize_for_serialization(resource_dict)
self.apps_v1.create_namespaced_deployment(namespace, body)
print(f"Created Deployment: {namespace}/{name}")
else:
raise
def apply_service(self, resource_dict, namespace, name):
"""应用 Service"""
try:
self.v1.read_namespaced_service(name, namespace)
body = client.ApiClient().sanitize_for_serialization(resource_dict)
self.v1.patch_namespaced_service(name, namespace, body)
print(f"Updated Service: {namespace}/{name}")
except client.exceptions.ApiException as e:
if e.status == 404:
body = client.ApiClient().sanitize_for_serialization(resource_dict)
self.v1.create_namespaced_service(namespace, body)
print(f"Created Service: {namespace}/{name}")
else:
raise
def apply_configmap(self, resource_dict, namespace, name):
"""应用 ConfigMap"""
try:
self.v1.read_namespaced_config_map(name, namespace)
body = client.ApiClient().sanitize_for_serialization(resource_dict)
self.v1.patch_namespaced_config_map(name, namespace, body)
print(f"Updated ConfigMap: {namespace}/{name}")
except client.exceptions.ApiException as e:
if e.status == 404:
body = client.ApiClient().sanitize_for_serialization(resource_dict)
self.v1.create_namespaced_config_map(namespace, body)
print(f"Created ConfigMap: {namespace}/{name}")
else:
raise
def apply_secret(self, resource_dict, namespace, name):
"""应用 Secret"""
try:
self.v1.read_namespaced_secret(name, namespace)
body = client.ApiClient().sanitize_for_serialization(resource_dict)
self.v1.patch_namespaced_secret(name, namespace, body)
print(f"Updated Secret: {namespace}/{name}")
except client.exceptions.ApiException as e:
if e.status == 404:
body = client.ApiClient().sanitize_for_serialization(resource_dict)
self.v1.create_namespaced_secret(namespace, body)
print(f"Created Secret: {namespace}/{name}")
else:
raise
def apply_with_dynamic_client(self, resource_dict):
"""使用 dynamic client 应用资源"""
# 简化实现
print(f"Skipping {resource_dict.get('kind')} (not implemented)")
def record_deployment(self, commit, success, error):
"""记录部署历史"""
record = {
'timestamp': datetime.now().isoformat(),
'commit': commit,
'success': success,
'error': error
}
print(f"Deployment recorded: {record}")
# 保存到文件或数据库
# with open('deployment_history.json', 'a') as f:
# json.dump(record, f)
# f.write('\n')
def rollback(self, commit_hash):
"""回滚到指定 commit"""
print(f"Rolling back to commit: {commit_hash}")
# 检出指定 commit
self.repo.git.checkout(commit_hash)
# 重新同步
self.sync_resources()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='GitOps Deployer')
parser.add_argument('--git-url', required=True, help='Git repository URL')
parser.add_argument('--repo-path', default='/tmp/gitops-repo', help='Local repo path')
parser.add_argument('--manifests-path', default='manifests', help='Manifests directory')
parser.add_argument('--sync-interval', type=int, default=60, help='Sync interval (seconds)')
args = parser.parse_args()
deployer = GitOpsDeployer(
git_url=args.git_url,
repo_path=args.repo_path,
manifests_path=args.manifests_path,
sync_interval=args.sync_interval
)
deployer.run()
仓库结构示例
my-k8s-config/
├── manifests/
│ ├── base/
│ │ ├── deployment.yaml
│ │ ├── service.yaml
│ │ └── configmap.yaml
│ ├── dev/
│ │ └── kustomization.yaml
│ ├── staging/
│ │ └── kustomization.yaml
│ └── production/
│ └── kustomization.yaml
└── README.md
deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
namespace: default
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.21
ports:
- containerPort: 80
部署
Kubernetes Deployment
apiVersion: v1
kind: ServiceAccount
metadata:
name: gitops-deployer
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: gitops-deployer
rules:
- apiGroups: ["*"]
resources: ["*"]
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: gitops-deployer
subjects:
- kind: ServiceAccount
name: gitops-deployer
namespace: kube-system
roleRef:
kind: ClusterRole
name: gitops-deployer
apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: Secret
metadata:
name: git-credentials
namespace: kube-system
type: Opaque
stringData:
token: "your-github-token"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: gitops-deployer
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: gitops-deployer
template:
metadata:
labels:
app: gitops-deployer
spec:
serviceAccountName: gitops-deployer
containers:
- name: deployer
image: your-registry/gitops-deployer:latest
args:
- --git-url=https://github.com/your-org/k8s-config.git
- --repo-path=/repo
- --manifests-path=manifests/production
- --sync-interval=60
env:
- name: GIT_TOKEN
valueFrom:
secretKeyRef:
name: git-credentials
key: token
volumeMounts:
- name: repo
mountPath: /repo
volumes:
- name: repo
emptyDir: {}
Webhook 集成
GitHub Webhook
from flask import Flask, request
import hmac
import hashlib
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def github_webhook():
# 验证签名
signature = request.headers.get('X-Hub-Signature-256')
if not verify_signature(request.data, signature):
return 'Invalid signature', 401
# 处理事件
event = request.headers.get('X-GitHub-Event')
payload = request.json
if event == 'push':
# 触发同步
deployer.check_and_sync()
return 'Sync triggered', 200
return 'OK', 200
def verify_signature(payload, signature):
secret = os.environ.get('WEBHOOK_SECRET')
expected = 'sha256=' + hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
使用示例
1. 提交变更到 Git
# 修改 deployment.yaml
vi manifests/production/deployment.yaml
# 提交变更
git add manifests/production/deployment.yaml
git commit -m "Update nginx replicas to 5"
git push origin main
2. 自动部署
GitOps Deployer 会自动:
- 检测到新 commit
- 拉取最新代码
- 应用变更到集群
- 记录部署历史
3. 回滚
# 查看历史
git log
# 回滚到指定 commit
kubectl exec -it gitops-deployer-xxx -- \
/app/deployer rollback <commit-hash>
监控
Prometheus Metrics
var (
syncTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitops_syncs_total",
Help: "Total number of syncs",
},
[]string{"status"},
)
lastSyncTime = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "gitops_last_sync_timestamp",
Help: "Timestamp of last successful sync",
},
)
)
总结
功能特性
✅ Git 作为真实来源: 所有变更通过 Git
✅ 自动同步: 无需手动 kubectl apply
✅ 审计日志: 完整的 Git 历史
✅ 轻松回滚: Git revert/checkout
✅ 多环境支持: dev/staging/prod
最佳实践
- 分支策略: 不同环境使用不同分支
- PR 审查: 通过 PR 审查变更
- 测试环境: 先部署到测试环境
- 健康检查: 部署后验证健康状态
- 告警通知: 部署失败时通知
对比 ArgoCD/Flux
| 特性 | 本项目 | ArgoCD | Flux |
|---|---|---|---|
| 轻量级 | ✅ | ❌ | ✅ |
| 易定制 | ✅ | ❌ | ✅ |
| UI | ❌ | ✅ | ❌ |
| 多集群 | ❌ | ✅ | ✅ |
最后一个项目将介绍备份恢复工具。