mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1481 字
4 分钟
Kubernetes 调度框架与自定义调度器开发
2023-09-01

一、Kubernetes 调度框架以及开发实例#

Kubernetes 的调度器(kube-scheduler)是集群的核心组件之一,负责为每个新建的 Pod 选择最合适的运行节点。默认调度器采用 predicates 和 priorities 算法进行节点筛选和排序,但在生产环境中往往需要对调度行为进行更精细的控制——例如将特定服务绑定到特定节点池、考虑数据的本地性(data locality)、或实现Bin-Packing/Spread调度策略。

从 Kubernetes 1.19 开始,调度框架(Scheduler Framework)取代了旧的 predicates/priorities 插件机制,成为官方推荐的扩展方式。相比旧体系,调度框架提供了更清晰的扩展点(Extension Points)和更完善的生命周期管理,被社区视为走向生产可用的必经之路。

1.1 调度框架的核心概念#

调度框架定义了一套插件接口和扩展点,调度器的核心逻辑与具体策略实现完全解耦。调度器核心代码只负责按顺序调用各扩展点的插件,而具体的筛选、排序逻辑由插件实现。

一个 Pod 的完整调度周期分为两个主要阶段:调度周期(Scheduling Cycle)绑定周期(Binding Cycle)。调度周期负责为 Pod 选择目标节点,绑定周期负责将调度结果持久化到 apiserver。两个周期相互独立,调度周期失败则 Pod 不会被调度,绑定周期失败则调度结果会被回滚。

flowchart TB subgraph "调度周期 Scheduling Cycle" START([Pod 进入队列]) --> QS[QueueSort 排序] QS --> PF[PreFilter 预处理] PF --> F[Filter 节点筛选] F --> PoF{有可用节点?} PoF -->|否| POST[PostFilter 后处理] POST --> FAIL([调度失败]) PoF -->|是| PS[PreScore 预打分] PS --> S[Score 节点打分] S --> SELECT[选择最优节点] end subgraph "绑定周期 Binding Cycle" SELECT --> R[Reserve 资源预留] R --> P[Permit 许可检查] P --> PERMIT{通过?} PERMIT -->|否| ROLLBACK[释放预留资源] ROLLBACK --> FAIL PERMIT -->|是| B[Bind 绑定节点] B --> PB[PostBind 后处理] PB --> SUCCESS([调度成功]) end style START fill:#e3f2fd style SUCCESS fill:#c8e6c9 style FAIL fill:#ffcdd2

调度框架定义了以下扩展点,按调度周期中的调用顺序排列:

扩展点插件接口调用时机
QueueSortLess(Pod1, Pod2)对调度队列中的 Pod 进行排序,决定下一个被调度的 Pod
PreFilterExtensions.Plugin.PreFilter预处理 Pod 的调度条件,可用于检查集群是否满足 Pod 的调度需求
FilterExtensions.Plugin.Filter排除不满足条件的节点,对应旧的 predicates
PostFilterExtensions.Plugin.PostFilterFilter 阶段后的后处理,当没有节点通过 Filter 时被调用
PreScoreExtensions.Plugin.PreScore预打分阶段,可用于准备评分所需的共享资源信息
ScoreExtensions.Plugin.Score对通过 Filter 的节点进行评分,对应旧的 priorities
ReserveExtensions.Plugin.Reserve预留插件,用于「成功绑定前」的资源预留(如 CSI 存储挂载)
PermitExtensions.Plugin.Permit允许阶段,可用于暂停/拒绝/批准调度结果
BindExtensions.Plugin.Bind绑定阶段,将 Pod 绑定到目标节点
PostBindExtensions.Plugin.PostBind绑定完成后的后处理,可用于清理预留资源
graph TB subgraph "Filter 阶段 - 节点筛选" N1["Node 1"] --> F1{"PodFitResources?"} N2["Node 2"] --> F2{"PodFitResources?"} N3["Node 3"] --> F3{"PodFitResources?"} N4["Node 4"] --> F4{"PodFitResources?"} F1 -->|通过| S1["加入候选列表"] F2 -->|拒绝| X1["排除"] F3 -->|通过| S2["加入候选列表"] F4 -->|通过| S3["加入候选列表"] end subgraph "Score 阶段 - 节点打分" CANDIDATES["候选节点<br/>Node 1, 3, 4"] CANDIDATES --> SCORE1["Node 1: 60 分"] CANDIDATES --> SCORE2["Node 3: 85 分"] CANDIDATES --> SCORE3["Node 4: 45 分"] end RESULT["选择 Node 3<br/>最高分节点"] S1 --> CANDIDATES S2 --> CANDIDATES S3 --> CANDIDATES SCORE2 --> RESULT style RESULT fill:#c8e6c9 style X1 fill:#ffcdd2

