CRD 与 Operator 开发

CRD 与 Operator 开发

CRD 概述

CRD (Custom Resource Definition) 允许用户扩展 Kubernetes API,定义自己的资源类型。

为什么需要 CRD

内置资源 (Pod, Deployment, Service...)
         ↓
    不满足需求
         ↓
    自定义资源 (CRD)
         ↓
┌────────────────────────────┐
│  数据库集群 (MySQLCluster)  │
│  消息队列 (KafkaCluster)    │
│  应用配置 (Application)     │
│  自动伸缩策略 (ScalingRule) │
└────────────────────────────┘

创建 CRD

基本 CRD 定义

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: mysqls.database.example.com
spec:
  # 资源组
  group: database.example.com
  
  # 支持的版本
  versions:
  - name: v1
    # 是否启用
    served: true
    # 是否用于存储
    storage: true
    # Schema 定义
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              # 副本数
              replicas:
                type: integer
                minimum: 1
                maximum: 10
              # 版本
              version:
                type: string
                enum: ["5.7", "8.0"]
              # 存储大小
              storageSize:
                type: string
                pattern: '^[0-9]+Gi$'
              # 资源配置
              resources:
                type: object
                properties:
                  requests:
                    type: object
                    properties:
                      memory:
                        type: string
                      cpu:
                        type: string
            required: ["replicas", "version"]
          status:
            type: object
            properties:
              # 当前状态
              phase:
                type: string
                enum: ["Pending", "Running", "Failed"]
              # 就绪副本数
              readyReplicas:
                type: integer
              # 消息
              message:
                type: string
    # 打印列
    additionalPrinterColumns:
    - name: Replicas
      type: integer
      jsonPath: .spec.replicas
    - name: Version
      type: string
      jsonPath: .spec.version
    - name: Status
      type: string
      jsonPath: .status.phase
    - name: Age
      type: date
      jsonPath: .metadata.creationTimestamp
    
    # Subresources
    subresources:
      # 支持 /status 子资源
      status: {}
      # 支持 /scale 子资源
      scale:
        specReplicasPath: .spec.replicas
        statusReplicasPath: .status.replicas
  
  # 资源范围
  scope: Namespaced
  
  # 资源名称
  names:
    # CRD 名称
    plural: mysqls
    # 单数形式
    singular: mysql
    # Kind 名称
    kind: MySQL
    # 短名称
    shortNames:
    - my

应用 CRD

# 创建 CRD
kubectl apply -f mysql-crd.yaml

# 查看 CRD
kubectl get crd
kubectl describe crd mysqls.database.example.com

# 查看 API 资源
kubectl api-resources | grep mysql

创建自定义资源实例

apiVersion: database.example.com/v1
kind: MySQL
metadata:
  name: my-database
  namespace: default
spec:
  replicas: 3
  version: "8.0"
  storageSize: 10Gi
  resources:
    requests:
      memory: 1Gi
      cpu: 500m
# 创建实例
kubectl apply -f mysql-instance.yaml

# 查看实例
kubectl get mysql
kubectl get my  # 使用短名称
kubectl describe mysql my-database

Operator 模式

Operator 架构

┌────────────────────────────────────────┐
│          Kubernetes API Server         │
└────────────┬───────────────────────────┘
             │ Watch
             ▼
┌────────────────────────────────────────┐
│           Operator Controller          │
│  ┌──────────────────────────────────┐  │
│  │  Informer (监听资源变化)          │  │
│  │  - List/Watch Custom Resources   │  │
│  │  - 缓存资源状态                   │  │
│  └────────────┬─────────────────────┘  │
│               ▼                         │
│  ┌──────────────────────────────────┐  │
│  │  Control Loop (控制循环)          │  │
│  │  1. 获取期望状态 (Spec)           │  │
│  │  2. 获取当前状态 (Status)         │  │
│  │  3. 计算差异                      │  │
│  │  4. 执行调谐操作                  │  │
│  │  5. 更新状态                      │  │
│  └────────────┬─────────────────────┘  │
└───────────────┼────────────────────────┘
                │ 操作资源
                ▼
