mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2422 字
7 分钟
Kubernetes CSI 存储插件开发
2023-12-05

一、Kubernetes 存储基础#

Kubernetes 的存储体系是一套层次分明的抽象模型,理解这些概念之间的关系是掌握 CSI 的前提。从最底层的概念开始,Volume 是 Kubernetes 中最基本的存储抽象,它定义了 Pod 可以访问的存储资源。然而,直接在 Pod 中定义 Volume 存在明显的局限性:Volume 的生命周期与 Pod 绑定,无法独立管理;Volume 的配置信息与存储实现细节耦合,难以实现存储资源的复用和统一管理。

为了解决这些问题,Kubernetes 引入了持久卷(PersistentVolume,简称 PV)和持久卷声明(PersistentVolumeClaim,简称 PVC)的概念。PV 是集群级别的资源,代表一块存储空间,它独立于 Pod 的生命周期存在。PVC 是命名空间级别的资源,代表用户对存储资源的请求。通过 PV 和 PVC 的分离,存储资源的管理者和使用者可以各司其职:管理员负责创建和配置 PV,开发者只需通过 PVC 申请所需的存储资源。

graph TB subgraph "用户视角" DEV["开发者"] PVC["PVC<br/>存储请求"] end subgraph "管理员视角" ADMIN["管理员"] PV["PV<br/>存储资源"] SC["StorageClass<br/>存储模板"] end subgraph "存储后端" STORAGE["存储系统<br/>NFS/Ceph/Cloud Disk"] end DEV -->|"申请存储"| PVC PVC -->|"绑定"| PV PV -->|"映射"| STORAGE ADMIN -->|"创建"| PV ADMIN -->|"定义"| SC SC -.->|"动态供应"| PV style DEV fill:#bbdefb style ADMIN fill:#c8e6c9 style STORAGE fill:#fff9c4
apiVersion: v1
kind: PersistentVolume
metadata:
name: pv-nfs
spec:
capacity:
storage: 10Gi
volumeMode: Filesystem
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs-storage
nfs:
server: 192.168.1.100
path: /data/k8s
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-nfs
namespace: default
spec:
accessModes:
- ReadWriteMany
storageClassName: nfs-storage
resources:
requests:
storage: 5Gi

然而,手动创建 PV 的方式在面对大规模集群时显得力不从心。管理员需要预先估算存储需求,创建大量 PV 以备使用,这不仅增加了管理负担,也造成了资源的浪费。为了实现存储资源的按需分配,Kubernetes 引入了 StorageClass 和动态供应(Dynamic Provisioning)机制。

StorageClass 定义了一类存储的模板,包括存储类型、参数配置和供应策略。当用户创建 PVC 并指定 StorageClass 时,Kubernetes 会自动调用对应的存储插件创建 PV,并将其绑定到 PVC 上。这种机制极大地简化了存储资源的管理流程,使存储资源像 CPU 和内存一样可以按需申请。

注意,StorageClass 的 volumeBindingMode 字段控制着存储卷的绑定时机。设置为 WaitForFirstConsumer 时,调度器会等待 Pod 被调度到具体节点后再创建存储卷,这对于 CSI 存储的调度决策至关重要——详见 Kubernetes 调度框架 中关于 Reserve 扩展点的讨论。

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: fast-ssd
provisioner: kubernetes.io/aws-ebs
parameters:
type: gp3
fsType: ext4
reclaimPolicy: Delete
allowVolumeExpansion: true
volumeBindingMode: WaitForFirstConsumer

二、CSI 规范详解#

容器存储接口(Container Storage Interface,简称 CSI)是一个开放的行业标准,它定义了容器编排系统(如 Kubernetes)与存储提供商之间的标准接口。CSI 的设计目标是让存储供应商只需编写一次插件,就能支持多种容器编排系统;同时让容器编排系统能够对接各种存储后端,而无需修改核心代码。

