mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1184 字
3 分钟
kubernetes client-go 学习
2024-01-11

一、认识 client-go#

学习 Kubernetes 开发的第一步总是从 client-go 的使用开始。首先,可以从 client-go 的包结构入手:

  • kubernetes:包含用于访问 Kubernetes API 的客户端集。
  • discovery:用于发现 Kubernetes API 服务器支持的 API。
  • dynamic:包含一个动态客户端,可以对任意 Kubernetes API 对象执行通用操作。
  • transport:用于设置身份验证和启动连接。
  • tools/cache:对于编写控制器很有用。

一个 hello world 级别的使用示例:

package main
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// uses the current context in kubeconfig
// path-to-kubeconfig -- for example, /root/.kube/config
config, _ := clientcmd.BuildConfigFromFlags("", "<path-to-kubeconfig>")
// creates the clientset
clientset, _ := kubernetes.NewForConfig(config)
// access the API to list pods
pods, _ := clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{})
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
}

二、获取客户端#

2.1 从 Pod 中获取客户端#

import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func main(){
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// access the API to list pod
pods, _ := clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{})
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
}

2.2 从 kubeconfig 获取客户端#

当我们在集群外部开发调试时,通常使用 kubeconfig 文件来获取客户端:

import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
)
func main() {
// 获取 kubeconfig 文件路径
kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
// 使用 kubeconfig 文件构建配置
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
// 创建 clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 使用 clientset 访问 API
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
}

clientcmd.BuildConfigFromFlags 函数支持传入 masterURLkubeconfigPath 两个参数。如果 masterURL 为空,则从 kubeconfig 文件中读取集群地址。

2.3 小结#

本节介绍了两种获取 Kubernetes 客户端的方式:

  1. 集群内部:使用 rest.InClusterConfig() 从 Pod 中读取 ServiceAccount 凭证
  2. 集群外部:使用 clientcmd.BuildConfigFromFlags() 从 kubeconfig 文件读取配置

两种方式返回的 *rest.Config 对象都可以用于创建 kubernetes.Clientset,进而访问 Kubernetes API。

三、获取具体资源#

通过 clientset,可以方便地获取各种 Kubernetes 资源。clientset 提供了类型安全的客户端接口,每个 API 组都有对应的方法:

3.1 获取 Pod#

// 获取指定命名空间的 Pod
pod, err := clientset.CoreV1().Pods("default").Get(context.TODO(), "my-pod", v1.GetOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("Pod name: %s\n", pod.Name)
// 列出所有命名空间的 Pod
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{
LabelSelector: "app=myapp",
})

3.2 获取 Deployment#

