CSI 存储插件深度解析

CSI 存储插件深度解析

CSI 概述

CSI (Container Storage Interface) 是 Kubernetes 用于存储插件的标准接口规范。

CSI 架构

┌────────────────────────────────────────────────┐
│              Kubernetes Master                 │
│  ┌──────────────────────────────────────────┐  │
│  │   kube-controller-manager                │  │
│  │   - VolumeAttach Controller              │  │
│  │   - PV Controller                        │  │
│  └────────────┬─────────────────────────────┘  │
└───────────────┼────────────────────────────────┘
                │ gRPC
                ▼
┌────────────────────────────────────────────────┐
│         CSI External Components                │
│  ┌──────────────────────────────────────────┐  │
│  │  External Provisioner (创建/删除卷)      │  │
│  │  External Attacher (挂载/卸载卷)         │  │
│  │  External Resizer (扩容卷)               │  │
│  │  External Snapshotter (快照)             │  │
│  └────────────┬─────────────────────────────┘  │
└───────────────┼────────────────────────────────┘
                │ gRPC
                ▼
┌────────────────────────────────────────────────┐
│           CSI Driver (插件实现)                │
│  ┌──────────────────────────────────────────┐  │
│  │  Controller Service                      │  │
│  │  - CreateVolume                          │  │
│  │  - DeleteVolume                          │  │
│  │  - ControllerPublishVolume               │  │
│  │  - ControllerUnpublishVolume             │  │
│  └──────────────────────────────────────────┘  │
│  ┌──────────────────────────────────────────┐  │
│  │  Node Service                            │  │
│  │  - NodeStageVolume                       │  │
│  │  - NodeUnstageVolume                     │  │
│  │  - NodePublishVolume                     │  │
│  │  - NodeUnpublishVolume                   │  │
│  └──────────────────────────────────────────┘  │
└───────────────┼────────────────────────────────┘
                │ 存储操作
                ▼
┌────────────────────────────────────────────────┐
│          存储后端 (块存储/文件存储)             │
│  - AWS EBS                                     │
│  - Ceph RBD                                    │
│  - NFS                                         │
│  - 本地磁盘                                     │
└────────────────────────────────────────────────┘

CSI 接口定义

Identity Service

// csi.proto
service Identity {
  // 获取插件信息
  rpc GetPluginInfo(GetPluginInfoRequest)
    returns (GetPluginInfoResponse) {}
  
  // 获取插件能力
  rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
    returns (GetPluginCapabilitiesResponse) {}
  
  // 探测插件健康状态
  rpc Probe(ProbeRequest)
    returns (ProbeResponse) {}
}

message GetPluginInfoResponse {
  string name = 1;          // 插件名称
  string vendor_version = 2; // 版本号
}

message PluginCapability {
  enum Service {
    UNKNOWN = 0;
    CONTROLLER_SERVICE = 1;  // 支持 Controller 服务
    VOLUME_ACCESSIBILITY_CONSTRAINTS = 2;
  }
  Service service = 1;
}

Controller Service

service Controller {
  // 创建卷
  rpc CreateVolume(CreateVolumeRequest)
    returns (CreateVolumeResponse) {}
  
  // 删除卷
  rpc DeleteVolume(DeleteVolumeRequest)
    returns (DeleteVolumeResponse) {}
  
  // 挂载卷到节点
  rpc ControllerPublishVolume(ControllerPublishVolumeRequest)
    returns (ControllerPublishVolumeResponse) {}
  
  // 从节点卸载卷
  rpc ControllerUnpublishVolume(ControllerUnpublishVolumeRequest)
    returns (ControllerUnpublishVolumeResponse) {}
  
  // 扩容卷
  rpc ControllerExpandVolume(ControllerExpandVolumeRequest)
    returns (ControllerExpandVolumeResponse) {}
  
  // 创建快照
  rpc CreateSnapshot(CreateSnapshotRequest)
    returns (CreateSnapshotResponse) {}
  
  // 删除快照
  rpc DeleteSnapshot(DeleteSnapshotRequest)
    returns (DeleteSnapshotResponse) {}
}