┌────────────────────────────────────────┐
│     Kubernetes Resources               │
│  - Pod                                 │
│  - Service                             │
│  - ConfigMap                           │
│  - PVC                                 │
└────────────────────────────────────────┘

使用 client-go 开发 Operator

项目结构

mysql-operator/
├── api/
│   └── v1/
│       ├── mysql_types.go      # 资源定义
│       └── zz_generated.deepcopy.go
├── config/
│   ├── crd/                    # CRD YAML
│   └── rbac/                   # RBAC 配置
├── controllers/
│   └── mysql_controller.go     # 控制器逻辑
├── main.go
└── go.mod

定义资源类型

// api/v1/mysql_types.go
package v1

import (
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// MySQLSpec 定义期望状态
type MySQLSpec struct {
    // 副本数
    Replicas int32 `json:"replicas"`
    
    // MySQL 版本
    Version string `json:"version"`
    
    // 存储大小
    StorageSize string `json:"storageSize"`
    
    // 资源配置
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`
    
    // 配置选项
    Config map[string]string `json:"config,omitempty"`
}

// MySQLStatus 定义当前状态
type MySQLStatus struct {
    // 当前阶段
    Phase string `json:"phase,omitempty"`
    
    // 就绪副本数
    ReadyReplicas int32 `json:"readyReplicas,omitempty"`
    
    // 条件
    Conditions []metav1.Condition `json:"conditions,omitempty"`
    
    // 消息
    Message string `json:"message,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas
// +kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Version",type=string,JSONPath=`.spec.version`
// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp`

// MySQL 是自定义资源的 Schema
type MySQL struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    
    Spec   MySQLSpec   `json:"spec,omitempty"`
    Status MySQLStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// MySQLList 包含 MySQL 列表
type MySQLList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []MySQL `json:"items"`
}

func init() {
    SchemeBuilder.Register(&MySQL{}, &MySQLList{})
}

实现控制器

// controllers/mysql_controller.go
package controllers

import (
    "context"
    "fmt"
    
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    "sigs.k8s.io/controller-runtime/pkg/log"
    
    databasev1 "mysql-operator/api/v1"
)

// MySQLReconciler 调谐自定义资源
type MySQLReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=database.example.com,resources=mysqls,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=database.example.com,resources=mysqls/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete

// Reconcile 是主要的调谐逻辑
func (r *MySQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := log.FromContext(ctx)
    
    // 1. 获取 MySQL 实例
    mysql := &databasev1.MySQL{}
    err := r.Get(ctx, req.NamespacedName, mysql)
    if err != nil {
        if errors.IsNotFound(err) {
            // 资源已删除
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }
    
    // 2. 调谐 Service
    if err := r.reconcileService(ctx, mysql); err != nil {
        log.Error(err, "Failed to reconcile Service")
        return ctrl.Result{}, err
    }
    
    // 3. 调谐 StatefulSet
    if err := r.reconcileStatefulSet(ctx, mysql); err != nil {
        log.Error(err, "Failed to reconcile StatefulSet")
        return ctrl.Result{}, err
    }
    
    // 4. 更新状态
    if err := r.updateStatus(ctx, mysql); err != nil {
        log.Error(err, "Failed to update status")
        return ctrl.Result{}, err
    }
    
    return ctrl.Result{}, nil
}

func (r *MySQLReconciler) reconcileService(ctx context.Context, mysql *databasev1.MySQL) error {
    // 期望的 Service
    desired := r.desiredService(mysql)
    
    // 查找现有 Service
    existing := &corev1.Service{}
    err := r.Get(ctx, client.ObjectKeyFromObject(desired), existing)
    
    if err != nil && errors.IsNotFound(err) {
        // Service 不存在,创建
        if err := controllerutil.SetControllerReference(mysql, desired, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, desired)
    } else if err != nil {
        return err
    }
    
    // Service 存在,更新(如果需要)
    if !serviceEqual(desired, existing) {
        existing.Spec = desired.Spec
        return r.Update(ctx, existing)
    }
    
    return nil
}

func (r *MySQLReconciler) reconcileStatefulSet(ctx context.Context, mysql *databasev1.MySQL) error {
    // 期望的 StatefulSet
    desired := r.desiredStatefulSet(mysql)
    
    // 查找现有 StatefulSet
    existing := &appsv1.StatefulSet{}
    err := r.Get(ctx, client.ObjectKeyFromObject(desired), existing)
    
    if err != nil && errors.IsNotFound(err) {
        // StatefulSet 不存在,创建
        if err := controllerutil.SetControllerReference(mysql, desired, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, desired)
    } else if err != nil {
        return err
    }
    
    // StatefulSet 存在,更新
    if *existing.Spec.Replicas != mysql.Spec.Replicas {
        existing.Spec.Replicas = &mysql.Spec.Replicas
        return r.Update(ctx, existing)
    }
    
    return nil
}

func (r *MySQLReconciler) desiredService(mysql *databasev1.MySQL) *corev1.Service {
    return &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      mysql.Name,
            Namespace: mysql.Namespace,
            Labels:    labelsForMySQL(mysql),
        },
        Spec: corev1.ServiceSpec{
            ClusterIP: "None",  // Headless Service
            Selector:  labelsForMySQL(mysql),
            Ports: []corev1.ServicePort{
                {
                    Name:     "mysql",
                    Port:     3306,
                    Protocol: corev1.ProtocolTCP,
                },
            },
        },
    }
}