// 获取 Deployment
deploy, err := clientset.AppsV1().Deployments("default").Get(context.TODO(), "my-deploy", v1.GetOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("Deployment replicas: %d\n", *deploy.Spec.Replicas)
// 创建 Deployment
newDeploy := &appsv1.Deployment{
ObjectMeta: v1.ObjectMeta{
Name: "my-new-deploy",
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32Ptr(3),
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{"app": "myapp"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{"app": "myapp"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
},
}
_, err = clientset.AppsV1().Deployments("default").Create(context.TODO(), newDeploy, v1.CreateOptions{})

3.3 获取 Service#

// 获取 Service
svc, err := clientset.CoreV1().Services("default").Get(context.TODO(), "my-svc", v1.GetOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("Service type: %s\n", svc.Spec.Type)
// 列出所有 Service
services, err := clientset.CoreV1().Services("default").List(context.TODO(), v1.ListOptions{})

3.4 获取 ConfigMap 和 Secret#

// 获取 ConfigMap
cm, err := clientset.CoreV1().ConfigMaps("default").Get(context.TODO(), "my-config", v1.GetOptions{})
fmt.Printf("ConfigMap data: %v\n", cm.Data)
// 获取 Secret
secret, err := clientset.CoreV1().Secrets("default").Get(context.TODO(), "my-secret", v1.GetOptions{})
fmt.Printf("Secret type: %s\n", secret.Type)

3.5 使用 Dynamic Client#

对于 CRD(Custom Resource Definition)等自定义资源,可以使用 Dynamic Client:

import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)
func main() {
config, _ := clientcmd.BuildConfigFromFlags("", kubeconfig)
// 创建 dynamic client
dynamicClient, _ := dynamic.NewForConfig(config)
// 定义 GVR(GroupVersionResource)
gvr := schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
}
// 获取 CRD 列表
crds, _ := dynamicClient.Resource(gvr).List(context.TODO(), v1.ListOptions{})
for _, crd := range crds.Items {
fmt.Printf("CRD: %s\n", crd.GetName())
}
}

Dynamic Client 返回的是 *unstructured.Unstructured 类型,需要通过 crd.Object["spec"] 等方式访问字段。

四、informer 机制#

直接调用 Kubernetes API 获取资源存在几个问题:

  1. 性能问题:频繁调用 API 会给 apiserver 造成压力
  2. 数据一致性:无法实时感知资源变化
  3. 网络开销:每次请求都需要网络通信

Informer 机制通过本地缓存事件订阅解决了这些问题。它是 client-go 中最核心的设计之一。

4.1 Informer 架构#

graph TD A[Reflector] -->|Watch/List| B[DeltaFIFO] B -->|Pop| C[SharedIndexInformer] C -->|Add/Update/Delete| D[Indexer 本地缓存] C -->|分发事件| E[ResourceEventHandler] D -->|Get/List| F[Controller 业务逻辑]

核心组件说明:

  • Reflector:负责从 apiserver 监听资源变化,将事件推送到 DeltaFIFO
  • DeltaFIFO:先进先出队列,存储资源变更事件(Added/Updated/Deleted)
  • Indexer:本地缓存,提供基于索引的快速查询
  • ResourceEventHandler:用户自定义的事件处理回调

4.2 SharedIndexInformer#

SharedIndexInformer 是最常用的 Informer 实现,它支持多个 Controller 共享同一个 Informer 实例:

import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func main() {
config, _ := clientcmd.BuildConfigFromFlags("", kubeconfig)
clientset, _ := kubernetes.NewForConfig(config)
// 创建 SharedInformerFactory
informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
// 获取 Pod Informer
podInformer := informerFactory.Core().V1().Pods()
// 注册事件处理器
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod Added: %s/%s\n", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
fmt.Printf("Pod Updated: %s/%s (resourceVersion: %s -> %s)\n",
newPod.Namespace, newPod.Name,
oldPod.ResourceVersion, newPod.ResourceVersion)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod Deleted: %s/%s\n", pod.Namespace, pod.Name)
},
})
// 启动 Informer(非阻塞)
informerFactory.Start(make(chan struct{}))
// 等待缓存同步完成
if !cache.WaitForCacheSync(context.TODO().Done(), podInformer.Informer().HasSynced) {
panic("failed to sync cache")
}
// 从本地缓存获取 Pod
pods, err := podInformer.Lister().Pods("default").List(labels.Everything())
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in default namespace\n", len(pods))
// 阻塞主线程
select {}
}

4.3 Resync 机制#

NewSharedInformerFactory 的第二个参数是 resyncPeriod,表示全量同步间隔。当达到间隔时间后,Informer 会触发所有对象的 UpdateFunc,即使对象实际没有变化。

这个机制主要用于:

  1. 处理可能遗漏的事件(如网络中断)
  2. 定期触发业务逻辑校验
// 每 10 分钟进行一次全量同步
informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute*10)

4.4 Indexer 索引#

Indexer 提供了基于索引的高效查询能力,避免遍历所有对象:

// 定义索引函数
func namespaceIndexFunc(obj interface{}) ([]string, error) {
metaObj, ok := obj.(v1.Object)
if !ok {
return nil, fmt.Errorf("object is not a v1.Object")
}
return []string{metaObj.GetNamespace()}, nil
}
// 创建带有自定义索引的 Informer
podInformer := informerFactory.Core().V1().Pods()
podInformer.Informer().AddIndexers(cache.Indexers{
"namespace": namespaceIndexFunc,
})
// 通过索引查询
pods, err := podInformer.Informer().GetIndexer().ByIndex("namespace", "kube-system")

4.5 DeltaFIFO 队列#

DeltaFIFO 是一个生产者-消费者队列,存储资源的变更事件:

// Delta 类型定义
type Delta struct {
Type DeltaType
Object interface{}
}
// DeltaType 包括
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Sync DeltaType = "Sync" // 来自 resync
)

