API Server 运行机制深度解析

API Server 运行机制深度解析

API Server 概述

kube-apiserver 是 Kubernetes 控制平面的核心组件,作为集群的入口,处理所有 REST API 请求。

核心职责

  1. API 接口服务:提供 RESTful API
  2. 认证与授权:验证用户身份和权限
  3. 准入控制:请求验证和修改
  4. 数据持久化:与 etcd 交互
  5. 集群状态管理:维护资源对象状态
  6. Watch 机制:支持资源变更通知

架构图

┌─────────────────────────────────────────────────────┐
│                    API Server                       │
├─────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌───────────┐ │
│  │ HTTP Handler │  │  API Routes  │  │  Swagger  │ │
│  └──────┬───────┘  └──────┬───────┘  └─────┬─────┘ │
│         │                  │                 │       │
│  ┌──────▼──────────────────▼─────────────────▼────┐ │
│  │            Authentication Chain                  │ │
│  │  X509 | Token | OIDC | Webhook | Bootstrap     │ │
│  └──────────────────────┬──────────────────────────┘ │
│  ┌──────────────────────▼──────────────────────────┐ │
│  │            Authorization Chain                   │ │
│  │  RBAC | ABAC | Webhook | Node                  │ │
│  └──────────────────────┬──────────────────────────┘ │
│  ┌──────────────────────▼──────────────────────────┐ │
│  │         Admission Control Chain                  │ │
│  │  Mutating Webhooks → Validating Webhooks        │ │
│  └──────────────────────┬──────────────────────────┘ │
│  ┌──────────────────────▼──────────────────────────┐ │
│  │            Resource Registry                     │ │
│  │  Pod | Deployment | Service | ...               │ │
│  └──────────────────────┬──────────────────────────┘ │
│  ┌──────────────────────▼──────────────────────────┐ │
│  │              Storage Backend                     │ │
│  │           etcd v3 (Key-Value Store)             │ │
│  └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘

请求处理流程

完整请求链路

Client Request
    ↓
HTTP Handler (负载均衡、TLS 终止)
    ↓
Authentication (身份认证)
    ↓
Authorization (授权检查)
    ↓
Admission Control (准入控制)
    ├─ Mutating Admission (修改请求)
    └─ Validating Admission (验证请求)
    ↓
Validation (Schema 验证)
    ↓
etcd (持久化存储)
    ↓
Response (返回结果)
    ↓
Watch Notification (通知订阅者)

源码级分析

1. HTTP 请求处理

// k8s.io/apiserver/pkg/server/handler.go

// APIServerHandler 处理所有 API 请求
type APIServerHandler struct {
    FullHandlerChain   http.Handler
    GoRestfulContainer *restful.Container
    NonGoRestfulMux    *mux.PathRecorderMux
}

func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 1. 首先尝试 FullHandlerChain (包含认证、授权等)
    a.FullHandlerChain.ServeHTTP(w, r)
}

// 构建处理链
func DefaultBuildHandlerChain(handler http.Handler, c *Config) http.Handler {
    handler = filterlatency.TrackCompleted(handler)
    handler = genericfilters.WithAuthorization(handler, c.Authorization.Authorizer)
    handler = filterlatency.TrackStarted(handler, "authorization")
    handler = genericfilters.WithAudit(handler, c.AuditBackend)
    handler = filterlatency.TrackStarted(handler, "audit")
    handler = genericfilters.WithAuthentication(handler, c.Authentication.Authenticator)
    handler = filterlatency.TrackStarted(handler, "authentication")
    handler = genericfilters.WithRequestInfo(handler, c.RequestInfoResolver)
    return handler
}

2. 认证处理

// k8s.io/apiserver/pkg/authentication/request/union/union.go

type unionAuthRequestHandler struct {
    Handlers []authenticator.Request
    FailOnError bool
}

