API Server 运行机制深度解析
API Server 运行机制深度解析
API Server 概述
kube-apiserver 是 Kubernetes 控制平面的核心组件,作为集群的入口,处理所有 REST API 请求。
核心职责
- API 接口服务:提供 RESTful API
- 认证与授权:验证用户身份和权限
- 准入控制:请求验证和修改
- 数据持久化:与 etcd 交互
- 集群状态管理:维护资源对象状态
- Watch 机制:支持资源变更通知
架构图
┌─────────────────────────────────────────────────────┐
│ API Server │
├─────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌───────────┐ │
│ │ HTTP Handler │ │ API Routes │ │ Swagger │ │
│ └──────┬───────┘ └──────┬───────┘ └─────┬─────┘ │
│ │ │ │ │
│ ┌──────▼──────────────────▼─────────────────▼────┐ │
│ │ Authentication Chain │ │
│ │ X509 | Token | OIDC | Webhook | Bootstrap │ │
│ └──────────────────────┬──────────────────────────┘ │
│ ┌──────────────────────▼──────────────────────────┐ │
│ │ Authorization Chain │ │
│ │ RBAC | ABAC | Webhook | Node │ │
│ └──────────────────────┬──────────────────────────┘ │
│ ┌──────────────────────▼──────────────────────────┐ │
│ │ Admission Control Chain │ │
│ │ Mutating Webhooks → Validating Webhooks │ │
│ └──────────────────────┬──────────────────────────┘ │
│ ┌──────────────────────▼──────────────────────────┐ │
│ │ Resource Registry │ │
│ │ Pod | Deployment | Service | ... │ │
│ └──────────────────────┬──────────────────────────┘ │
│ ┌──────────────────────▼──────────────────────────┐ │
│ │ Storage Backend │ │
│ │ etcd v3 (Key-Value Store) │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
请求处理流程
完整请求链路
Client Request
↓
HTTP Handler (负载均衡、TLS 终止)
↓
Authentication (身份认证)
↓
Authorization (授权检查)
↓
Admission Control (准入控制)
├─ Mutating Admission (修改请求)
└─ Validating Admission (验证请求)
↓
Validation (Schema 验证)
↓
etcd (持久化存储)
↓
Response (返回结果)
↓
Watch Notification (通知订阅者)
源码级分析
1. HTTP 请求处理
// k8s.io/apiserver/pkg/server/handler.go
// APIServerHandler 处理所有 API 请求
type APIServerHandler struct {
FullHandlerChain http.Handler
GoRestfulContainer *restful.Container
NonGoRestfulMux *mux.PathRecorderMux
}
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 1. 首先尝试 FullHandlerChain (包含认证、授权等)
a.FullHandlerChain.ServeHTTP(w, r)
}
// 构建处理链
func DefaultBuildHandlerChain(handler http.Handler, c *Config) http.Handler {
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithAuthorization(handler, c.Authorization.Authorizer)
handler = filterlatency.TrackStarted(handler, "authorization")
handler = genericfilters.WithAudit(handler, c.AuditBackend)
handler = filterlatency.TrackStarted(handler, "audit")
handler = genericfilters.WithAuthentication(handler, c.Authentication.Authenticator)
handler = filterlatency.TrackStarted(handler, "authentication")
handler = genericfilters.WithRequestInfo(handler, c.RequestInfoResolver)
return handler
}
2. 认证处理
// k8s.io/apiserver/pkg/authentication/request/union/union.go
type unionAuthRequestHandler struct {
Handlers []authenticator.Request
FailOnError bool
}
func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
var errlist []error
for _, currAuthRequestHandler := range authHandler.Handlers {
// 依次尝试每个认证器
resp, ok, err := currAuthRequestHandler.AuthenticateRequest(req)
if err != nil {
if authHandler.FailOnError {
return nil, false, err
}
errlist = append(errlist, err)
continue
}
if ok {
// 认证成功
return resp, ok, err
}
}
return nil, false, utilerrors.NewAggregate(errlist)
}
3. 授权处理
// k8s.io/apiserver/pkg/authorization/authorizer/interfaces.go
type Authorizer interface {
Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error)
}
// RBAC 授权器实现
func (r *RBACAuthorizer) Authorize(ctx context.Context, requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) {
// 1. 获取用户信息
user := requestAttributes.GetUser()
// 2. 列出用户的所有 RoleBinding 和 ClusterRoleBinding
roleBindings, err := r.getRoleBindings(user)
// 3. 检查是否有匹配的规则
for _, binding := range roleBindings {
for _, rule := range binding.Rules {
if r.ruleMatches(requestAttributes, rule) {
return authorizer.DecisionAllow, "", nil
}
}
}
return authorizer.DecisionNoOpinion, "no matching rule found", nil
}
4. 准入控制
// k8s.io/apiserver/pkg/admission/chain.go
type chainAdmissionHandler []admission.Interface
func (c chainAdmissionHandler) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
for _, handler := range c {
if !handler.Handles(a.GetOperation()) {
continue
}
if err := handler.Admit(ctx, a, o); err != nil {
return err
}
}
return nil
}
// Mutating Webhook 示例
type MutatingWebhook struct {
webhook *webhookutil.WebhookAccessor
}
func (a *MutatingWebhook) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
// 调用外部 Webhook
response, err := a.webhook.Call(ctx, attr.GetObject())
if err != nil {
return err
}
// 应用 patch
if len(response.Patch) > 0 {
patchedObj, err := applyPatch(attr.GetObject(), response.Patch)
if err != nil {
return err
}
attr.SetObject(patchedObj)
}
return nil
}
Watch 机制深度解析
Watch 架构
┌──────────────┐
│ Client │
│ (kubectl) │
└──────┬───────┘
│ HTTP GET /api/v1/pods?watch=true
▼
┌──────────────────────────────────┐
│ API Server │
│ ┌────────────────────────────┐ │
│ │ Watch Cache (in-memory) │ │
│ │ - 缓存最近的资源状态 │ │
│ │ - 减少 etcd 负载 │ │
│ └────────┬───────────────────┘ │
│ │ │
│ ┌────────▼───────────────────┐ │
│ │ Cacher (核心组件) │ │
│ │ - 管理 Watch 订阅 │ │
│ │ - 事件分发 │ │
│ └────────┬───────────────────┘ │
└───────────┼───────────────────────┘
│
▼ etcd watch
┌─────────────────────┐
│ etcd │
│ - 持久化存储 │
│ - Watch 通知 │
└─────────────────────┘
源码实现
// k8s.io/apiserver/pkg/storage/cacher/cacher.go
type Cacher struct {
// incoming events 通道
incoming chan watchCacheEvent
// 所有 Watch 订阅者
watchers *watchersMap
// 内存缓存
watchCache *watchCache
}
// 处理 Watch 请求
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
// 1. 创建新的 watcher
watcher := newCacheWatcher(...)
// 2. 注册到 watchers map
c.watchers.addWatcher(watcher, ...)
// 3. 发送初始事件
go watcher.processInterval(ctx, c.watchCache, initEvents)
return watcher, nil
}
// 事件分发循环
func (c *Cacher) dispatchEvents() {
for {
select {
case event := <-c.incoming:
// 将事件分发给所有匹配的 watchers
c.watchers.notify(event)
}
}
}
Watch 事件类型
type EventType string
const (
Added EventType = "ADDED" // 资源创建
Modified EventType = "MODIFIED" // 资源更新
Deleted EventType = "DELETED" // 资源删除
Error EventType = "ERROR" // 错误事件
)
// Watch 事件结构
type WatchEvent struct {
Type EventType
Object runtime.Object // 资源对象
}
存储层交互
etcd 客户端封装
// k8s.io/apiserver/pkg/storage/etcd3/store.go
type store struct {
client *clientv3.Client
codec runtime.Codec
prefix string // etcd key 前缀,如 /registry/pods/default
}
// 创建资源
func (s *store) Create(ctx context.Context, key string, obj runtime.Object, ttl uint64) error {
// 1. 序列化对象
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
// 2. 生成 etcd key
etcdKey := path.Join(s.prefix, key)
// 3. 创建事务:key 不存在时才创建
txn := s.client.Txn(ctx)
txn.If(clientv3.Compare(clientv3.Version(etcdKey), "=", 0))
txn.Then(clientv3.OpPut(etcdKey, string(data)))
// 4. 执行事务
resp, err := txn.Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
return nil
}
// 更新资源 (乐观锁)
func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
// 1. 获取当前版本
getCurrentObj := func() (runtime.Object, uint64, error) {
getResp, err := s.client.Get(ctx, key)
if err != nil {
return nil, 0, err
}
obj := s.newFunc()
if err := runtime.DecodeInto(s.codec, getResp.Kvs[0].Value, obj); err != nil {
return nil, 0, err
}
return obj, uint64(getResp.Kvs[0].ModRevision), nil
}
// 2. 循环尝试更新
for {
currentObj, resourceVersion, err := getCurrentObj()
if err != nil {
return err
}
// 3. 执行用户定义的更新逻辑
updatedObj, _, err := tryUpdate(currentObj, storage.ResponseMeta{})
if err != nil {
return err
}
// 4. 序列化
data, err := runtime.Encode(s.codec, updatedObj)
if err != nil {
return err
}
// 5. CAS 更新 (Compare-And-Swap)
txn := s.client.Txn(ctx)
txn.If(clientv3.Compare(clientv3.ModRevision(key), "=", int64(resourceVersion)))
txn.Then(clientv3.OpPut(key, string(data)))
resp, err := txn.Commit()
if err != nil {
return err
}
if resp.Succeeded {
// 更新成功
return nil
}
// 版本冲突,重试
}
}
etcd 数据组织
/registry
├── pods
│ ├── default
│ │ ├── nginx-7c6c8f9b5-abcde
│ │ └── redis-6d4f7b8c9-xyzwv
│ └── kube-system
│ ├── coredns-5c8d5f9b5-12345
│ └── kube-proxy-6d8f7b9c8-67890
├── deployments
│ └── default
│ └── nginx
├── services
│ └── default
│ └── nginx-service
└── configmaps
└── default
└── app-config
API 聚合机制
APIService 资源
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1.metrics.k8s.io
spec:
service:
name: metrics-server
namespace: kube-system
group: metrics.k8s.io
version: v1
insecureSkipTLSVerify: true
groupPriorityMinimum: 100
versionPriority: 100
聚合层架构
kubectl get pods --all-namespaces
│
▼
┌────────────────────────┐
│ kube-apiserver │
│ ┌──────────────────┐ │
│ │ Aggregation Layer│ │
│ └────────┬─────────┘ │
└───────────┼────────────┘
│
┌───────┴───────┐
│ │
▼ ▼
┌─────────┐ ┌─────────────┐
│ Built-in│ │ Extension │
│ APIs │ │ API Server │
│ │ │ (metrics) │
└─────────┘ └─────────────┘
请求路由
// k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
func (s *APIAggregator) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// 1. 解析请求路径
apiService, err := s.getAPIService(req.URL.Path)
if err != nil {
s.localDelegate.ServeHTTP(w, req)
return
}
// 2. 检查 APIService 状态
if !s.isAvailable(apiService) {
http.Error(w, "APIService unavailable", http.StatusServiceUnavailable)
return
}
// 3. 代理到后端服务
proxy := s.proxyHandlers[apiService.Name]
proxy.ServeHTTP(w, req)
}
性能优化
1. Watch Cache
// 配置 watch cache 大小
--watch-cache-sizes=pods#1000,nodes#100,services#500
// 监控 watch cache
kubectl top apiserver --watch-cache-hits
2. 请求限流
// 配置 API 优先级和公平性
apiVersion: flowcontrol.apiserver.k8s.io/v1beta2
kind: FlowSchema
metadata:
name: system-leader-election
spec:
priorityLevelConfiguration:
name: leader-election
matchingPrecedence: 200
rules:
- subjects:
- kind: User
user:
name: system:kube-controller-manager
- kind: User
user:
name: system:kube-scheduler
resourceRules:
- verbs: ["get", "create", "update"]
apiGroups: [""]
resources: ["endpoints", "configmaps"]
namespaces: ["kube-system"]
3. 连接池配置
# API Server 配置
--max-requests-inflight=400 # 最大并发非 mutating 请求
--max-mutating-requests-inflight=200 # 最大并发 mutating 请求
--min-request-timeout=1800 # 最小请求超时时间(秒)
高可用部署
架构
Load Balancer (VIP)
192.168.1.100:6443
│
┌──────────┼──────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│API Srv1│ │API Srv2│ │API Srv3│
└───┬────┘ └───┬────┘ └───┬────┘
│ │ │
└──────────┼──────────┘
│
▼
etcd Cluster
(3 或 5 节点)
部署配置
# kube-apiserver.yaml
apiVersion: v1
kind: Pod
metadata:
name: kube-apiserver
namespace: kube-system
spec:
containers:
- name: kube-apiserver
image: k8s.gcr.io/kube-apiserver:v1.28.0
command:
- kube-apiserver
- --advertise-address=192.168.1.101
- --etcd-servers=https://192.168.1.201:2379,https://192.168.1.202:2379,https://192.168.1.203:2379
- --etcd-cafile=/etc/kubernetes/pki/etcd/ca.crt
- --etcd-certfile=/etc/kubernetes/pki/apiserver-etcd-client.crt
- --etcd-keyfile=/etc/kubernetes/pki/apiserver-etcd-client.key
- --apiserver-count=3 # API Server 数量
- --endpoint-reconciler-type=lease # 端点协调类型
- --enable-aggregator-routing=true
- --audit-log-path=/var/log/kubernetes/audit.log
- --audit-log-maxage=30
- --audit-log-maxbackup=10
- --audit-log-maxsize=100
livenessProbe:
httpGet:
path: /livez
port: 6443
scheme: HTTPS
initialDelaySeconds: 15
timeoutSeconds: 15
readinessProbe:
httpGet:
path: /readyz
port: 6443
scheme: HTTPS
监控指标
关键指标
# API Server 请求延迟
histogram_quantile(0.99,
sum(rate(apiserver_request_duration_seconds_bucket[5m])) by (verb, le)
)
# API Server 请求速率
sum(rate(apiserver_request_total[5m])) by (verb, code)
# Watch 缓存命中率
sum(rate(apiserver_watch_cache_events_dispatched_total[5m])) /
sum(rate(apiserver_watch_events_total[5m]))
# etcd 请求延迟
histogram_quantile(0.99,
sum(rate(etcd_request_duration_seconds_bucket[5m])) by (operation, le)
)
Grafana Dashboard
{
"dashboard": {
"title": "API Server Metrics",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "sum(rate(apiserver_request_total[5m])) by (verb)"
}
]
},
{
"title": "Request Latency P99",
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(apiserver_request_duration_seconds_bucket[5m])) by (verb, le))"
}
]
}
]
}
}
故障排查
常见问题
1. API Server 不可用
# 检查进程
ps aux | grep kube-apiserver
# 检查日志
journalctl -u kube-apiserver -f
# 检查端口
netstat -tlnp | grep 6443
# 检查证书
openssl s_client -connect localhost:6443 -cert /etc/kubernetes/pki/apiserver.crt -key /etc/kubernetes/pki/apiserver.key
2. etcd 连接问题
# 测试 etcd 连接
ETCDCTL_API=3 etcdctl \
--endpoints=https://127.0.0.1:2379 \
--cacert=/etc/kubernetes/pki/etcd/ca.crt \
--cert=/etc/kubernetes/pki/etcd/server.crt \
--key=/etc/kubernetes/pki/etcd/server.key \
endpoint health
# 查看 API Server 到 etcd 的延迟
curl -k https://localhost:6443/metrics | grep etcd_request_duration
3. Watch 性能问题
# 检查 watch cache 统计
kubectl get --raw /metrics | grep watch_cache
# 查看活跃的 watch 连接数
kubectl get --raw /metrics | grep apiserver_registered_watchers
最佳实践
1. 生产环境配置
# 推荐的 API Server 参数
--max-requests-inflight=800
--max-mutating-requests-inflight=400
--target-ram-mb=8192
--event-ttl=24h
--service-node-port-range=30000-32767
--enable-admission-plugins=NodeRestriction,PodSecurityPolicy
--audit-policy-file=/etc/kubernetes/audit-policy.yaml
--encryption-provider-config=/etc/kubernetes/encryption-config.yaml
2. 审计配置
# audit-policy.yaml
apiVersion: audit.k8s.io/v1
kind: Policy
rules:
- level: Metadata
resources:
- group: ""
resources: ["secrets", "configmaps"]
- level: Request
users: ["system:admin"]
verbs: ["delete", "create", "update"]
- level: RequestResponse
resources:
- group: ""
resources: ["pods"]
verbs: ["create", "update", "patch", "delete"]
3. 加密配置
# encryption-config.yaml
apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
- resources:
- secrets
providers:
- aescbc:
keys:
- name: key1
secret: <base64-encoded-32-byte-key>
- identity: {}
总结
API Server 是 Kubernetes 的大脑,理解其运行机制对于:
- 架构设计:合理设计 API 和资源模型
- 性能优化:识别瓶颈并优化
- 故障排查:快速定位问题
- 扩展开发:开发自定义控制器和 Webhook
核心要点:
- 请求处理链:认证 → 授权 → 准入控制
- Watch 机制:高效的资源变更通知
- etcd 交互:乐观锁保证一致性
- 高可用:多实例 + 负载均衡