CRD 与 Operator 开发
CRD 与 Operator 开发
CRD 概述
CRD (Custom Resource Definition) 允许用户扩展 Kubernetes API,定义自己的资源类型。
为什么需要 CRD
内置资源 (Pod, Deployment, Service...)
↓
不满足需求
↓
自定义资源 (CRD)
↓
┌────────────────────────────┐
│ 数据库集群 (MySQLCluster) │
│ 消息队列 (KafkaCluster) │
│ 应用配置 (Application) │
│ 自动伸缩策略 (ScalingRule) │
└────────────────────────────┘
创建 CRD
基本 CRD 定义
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mysqls.database.example.com
spec:
# 资源组
group: database.example.com
# 支持的版本
versions:
- name: v1
# 是否启用
served: true
# 是否用于存储
storage: true
# Schema 定义
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
# 副本数
replicas:
type: integer
minimum: 1
maximum: 10
# 版本
version:
type: string
enum: ["5.7", "8.0"]
# 存储大小
storageSize:
type: string
pattern: '^[0-9]+Gi$'
# 资源配置
resources:
type: object
properties:
requests:
type: object
properties:
memory:
type: string
cpu:
type: string
required: ["replicas", "version"]
status:
type: object
properties:
# 当前状态
phase:
type: string
enum: ["Pending", "Running", "Failed"]
# 就绪副本数
readyReplicas:
type: integer
# 消息
message:
type: string
# 打印列
additionalPrinterColumns:
- name: Replicas
type: integer
jsonPath: .spec.replicas
- name: Version
type: string
jsonPath: .spec.version
- name: Status
type: string
jsonPath: .status.phase
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
# Subresources
subresources:
# 支持 /status 子资源
status: {}
# 支持 /scale 子资源
scale:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
# 资源范围
scope: Namespaced
# 资源名称
names:
# CRD 名称
plural: mysqls
# 单数形式
singular: mysql
# Kind 名称
kind: MySQL
# 短名称
shortNames:
- my
应用 CRD
# 创建 CRD
kubectl apply -f mysql-crd.yaml
# 查看 CRD
kubectl get crd
kubectl describe crd mysqls.database.example.com
# 查看 API 资源
kubectl api-resources | grep mysql
创建自定义资源实例
apiVersion: database.example.com/v1
kind: MySQL
metadata:
name: my-database
namespace: default
spec:
replicas: 3
version: "8.0"
storageSize: 10Gi
resources:
requests:
memory: 1Gi
cpu: 500m
# 创建实例
kubectl apply -f mysql-instance.yaml
# 查看实例
kubectl get mysql
kubectl get my # 使用短名称
kubectl describe mysql my-database
Operator 模式
Operator 架构
┌────────────────────────────────────────┐
│ Kubernetes API Server │
└────────────┬───────────────────────────┘
│ Watch
▼
┌────────────────────────────────────────┐
│ Operator Controller │
│ ┌──────────────────────────────────┐ │
│ │ Informer (监听资源变化) │ │
│ │ - List/Watch Custom Resources │ │
│ │ - 缓存资源状态 │ │
│ └────────────┬─────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────────┐ │
│ │ Control Loop (控制循环) │ │
│ │ 1. 获取期望状态 (Spec) │ │
│ │ 2. 获取当前状态 (Status) │ │
│ │ 3. 计算差异 │ │
│ │ 4. 执行调谐操作 │ │
│ │ 5. 更新状态 │ │
│ └────────────┬─────────────────────┘ │
└───────────────┼────────────────────────┘
│ 操作资源
▼
┌────────────────────────────────────────┐
│ Kubernetes Resources │
│ - Pod │
│ - Service │
│ - ConfigMap │
│ - PVC │
└────────────────────────────────────────┘
使用 client-go 开发 Operator
项目结构
mysql-operator/
├── api/
│ └── v1/
│ ├── mysql_types.go # 资源定义
│ └── zz_generated.deepcopy.go
├── config/
│ ├── crd/ # CRD YAML
│ └── rbac/ # RBAC 配置
├── controllers/
│ └── mysql_controller.go # 控制器逻辑
├── main.go
└── go.mod
定义资源类型
// api/v1/mysql_types.go
package v1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// MySQLSpec 定义期望状态
type MySQLSpec struct {
// 副本数
Replicas int32 `json:"replicas"`
// MySQL 版本
Version string `json:"version"`
// 存储大小
StorageSize string `json:"storageSize"`
// 资源配置
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
// 配置选项
Config map[string]string `json:"config,omitempty"`
}
// MySQLStatus 定义当前状态
type MySQLStatus struct {
// 当前阶段
Phase string `json:"phase,omitempty"`
// 就绪副本数
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// 条件
Conditions []metav1.Condition `json:"conditions,omitempty"`
// 消息
Message string `json:"message,omitempty"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas
// +kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Version",type=string,JSONPath=`.spec.version`
// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp`
// MySQL 是自定义资源的 Schema
type MySQL struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MySQLSpec `json:"spec,omitempty"`
Status MySQLStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// MySQLList 包含 MySQL 列表
type MySQLList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MySQL `json:"items"`
}
func init() {
SchemeBuilder.Register(&MySQL{}, &MySQLList{})
}
实现控制器
// controllers/mysql_controller.go
package controllers
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
databasev1 "mysql-operator/api/v1"
)
// MySQLReconciler 调谐自定义资源
type MySQLReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=database.example.com,resources=mysqls,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=database.example.com,resources=mysqls/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// Reconcile 是主要的调谐逻辑
func (r *MySQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
// 1. 获取 MySQL 实例
mysql := &databasev1.MySQL{}
err := r.Get(ctx, req.NamespacedName, mysql)
if err != nil {
if errors.IsNotFound(err) {
// 资源已删除
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// 2. 调谐 Service
if err := r.reconcileService(ctx, mysql); err != nil {
log.Error(err, "Failed to reconcile Service")
return ctrl.Result{}, err
}
// 3. 调谐 StatefulSet
if err := r.reconcileStatefulSet(ctx, mysql); err != nil {
log.Error(err, "Failed to reconcile StatefulSet")
return ctrl.Result{}, err
}
// 4. 更新状态
if err := r.updateStatus(ctx, mysql); err != nil {
log.Error(err, "Failed to update status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *MySQLReconciler) reconcileService(ctx context.Context, mysql *databasev1.MySQL) error {
// 期望的 Service
desired := r.desiredService(mysql)
// 查找现有 Service
existing := &corev1.Service{}
err := r.Get(ctx, client.ObjectKeyFromObject(desired), existing)
if err != nil && errors.IsNotFound(err) {
// Service 不存在,创建
if err := controllerutil.SetControllerReference(mysql, desired, r.Scheme); err != nil {
return err
}
return r.Create(ctx, desired)
} else if err != nil {
return err
}
// Service 存在,更新(如果需要)
if !serviceEqual(desired, existing) {
existing.Spec = desired.Spec
return r.Update(ctx, existing)
}
return nil
}
func (r *MySQLReconciler) reconcileStatefulSet(ctx context.Context, mysql *databasev1.MySQL) error {
// 期望的 StatefulSet
desired := r.desiredStatefulSet(mysql)
// 查找现有 StatefulSet
existing := &appsv1.StatefulSet{}
err := r.Get(ctx, client.ObjectKeyFromObject(desired), existing)
if err != nil && errors.IsNotFound(err) {
// StatefulSet 不存在,创建
if err := controllerutil.SetControllerReference(mysql, desired, r.Scheme); err != nil {
return err
}
return r.Create(ctx, desired)
} else if err != nil {
return err
}
// StatefulSet 存在,更新
if *existing.Spec.Replicas != mysql.Spec.Replicas {
existing.Spec.Replicas = &mysql.Spec.Replicas
return r.Update(ctx, existing)
}
return nil
}
func (r *MySQLReconciler) desiredService(mysql *databasev1.MySQL) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: mysql.Name,
Namespace: mysql.Namespace,
Labels: labelsForMySQL(mysql),
},
Spec: corev1.ServiceSpec{
ClusterIP: "None", // Headless Service
Selector: labelsForMySQL(mysql),
Ports: []corev1.ServicePort{
{
Name: "mysql",
Port: 3306,
Protocol: corev1.ProtocolTCP,
},
},
},
}
}
func (r *MySQLReconciler) desiredStatefulSet(mysql *databasev1.MySQL) *appsv1.StatefulSet {
labels := labelsForMySQL(mysql)
replicas := mysql.Spec.Replicas
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: mysql.Name,
Namespace: mysql.Namespace,
Labels: labels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
ServiceName: mysql.Name,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "mysql",
Image: fmt.Sprintf("mysql:%s", mysql.Spec.Version),
Ports: []corev1.ContainerPort{
{
ContainerPort: 3306,
Name: "mysql",
},
},
Env: []corev1.EnvVar{
{
Name: "MYSQL_ROOT_PASSWORD",
Value: "changeme", // 实际应使用 Secret
},
},
Resources: mysql.Spec.Resources,
VolumeMounts: []corev1.VolumeMount{
{
Name: "data",
MountPath: "/var/lib/mysql",
},
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "data",
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(mysql.Spec.StorageSize),
},
},
},
},
},
},
}
}
func (r *MySQLReconciler) updateStatus(ctx context.Context, mysql *databasev1.MySQL) error {
// 获取 StatefulSet
sts := &appsv1.StatefulSet{}
err := r.Get(ctx, client.ObjectKey{Name: mysql.Name, Namespace: mysql.Namespace}, sts)
if err != nil {
return err
}
// 更新状态
mysql.Status.ReadyReplicas = sts.Status.ReadyReplicas
if sts.Status.ReadyReplicas == mysql.Spec.Replicas {
mysql.Status.Phase = "Running"
mysql.Status.Message = "All replicas are ready"
} else {
mysql.Status.Phase = "Pending"
mysql.Status.Message = fmt.Sprintf("Waiting for replicas: %d/%d", sts.Status.ReadyReplicas, mysql.Spec.Replicas)
}
return r.Status().Update(ctx, mysql)
}
func labelsForMySQL(mysql *databasev1.MySQL) map[string]string {
return map[string]string{
"app": "mysql",
"mysql.database.example.com/cr": mysql.Name,
}
}
// SetupWithManager 设置控制器
func (r *MySQLReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&databasev1.MySQL{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Complete(r)
}
主程序
// main.go
package main
import (
"flag"
"os"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
databasev1 "mysql-operator/api/v1"
"mysql-operator/controllers"
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(databasev1.AddToScheme(scheme))
}
func main() {
var metricsAddr string
var enableLeaderElection bool
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager.")
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
LeaderElection: enableLeaderElection,
LeaderElectionID: "mysql-operator.database.example.com",
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
if err = (&controllers.MySQLReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MySQL")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
部署 Operator
构建镜像
# Dockerfile
FROM golang:1.20 as builder
WORKDIR /workspace
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go
FROM alpine:3.18
WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532
ENTRYPOINT ["/manager"]
docker build -t mysql-operator:v1.0.0 .
docker push registry.example.com/mysql-operator:v1.0.0
部署 YAML
apiVersion: v1
kind: ServiceAccount
metadata:
name: mysql-operator
namespace: mysql-operator-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: mysql-operator-role
rules:
- apiGroups: ["database.example.com"]
resources: ["mysqls"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["database.example.com"]
resources: ["mysqls/status"]
verbs: ["get", "update", "patch"]
- apiGroups: ["apps"]
resources: ["statefulsets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services", "persistentvolumeclaims"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: mysql-operator-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: mysql-operator-role
subjects:
- kind: ServiceAccount
name: mysql-operator
namespace: mysql-operator-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql-operator
namespace: mysql-operator-system
spec:
replicas: 1
selector:
matchLabels:
control-plane: controller-manager
template:
metadata:
labels:
control-plane: controller-manager
spec:
serviceAccountName: mysql-operator
containers:
- name: manager
image: registry.example.com/mysql-operator:v1.0.0
args:
- --leader-elect
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
总结
CRD 和 Operator 是扩展 Kubernetes 的核心机制:
- CRD:定义自定义资源
- Operator:自动化管理自定义资源
- 控制循环:持续调谐期望状态和实际状态
- 声明式 API:用户只需声明期望状态
核心模式:
- 定义 CRD(Schema)
- 实现 Controller(调谐逻辑)
- 部署 Operator(运行 Controller)
- 创建自定义资源实例
- Operator 自动管理资源生命周期