CSI 规范定义了三组 gRPC 接口,分别对应存储系统中的不同职责。IdentityServer 接口用于插件的标识和能力声明,包括获取插件名称、版本信息和支持的功能列表。ControllerServer 接口负责卷的生命周期管理,包括创建卷、删除卷、扩容卷、快照创建和恢复等操作。NodeServer 接口运行在每个节点上,负责卷的挂载和卸载操作,以及卷的状态查询。

graph TB subgraph "CSI 插件架构" subgraph "Controller Service" CS["ControllerServer"] CS1["CreateVolume"] CS2["DeleteVolume"] CS3["ControllerPublishVolume"] CS4["CreateSnapshot"] end subgraph "Node Service" NS["NodeServer"] NS1["NodeStageVolume"] NS2["NodePublishVolume"] NS3["NodeUnpublishVolume"] end subgraph "Identity Service" IS["IdentityServer"] IS1["GetPluginInfo"] IS2["GetPluginCapabilities"] IS3["Probe"] end end subgraph "调用方" CO["Kubernetes<br/>容器编排系统"] end CO -->|"gRPC"| IS CO -->|"gRPC"| CS CO -->|"gRPC"| NS style IS fill:#f3e5f5 style CS fill:#e1f5fe style NS fill:#e8f5e9
// IdentityServer 接口定义
service Identity {
rpc GetPluginInfo(GetPluginInfoRequest)
returns (GetPluginInfoResponse) {}
rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
returns (GetPluginCapabilitiesResponse) {}
rpc Probe(ProbeRequest) returns (ProbeResponse) {}
}
// ControllerServer 接口定义
service Controller {
rpc CreateVolume(CreateVolumeRequest)
returns (CreateVolumeResponse) {}
rpc DeleteVolume(DeleteVolumeRequest)
returns (DeleteVolumeResponse) {}
rpc ControllerPublishVolume(ControllerPublishVolumeRequest)
returns (ControllerPublishVolumeResponse) {}
rpc ControllerUnpublishVolume(ControllerUnpublishVolumeRequest)
returns (ControllerUnpublishVolumeResponse) {}
rpc ValidateVolumeCapabilities(ValidateVolumeCapabilitiesRequest)
returns (ValidateVolumeCapabilitiesResponse) {}
rpc ListVolumes(ListVolumesRequest)
returns (ListVolumesResponse) {}
rpc GetCapacity(GetCapacityRequest)
returns (GetCapacityResponse) {}
rpc ControllerGetCapabilities(ControllerGetCapabilitiesRequest)
returns (ControllerGetCapabilitiesResponse) {}
rpc CreateSnapshot(CreateSnapshotRequest)
returns (CreateSnapshotResponse) {}
rpc DeleteSnapshot(DeleteSnapshotRequest)
returns (DeleteSnapshotResponse) {}
rpc ListSnapshots(ListSnapshotsRequest)
returns (ListSnapshotsResponse) {}
rpc ControllerExpandVolume(ControllerExpandVolumeRequest)
returns (ControllerExpandVolumeResponse) {}
}
// NodeServer 接口定义
service Node {
rpc NodeStageVolume(NodeStageVolumeRequest)
returns (NodeStageVolumeResponse) {}
rpc NodeUnstageVolume(NodeUnstageVolumeRequest)
returns (NodeUnstageVolumeResponse) {}
rpc NodePublishVolume(NodePublishVolumeRequest)
returns (NodePublishVolumeResponse) {}
rpc NodeUnpublishVolume(NodeUnpublishVolumeRequest)
returns (NodeUnpublishVolumeResponse) {}
rpc NodeGetVolumeStats(NodeGetVolumeStatsRequest)
returns (NodeGetVolumeStatsResponse) {}
rpc NodeExpandVolume(NodeExpandVolumeRequest)
returns (NodeExpandVolumeResponse) {}
rpc NodeGetCapabilities(NodeGetCapabilitiesRequest)
returns (NodeGetCapabilitiesResponse) {}
rpc NodeGetInfo(NodeGetInfoRequest)
returns (NodeGetInfoResponse) {}
}