message CreateVolumeRequest {
  string name = 1;                    // 卷名称
  VolumeCapability capabilities = 2;  // 卷能力
  map<string, string> parameters = 3; // 参数(StorageClass)
  map<string, string> secrets = 4;    // 凭证
  CapacityRange capacity_range = 5;   // 容量范围
}

message CreateVolumeResponse {
  Volume volume = 1;
}

message Volume {
  string volume_id = 1;              // 卷 ID
  int64 capacity_bytes = 2;          // 容量
  map<string, string> volume_context = 3;
}

Node Service

service Node {
  // Stage:准备卷(格式化、挂载到全局目录)
  rpc NodeStageVolume(NodeStageVolumeRequest)
    returns (NodeStageVolumeResponse) {}
  
  // Unstage:清理卷
  rpc NodeUnstageVolume(NodeUnstageVolumeRequest)
    returns (NodeUnstageVolumeResponse) {}
  
  // Publish:挂载卷到 Pod 目录
  rpc NodePublishVolume(NodePublishVolumeRequest)
    returns (NodePublishVolumeResponse) {}
  
  // Unpublish:从 Pod 目录卸载
  rpc NodeUnpublishVolume(NodeUnpublishVolumeRequest)
    returns (NodeUnpublishVolumeResponse) {}
  
  // 扩容文件系统
  rpc NodeExpandVolume(NodeExpandVolumeRequest)
    returns (NodeExpandVolumeResponse) {}
}

message NodeStageVolumeRequest {
  string volume_id = 1;
  map<string, string> publish_context = 2;
  string staging_target_path = 3;  // 全局挂载路径
  VolumeCapability volume_capability = 4;
  map<string, string> secrets = 5;
  map<string, string> volume_context = 6;
}

message NodePublishVolumeRequest {
  string volume_id = 1;
  map<string, string> publish_context = 2;
  string staging_target_path = 3;  // 全局挂载路径
  string target_path = 4;           // Pod 挂载路径
  VolumeCapability volume_capability = 5;
  bool readonly = 6;
  map<string, string> secrets = 7;
  map<string, string> volume_context = 8;
}

CSI 驱动开发

基本框架

package main

import (
    "github.com/container-storage-interface/spec/lib/go/csi"
    "google.golang.org/grpc"
)

// CSI Driver 结构
type Driver struct {
    name    string
    version string
    nodeID  string
    
    // CSI 服务实现
    ids *IdentityServer
    cs  *ControllerServer
    ns  *NodeServer
}

func NewDriver(name, version, nodeID string) *Driver {
    driver := &Driver{
        name:    name,
        version: version,
        nodeID:  nodeID,
    }
    
    driver.ids = NewIdentityServer(driver)
    driver.cs = NewControllerServer(driver)
    driver.ns = NewNodeServer(driver)
    
    return driver
}

func (d *Driver) Run(endpoint string) error {
    // 创建 gRPC 服务器
    server := grpc.NewServer()
    
    // 注册服务
    csi.RegisterIdentityServer(server, d.ids)
    csi.RegisterControllerServer(server, d.cs)
    csi.RegisterNodeServer(server, d.ns)
    
    // 监听 Unix Socket
    listener, err := net.Listen("unix", endpoint)
    if err != nil {
        return err
    }
    
    return server.Serve(listener)
}

Identity Server 实现

type IdentityServer struct {
    driver *Driver
}

func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
    return &csi.GetPluginInfoResponse{
        Name:          ids.driver.name,
        VendorVersion: ids.driver.version,
    }, nil
}

func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
    return &csi.GetPluginCapabilitiesResponse{
        Capabilities: []*csi.PluginCapability{
            {
                Type: &csi.PluginCapability_Service_{
                    Service: &csi.PluginCapability_Service{
                        Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
                    },
                },
            },
            {
                Type: &csi.PluginCapability_Service_{
                    Service: &csi.PluginCapability_Service{
                        Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
                    },
                },
            },
        },
    }, nil
}

func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
    return &csi.ProbeResponse{
        Ready: &wrappers.BoolValue{Value: true},
    }, nil
}

Controller Server 实现

type ControllerServer struct {
    driver *Driver
    caps   []*csi.ControllerServiceCapability
}