func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
    var errlist []error
    for _, currAuthRequestHandler := range authHandler.Handlers {
        // 依次尝试每个认证器
        resp, ok, err := currAuthRequestHandler.AuthenticateRequest(req)
        if err != nil {
            if authHandler.FailOnError {
                return nil, false, err
            }
            errlist = append(errlist, err)
            continue
        }
        if ok {
            // 认证成功
            return resp, ok, err
        }
    }
    return nil, false, utilerrors.NewAggregate(errlist)
}

3. 授权处理

// k8s.io/apiserver/pkg/authorization/authorizer/interfaces.go

type Authorizer interface {
    Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error)
}

// RBAC 授权器实现
func (r *RBACAuthorizer) Authorize(ctx context.Context, requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) {
    // 1. 获取用户信息
    user := requestAttributes.GetUser()
    
    // 2. 列出用户的所有 RoleBinding 和 ClusterRoleBinding
    roleBindings, err := r.getRoleBindings(user)
    
    // 3. 检查是否有匹配的规则
    for _, binding := range roleBindings {
        for _, rule := range binding.Rules {
            if r.ruleMatches(requestAttributes, rule) {
                return authorizer.DecisionAllow, "", nil
            }
        }
    }
    
    return authorizer.DecisionNoOpinion, "no matching rule found", nil
}

4. 准入控制

// k8s.io/apiserver/pkg/admission/chain.go

type chainAdmissionHandler []admission.Interface

func (c chainAdmissionHandler) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
    for _, handler := range c {
        if !handler.Handles(a.GetOperation()) {
            continue
        }
        if err := handler.Admit(ctx, a, o); err != nil {
            return err
        }
    }
    return nil
}

// Mutating Webhook 示例
type MutatingWebhook struct {
    webhook *webhookutil.WebhookAccessor
}

func (a *MutatingWebhook) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
    // 调用外部 Webhook
    response, err := a.webhook.Call(ctx, attr.GetObject())
    if err != nil {
        return err
    }
    
    // 应用 patch
    if len(response.Patch) > 0 {
        patchedObj, err := applyPatch(attr.GetObject(), response.Patch)
        if err != nil {
            return err
        }
        attr.SetObject(patchedObj)
    }
    
    return nil
}

Watch 机制深度解析

Watch 架构

┌──────────────┐
│    Client    │
│  (kubectl)   │
└──────┬───────┘
       │ HTTP GET /api/v1/pods?watch=true
       ▼
┌──────────────────────────────────┐
│         API Server               │
│  ┌────────────────────────────┐  │
│  │  Watch Cache (in-memory)   │  │
│  │  - 缓存最近的资源状态        │  │
│  │  - 减少 etcd 负载           │  │
│  └────────┬───────────────────┘  │
│           │                       │
│  ┌────────▼───────────────────┐  │
│  │  Cacher (核心组件)          │  │
│  │  - 管理 Watch 订阅          │  │
│  │  - 事件分发                 │  │
│  └────────┬───────────────────┘  │
└───────────┼───────────────────────┘
            │
            ▼ etcd watch
┌─────────────────────┐
│        etcd         │
│  - 持久化存储        │
│  - Watch 通知        │
└─────────────────────┘

源码实现

// k8s.io/apiserver/pkg/storage/cacher/cacher.go

type Cacher struct {
    // incoming events 通道
    incoming chan watchCacheEvent
    
    // 所有 Watch 订阅者
    watchers  *watchersMap
    
    // 内存缓存
    watchCache *watchCache
}

// 处理 Watch 请求
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
    // 1. 创建新的 watcher
    watcher := newCacheWatcher(...)
    
    // 2. 注册到 watchers map
    c.watchers.addWatcher(watcher, ...)
    
    // 3. 发送初始事件
    go watcher.processInterval(ctx, c.watchCache, initEvents)
    
    return watcher, nil
}

