Scheduler 调度器深度解析

Scheduler 调度器深度解析

Scheduler 概述

kube-scheduler 负责将 Pod 调度到合适的 Node 上,是 Kubernetes 集群的"资源分配大脑"。

调度流程

新建 Pod (Pending)
    ↓
┌──────────────────────────────────────┐
│         Scheduler 主循环             │
├──────────────────────────────────────┤
│  1. 从队列获取待调度 Pod              │
│  2. 预选阶段 (Predicate/Filter)      │
│     - 节点资源充足                   │
│     - 端口不冲突                     │
│     - 标签选择器匹配                 │
│     - 污点容忍                       │
│  3. 优选阶段 (Priority/Score)        │
│     - 资源均衡性打分                 │
│     - 亲和性打分                     │
│     - 镜像本地性打分                 │
│  4. 选择最优节点                     │
│  5. Bind Pod 到 Node                │
└──────────────────────────────────────┘
    ↓
Pod 调度成功 (Running)

调度框架源码分析

调度器主循环

// k8s.io/kubernetes/pkg/scheduler/scheduler.go

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // 1. 从队列取出一个 Pod
    podInfo := sched.NextPod()
    pod := podInfo.Pod
    
    // 2. 开始调度周期
    scheduleResult, err := sched.Algorithm.Schedule(ctx, pod)
    if err != nil {
        // 调度失败,记录事件
        sched.recordSchedulingFailure(pod, err)
        return
    }
    
    // 3. 调度成功,执行绑定
    err = sched.bind(ctx, pod, scheduleResult.SuggestedHost)
    if err != nil {
        return
    }
}

预选阶段实现

// k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go

func (f *frameworkImpl) RunFilterPlugins(
    ctx context.Context,
    state *CycleState,
    pod *v1.Pod,
    nodeInfo *NodeInfo,
) *Status {
    // 遍历所有 Filter 插件
    for _, pl := range f.filterPlugins {
        status := pl.Filter(ctx, state, pod, nodeInfo)
        if !status.IsSuccess() {
            // 节点不满足条件
            return status
        }
    }
    return nil
}

// 示例:资源检查插件
type NodeResourcesFit struct{}

func (pl *NodeResourcesFit) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status {
    // 计算 Pod 请求的资源
    podRequest := computePodResourceRequest(pod)
    
    // 获取节点可分配资源
    allocatable := nodeInfo.Allocatable
    
    // 检查 CPU
    if allocatable.MilliCPU < podRequest.MilliCPU {
        return NewStatus(Unschedulable, "Insufficient CPU")
    }
    
    // 检查内存
    if allocatable.Memory < podRequest.Memory {
        return NewStatus(Unschedulable, "Insufficient memory")
    }
    
    return nil
}

优选阶段实现

// k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go

func (f *frameworkImpl) RunScorePlugins(
    ctx context.Context,
    state *CycleState,
    pod *v1.Pod,
    nodes []*v1.Node,
) (PluginToNodeScores, *Status) {
    scores := make(PluginToNodeScores, len(f.scorePlugins))
    
    // 遍历所有 Score 插件
    for _, pl := range f.scorePlugins {
        nodeScoreList := make(NodeScoreList, len(nodes))
        
        // 为每个节点打分
        for i, node := range nodes {
            score, status := pl.Score(ctx, state, pod, node.Name)
            if !status.IsSuccess() {
                return nil, status
            }
            nodeScoreList[i] = NodeScore{
                Name:  node.Name,
                Score: score,
            }
        }
        
        // 归一化分数(可选)
        if pl.ScoreExtensions() != nil {
            pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
        }
        
        scores[pl.Name()] = nodeScoreList
    }
    
    return scores, nil
}

// 示例:资源均衡打分插件
type NodeResourcesBalancedAllocation struct{}

func (pl *NodeResourcesBalancedAllocation) Score(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) {
    nodeInfo := getNodeInfo(nodeName)
    
    // 计算资源使用率方差
    cpuFraction := float64(nodeInfo.Requested.MilliCPU) / float64(nodeInfo.Allocatable.MilliCPU)
    memoryFraction := float64(nodeInfo.Requested.Memory) / float64(nodeInfo.Allocatable.Memory)
    
    // 方差越小,说明资源越均衡,分数越高
    variance := math.Abs(cpuFraction - memoryFraction)
    score := int64((1.0 - variance) * 100)
    
    return score, nil
}

调度策略配置

节点亲和性

apiVersion: v1
kind: Pod
metadata:
  name: with-node-affinity