func (r *MySQLReconciler) desiredStatefulSet(mysql *databasev1.MySQL) *appsv1.StatefulSet {
    labels := labelsForMySQL(mysql)
    replicas := mysql.Spec.Replicas
    
    return &appsv1.StatefulSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      mysql.Name,
            Namespace: mysql.Namespace,
            Labels:    labels,
        },
        Spec: appsv1.StatefulSetSpec{
            Replicas:    &replicas,
            ServiceName: mysql.Name,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  "mysql",
                            Image: fmt.Sprintf("mysql:%s", mysql.Spec.Version),
                            Ports: []corev1.ContainerPort{
                                {
                                    ContainerPort: 3306,
                                    Name:          "mysql",
                                },
                            },
                            Env: []corev1.EnvVar{
                                {
                                    Name:  "MYSQL_ROOT_PASSWORD",
                                    Value: "changeme",  // 实际应使用 Secret
                                },
                            },
                            Resources: mysql.Spec.Resources,
                            VolumeMounts: []corev1.VolumeMount{
                                {
                                    Name:      "data",
                                    MountPath: "/var/lib/mysql",
                                },
                            },
                        },
                    },
                },
            },
            VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
                {
                    ObjectMeta: metav1.ObjectMeta{
                        Name: "data",
                    },
                    Spec: corev1.PersistentVolumeClaimSpec{
                        AccessModes: []corev1.PersistentVolumeAccessMode{
                            corev1.ReadWriteOnce,
                        },
                        Resources: corev1.ResourceRequirements{
                            Requests: corev1.ResourceList{
                                corev1.ResourceStorage: resource.MustParse(mysql.Spec.StorageSize),
                            },
                        },
                    },
                },
            },
        },
    }
}

func (r *MySQLReconciler) updateStatus(ctx context.Context, mysql *databasev1.MySQL) error {
    // 获取 StatefulSet
    sts := &appsv1.StatefulSet{}
    err := r.Get(ctx, client.ObjectKey{Name: mysql.Name, Namespace: mysql.Namespace}, sts)
    if err != nil {
        return err
    }
    
    // 更新状态
    mysql.Status.ReadyReplicas = sts.Status.ReadyReplicas
    
    if sts.Status.ReadyReplicas == mysql.Spec.Replicas {
        mysql.Status.Phase = "Running"
        mysql.Status.Message = "All replicas are ready"
    } else {
        mysql.Status.Phase = "Pending"
        mysql.Status.Message = fmt.Sprintf("Waiting for replicas: %d/%d", sts.Status.ReadyReplicas, mysql.Spec.Replicas)
    }
    
    return r.Status().Update(ctx, mysql)
}