DeltaFIFO 保证:

  • 同一资源的多个事件会合并
  • 事件顺序与 apiserver 返回顺序一致
  • 支持去重和合并

五、WorkQueue 机制#

在编写 Controller 时,直接在事件回调中处理业务逻辑存在问题:

  1. 处理失败无法重试
  2. 无法限速处理
  3. 无法处理重复事件

WorkQueue 提供了限速队列功能,是 Controller 模式的标准组件。

5.1 基本使用#

import (
"k8s.io/client-go/util/workqueue"
)
func main() {
// 创建工作队列
queue := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{
Name: "my-queue",
})
// 添加元素
queue.Add("item1")
queue.Add("item2")
// 获取元素(阻塞)
item, shutdown := queue.Get()
if shutdown {
return
}
// 处理元素
err := processItem(item)
if err != nil {
// 处理失败,重新入队
queue.AddRateLimited(item)
} else {
// 处理成功,标记完成
queue.Done(item)
}
// 关闭队列
queue.ShutDown()
}

5.2 限速队列#

client-go 提供了三种限速队列实现:

import (
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
)
// 1. 令牌桶限速
rateLimitingQueue := workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "rate-limiting-queue",
},
)
// 2. 指数退避限速(ItemExponentialFailureRateLimiter)
// 失败次数越多,等待时间越长:baseDelay * 2^<num-failures>
exponentialRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](
time.Second, // baseDelay
time.Minute*5, // maxDelay
)
// 3. 组合限速器
// 先尝试 ItemFastSlowRateLimiter,再用 ItemExponentialFailureRateLimiter
rateLimiter := workqueue.NewTypedMaxOfRateLimiter[string](
workqueue.NewTypedItemFastSlowRateLimiter[string](time.Second, time.Minute, 3),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, time.Minute*5),
)

5.3 典型 Controller 模式#

结合 Informer 和 WorkQueue 的标准 Controller 模式:

type Controller struct {
clientset *kubernetes.Clientset
podLister corev1listers.PodLister
podsSynced cache.InformerSynced
workqueue workqueue.TypedRateLimitingInterface[string]
}
func NewController(
clientset *kubernetes.Clientset,
podInformer cache.SharedIndexInformer,
) *Controller {
controller := &Controller{
clientset: clientset,
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
podsSynced: podInformer.HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "Pods",
},
),
}
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueuePod,
UpdateFunc: func(old, new interface{}) {
controller.enqueuePod(new)
},
DeleteFunc: controller.enqueuePod,
})
return controller
}
func (c *Controller) enqueuePod(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return
}
c.workqueue.Add(key)
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer c.workqueue.ShutDown()
// 等待缓存同步
if !cache.WaitForCacheSync(stopCh, c.podsSynced) {
return
}
// 启动多个 worker
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
key, shutdown := c.workqueue.Get()
if shutdown {
return false
}
defer c.workqueue.Done(key)
err := c.syncHandler(key)
if err == nil {
c.workqueue.Forget(key)
return true
}
// 处理失败,重新入队
c.workqueue.AddRateLimited(key)
return true
}
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 从本地缓存获取 Pod
pod, err := c.podLister.Pods(namespace).Get(name)
if err != nil {
return err
}
// 业务逻辑处理
fmt.Printf("Syncing pod: %s/%s\n", namespace, name)
return nil
}

六、小结#

本文从 client-go 的基础使用入手,逐步深入介绍了 Kubernetes 控制器开发的核心机制:

  1. 客户端获取:介绍了从集群内部(InClusterConfig)和外部(kubeconfig)获取客户端的方法

  2. 资源操作:演示了如何使用 typed client 和 dynamic client 操作 Kubernetes 资源

  3. Informer 机制:深入讲解了 Reflector、DeltaFIFO、Indexer 等核心组件的工作原理,以及如何通过事件订阅实现高效的资源监听

  4. WorkQueue 机制:介绍了限速队列的使用方法,以及标准的 Controller 编程模式

这些机制是编写 Kubernetes Controller 的基础,理解它们对于开发自定义 Operator 至关重要。在实际项目中,通常会使用 KubebuilderOperator SDK 等工具来简化开发,但底层原理依然相通。

七、参考资料#


参考#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

kubernetes client-go 学习
https://blog.souloss.com/posts/kubernetes/k8s-client-go/
作者
Souloss
发布于
2024-01-11
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时