spec:
  affinity:
    nodeAffinity:
      # 硬性要求
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
        - matchExpressions:
          - key: kubernetes.io/hostname
            operator: In
            values:
            - node1
            - node2
      # 软性偏好
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 1
        preference:
          matchExpressions:
          - key: disk-type
            operator: In
            values:
            - ssd
  containers:
  - name: nginx
    image: nginx

Pod 亲和性与反亲和性

apiVersion: v1
kind: Pod
metadata:
  name: with-pod-affinity
spec:
  affinity:
    # Pod 亲和性:倾向于调度到已有相关 Pod 的节点
    podAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
      - labelSelector:
          matchExpressions:
          - key: app
            operator: In
            values:
            - cache
        topologyKey: kubernetes.io/hostname
    # Pod 反亲和性:避免调度到已有相关 Pod 的节点
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        podAffinityTerm:
          labelSelector:
            matchExpressions:
            - key: app
              operator: In
              values:
              - web
          topologyKey: kubernetes.io/hostname
  containers:
  - name: app
    image: myapp

污点和容忍

# 给节点添加污点
kubectl taint nodes node1 key=value:NoSchedule
kubectl taint nodes node1 key=value:NoExecute
kubectl taint nodes node1 key=value:PreferNoSchedule

# Pod 容忍污点
apiVersion: v1
kind: Pod
metadata:
  name: tolerations-pod
spec:
  tolerations:
  - key: "key"
    operator: "Equal"
    value: "value"
    effect: "NoSchedule"
  - key: "node.kubernetes.io/not-ready"
    operator: "Exists"
    effect: "NoExecute"
    tolerationSeconds: 300
  containers:
  - name: nginx
    image: nginx

自定义调度器

方法 1:Scheduler Extender

// 自定义调度扩展器
package main

import (
    "encoding/json"
    "net/http"
    schedulerapi "k8s.io/kube-scheduler/extender/v1"
)

// Filter 函数
func filterHandler(w http.ResponseWriter, r *http.Request) {
    var args schedulerapi.ExtenderArgs
    json.NewDecoder(r.Body).Decode(&args)
    
    filteredNodes := []v1.Node{}
    failedNodes := schedulerapi.FailedNodesMap{}
    
    // 自定义过滤逻辑
    for _, node := range args.Nodes.Items {
        if customFilter(args.Pod, node) {
            filteredNodes = append(filteredNodes, node)
        } else {
            failedNodes[node.Name] = "Custom filter failed"
        }
    }
    
    result := &schedulerapi.ExtenderFilterResult{
        Nodes: &v1.NodeList{Items: filteredNodes},
        FailedNodes: failedNodes,
    }
    
    json.NewEncoder(w).Encode(result)
}

// Prioritize 函数
func prioritizeHandler(w http.ResponseWriter, r *http.Request) {
    var args schedulerapi.ExtenderArgs
    json.NewDecoder(r.Body).Decode(&args)
    
    hostPriorities := []schedulerapi.HostPriority{}
    
    // 自定义打分逻辑
    for _, node := range args.Nodes.Items {
        score := customScore(args.Pod, node)
        hostPriorities = append(hostPriorities, schedulerapi.HostPriority{
            Host:  node.Name,
            Score: score,
        })
    }
    
    json.NewEncoder(w).Encode(hostPriorities)
}

func main() {
    http.HandleFunc("/filter", filterHandler)
    http.HandleFunc("/prioritize", prioritizeHandler)
    http.ListenAndServe(":8888", nil)
}

配置 Scheduler 使用 Extender:

# scheduler-config.yaml
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
extenders:
- urlPrefix: "http://localhost:8888"
  filterVerb: "filter"
  prioritizeVerb: "prioritize"
  weight: 1
  enableHTTPS: false

方法 2:调度框架插件

// k8s.io/kubernetes/pkg/scheduler/framework/plugins/example/example.go

package example

import (
    "context"
    v1 "k8s.io/api/core/v1"
    "k8s.io/kubernetes/pkg/scheduler/framework"
)

// 插件名称
const Name = "CustomScheduler"

// 插件结构
type CustomScheduler struct {
    handle framework.Handle
}

// 实现 FilterPlugin 接口
func (cs *CustomScheduler) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    // 自定义过滤逻辑
    if !cs.customCheck(pod, nodeInfo) {
        return framework.NewStatus(framework.Unschedulable, "Custom check failed")
    }
    return nil
}