1.2 自定义调度器开发示例#

接下来通过一个实际例子,演示如何开发一个自定义调度器插件。该插件的功能是:优先将标注了 app.gpu=true 的 Pod 调度到带有 GPU 标签的节点

graph TB subgraph "Pod 调度请求" POD["GPU Pod<br/>annotation: app.gpu=true"] end subgraph "Filter 阶段" F1["检查节点 GPU 标签"] GN1["GPU Node<br/>gpu.kubernetes.io=true"] GN2["GPU Node<br/>gpu.kubernetes.io=true"] CN["普通 Node<br/>无 GPU 标签"] F1 --> GN1 -->|"通过"| PASS1["候选"] F1 --> GN2 -->|"通过"| PASS2["候选"] F1 --> CN -->|"拒绝"| REJECT["排除"] end subgraph "Score 阶段" SCORE["为 GPU 节点打高分"] PASS1 -->|"100 分"| S1["Node A"] PASS2 -->|"100 分"| S2["Node B"] end RESULT["选择最优 GPU 节点"] S1 --> RESULT S2 --> RESULT POD --> F1 style POD fill:#fff9c4 style RESULT fill:#c8e6c9 style REJECT fill:#ffcdd2

创建项目结构#

使用 Kubebuilder 初始化调度器插件项目:

# 初始化项目
kubebuilder init --domain myorg.io --repo github.com/myorg/scheduler-plugins
# 创建 API(CRD)
kubebuilder create api --group scheduling --namespacedfalse --kind NodeGPU
# 创建插件代码
mkdir -p pkg/plugins/gpunode

实现 Filter 插件#

Filter 插件负责检查节点是否满足 Pod 的调度条件。这里需要检查:若 Pod 需要 GPU,则目标节点必须拥有 GPU 标签。

pkg/plugins/gpunode/gpu_filter.go
package gpunode
import (
"context"
"fmt"
"github.com/kubernetes-sigs/scheduler-plugins/pkg/apis/config/scheme"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
const Name = "NodeGPUFilter"
// FilterArgs 定义插件配置参数
type FilterArgs struct {
// 需要 GPU 的 Pod 需要调度到哪些标签的节点
GPULabelKey string `json:"gpuLabelKey,omitempty"`
GPULabelValue string `json:"gpuLabelValue,omitempty"`
}
// Filter 实现 Filter 扩展点
type Filter struct {
args FilterArgs
}
var _ framework.FilterPlugin = &Filter{}
func (f *Filter) Name() string {
return Name
}
// Filter 检查节点是否满足调度条件
func (f *Filter) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// 获取 Pod 是否需要 GPU
needsGPU, ok := pod.Annotations["app.gpu"]
if !ok || needsGPU != "true" {
// Pod 不需要 GPU,直接通过
return nil
}
// 检查节点是否有 GPU 标签
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Unschedulable, "node is nil")
}
gpuValue, exists := node.Labels[f.args.GPULabelKey]
if !exists || gpuValue != f.args.GPULabelValue {
return framework.NewStatus(
framework.Unschedulable,
fmt.Sprintf("node %s does not have GPU label %s=%s",
node.Name, f.args.GPULabelKey, f.args.GPULabelValue),
)
}
return nil
}
// NewFilter 创建 Filter 插件实例
func NewFilter(_ context.Context, plArgs runtime.Object) (framework.Plugin, error) {
args := &FilterArgs{}
if err := scheme.Scheme.Convert(plArgs, args, nil); err != nil {
return nil, fmt.Errorf("failed to convert args: %v", err)
}
if args.GPULabelKey == "" {
args.GPULabelKey = "gpu.kubernetes.io"
}
if args.GPULabelValue == "" {
args.GPULabelValue = "true"
}
return &Filter{args: *args}, nil
}

实现 Score 插件#

在调度框架中,Filter 阶段只决定「能不能」调度,而 Score 阶段决定「哪个节点更好」。这里我们为 GPU 节点赋予更高的评分:

pkg/plugins/gpunode/gpu_score.go
package gpunode
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
const (
// GPU 节点的基础分数
gpuNodeScore = 100
// 普通节点的分数
normalNodeScore = 50
)
// Score 实现 Score 扩展点
type Score struct {
handle framework.Handle
}
var _ framework.ScorePlugin = &Score{}
func NewScore(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) {
return &Score{handle: h}, nil
}
func (s *Score) Name() string {
return Name
}
// Score 对节点进行评分
// 注意:Score 函数接收的是节点名称,需要通过 handle 获取节点信息
func (s *Score) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
// 通过 handle 获取节点信息
nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("failed to get node %s: %v", nodeName, err))
}
node := nodeInfo.Node()
if node == nil {
return 0, framework.NewStatus(framework.Error, "node is nil")
}
// 检查是否为 GPU 节点
needsGPU, _ := pod.Annotations["app.gpu"]
if needsGPU == "true" {
if gpuValue, ok := node.Labels["gpu.kubernetes.io"]; ok && gpuValue == "true" {
return gpuNodeScore, nil
}
return 1, nil // GPU Pod 但节点无 GPU,给最低分
}
return normalNodeScore, nil
}
// ScoreExtensions 返回分数的扩展信息
func (s *Score) ScoreExtensions() framework.ScoreExtensions {
return nil
}

