一、认识 client-go
学习 Kubernetes 开发的第一步总是从 client-go 的使用开始。首先,可以从 client-go 的包结构入手:
kubernetes:包含用于访问 Kubernetes API 的客户端集。discovery:用于发现 Kubernetes API 服务器支持的 API。dynamic:包含一个动态客户端,可以对任意 Kubernetes API 对象执行通用操作。transport:用于设置身份验证和启动连接。tools/cache:对于编写控制器很有用。
一个 hello world 级别的使用示例:
package main
import ( "context" "fmt" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd")
func main() { // uses the current context in kubeconfig // path-to-kubeconfig -- for example, /root/.kube/config config, _ := clientcmd.BuildConfigFromFlags("", "<path-to-kubeconfig>") // creates the clientset clientset, _ := kubernetes.NewForConfig(config) // access the API to list pods pods, _ := clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{}) fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))}二、获取客户端
2.1 从 Pod 中获取客户端
import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest")func main(){ config, err := rest.InClusterConfig() if err != nil { panic(err.Error()) } // creates the clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // access the API to list pod pods, _ := clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{}) fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))}2.2 从 kubeconfig 获取客户端
当我们在集群外部开发调试时,通常使用 kubeconfig 文件来获取客户端:
import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "path/filepath")
func main() { // 获取 kubeconfig 文件路径 kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
// 使用 kubeconfig 文件构建配置 config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { panic(err.Error()) }
// 创建 clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) }
// 使用 clientset 访问 API pods, err := clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{}) if err != nil { panic(err.Error()) } fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))}clientcmd.BuildConfigFromFlags 函数支持传入 masterURL 和 kubeconfigPath 两个参数。如果 masterURL 为空,则从 kubeconfig 文件中读取集群地址。
2.3 小结
本节介绍了两种获取 Kubernetes 客户端的方式:
- 集群内部:使用
rest.InClusterConfig()从 Pod 中读取 ServiceAccount 凭证 - 集群外部:使用
clientcmd.BuildConfigFromFlags()从 kubeconfig 文件读取配置
两种方式返回的 *rest.Config 对象都可以用于创建 kubernetes.Clientset,进而访问 Kubernetes API。
三、获取具体资源
通过 clientset,可以方便地获取各种 Kubernetes 资源。clientset 提供了类型安全的客户端接口,每个 API 组都有对应的方法:
3.1 获取 Pod
// 获取指定命名空间的 Podpod, err := clientset.CoreV1().Pods("default").Get(context.TODO(), "my-pod", v1.GetOptions{})if err != nil { panic(err.Error())}fmt.Printf("Pod name: %s\n", pod.Name)
// 列出所有命名空间的 Podpods, err := clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{ LabelSelector: "app=myapp",})3.2 获取 Deployment
// 获取 Deploymentdeploy, err := clientset.AppsV1().Deployments("default").Get(context.TODO(), "my-deploy", v1.GetOptions{})if err != nil { panic(err.Error())}fmt.Printf("Deployment replicas: %d\n", *deploy.Spec.Replicas)
// 创建 DeploymentnewDeploy := &appsv1.Deployment{ ObjectMeta: v1.ObjectMeta{ Name: "my-new-deploy", }, Spec: appsv1.DeploymentSpec{ Replicas: pointer.Int32Ptr(3), Selector: &v1.LabelSelector{ MatchLabels: map[string]string{"app": "myapp"}, }, Template: corev1.PodTemplateSpec{ ObjectMeta: v1.ObjectMeta{ Labels: map[string]string{"app": "myapp"}, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "nginx", Image: "nginx:latest", }, }, }, }, },}_, err = clientset.AppsV1().Deployments("default").Create(context.TODO(), newDeploy, v1.CreateOptions{})3.3 获取 Service
// 获取 Servicesvc, err := clientset.CoreV1().Services("default").Get(context.TODO(), "my-svc", v1.GetOptions{})if err != nil { panic(err.Error())}fmt.Printf("Service type: %s\n", svc.Spec.Type)
// 列出所有 Serviceservices, err := clientset.CoreV1().Services("default").List(context.TODO(), v1.ListOptions{})3.4 获取 ConfigMap 和 Secret
// 获取 ConfigMapcm, err := clientset.CoreV1().ConfigMaps("default").Get(context.TODO(), "my-config", v1.GetOptions{})fmt.Printf("ConfigMap data: %v\n", cm.Data)
// 获取 Secretsecret, err := clientset.CoreV1().Secrets("default").Get(context.TODO(), "my-secret", v1.GetOptions{})fmt.Printf("Secret type: %s\n", secret.Type)3.5 使用 Dynamic Client
对于 CRD(Custom Resource Definition)等自定义资源,可以使用 Dynamic Client:
import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic")
func main() { config, _ := clientcmd.BuildConfigFromFlags("", kubeconfig)
// 创建 dynamic client dynamicClient, _ := dynamic.NewForConfig(config)
// 定义 GVR(GroupVersionResource) gvr := schema.GroupVersionResource{ Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions", }
// 获取 CRD 列表 crds, _ := dynamicClient.Resource(gvr).List(context.TODO(), v1.ListOptions{})
for _, crd := range crds.Items { fmt.Printf("CRD: %s\n", crd.GetName()) }}Dynamic Client 返回的是 *unstructured.Unstructured 类型,需要通过 crd.Object["spec"] 等方式访问字段。
四、informer 机制
直接调用 Kubernetes API 获取资源存在几个问题:
- 性能问题:频繁调用 API 会给 apiserver 造成压力
- 数据一致性:无法实时感知资源变化
- 网络开销:每次请求都需要网络通信
Informer 机制通过本地缓存和事件订阅解决了这些问题。它是 client-go 中最核心的设计之一。
4.1 Informer 架构
核心组件说明:
- Reflector:负责从 apiserver 监听资源变化,将事件推送到 DeltaFIFO
- DeltaFIFO:先进先出队列,存储资源变更事件(Added/Updated/Deleted)
- Indexer:本地缓存,提供基于索引的快速查询
- ResourceEventHandler:用户自定义的事件处理回调
4.2 SharedIndexInformer
SharedIndexInformer 是最常用的 Informer 实现,它支持多个 Controller 共享同一个 Informer 实例:
import ( "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache")
func main() { config, _ := clientcmd.BuildConfigFromFlags("", kubeconfig) clientset, _ := kubernetes.NewForConfig(config)
// 创建 SharedInformerFactory informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
// 获取 Pod Informer podInformer := informerFactory.Core().V1().Pods()
// 注册事件处理器 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*corev1.Pod) fmt.Printf("Pod Added: %s/%s\n", pod.Namespace, pod.Name) }, UpdateFunc: func(oldObj, newObj interface{}) { oldPod := oldObj.(*corev1.Pod) newPod := newObj.(*corev1.Pod) fmt.Printf("Pod Updated: %s/%s (resourceVersion: %s -> %s)\n", newPod.Namespace, newPod.Name, oldPod.ResourceVersion, newPod.ResourceVersion) }, DeleteFunc: func(obj interface{}) { pod := obj.(*corev1.Pod) fmt.Printf("Pod Deleted: %s/%s\n", pod.Namespace, pod.Name) }, })
// 启动 Informer(非阻塞) informerFactory.Start(make(chan struct{}))
// 等待缓存同步完成 if !cache.WaitForCacheSync(context.TODO().Done(), podInformer.Informer().HasSynced) { panic("failed to sync cache") }
// 从本地缓存获取 Pod pods, err := podInformer.Lister().Pods("default").List(labels.Everything()) if err != nil { panic(err.Error()) } fmt.Printf("There are %d pods in default namespace\n", len(pods))
// 阻塞主线程 select {}}4.3 Resync 机制
NewSharedInformerFactory 的第二个参数是 resyncPeriod,表示全量同步间隔。当达到间隔时间后,Informer 会触发所有对象的 UpdateFunc,即使对象实际没有变化。
这个机制主要用于:
- 处理可能遗漏的事件(如网络中断)
- 定期触发业务逻辑校验
// 每 10 分钟进行一次全量同步informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute*10)4.4 Indexer 索引
Indexer 提供了基于索引的高效查询能力,避免遍历所有对象:
// 定义索引函数func namespaceIndexFunc(obj interface{}) ([]string, error) { metaObj, ok := obj.(v1.Object) if !ok { return nil, fmt.Errorf("object is not a v1.Object") } return []string{metaObj.GetNamespace()}, nil}
// 创建带有自定义索引的 InformerpodInformer := informerFactory.Core().V1().Pods()podInformer.Informer().AddIndexers(cache.Indexers{ "namespace": namespaceIndexFunc,})
// 通过索引查询pods, err := podInformer.Informer().GetIndexer().ByIndex("namespace", "kube-system")4.5 DeltaFIFO 队列
DeltaFIFO 是一个生产者-消费者队列,存储资源的变更事件:
// Delta 类型定义type Delta struct { Type DeltaType Object interface{}}
// DeltaType 包括const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Sync DeltaType = "Sync" // 来自 resync)DeltaFIFO 保证:
- 同一资源的多个事件会合并
- 事件顺序与 apiserver 返回顺序一致
- 支持去重和合并
五、WorkQueue 机制
在编写 Controller 时,直接在事件回调中处理业务逻辑存在问题:
- 处理失败无法重试
- 无法限速处理
- 无法处理重复事件
WorkQueue 提供了限速队列功能,是 Controller 模式的标准组件。
5.1 基本使用
import ( "k8s.io/client-go/util/workqueue")
func main() { // 创建工作队列 queue := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{ Name: "my-queue", })
// 添加元素 queue.Add("item1") queue.Add("item2")
// 获取元素(阻塞) item, shutdown := queue.Get() if shutdown { return }
// 处理元素 err := processItem(item) if err != nil { // 处理失败,重新入队 queue.AddRateLimited(item) } else { // 处理成功,标记完成 queue.Done(item) }
// 关闭队列 queue.ShutDown()}5.2 限速队列
client-go 提供了三种限速队列实现:
import ( "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr")
// 1. 令牌桶限速rateLimitingQueue := workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{ Name: "rate-limiting-queue", },)
// 2. 指数退避限速(ItemExponentialFailureRateLimiter)// 失败次数越多,等待时间越长:baseDelay * 2^<num-failures>exponentialRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string]( time.Second, // baseDelay time.Minute*5, // maxDelay)
// 3. 组合限速器// 先尝试 ItemFastSlowRateLimiter,再用 ItemExponentialFailureRateLimiterrateLimiter := workqueue.NewTypedMaxOfRateLimiter[string]( workqueue.NewTypedItemFastSlowRateLimiter[string](time.Second, time.Minute, 3), workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, time.Minute*5),)5.3 典型 Controller 模式
结合 Informer 和 WorkQueue 的标准 Controller 模式:
type Controller struct { clientset *kubernetes.Clientset podLister corev1listers.PodLister podsSynced cache.InformerSynced workqueue workqueue.TypedRateLimitingInterface[string]}
func NewController( clientset *kubernetes.Clientset, podInformer cache.SharedIndexInformer,) *Controller { controller := &Controller{ clientset: clientset, podLister: corev1listers.NewPodLister(podInformer.GetIndexer()), podsSynced: podInformer.HasSynced, workqueue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{ Name: "Pods", }, ), }
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueuePod, UpdateFunc: func(old, new interface{}) { controller.enqueuePod(new) }, DeleteFunc: controller.enqueuePod, })
return controller}
func (c *Controller) enqueuePod(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { return } c.workqueue.Add(key)}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer c.workqueue.ShutDown()
// 等待缓存同步 if !cache.WaitForCacheSync(stopCh, c.podsSynced) { return }
// 启动多个 worker for i := 0; i < workers; i++ { go wait.Until(c.runWorker, time.Second, stopCh) }
<-stopCh}
func (c *Controller) runWorker() { for c.processNextWorkItem() { }}
func (c *Controller) processNextWorkItem() bool { key, shutdown := c.workqueue.Get() if shutdown { return false } defer c.workqueue.Done(key)
err := c.syncHandler(key) if err == nil { c.workqueue.Forget(key) return true }
// 处理失败,重新入队 c.workqueue.AddRateLimited(key) return true}
func (c *Controller) syncHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err }
// 从本地缓存获取 Pod pod, err := c.podLister.Pods(namespace).Get(name) if err != nil { return err }
// 业务逻辑处理 fmt.Printf("Syncing pod: %s/%s\n", namespace, name) return nil}六、小结
本文从 client-go 的基础使用入手,逐步深入介绍了 Kubernetes 控制器开发的核心机制:
-
客户端获取:介绍了从集群内部(
InClusterConfig)和外部(kubeconfig)获取客户端的方法 -
资源操作:演示了如何使用 typed client 和 dynamic client 操作 Kubernetes 资源
-
Informer 机制:深入讲解了 Reflector、DeltaFIFO、Indexer 等核心组件的工作原理,以及如何通过事件订阅实现高效的资源监听
-
WorkQueue 机制:介绍了限速队列的使用方法,以及标准的 Controller 编程模式
这些机制是编写 Kubernetes Controller 的基础,理解它们对于开发自定义 Operator 至关重要。在实际项目中,通常会使用 Kubebuilder 或 Operator SDK 等工具来简化开发,但底层原理依然相通。
七、参考资料
- document
- project
参考
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