理解 CSI 插件的工作流程对于开发 CSI 插件至关重要。当用户创建一个 PVC 时,Kubernetes 控制器会根据 StorageClass 中指定的 provisioner 找到对应的 CSI 插件,然后调用 ControllerServer 的 CreateVolume 方法创建存储卷。创建成功后,CSI 插件返回卷的 ID 和相关信息,Kubernetes 控制器创建对应的 PV 对象并绑定到 PVC。

当 Pod 调度到某个节点并使用该 PVC 时,Kubernetes 首先调用 ControllerServer 的 ControllerPublishVolume 方法,将卷附加到目标节点。然后,运行在该节点上的 NodeServer 收到 NodeStageVolume 调用,将卷挂载到一个临时目录(称为 staging 目录)。最后,NodeServer 收到 NodePublishVolume 调用,将卷从 staging 目录绑定挂载到 Pod 的目标路径。

三、CSI 插件结构#

一个完整的 CSI 插件通常由两部分组成:Controller 插件和 Node 插件。Controller 插件负责集群级别的存储管理操作,通常以 Deployment 方式部署,支持多副本高可用。Node 插件负责节点级别的存储操作,必须以 DaemonSet 方式部署,确保在每个节点上都有一个实例运行。

除了 CSI 插件本身,Kubernetes 还提供了一系列 Sidecar 容器来处理与 Kubernetes API 的交互,使 CSI 插件开发者可以专注于存储逻辑的实现,而不必关心 Kubernetes 特定的细节。

graph TB subgraph "Controller Pod (Deployment)" direction TB CP["CSI Controller Plugin"] EP["External<br/>Provisioner"] EA["External<br/>Attacher"] ER["External<br/>Resizer"] EP <-->|"gRPC"| CP EA <-->|"gRPC"| CP ER <-->|"gRPC"| CP end subgraph "Node Pod (DaemonSet)" direction TB NP["CSI Node Plugin"] NDR["Node Driver<br/>Registrar"] LP["Liveness<br/>Probe"] NDR -->|"注册"| NP LP -->|"健康检查"| NP end subgraph "Kubernetes 控制平面" API["API Server"] KUBELET["Kubelet"] end subgraph "存储后端" STORAGE["存储系统<br/>Ceph/NFS/Cloud"] end API -->|"监听 PVC"| EP API -->|"监听 VA"| EA API -->|"监听 PVC 扩容"| ER KUBELET <-->|"gRPC"| NP KUBELET -->|"注册"| NDR CP -->|"管理卷"| STORAGE NP -->|"挂载卷"| STORAGE style CP fill:#fff9c4 style NP fill:#fff9c4 style STORAGE fill:#f3e5f5

External Provisioner 是最重要的 Sidecar 之一。它监听 PVC 资源的变化,当发现有 PVC 使用当前 CSI 插件作为 provisioner 且处于 Pending 状态时,它会调用 CSI 插件的 CreateVolume 方法创建卷,然后创建对应的 PV 对象。当 PVC 被删除时,External Provisioner 会调用 DeleteVolume 方法删除卷。

External Attacher 负责卷的附加和分离操作。它监听 VolumeAttachment 资源的变化,当发现 VolumeAttachment 使用当前 CSI 插件时,会调用 ControllerPublishVolume 或 ControllerUnpublishVolume 方法将卷附加到指定节点或从节点分离。

External Resizer 处理卷扩容请求。当 PVC 的存储请求增加时,External Resizer 会调用 ControllerExpandVolume 方法扩展卷的容量,然后更新 PV 的容量信息。

Liveness Probe 用于健康检查。它周期性地调用 CSI 插件的 Probe 方法,如果插件无响应或返回错误,Liveness Probe 会触发插件重启。

Node Driver Registrar 负责向 kubelet 注册 CSI 插件。它通过 Unix domain socket 与 kubelet 通信,将 CSI 插件的信息注册到 kubelet 中,使 kubelet 知道如何调用该插件。

