Scheduler 调度器深度解析
Scheduler 调度器深度解析
Scheduler 概述
kube-scheduler 负责将 Pod 调度到合适的 Node 上,是 Kubernetes 集群的"资源分配大脑"。
调度流程
新建 Pod (Pending)
↓
┌──────────────────────────────────────┐
│ Scheduler 主循环 │
├──────────────────────────────────────┤
│ 1. 从队列获取待调度 Pod │
│ 2. 预选阶段 (Predicate/Filter) │
│ - 节点资源充足 │
│ - 端口不冲突 │
│ - 标签选择器匹配 │
│ - 污点容忍 │
│ 3. 优选阶段 (Priority/Score) │
│ - 资源均衡性打分 │
│ - 亲和性打分 │
│ - 镜像本地性打分 │
│ 4. 选择最优节点 │
│ 5. Bind Pod 到 Node │
└──────────────────────────────────────┘
↓
Pod 调度成功 (Running)
调度框架源码分析
调度器主循环
// k8s.io/kubernetes/pkg/scheduler/scheduler.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// 1. 从队列取出一个 Pod
podInfo := sched.NextPod()
pod := podInfo.Pod
// 2. 开始调度周期
scheduleResult, err := sched.Algorithm.Schedule(ctx, pod)
if err != nil {
// 调度失败,记录事件
sched.recordSchedulingFailure(pod, err)
return
}
// 3. 调度成功,执行绑定
err = sched.bind(ctx, pod, scheduleResult.SuggestedHost)
if err != nil {
return
}
}
预选阶段实现
// k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go
func (f *frameworkImpl) RunFilterPlugins(
ctx context.Context,
state *CycleState,
pod *v1.Pod,
nodeInfo *NodeInfo,
) *Status {
// 遍历所有 Filter 插件
for _, pl := range f.filterPlugins {
status := pl.Filter(ctx, state, pod, nodeInfo)
if !status.IsSuccess() {
// 节点不满足条件
return status
}
}
return nil
}
// 示例:资源检查插件
type NodeResourcesFit struct{}
func (pl *NodeResourcesFit) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status {
// 计算 Pod 请求的资源
podRequest := computePodResourceRequest(pod)
// 获取节点可分配资源
allocatable := nodeInfo.Allocatable
// 检查 CPU
if allocatable.MilliCPU < podRequest.MilliCPU {
return NewStatus(Unschedulable, "Insufficient CPU")
}
// 检查内存
if allocatable.Memory < podRequest.Memory {
return NewStatus(Unschedulable, "Insufficient memory")
}
return nil
}
优选阶段实现
// k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go
func (f *frameworkImpl) RunScorePlugins(
ctx context.Context,
state *CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) (PluginToNodeScores, *Status) {
scores := make(PluginToNodeScores, len(f.scorePlugins))
// 遍历所有 Score 插件
for _, pl := range f.scorePlugins {
nodeScoreList := make(NodeScoreList, len(nodes))
// 为每个节点打分
for i, node := range nodes {
score, status := pl.Score(ctx, state, pod, node.Name)
if !status.IsSuccess() {
return nil, status
}
nodeScoreList[i] = NodeScore{
Name: node.Name,
Score: score,
}
}
// 归一化分数(可选)
if pl.ScoreExtensions() != nil {
pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
}
scores[pl.Name()] = nodeScoreList
}
return scores, nil
}
// 示例:资源均衡打分插件
type NodeResourcesBalancedAllocation struct{}
func (pl *NodeResourcesBalancedAllocation) Score(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) {
nodeInfo := getNodeInfo(nodeName)
// 计算资源使用率方差
cpuFraction := float64(nodeInfo.Requested.MilliCPU) / float64(nodeInfo.Allocatable.MilliCPU)
memoryFraction := float64(nodeInfo.Requested.Memory) / float64(nodeInfo.Allocatable.Memory)
// 方差越小,说明资源越均衡,分数越高
variance := math.Abs(cpuFraction - memoryFraction)
score := int64((1.0 - variance) * 100)
return score, nil
}
调度策略配置
节点亲和性
apiVersion: v1
kind: Pod
metadata:
name: with-node-affinity
spec:
affinity:
nodeAffinity:
# 硬性要求
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- node1
- node2
# 软性偏好
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
preference:
matchExpressions:
- key: disk-type
operator: In
values:
- ssd
containers:
- name: nginx
image: nginx
Pod 亲和性与反亲和性
apiVersion: v1
kind: Pod
metadata:
name: with-pod-affinity
spec:
affinity:
# Pod 亲和性:倾向于调度到已有相关 Pod 的节点
podAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- cache
topologyKey: kubernetes.io/hostname
# Pod 反亲和性:避免调度到已有相关 Pod 的节点
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- web
topologyKey: kubernetes.io/hostname
containers:
- name: app
image: myapp
污点和容忍
# 给节点添加污点
kubectl taint nodes node1 key=value:NoSchedule
kubectl taint nodes node1 key=value:NoExecute
kubectl taint nodes node1 key=value:PreferNoSchedule
# Pod 容忍污点
apiVersion: v1
kind: Pod
metadata:
name: tolerations-pod
spec:
tolerations:
- key: "key"
operator: "Equal"
value: "value"
effect: "NoSchedule"
- key: "node.kubernetes.io/not-ready"
operator: "Exists"
effect: "NoExecute"
tolerationSeconds: 300
containers:
- name: nginx
image: nginx
自定义调度器
方法 1:Scheduler Extender
// 自定义调度扩展器
package main
import (
"encoding/json"
"net/http"
schedulerapi "k8s.io/kube-scheduler/extender/v1"
)
// Filter 函数
func filterHandler(w http.ResponseWriter, r *http.Request) {
var args schedulerapi.ExtenderArgs
json.NewDecoder(r.Body).Decode(&args)
filteredNodes := []v1.Node{}
failedNodes := schedulerapi.FailedNodesMap{}
// 自定义过滤逻辑
for _, node := range args.Nodes.Items {
if customFilter(args.Pod, node) {
filteredNodes = append(filteredNodes, node)
} else {
failedNodes[node.Name] = "Custom filter failed"
}
}
result := &schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{Items: filteredNodes},
FailedNodes: failedNodes,
}
json.NewEncoder(w).Encode(result)
}
// Prioritize 函数
func prioritizeHandler(w http.ResponseWriter, r *http.Request) {
var args schedulerapi.ExtenderArgs
json.NewDecoder(r.Body).Decode(&args)
hostPriorities := []schedulerapi.HostPriority{}
// 自定义打分逻辑
for _, node := range args.Nodes.Items {
score := customScore(args.Pod, node)
hostPriorities = append(hostPriorities, schedulerapi.HostPriority{
Host: node.Name,
Score: score,
})
}
json.NewEncoder(w).Encode(hostPriorities)
}
func main() {
http.HandleFunc("/filter", filterHandler)
http.HandleFunc("/prioritize", prioritizeHandler)
http.ListenAndServe(":8888", nil)
}
配置 Scheduler 使用 Extender:
# scheduler-config.yaml
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
extenders:
- urlPrefix: "http://localhost:8888"
filterVerb: "filter"
prioritizeVerb: "prioritize"
weight: 1
enableHTTPS: false
方法 2:调度框架插件
// k8s.io/kubernetes/pkg/scheduler/framework/plugins/example/example.go
package example
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// 插件名称
const Name = "CustomScheduler"
// 插件结构
type CustomScheduler struct {
handle framework.Handle
}
// 实现 FilterPlugin 接口
func (cs *CustomScheduler) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// 自定义过滤逻辑
if !cs.customCheck(pod, nodeInfo) {
return framework.NewStatus(framework.Unschedulable, "Custom check failed")
}
return nil
}
// 实现 ScorePlugin 接口
func (cs *CustomScheduler) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := cs.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, err.Error())
}
// 自定义打分逻辑
score := cs.customScore(p, nodeInfo)
return score, nil
}
// ScoreExtensions 返回 nil 表示不需要归一化
func (cs *CustomScheduler) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// 插件名称
func (cs *CustomScheduler) Name() string {
return Name
}
// 插件工厂函数
func New(obj runtime.Object, h framework.Handle) (framework.Plugin, error) {
return &CustomScheduler{handle: h}, nil
}
调度性能优化
并发调度
// 配置并发调度数量
--kube-scheduler-qps=50
--kube-scheduler-burst=100
// 启用调度队列并发
--enable-scheduler-throughput-measurement=true
调度缓存优化
// k8s.io/kubernetes/pkg/scheduler/internal/cache/cache.go
type schedulerCache struct {
// Node 信息缓存
nodes map[string]*nodeInfoListItem
// Pod 信息缓存
pods map[string]*v1.Pod
// 假定的 Pod(已调度但未绑定)
assumedPods map[string]bool
mu sync.RWMutex
}
// 假定操作(乐观调度)
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
cache.mu.Lock()
defer cache.mu.Unlock()
// 更新缓存状态,假定 Pod 已调度
cache.assumedPods[pod.UID] = true
cache.addPodToCache(pod, true)
return nil
}
调度器监控
关键指标
# 调度延迟 P99
histogram_quantile(0.99,
sum(rate(scheduler_scheduling_duration_seconds_bucket[5m])) by (le)
)
# 调度失败率
sum(rate(scheduler_schedule_attempts_total{result="error"}[5m])) /
sum(rate(scheduler_schedule_attempts_total[5m]))
# 待调度 Pod 队列长度
scheduler_pending_pods
# 预选阶段耗时
histogram_quantile(0.99,
sum(rate(scheduler_framework_extension_point_duration_seconds_bucket{extension_point="Filter"}[5m])) by (le)
)
# 优选阶段耗时
histogram_quantile(0.99,
sum(rate(scheduler_framework_extension_point_duration_seconds_bucket{extension_point="Score"}[5m])) by (le)
)
调度故障排查
常见问题
Pod 一直 Pending
# 查看 Pod 事件
kubectl describe pod <pod-name>
# 常见原因:
# 1. 节点资源不足
# 2. 节点污点不容忍
# 3. 节点选择器不匹配
# 4. 亲和性规则不满足
# 5. PV 不可用
# 查看调度器日志
kubectl logs -n kube-system kube-scheduler-<node>
调度倾斜
# 查看节点 Pod 分布
kubectl get pods --all-namespaces -o wide | awk '{print $8}' | sort | uniq -c | sort -n
# 分析原因:
# 1. 节点亲和性配置不当
# 2. 资源配额不均衡
# 3. 调度策略权重设置问题
最佳实践
1. 合理配置资源请求
apiVersion: v1
kind: Pod
metadata:
name: resource-demo
spec:
containers:
- name: app
image: myapp
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
2. 使用 PriorityClass
# 高优先级 PriorityClass
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: high-priority
value: 1000000
globalDefault: false
description: "High priority class"
---
# 使用 PriorityClass
apiVersion: v1
kind: Pod
metadata:
name: high-priority-pod
spec:
priorityClassName: high-priority
containers:
- name: nginx
image: nginx
3. 拓扑分布约束
apiVersion: v1
kind: Pod
metadata:
name: topology-spread-pod
spec:
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: myapp
containers:
- name: app
image: myapp
总结
Scheduler 是 Kubernetes 的"大脑",掌握其原理对于:
- 资源优化:合理分配集群资源
- 性能调优:提高调度效率
- 自定义扩展:满足特殊调度需求
- 故障排查:快速定位调度问题
核心要点:
- 两阶段调度:预选(Filter)+ 优选(Score)
- 调度策略:亲和性、污点、拓扑约束
- 扩展机制:Extender 和 Framework Plugin
- 性能优化:并发调度、缓存机制