// 事件分发循环
func (c *Cacher) dispatchEvents() {
    for {
        select {
        case event := <-c.incoming:
            // 将事件分发给所有匹配的 watchers
            c.watchers.notify(event)
        }
    }
}

Watch 事件类型

type EventType string

const (
    Added    EventType = "ADDED"      // 资源创建
    Modified EventType = "MODIFIED"   // 资源更新
    Deleted  EventType = "DELETED"    // 资源删除
    Error    EventType = "ERROR"      // 错误事件
)

// Watch 事件结构
type WatchEvent struct {
    Type   EventType
    Object runtime.Object  // 资源对象
}

存储层交互

etcd 客户端封装

// k8s.io/apiserver/pkg/storage/etcd3/store.go

type store struct {
    client *clientv3.Client
    codec  runtime.Codec
    prefix string  // etcd key 前缀,如 /registry/pods/default
}

// 创建资源
func (s *store) Create(ctx context.Context, key string, obj runtime.Object, ttl uint64) error {
    // 1. 序列化对象
    data, err := runtime.Encode(s.codec, obj)
    if err != nil {
        return err
    }
    
    // 2. 生成 etcd key
    etcdKey := path.Join(s.prefix, key)
    
    // 3. 创建事务:key 不存在时才创建
    txn := s.client.Txn(ctx)
    txn.If(clientv3.Compare(clientv3.Version(etcdKey), "=", 0))
    txn.Then(clientv3.OpPut(etcdKey, string(data)))
    
    // 4. 执行事务
    resp, err := txn.Commit()
    if err != nil {
        return err
    }
    if !resp.Succeeded {
        return storage.NewKeyExistsError(key, 0)
    }
    
    return nil
}

// 更新资源 (乐观锁)
func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
    // 1. 获取当前版本
    getCurrentObj := func() (runtime.Object, uint64, error) {
        getResp, err := s.client.Get(ctx, key)
        if err != nil {
            return nil, 0, err
        }
        
        obj := s.newFunc()
        if err := runtime.DecodeInto(s.codec, getResp.Kvs[0].Value, obj); err != nil {
            return nil, 0, err
        }
        
        return obj, uint64(getResp.Kvs[0].ModRevision), nil
    }
    
    // 2. 循环尝试更新
    for {
        currentObj, resourceVersion, err := getCurrentObj()
        if err != nil {
            return err
        }
        
        // 3. 执行用户定义的更新逻辑
        updatedObj, _, err := tryUpdate(currentObj, storage.ResponseMeta{})
        if err != nil {
            return err
        }
        
        // 4. 序列化
        data, err := runtime.Encode(s.codec, updatedObj)
        if err != nil {
            return err
        }
        
        // 5. CAS 更新 (Compare-And-Swap)
        txn := s.client.Txn(ctx)
        txn.If(clientv3.Compare(clientv3.ModRevision(key), "=", int64(resourceVersion)))
        txn.Then(clientv3.OpPut(key, string(data)))
        
        resp, err := txn.Commit()
        if err != nil {
            return err
        }
        
        if resp.Succeeded {
            // 更新成功
            return nil
        }
        
        // 版本冲突,重试
    }
}

etcd 数据组织

/registry
├── pods
│   ├── default
│   │   ├── nginx-7c6c8f9b5-abcde
│   │   └── redis-6d4f7b8c9-xyzwv
│   └── kube-system
│       ├── coredns-5c8d5f9b5-12345
│       └── kube-proxy-6d8f7b9c8-67890
├── deployments
│   └── default
│       └── nginx
├── services
│   └── default
│       └── nginx-service
└── configmaps
    └── default
        └── app-config

API 聚合机制

APIService 资源

apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
  name: v1.metrics.k8s.io
spec:
  service:
    name: metrics-server
    namespace: kube-system
  group: metrics.k8s.io
  version: v1
  insecureSkipTLSVerify: true
  groupPriorityMinimum: 100
  versionPriority: 100

聚合层架构