// 实现 ScorePlugin 接口
func (cs *CustomScheduler) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
    nodeInfo, err := cs.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
    if err != nil {
        return 0, framework.NewStatus(framework.Error, err.Error())
    }
    
    // 自定义打分逻辑
    score := cs.customScore(p, nodeInfo)
    return score, nil
}

// ScoreExtensions 返回 nil 表示不需要归一化
func (cs *CustomScheduler) ScoreExtensions() framework.ScoreExtensions {
    return nil
}

// 插件名称
func (cs *CustomScheduler) Name() string {
    return Name
}

// 插件工厂函数
func New(obj runtime.Object, h framework.Handle) (framework.Plugin, error) {
    return &CustomScheduler{handle: h}, nil
}

调度性能优化

并发调度

// 配置并发调度数量
--kube-scheduler-qps=50
--kube-scheduler-burst=100

// 启用调度队列并发
--enable-scheduler-throughput-measurement=true

调度缓存优化

// k8s.io/kubernetes/pkg/scheduler/internal/cache/cache.go

type schedulerCache struct {
    // Node 信息缓存
    nodes map[string]*nodeInfoListItem
    
    // Pod 信息缓存
    pods  map[string]*v1.Pod
    
    // 假定的 Pod(已调度但未绑定)
    assumedPods map[string]bool
    
    mu sync.RWMutex
}

// 假定操作(乐观调度)
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
    cache.mu.Lock()
    defer cache.mu.Unlock()
    
    // 更新缓存状态,假定 Pod 已调度
    cache.assumedPods[pod.UID] = true
    cache.addPodToCache(pod, true)
    
    return nil
}

调度器监控

关键指标

# 调度延迟 P99
histogram_quantile(0.99,
  sum(rate(scheduler_scheduling_duration_seconds_bucket[5m])) by (le)
)

# 调度失败率
sum(rate(scheduler_schedule_attempts_total{result="error"}[5m])) /
sum(rate(scheduler_schedule_attempts_total[5m]))

# 待调度 Pod 队列长度
scheduler_pending_pods

# 预选阶段耗时
histogram_quantile(0.99,
  sum(rate(scheduler_framework_extension_point_duration_seconds_bucket{extension_point="Filter"}[5m])) by (le)
)

# 优选阶段耗时
histogram_quantile(0.99,
  sum(rate(scheduler_framework_extension_point_duration_seconds_bucket{extension_point="Score"}[5m])) by (le)
)

调度故障排查

常见问题

Pod 一直 Pending

# 查看 Pod 事件
kubectl describe pod <pod-name>

# 常见原因:
# 1. 节点资源不足
# 2. 节点污点不容忍
# 3. 节点选择器不匹配
# 4. 亲和性规则不满足
# 5. PV 不可用

# 查看调度器日志
kubectl logs -n kube-system kube-scheduler-<node>

调度倾斜

# 查看节点 Pod 分布
kubectl get pods --all-namespaces -o wide | awk '{print $8}' | sort | uniq -c | sort -n

# 分析原因:
# 1. 节点亲和性配置不当
# 2. 资源配额不均衡
# 3. 调度策略权重设置问题

最佳实践

1. 合理配置资源请求

apiVersion: v1
kind: Pod
metadata:
  name: resource-demo
spec:
  containers:
  - name: app
    image: myapp
    resources:
      requests:
        memory: "64Mi"
        cpu: "250m"
      limits:
        memory: "128Mi"
        cpu: "500m"

2. 使用 PriorityClass

# 高优先级 PriorityClass
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: high-priority
value: 1000000
globalDefault: false
description: "High priority class"

---
# 使用 PriorityClass
apiVersion: v1
kind: Pod
metadata:
  name: high-priority-pod
spec:
  priorityClassName: high-priority
  containers:
  - name: nginx
    image: nginx

3. 拓扑分布约束

apiVersion: v1
kind: Pod
metadata:
  name: topology-spread-pod
spec:
  topologySpreadConstraints:
  - maxSkew: 1
    topologyKey: kubernetes.io/hostname
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        app: myapp
  containers:
  - name: app
    image: myapp

总结

Scheduler 是 Kubernetes 的"大脑",掌握其原理对于:

  • 资源优化:合理分配集群资源
  • 性能调优:提高调度效率
  • 自定义扩展:满足特殊调度需求
  • 故障排查:快速定位调度问题

核心要点:

  1. 两阶段调度:预选(Filter)+ 优选(Score)
  2. 调度策略:亲和性、污点、拓扑约束
  3. 扩展机制:Extender 和 Framework Plugin
  4. 性能优化:并发调度、缓存机制