注册插件#

插件编写完成后,需要在调度器启动时注册。调度框架通过插件配置文件(KubeSchedulerProfile)加载插件:

scheduler-config.yaml
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: default-scheduler
pluginConfig:
- name: NodeGPUFilter
args:
gpuLabelKey: gpu.kubernetes.io
gpuLabelValue: "true"

启动调度器时指定配置文件:

kube-scheduler --config scheduler-config.yaml --authentication-kubeconfig=/path/to/kubeconfig --authorization-kubeconfig=/path/to/kubeconfig

使用自定义调度器#

Pod 可以通过 schedulerName 字段指定使用哪个调度器:

apiVersion: v1
kind: Pod
metadata:
name: gpu-pod
annotations:
app.gpu: "true" # 触发自定义调度逻辑
spec:
schedulerName: default-scheduler
containers:
- name: cuda-container
image: nvidia/cuda:11.0-base
command: ["nvidia-smi"]

1.3 调度框架的高级特性#

多个调度器并存#

Kubernetes 支持集群中运行多个调度器实例,每个调度器可以有不同的配置。通过 schedulerName 字段,Pod 可以选择使用哪个调度器。如果不指定,默认使用 default-scheduler

graph TB subgraph "API Server" API["API Server"] end subgraph "调度器实例" DS["default-scheduler<br/>默认调度器"] GS["gpu-scheduler<br/>GPU 专用调度器"] BS["batch-scheduler<br/>批处理调度器"] end subgraph "Pod 部署" P1["普通 Pod<br/>schedulerName: default-scheduler"] P2["GPU Pod<br/>schedulerName: gpu-scheduler"] P3["Batch Job<br/>schedulerName: batch-scheduler"] end API -->|"监听 Pod"| DS API -->|"监听 Pod"| GS API -->|"监听 Pod"| BS DS -->|"调度"| P1 GS -->|"调度"| P2 BS -->|"调度"| P3 style DS fill:#bbdefb style GS fill:#c8e6c9 style BS fill:#fff9c4

多个调度器的典型部署场景是:为训练任务和在线服务分别配置调度器,训练任务使用 Bin-Packing 策略优先利用 GPU 节点资源,在线服务使用 Spread 策略保证高可用。

队列排序(QueueSort)#

调度队列中的 Pod 等待调度时,通过 QueueSort 扩展点决定哪个 Pod 先被调度。默认实现按 Pod 的创建时间排序,但可以通过实现自定义 QueueSort 插件来改变调度顺序,例如优先调度关键业务 Pod。

// QueueSort 实现示例:优先调度标注了 priorityClassName 的 Pod
type PriorityQueueSort struct{}
func (p *PriorityQueueSort) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
p1 := getPriority(podInfo1.Pod)
p2 := getPriority(podInfo2.Pod)
if p1 != p2 {
return p1 > p2 // 更高优先级的 Pod 先调度
}
return podInfo1.Timestamp.Before(podInfo2.Timestamp)
}

预留(Reserve)与 Permit#

Reserve 扩展点在 Pod 被绑定前被调用,常用于需要「预占」资源的插件。最典型的场景是 CSI 存储卷的挂载——在 Pod 绑定到节点之前,调度器需要确保该节点能够访问 PVC 对应的存储卷。

当 StorageClass 的 volumeBindingMode 设置为 WaitForFirstConsumer 时,调度器会延迟 PV 的创建和绑定,直到 Pod 被调度到具体节点。这种设计使得存储卷可以与 Pod 的调度决策协同工作,避免存储卷被创建在不合适的节点上。CSI 存储插件的完整实现请参阅 Kubernetes CSI 存储插件开发

Permit 扩展点可以暂停或拒绝调度结果。当需要等待某个外部条件满足时(例如等待存储卷完成绑定),可以在 Permit 阶段返回 Wait 或 Deny:

func (p *MyPermit) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
// 检查存储是否就绪
if !storageReady(pod) {
// 等待 30 秒后重试
return framework.NewStatus(framework.Wait), 30 * time.Second
}
// 批准调度
return nil, 0
}

二、参考资料#


参考#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Kubernetes 调度框架与自定义调度器开发
https://blog.souloss.com/posts/kubernetes/k8s-scheduling/
作者
Souloss
发布于
2023-09-01
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时