kubectl get pods --all-namespaces
         │
         ▼
┌────────────────────────┐
│   kube-apiserver       │
│  ┌──────────────────┐  │
│  │ Aggregation Layer│  │
│  └────────┬─────────┘  │
└───────────┼────────────┘
            │
    ┌───────┴───────┐
    │               │
    ▼               ▼
┌─────────┐   ┌─────────────┐
│ Built-in│   │  Extension  │
│   APIs  │   │  API Server │
│         │   │ (metrics)   │
└─────────┘   └─────────────┘

请求路由

// k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

func (s *APIAggregator) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    // 1. 解析请求路径
    apiService, err := s.getAPIService(req.URL.Path)
    if err != nil {
        s.localDelegate.ServeHTTP(w, req)
        return
    }
    
    // 2. 检查 APIService 状态
    if !s.isAvailable(apiService) {
        http.Error(w, "APIService unavailable", http.StatusServiceUnavailable)
        return
    }
    
    // 3. 代理到后端服务
    proxy := s.proxyHandlers[apiService.Name]
    proxy.ServeHTTP(w, req)
}

性能优化

1. Watch Cache

// 配置 watch cache 大小
--watch-cache-sizes=pods#1000,nodes#100,services#500

// 监控 watch cache
kubectl top apiserver --watch-cache-hits

2. 请求限流

// 配置 API 优先级和公平性
apiVersion: flowcontrol.apiserver.k8s.io/v1beta2
kind: FlowSchema
metadata:
  name: system-leader-election
spec:
  priorityLevelConfiguration:
    name: leader-election
  matchingPrecedence: 200
  rules:
  - subjects:
    - kind: User
      user:
        name: system:kube-controller-manager
    - kind: User
      user:
        name: system:kube-scheduler
    resourceRules:
    - verbs: ["get", "create", "update"]
      apiGroups: [""]
      resources: ["endpoints", "configmaps"]
      namespaces: ["kube-system"]

3. 连接池配置

# API Server 配置
--max-requests-inflight=400          # 最大并发非 mutating 请求
--max-mutating-requests-inflight=200 # 最大并发 mutating 请求
--min-request-timeout=1800           # 最小请求超时时间(秒)

高可用部署

架构

        Load Balancer (VIP)
        192.168.1.100:6443
               │
    ┌──────────┼──────────┐
    │          │          │
    ▼          ▼          ▼
┌────────┐ ┌────────┐ ┌────────┐
│API Srv1│ │API Srv2│ │API Srv3│
└───┬────┘ └───┬────┘ └───┬────┘
    │          │          │
    └──────────┼──────────┘
               │
               ▼
          etcd Cluster
       (3 或 5 节点)

部署配置

# kube-apiserver.yaml
apiVersion: v1
kind: Pod
metadata:
  name: kube-apiserver
  namespace: kube-system
spec:
  containers:
  - name: kube-apiserver
    image: k8s.gcr.io/kube-apiserver:v1.28.0
    command:
    - kube-apiserver
    - --advertise-address=192.168.1.101
    - --etcd-servers=https://192.168.1.201:2379,https://192.168.1.202:2379,https://192.168.1.203:2379
    - --etcd-cafile=/etc/kubernetes/pki/etcd/ca.crt
    - --etcd-certfile=/etc/kubernetes/pki/apiserver-etcd-client.crt
    - --etcd-keyfile=/etc/kubernetes/pki/apiserver-etcd-client.key
    - --apiserver-count=3  # API Server 数量
    - --endpoint-reconciler-type=lease  # 端点协调类型
    - --enable-aggregator-routing=true
    - --audit-log-path=/var/log/kubernetes/audit.log
    - --audit-log-maxage=30
    - --audit-log-maxbackup=10
    - --audit-log-maxsize=100
    livenessProbe:
      httpGet:
        path: /livez
        port: 6443
        scheme: HTTPS
      initialDelaySeconds: 15
      timeoutSeconds: 15
    readinessProbe:
      httpGet:
        path: /readyz
        port: 6443
        scheme: HTTPS