func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
    // 1. 验证请求
    if req.Name == "" {
        return nil, status.Error(codes.InvalidArgument, "Volume name missing")
    }
    
    // 2. 获取参数
    parameters := req.Parameters
    capacity := req.CapacityRange.GetRequiredBytes()
    
    // 3. 调用存储后端创建卷
    volumeID, err := cs.createVolumeOnBackend(req.Name, capacity, parameters)
    if err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    // 4. 返回结果
    return &csi.CreateVolumeResponse{
        Volume: &csi.Volume{
            VolumeId:      volumeID,
            CapacityBytes: capacity,
            VolumeContext: parameters,
        },
    }, nil
}

func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
    // 1. 验证请求
    volumeID := req.VolumeId
    if volumeID == "" {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing")
    }
    
    // 2. 调用存储后端删除卷
    if err := cs.deleteVolumeOnBackend(volumeID); err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    return &csi.DeleteVolumeResponse{}, nil
}

func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
    // 1. 验证请求
    volumeID := req.VolumeId
    nodeID := req.NodeId
    
    // 2. 将卷挂载到节点(块存储场景)
    devicePath, err := cs.attachVolumeToNode(volumeID, nodeID)
    if err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    // 3. 返回挂载上下文
    return &csi.ControllerPublishVolumeResponse{
        PublishContext: map[string]string{
            "devicePath": devicePath,
        },
    }, nil
}

Node Server 实现

type NodeServer struct {
    driver *Driver
    mounter mount.Interface
}

func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
    // 1. 验证请求
    volumeID := req.VolumeId
    stagingPath := req.StagingTargetPath
    
    // 2. 从 publish context 获取设备路径
    devicePath := req.PublishContext["devicePath"]
    
    // 3. 格式化设备(如果需要)
    if !ns.isFormatted(devicePath) {
        fsType := req.VolumeCapability.GetMount().FsType
        if fsType == "" {
            fsType = "ext4"
        }
        
        if err := ns.formatDevice(devicePath, fsType); err != nil {
            return nil, status.Error(codes.Internal, err.Error())
        }
    }
    
    // 4. 挂载到全局目录
    if err := ns.mounter.Mount(devicePath, stagingPath, fsType, []string{}); err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    return &csi.NodeStageVolumeResponse{}, nil
}

func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
    // 1. 验证请求
    stagingPath := req.StagingTargetPath
    targetPath := req.TargetPath
    
    // 2. 创建目标目录
    if err := os.MkdirAll(targetPath, 0750); err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    // 3. Bind Mount 到 Pod 目录
    options := []string{"bind"}
    if req.Readonly {
        options = append(options, "ro")
    }
    
    if err := ns.mounter.Mount(stagingPath, targetPath, "", options); err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    return &csi.NodePublishVolumeResponse{}, nil
}

func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
    targetPath := req.TargetPath
    
    // 卸载
    if err := ns.mounter.Unmount(targetPath); err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    return &csi.NodeUnpublishVolumeResponse{}, nil
}

func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
    stagingPath := req.StagingTargetPath
    
    // 卸载全局目录
    if err := ns.mounter.Unmount(stagingPath); err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    return &csi.NodeUnstageVolumeResponse{}, nil
}

CSI 部署配置

CSIDriver 对象

apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
  name: csi.example.com
spec:
  # 是否支持挂载
  attachRequired: true
  
  # Pod 信息传递给 CSI Driver
  podInfoOnMount: true
  
  # 卷生命周期模式
  volumeLifecycleModes:
  - Persistent
  - Ephemeral
  
  # 存储容量跟踪
  storageCapacity: true
  
  # FSGroup 策略
  fsGroupPolicy: File