apiVersion: apps/v1
kind: DaemonSet
metadata:
name: csi-node-plugin
namespace: kube-system
spec:
selector:
matchLabels:
app: csi-node-plugin
template:
metadata:
labels:
app: csi-node-plugin
spec:
containers:
- name: node-driver-registrar
image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.8.0
args:
- --v=5
- --csi-address=/csi/csi.sock
- --kubelet-registration-path=/var/lib/kubelet/plugins/csi-example/csi.sock
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: registration-dir
mountPath: /registration
- name: liveness-probe
image: registry.k8s.io/sig-storage/livenessprobe:v2.10.0
args:
- --csi-address=/csi/csi.sock
- --health-port=9898
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: csi-plugin
image: my-csi-plugin:latest
args:
- --nodeid=$(NODE_NAME)
- --endpoint=unix:///csi/csi.sock
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: Bidirectional
volumes:
- name: socket-dir
hostPath:
path: /var/lib/kubelet/plugins/csi-example
type: DirectoryOrCreate
- name: registration-dir
hostPath:
path: /var/lib/kubelet/plugins_registry
type: Directory
- name: mount-dir
hostPath:
path: /var/lib/kubelet/pods
type: Directory

四、动态卷供应机制#

动态卷供应是 Kubernetes 存储系统中最强大的功能之一,它实现了存储资源的按需创建和自动管理。理解其工作原理对于开发 CSI 插件至关重要。

sequenceDiagram participant User as 用户 participant API as API Server participant PVC as PVC Controller participant Prov as External Provisioner participant CSI as CSI Plugin participant Storage as 存储后端 participant Kubelet as Kubelet participant Node as Node Plugin User->>API: 创建 PVC API->>PVC: 监听到 PVC (Pending) PVC->>Prov: 触发供应 Prov->>CSI: CreateVolume gRPC CSI->>Storage: 创建存储卷 Storage-->>CSI: 返回卷 ID CSI-->>Prov: 返回卷信息 Prov->>API: 创建 PV API->>PVC: PV 与 PVC 绑定 Note over User,Node: Pod 调度阶段 User->>API: 创建 Pod API->>Kubelet: Pod 调度到节点 Kubelet->>Node: NodeStageVolume Node->>Storage: 挂载到 staging 目录 Kubelet->>Node: NodePublishVolume Node->>Storage: bind mount 到 Pod Kubelet->>API: Pod 启动成功

当用户创建一个 PVC 并指定 StorageClass 时,动态卷供应的流程如下。首先,PersistentVolumeController 监听到新的 PVC 创建事件,检查 PVC 是否指定了 StorageClass。如果指定了,PersistentVolumeController 会等待对应的 VolumeAttachment 或直接调用 provisioner。

External Provisioner Sidecar 持续监听 PVC 资源。当它发现一个 PVC 使用当前 CSI 插件作为 provisioner 且处于 Pending 状态时,会从 PVC 和 StorageClass 中提取参数,构造 CreateVolumeRequest,然后调用 CSI 插件的 ControllerServer.CreateVolume 方法。

CSI 插件的 CreateVolume 方法需要执行以下操作:解析请求中的参数,包括卷名称、容量、访问模式等;调用底层存储系统的 API 创建存储卷;记录卷的元数据信息,如卷 ID、容量、创建时间等;返回 CreateVolumeResponse,其中包含卷的 ID 和能力信息。

func (s *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
// 参数校验
if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "volume name is required")
}
cap := req.GetCapacityRange()
if cap == nil {
return nil, status.Error(codes.InvalidArgument, "capacity range is required")
}
// 计算请求的容量大小
requestedSize := cap.GetRequiredBytes()
if requestedSize == 0 {
requestedSize = cap.GetLimitBytes()
}
// 从 StorageClass 参数中获取存储配置
params := req.GetParameters()
storageType := params["type"]
// 调用底层存储系统创建卷
volumeID, size, err := s.storageClient.CreateVolume(ctx, req.GetName(), requestedSize, storageType)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to create volume: %v", err)
}
// 返回创建结果
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
CapacityBytes: size,
VolumeContext: map[string]string{
"type": storageType,
},
},
}, nil
}