监控指标

关键指标

# API Server 请求延迟
histogram_quantile(0.99, 
  sum(rate(apiserver_request_duration_seconds_bucket[5m])) by (verb, le)
)

# API Server 请求速率
sum(rate(apiserver_request_total[5m])) by (verb, code)

# Watch 缓存命中率
sum(rate(apiserver_watch_cache_events_dispatched_total[5m])) /
sum(rate(apiserver_watch_events_total[5m]))

# etcd 请求延迟
histogram_quantile(0.99,
  sum(rate(etcd_request_duration_seconds_bucket[5m])) by (operation, le)
)

Grafana Dashboard

{
  "dashboard": {
    "title": "API Server Metrics",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "sum(rate(apiserver_request_total[5m])) by (verb)"
          }
        ]
      },
      {
        "title": "Request Latency P99",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, sum(rate(apiserver_request_duration_seconds_bucket[5m])) by (verb, le))"
          }
        ]
      }
    ]
  }
}

故障排查

常见问题

1. API Server 不可用

# 检查进程
ps aux | grep kube-apiserver

# 检查日志
journalctl -u kube-apiserver -f

# 检查端口
netstat -tlnp | grep 6443

# 检查证书
openssl s_client -connect localhost:6443 -cert /etc/kubernetes/pki/apiserver.crt -key /etc/kubernetes/pki/apiserver.key

2. etcd 连接问题

# 测试 etcd 连接
ETCDCTL_API=3 etcdctl \
  --endpoints=https://127.0.0.1:2379 \
  --cacert=/etc/kubernetes/pki/etcd/ca.crt \
  --cert=/etc/kubernetes/pki/etcd/server.crt \
  --key=/etc/kubernetes/pki/etcd/server.key \
  endpoint health

# 查看 API Server 到 etcd 的延迟
curl -k https://localhost:6443/metrics | grep etcd_request_duration

3. Watch 性能问题

# 检查 watch cache 统计
kubectl get --raw /metrics | grep watch_cache

# 查看活跃的 watch 连接数
kubectl get --raw /metrics | grep apiserver_registered_watchers

最佳实践

1. 生产环境配置

# 推荐的 API Server 参数
--max-requests-inflight=800
--max-mutating-requests-inflight=400
--target-ram-mb=8192
--event-ttl=24h
--service-node-port-range=30000-32767
--enable-admission-plugins=NodeRestriction,PodSecurityPolicy
--audit-policy-file=/etc/kubernetes/audit-policy.yaml
--encryption-provider-config=/etc/kubernetes/encryption-config.yaml

2. 审计配置

# audit-policy.yaml
apiVersion: audit.k8s.io/v1
kind: Policy
rules:
- level: Metadata
  resources:
  - group: ""
    resources: ["secrets", "configmaps"]
- level: Request
  users: ["system:admin"]
  verbs: ["delete", "create", "update"]
- level: RequestResponse
  resources:
  - group: ""
    resources: ["pods"]
  verbs: ["create", "update", "patch", "delete"]

3. 加密配置

# encryption-config.yaml
apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
- resources:
  - secrets
  providers:
  - aescbc:
      keys:
      - name: key1
        secret: <base64-encoded-32-byte-key>
  - identity: {}

总结

API Server 是 Kubernetes 的大脑,理解其运行机制对于:

  • 架构设计:合理设计 API 和资源模型
  • 性能优化:识别瓶颈并优化
  • 故障排查:快速定位问题
  • 扩展开发:开发自定义控制器和 Webhook

核心要点:

  1. 请求处理链:认证 → 授权 → 准入控制
  2. Watch 机制:高效的资源变更通知
  3. etcd 交互:乐观锁保证一致性
  4. 高可用:多实例 + 负载均衡