Controller Plugin 部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: csi-controller
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: csi-controller
  template:
    metadata:
      labels:
        app: csi-controller
    spec:
      serviceAccountName: csi-controller-sa
      containers:
      # CSI Driver
      - name: csi-driver
        image: registry.example.com/csi-driver:v1.0.0
        args:
        - --endpoint=unix:///csi/csi.sock
        - --mode=controller
        volumeMounts:
        - name: socket-dir
          mountPath: /csi
      
      # External Provisioner
      - name: csi-provisioner
        image: k8s.gcr.io/sig-storage/csi-provisioner:v3.0.0
        args:
        - --csi-address=/csi/csi.sock
        - --v=5
        volumeMounts:
        - name: socket-dir
          mountPath: /csi
      
      # External Attacher
      - name: csi-attacher
        image: k8s.gcr.io/sig-storage/csi-attacher:v3.3.0
        args:
        - --csi-address=/csi/csi.sock
        - --v=5
        volumeMounts:
        - name: socket-dir
          mountPath: /csi
      
      # External Resizer
      - name: csi-resizer
        image: k8s.gcr.io/sig-storage/csi-resizer:v1.3.0
        args:
        - --csi-address=/csi/csi.sock
        - --v=5
        volumeMounts:
        - name: socket-dir
          mountPath: /csi
      
      volumes:
      - name: socket-dir
        emptyDir: {}

Node Plugin 部署

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: csi-node
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: csi-node
  template:
    metadata:
      labels:
        app: csi-node
    spec:
      serviceAccountName: csi-node-sa
      hostNetwork: true
      containers:
      # CSI Driver
      - name: csi-driver
        image: registry.example.com/csi-driver:v1.0.0
        args:
        - --endpoint=unix:///csi/csi.sock
        - --mode=node
        - --node-id=$(NODE_ID)
        env:
        - name: NODE_ID
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        securityContext:
          privileged: true
        volumeMounts:
        - name: socket-dir
          mountPath: /csi
        - name: pods-mount-dir
          mountPath: /var/lib/kubelet/pods
          mountPropagation: Bidirectional
        - name: device-dir
          mountPath: /dev
      
      # CSI Node Driver Registrar
      - name: node-driver-registrar
        image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.3.0
        args:
        - --csi-address=/csi/csi.sock
        - --kubelet-registration-path=/var/lib/kubelet/plugins/csi.example.com/csi.sock
        volumeMounts:
        - name: socket-dir
          mountPath: /csi
        - name: registration-dir
          mountPath: /registration
      
      volumes:
      - name: socket-dir
        hostPath:
          path: /var/lib/kubelet/plugins/csi.example.com/
          type: DirectoryOrCreate
      - name: registration-dir
        hostPath:
          path: /var/lib/kubelet/plugins_registry/
          type: Directory
      - name: pods-mount-dir
        hostPath:
          path: /var/lib/kubelet/pods
          type: Directory
      - name: device-dir
        hostPath:
          path: /dev
          type: Directory

使用 CSI 驱动

StorageClass 定义

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: csi-sc
provisioner: csi.example.com
parameters:
  type: ssd
  replication: "3"
  fsType: ext4
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true

使用 PVC

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: csi-pvc
spec:
  accessModes:
  - ReadWriteOnce
  storageClassName: csi-sc
  resources:
    requests:
      storage: 10Gi

CSI 故障排查

检查 CSI 组件

# 查看 CSIDriver
kubectl get csidrivers

# 查看 Controller Plugin
kubectl get pods -n kube-system -l app=csi-controller
kubectl logs -n kube-system <csi-controller-pod> -c csi-driver

# 查看 Node Plugin
kubectl get pods -n kube-system -l app=csi-node
kubectl logs -n kube-system <csi-node-pod> -c csi-driver

# 查看 CSI 注册
kubectl get csinodes
kubectl describe csinode <node-name>

常见问题

# 1. PVC 一直 Pending
kubectl describe pvc <pvc-name>
# 检查 StorageClass 是否存在
# 检查 CSI Controller 日志

# 2. Pod 无法挂载卷
kubectl describe pod <pod-name>
# 检查 Node Plugin 日志
# 检查设备是否正确挂载

# 3. 卷扩容失败
kubectl get volumeattachment
kubectl describe volumeattachment <attachment-name>

总结

CSI 是 Kubernetes 存储的标准接口,理解其原理对于:

  • 存储集成:对接各种存储后端
  • 插件开发:开发自定义存储驱动
  • 故障排查:快速定位存储问题
  • 性能优化:优化存储性能

核心要点:

  1. CSI 定义了三个服务:Identity、Controller、Node
  2. 卷生命周期:Create → Attach → Stage → Publish
  3. 部署模式:Controller Plugin (Deployment) + Node Plugin (DaemonSet)
  4. 扩展能力:快照、扩容、克隆