func labelsForMySQL(mysql *databasev1.MySQL) map[string]string {
    return map[string]string{
        "app":                          "mysql",
        "mysql.database.example.com/cr": mysql.Name,
    }
}

// SetupWithManager 设置控制器
func (r *MySQLReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&databasev1.MySQL{}).
        Owns(&appsv1.StatefulSet{}).
        Owns(&corev1.Service{}).
        Complete(r)
}

主程序

// main.go
package main

import (
    "flag"
    "os"
    
    "k8s.io/apimachinery/pkg/runtime"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/log/zap"
    
    databasev1 "mysql-operator/api/v1"
    "mysql-operator/controllers"
)

var (
    scheme   = runtime.NewScheme()
    setupLog = ctrl.Log.WithName("setup")
)

func init() {
    utilruntime.Must(clientgoscheme.AddToScheme(scheme))
    utilruntime.Must(databasev1.AddToScheme(scheme))
}

func main() {
    var metricsAddr string
    var enableLeaderElection bool
    
    flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
    flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager.")
    flag.Parse()
    
    ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
    
    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:             scheme,
        MetricsBindAddress: metricsAddr,
        Port:               9443,
        LeaderElection:     enableLeaderElection,
        LeaderElectionID:   "mysql-operator.database.example.com",
    })
    if err != nil {
        setupLog.Error(err, "unable to start manager")
        os.Exit(1)
    }
    
    if err = (&controllers.MySQLReconciler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to create controller", "controller", "MySQL")
        os.Exit(1)
    }
    
    setupLog.Info("starting manager")
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
        os.Exit(1)
    }
}

部署 Operator

构建镜像

# Dockerfile
FROM golang:1.20 as builder
WORKDIR /workspace
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go

FROM alpine:3.18
WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532
ENTRYPOINT ["/manager"]
docker build -t mysql-operator:v1.0.0 .
docker push registry.example.com/mysql-operator:v1.0.0

部署 YAML

apiVersion: v1
kind: ServiceAccount
metadata:
  name: mysql-operator
  namespace: mysql-operator-system

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: mysql-operator-role
rules:
- apiGroups: ["database.example.com"]
  resources: ["mysqls"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["database.example.com"]
  resources: ["mysqls/status"]
  verbs: ["get", "update", "patch"]
- apiGroups: ["apps"]
  resources: ["statefulsets"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services", "persistentvolumeclaims"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: mysql-operator-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: mysql-operator-role
subjects:
- kind: ServiceAccount
  name: mysql-operator
  namespace: mysql-operator-system

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql-operator
  namespace: mysql-operator-system
spec:
  replicas: 1
  selector:
    matchLabels:
      control-plane: controller-manager
  template:
    metadata:
      labels:
        control-plane: controller-manager
    spec:
      serviceAccountName: mysql-operator
      containers:
      - name: manager
        image: registry.example.com/mysql-operator:v1.0.0
        args:
        - --leader-elect
        resources:
          limits:
            cpu: 500m
            memory: 128Mi
          requests:
            cpu: 10m
            memory: 64Mi

总结

CRD 和 Operator 是扩展 Kubernetes 的核心机制:

  • CRD:定义自定义资源
  • Operator:自动化管理自定义资源
  • 控制循环:持续调谐期望状态和实际状态
  • 声明式 API:用户只需声明期望状态

核心模式:

  1. 定义 CRD(Schema)
  2. 实现 Controller(调谐逻辑)
  3. 部署 Operator(运行 Controller)
  4. 创建自定义资源实例
  5. Operator 自动管理资源生命周期