当 Pod 调度到节点并使用该 PVC 时,AD Controller(AttachDetach Controller)会创建 VolumeAttachment 资源。External Attacher Sidecar 监听到 VolumeAttachment 事件后,调用 CSI 插件的 ControllerPublishVolume 方法将卷附加到目标节点。附加成功后,kubelet 会调用 CSI 插件的 NodeStageVolume 和 NodePublishVolume 方法,将卷挂载到 Pod 的目标路径。

graph TB subgraph "卷挂载流程" subgraph "1. Controller 阶段" AD["AD Controller"] VA["VolumeAttachment"] ATT["External Attacher"] CP["ControllerPublishVolume"] end subgraph "2. Node 阶段" KL["Kubelet"] NS["NodeStageVolume"] NP["NodePublishVolume"] end subgraph "3. 存储系统" STORAGE["存储后端"] STAGING["Staging 目录<br/>/var/lib/kubelet/plugins/..."] TARGET["目标目录<br/>Pod 内 /data"] end end AD -->|"创建"| VA VA -->|"监听"| ATT ATT -->|"gRPC"| CP CP -->|"附加卷"| STORAGE KL -->|"gRPC"| NS NS -->|"挂载"| STAGING KL -->|"gRPC"| NP NP -->|"bind mount"| TARGET STAGING -->|"bind"| TARGET style STORAGE fill:#f3e5f5 style STAGING fill:#fff9c4 style TARGET fill:#c8e6c9

NodeStageVolume 方法的职责是将存储卷挂载到一个临时的 staging 目录。这个设计允许同一个卷被同一个节点上的多个 Pod 共享,因为 staging 操作只需执行一次。NodePublishVolume 方法的职责是将卷从 staging 目录绑定挂载到 Pod 的目标路径。

func (s *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingPath := req.GetStagingTargetPath()
// 参数校验
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
// 获取卷信息
volumeInfo, err := s.getVolumeInfo(volumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get volume info: %v", err)
}
// 创建 staging 目录
if err := os.MkdirAll(stagingPath, 0750); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create staging path: %v", err)
}
// 挂载卷到 staging 目录
if err := s.mountVolume(volumeInfo.DevicePath, stagingPath); err != nil {
return nil, status.Errorf(codes.Internal, "failed to mount volume: %v", err)
}
return &csi.NodeStageVolumeResponse{}, nil
}
func (s *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingPath := req.GetStagingTargetPath()
targetPath := req.GetTargetPath()
// 参数校验
if volumeID == "" || stagingPath == "" || targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID, staging path and target path are required")
}
// 创建目标目录
if err := os.MkdirAll(targetPath, 0750); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create target path: %v", err)
}
// 绑定挂载到目标路径
mountOptions := []string{"bind"}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
}
if err := s.bindMount(stagingPath, targetPath, mountOptions); err != nil {
return nil, status.Errorf(codes.Internal, "failed to bind mount: %v", err)
}
return &csi.NodePublishVolumeResponse{}, nil
}

五、CSI 插件开发实战#

现在我们来开发一个简单的 CSI 插件,实现基于本地目录的存储供应。这个插件会将每个 PVC 映射到宿主机上的一个独立目录,适合用于测试环境或需要高性能本地存储的场景。

首先,需要定义插件的主结构体,包含 gRPC 服务器和必要的配置信息。

