一、Kubernetes 调度框架以及开发实例
Kubernetes 的调度器(kube-scheduler)是集群的核心组件之一,负责为每个新建的 Pod 选择最合适的运行节点。默认调度器采用 predicates 和 priorities 算法进行节点筛选和排序,但在生产环境中往往需要对调度行为进行更精细的控制——例如将特定服务绑定到特定节点池、考虑数据的本地性(data locality)、或实现Bin-Packing/Spread调度策略。
从 Kubernetes 1.19 开始,调度框架(Scheduler Framework)取代了旧的 predicates/priorities 插件机制,成为官方推荐的扩展方式。相比旧体系,调度框架提供了更清晰的扩展点(Extension Points)和更完善的生命周期管理,被社区视为走向生产可用的必经之路。
1.1 调度框架的核心概念
调度框架定义了一套插件接口和扩展点,调度器的核心逻辑与具体策略实现完全解耦。调度器核心代码只负责按顺序调用各扩展点的插件,而具体的筛选、排序逻辑由插件实现。
一个 Pod 的完整调度周期分为两个主要阶段:调度周期(Scheduling Cycle)和绑定周期(Binding Cycle)。调度周期负责为 Pod 选择目标节点,绑定周期负责将调度结果持久化到 apiserver。两个周期相互独立,调度周期失败则 Pod 不会被调度,绑定周期失败则调度结果会被回滚。
调度框架定义了以下扩展点,按调度周期中的调用顺序排列:
| 扩展点 | 插件接口 | 调用时机 |
|---|---|---|
QueueSort | Less(Pod1, Pod2) | 对调度队列中的 Pod 进行排序,决定下一个被调度的 Pod |
PreFilter | Extensions.Plugin.PreFilter | 预处理 Pod 的调度条件,可用于检查集群是否满足 Pod 的调度需求 |
Filter | Extensions.Plugin.Filter | 排除不满足条件的节点,对应旧的 predicates |
PostFilter | Extensions.Plugin.PostFilter | Filter 阶段后的后处理,当没有节点通过 Filter 时被调用 |
PreScore | Extensions.Plugin.PreScore | 预打分阶段,可用于准备评分所需的共享资源信息 |
Score | Extensions.Plugin.Score | 对通过 Filter 的节点进行评分,对应旧的 priorities |
Reserve | Extensions.Plugin.Reserve | 预留插件,用于「成功绑定前」的资源预留(如 CSI 存储挂载) |
Permit | Extensions.Plugin.Permit | 允许阶段,可用于暂停/拒绝/批准调度结果 |
Bind | Extensions.Plugin.Bind | 绑定阶段,将 Pod 绑定到目标节点 |
PostBind | Extensions.Plugin.PostBind | 绑定完成后的后处理,可用于清理预留资源 |
1.2 自定义调度器开发示例
接下来通过一个实际例子,演示如何开发一个自定义调度器插件。该插件的功能是:优先将标注了 app.gpu=true 的 Pod 调度到带有 GPU 标签的节点。
创建项目结构
使用 Kubebuilder 初始化调度器插件项目:
# 初始化项目kubebuilder init --domain myorg.io --repo github.com/myorg/scheduler-plugins
# 创建 API(CRD)kubebuilder create api --group scheduling --namespacedfalse --kind NodeGPU
# 创建插件代码mkdir -p pkg/plugins/gpunode实现 Filter 插件
Filter 插件负责检查节点是否满足 Pod 的调度条件。这里需要检查:若 Pod 需要 GPU,则目标节点必须拥有 GPU 标签。
package gpunode
import ( "context" "fmt"
"github.com/kubernetes-sigs/scheduler-plugins/pkg/apis/config/scheme" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/framework")
const Name = "NodeGPUFilter"
// FilterArgs 定义插件配置参数type FilterArgs struct { // 需要 GPU 的 Pod 需要调度到哪些标签的节点 GPULabelKey string `json:"gpuLabelKey,omitempty"` GPULabelValue string `json:"gpuLabelValue,omitempty"`}
// Filter 实现 Filter 扩展点type Filter struct { args FilterArgs}
var _ framework.FilterPlugin = &Filter{}
func (f *Filter) Name() string { return Name}
// Filter 检查节点是否满足调度条件func (f *Filter) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { // 获取 Pod 是否需要 GPU needsGPU, ok := pod.Annotations["app.gpu"] if !ok || needsGPU != "true" { // Pod 不需要 GPU,直接通过 return nil }
// 检查节点是否有 GPU 标签 node := nodeInfo.Node() if node == nil { return framework.NewStatus(framework.Unschedulable, "node is nil") }
gpuValue, exists := node.Labels[f.args.GPULabelKey] if !exists || gpuValue != f.args.GPULabelValue { return framework.NewStatus( framework.Unschedulable, fmt.Sprintf("node %s does not have GPU label %s=%s", node.Name, f.args.GPULabelKey, f.args.GPULabelValue), ) }
return nil}
// NewFilter 创建 Filter 插件实例func NewFilter(_ context.Context, plArgs runtime.Object) (framework.Plugin, error) { args := &FilterArgs{} if err := scheme.Scheme.Convert(plArgs, args, nil); err != nil { return nil, fmt.Errorf("failed to convert args: %v", err) }
if args.GPULabelKey == "" { args.GPULabelKey = "gpu.kubernetes.io" } if args.GPULabelValue == "" { args.GPULabelValue = "true" }
return &Filter{args: *args}, nil}实现 Score 插件
在调度框架中,Filter 阶段只决定「能不能」调度,而 Score 阶段决定「哪个节点更好」。这里我们为 GPU 节点赋予更高的评分:
package gpunode
import ( "context" "fmt"
v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/framework")
const ( // GPU 节点的基础分数 gpuNodeScore = 100 // 普通节点的分数 normalNodeScore = 50)
// Score 实现 Score 扩展点type Score struct { handle framework.Handle}
var _ framework.ScorePlugin = &Score{}
func NewScore(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) { return &Score{handle: h}, nil}
func (s *Score) Name() string { return Name}
// Score 对节点进行评分// 注意:Score 函数接收的是节点名称,需要通过 handle 获取节点信息func (s *Score) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { // 通过 handle 获取节点信息 nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("failed to get node %s: %v", nodeName, err)) }
node := nodeInfo.Node() if node == nil { return 0, framework.NewStatus(framework.Error, "node is nil") }
// 检查是否为 GPU 节点 needsGPU, _ := pod.Annotations["app.gpu"] if needsGPU == "true" { if gpuValue, ok := node.Labels["gpu.kubernetes.io"]; ok && gpuValue == "true" { return gpuNodeScore, nil } return 1, nil // GPU Pod 但节点无 GPU,给最低分 }
return normalNodeScore, nil}
// ScoreExtensions 返回分数的扩展信息func (s *Score) ScoreExtensions() framework.ScoreExtensions { return nil}注册插件
插件编写完成后,需要在调度器启动时注册。调度框架通过插件配置文件(KubeSchedulerProfile)加载插件:
apiVersion: kubescheduler.config.k8s.io/v1beta3kind: KubeSchedulerConfigurationprofiles: - schedulerName: default-scheduler pluginConfig: - name: NodeGPUFilter args: gpuLabelKey: gpu.kubernetes.io gpuLabelValue: "true"启动调度器时指定配置文件:
kube-scheduler --config scheduler-config.yaml --authentication-kubeconfig=/path/to/kubeconfig --authorization-kubeconfig=/path/to/kubeconfig使用自定义调度器
Pod 可以通过 schedulerName 字段指定使用哪个调度器:
apiVersion: v1kind: Podmetadata: name: gpu-pod annotations: app.gpu: "true" # 触发自定义调度逻辑spec: schedulerName: default-scheduler containers: - name: cuda-container image: nvidia/cuda:11.0-base command: ["nvidia-smi"]1.3 调度框架的高级特性
多个调度器并存
Kubernetes 支持集群中运行多个调度器实例,每个调度器可以有不同的配置。通过 schedulerName 字段,Pod 可以选择使用哪个调度器。如果不指定,默认使用 default-scheduler。
多个调度器的典型部署场景是:为训练任务和在线服务分别配置调度器,训练任务使用 Bin-Packing 策略优先利用 GPU 节点资源,在线服务使用 Spread 策略保证高可用。
队列排序(QueueSort)
调度队列中的 Pod 等待调度时,通过 QueueSort 扩展点决定哪个 Pod 先被调度。默认实现按 Pod 的创建时间排序,但可以通过实现自定义 QueueSort 插件来改变调度顺序,例如优先调度关键业务 Pod。
// QueueSort 实现示例:优先调度标注了 priorityClassName 的 Podtype PriorityQueueSort struct{}
func (p *PriorityQueueSort) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { p1 := getPriority(podInfo1.Pod) p2 := getPriority(podInfo2.Pod) if p1 != p2 { return p1 > p2 // 更高优先级的 Pod 先调度 } return podInfo1.Timestamp.Before(podInfo2.Timestamp)}预留(Reserve)与 Permit
Reserve 扩展点在 Pod 被绑定前被调用,常用于需要「预占」资源的插件。最典型的场景是 CSI 存储卷的挂载——在 Pod 绑定到节点之前,调度器需要确保该节点能够访问 PVC 对应的存储卷。
当 StorageClass 的 volumeBindingMode 设置为 WaitForFirstConsumer 时,调度器会延迟 PV 的创建和绑定,直到 Pod 被调度到具体节点。这种设计使得存储卷可以与 Pod 的调度决策协同工作,避免存储卷被创建在不合适的节点上。CSI 存储插件的完整实现请参阅 Kubernetes CSI 存储插件开发。
Permit 扩展点可以暂停或拒绝调度结果。当需要等待某个外部条件满足时(例如等待存储卷完成绑定),可以在 Permit 阶段返回 Wait 或 Deny:
func (p *MyPermit) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { // 检查存储是否就绪 if !storageReady(pod) { // 等待 30 秒后重试 return framework.NewStatus(framework.Wait), 30 * time.Second } // 批准调度 return nil, 0}二、参考资料
参考
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