package main
import (
"context"
"flag"
"fmt"
"net"
"os"
"path/filepath"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"k8s.io/klog/v2"
)
var (
endpoint = flag.String("endpoint", "unix:///csi/csi.sock", "CSI endpoint")
nodeID = flag.String("nodeid", "", "Node ID")
storageRoot = flag.String("storage-root", "/var/lib/csi-local-storage", "Storage root directory")
)
type Driver struct {
nodeID string
storageRoot string
identityServer *IdentityServer
controllerServer *ControllerServer
nodeServer *NodeServer
}
func NewDriver(nodeID, storageRoot string) *Driver {
return &Driver{
nodeID: nodeID,
storageRoot: storageRoot,
identityServer: &IdentityServer{},
controllerServer: NewControllerServer(storageRoot),
nodeServer: NewNodeServer(nodeID, storageRoot),
}
}
func (d *Driver) Run(endpoint string) error {
// 创建存储根目录
if err := os.MkdirAll(d.storageRoot, 0755); err != nil {
return fmt.Errorf("failed to create storage root: %w", err)
}
// 创建 Unix domain socket 监听器
listener, err := net.Listen("unix", endpoint)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
// 创建 gRPC 服务器
server := grpc.NewServer()
// 注册 CSI 服务
csi.RegisterIdentityServer(server, d.identityServer)
csi.RegisterControllerServer(server, d.controllerServer)
csi.RegisterNodeServer(server, d.nodeServer)
klog.Infof("Starting CSI driver on %s", endpoint)
return server.Serve(listener)
}
func main() {
klog.InitFlags(nil)
flag.Parse()
if *nodeID == "" {
klog.Fatal("nodeid is required")
}
driver := NewDriver(*nodeID, *storageRoot)
if err := driver.Run(*endpoint); err != nil {
klog.Fatalf("Failed to run driver: %v", err)
}
}

接下来实现 IdentityServer,它是最简单的服务,只需返回插件的基本信息。

type IdentityServer struct {
csi.UnimplementedIdentityServer
}
func (s *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
return &csi.GetPluginInfoResponse{
Name: "csi-local.example.com",
VendorVersion: "v1.0.0",
Manifest: map[string]string{
"description": "Local directory CSI driver",
},
}, nil
}
func (s *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
{
Type: &csi.PluginCapability_VolumeExpansion_{
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
Type: csi.PluginCapability_VolumeExpansion_OFFLINE,
},
},
},
},
}, nil
}
func (s *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{Ready: &[]bool{true}[0]}, nil
}
func (s *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
return &csi.GetPluginInfoResponse{
Name: "csi-local.example.com",
VendorVersion: "v1.0.0",
}, nil
}

然后实现 ControllerServer,它负责卷的创建和删除。我们的本地存储实现非常简单:创建卷就是在存储根目录下创建一个子目录。

type ControllerServer struct {
csi.UnimplementedControllerServer
storageRoot string
}
func NewControllerServer(storageRoot string) *ControllerServer {
return &ControllerServer{
storageRoot: storageRoot,
}
}
func (s *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
name := req.GetName()
if name == "" {
return nil, status.Error(codes.InvalidArgument, "volume name is required")
}
// 获取请求的容量大小
capRange := req.GetCapacityRange()
requestedSize := int64(1 << 30) // 默认 1GB
if capRange != nil && capRange.GetRequiredBytes() > 0 {
requestedSize = capRange.GetRequiredBytes()
}
// 创建卷目录
volumePath := filepath.Join(s.storageRoot, name)
if err := os.MkdirAll(volumePath, 0755); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create volume: %v", err)
}
klog.Infof("Created volume %s at %s", name, volumePath)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: name,
CapacityBytes: requestedSize,
VolumeContext: map[string]string{
"path": volumePath,
},
},
}, nil
}
func (s *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
// 删除卷目录
volumePath := filepath.Join(s.storageRoot, volumeID)
if err := os.RemoveAll(volumePath); err != nil && !os.IsNotExist(err) {
return nil, status.Errorf(codes.Internal, "failed to delete volume: %v", err)
}
klog.Infof("Deleted volume %s", volumeID)
return &csi.DeleteVolumeResponse{}, nil
}
func (s *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: []*csi.ControllerServiceCapability{
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
},
},
},
},
}, nil
}

最后实现 NodeServer,它负责将卷挂载到 Pod 中。

type NodeServer struct {
csi.UnimplementedNodeServer
nodeID string
storageRoot string
}
func NewNodeServer(nodeID, storageRoot string) *NodeServer {
return &NodeServer{
nodeID: nodeID,
storageRoot: storageRoot,
}
}
func (s *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
// 对于本地目录存储,不需要 staging 阶段
return &csi.NodeStageVolumeResponse{}, nil
}
func (s *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return &csi.NodeUnstageVolumeResponse{}, nil
}
func (s *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
if volumeID == "" || targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID and target path are required")
}
// 获取卷路径
volumeContext := req.GetVolumeContext()
volumePath, ok := volumeContext["path"]
if !ok {
volumePath = filepath.Join(s.storageRoot, volumeID)
}
// 创建目标目录
if err := os.MkdirAll(targetPath, 0755); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create target path: %v", err)
}
// 绑定挂载
mountOptions := []string{"bind"}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
}
if err := mount(volumePath, targetPath, "none", mountOptions); err != nil {
return nil, status.Errorf(codes.Internal, "failed to mount volume: %v", err)
}
klog.Infof("Published volume %s to %s", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (s *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "target path is required")
}
// 卸载
if err := unmount(targetPath); err != nil && !os.IsNotExist(err) {
return nil, status.Errorf(codes.Internal, "failed to unmount volume: %v", err)
}
klog.Infof("Unpublished volume from %s", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (s *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
},
}, nil
}
func (s *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{
NodeId: s.nodeID,
}, nil
}

完成插件代码后,需要创建 Kubernetes 部署清单。首先是 RBAC 配置,给予 CSI 插件必要的权限。

apiVersion: v1
kind: ServiceAccount
metadata:
name: csi-local-controller
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: csi-local-controller
rules:
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: csi-local-controller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: csi-local-controller
subjects:
- kind: ServiceAccount
name: csi-local-controller
namespace: kube-system

然后是 StorageClass 定义,用户通过指定这个 StorageClass 来使用我们的 CSI 插件。

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: local-storage
provisioner: csi-local.example.com
reclaimPolicy: Delete
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true

最后,可以创建一个 PVC 和 Pod 来测试 CSI 插件是否正常工作。

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: local-pvc
spec:
accessModes:
- ReadWriteOnce
storageClassName: local-storage
resources:
requests:
storage: 1Gi
---
apiVersion: v1
kind: Pod
metadata:
name: test-pod
spec:
containers:
- name: nginx
image: nginx:latest
volumeMounts:
- name: data
mountPath: /data
volumes:
- name: data
persistentVolumeClaim:
claimName: local-pvc

六、CSI 插件完整架构#

下图展示了开发完成的本地存储 CSI 插件的整体架构:

graph TB subgraph "Kubernetes 集群" subgraph "Controller (Deployment)" CSI_C["CSI Controller<br/>local-driver"] PROV["External Provisioner"] PROV <-->|"unix socket"| CSI_C end subgraph "Node (DaemonSet)" CSI_N["CSI Node<br/>local-driver"] REG["Node Driver<br/>Registrar"] LIVE["Liveness Probe"] REG -->|"注册"| CSI_N LIVE -->|"健康检查"| CSI_N end subgraph "存储层" ROOT["/var/lib/csi-local-storage/"] VOL1["volume-1/"] VOL2["volume-2/"] end end subgraph "用户工作负载" POD["Pod"] MOUNT["/data"] end API["API Server"] -->|"PVC 事件"| PROV KUBELET["Kubelet"] -->|"挂载请求"| CSI_N CSI_N -->|"bind mount"| ROOT ROOT --> VOL1 ROOT --> VOL2 VOL1 -->|"绑定"| MOUNT POD --> MOUNT style CSI_C fill:#fff9c4 style CSI_N fill:#fff9c4 style ROOT fill:#c8e6c9 style POD fill:#bbdefb

七、参考资料#


参考#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Kubernetes CSI 存储插件开发
https://blog.souloss.com/posts/kubernetes/k8s-csi/
作者
Souloss
发布于
2023-12-05
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时