Client-go 实现一个 Watch 客户端 电脑版发表于:2025/11/18 17:33  >#Client-go 实现一个 Watch 客户端 [TOC] 概述 ------------ tn2>Watch 指的是持续监听特定的资源变化,包括增删改查 使用 ClientSet 提供的Watch 方法监听事件 一旦产生事件,则可以执行对应的业务逻辑  ### 代码示例 tn2>我们对正在创建的Pod进行监听,并且输出创建的Pod名称,代码如下: ```go package main import ( "context" "fmt" "flag" "path/filepath" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" ) func main() { // 加载 kubeconfig 配置 var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径") } else { kubeconfig = flag.String("kubeconfig", "", "kubeconfig 绝对路径") } config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } clientset, _ := kubernetes.NewForConfig(config) timeOut := int64(60) watcher, _ := clientset.CoreV1().Pods("default").Watch(context.Background(), metav1.ListOptions{TimeoutSeconds: &timeOut}) for event := range watcher.ResultChan() { item := event.Object.(*corev1.Pod) switch event.Type { case watch.Added: processPod(item.GetName()) case watch.Modified: case watch.Bookmark: case watch.Error: case watch.Deleted: } } } func processPod(name string) { fmt.Println("new Pod : ", name) } ``` ```bash kubectl run nginx$RANDOM --image=nginx ```  tn2>我们发现已经检测到了。  ### 为什么不推荐直接使用Watch tn2>处理 Watch 超时、断开重连等情况的处理比较复杂. Watch 机制直接请求 K8s API Server,增加了集群负载 对于希望对多个资源 Watch 时需创建单独的连接而无法共享,增加资源消耗和集群负载 重连可能会导致事件丢失 大量事件产生时无限流逻辑,可能导致业务过载崩溃 业务获得事件信号后,如果处理失败,没有第二次处理机会 使用 Informer 代替 ------------ tn2>基于 Watch 实现,提供更高层次的抽象,更简单、安全、高性能 自动处理超时和重连机制 本地缓存机制,无需频繁调用 API Server 内置全量和增量同步机制,确保事件不丢失 可结合 Rate Limiting 和延迟队列,控制事件处理速率,避免业务过载,同时支持错误重试 tn2>使用 SharedInformerFactory 创建一个共享的 Informer 实例 减少网络和资源消耗,减轻 K8s API 负载 ### 代码实践 tn2>代码主要是通过创建一个Informer 实例对Service和Deployment进行一个监听。 ```go package main import ( "flag" "fmt" "path/filepath" "time" v1 "k8s.io/api/apps/v1" v1core "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" ) func main() { var err error var config *rest.Config // 加载 kubeconfig 配置 var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径") } else { kubeconfig = flag.String("kubeconfig", "", "kubeconfig 绝对路径") } // 初始化 rest.Config 对象 if config, err = rest.InClusterConfig(); err != nil { if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } // 创建 Clientset 对象 clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 初始化 informer informerFactory := informers.NewSharedInformerFactory(clientset, time.Hour*12) // 对 Deployment 监听 deployInformer := informerFactory.Apps().V1().Deployments() informer := deployInformer.Informer() // Lister 实际上就是本地缓存,他从 Indexer 里取数据 deployLister := deployInformer.Lister() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAddDeployment, UpdateFunc: onUpdateDeployment, DeleteFunc: onDeleteDeployment, }) // 对 Service 监听 serviceInformer := informerFactory.Core().V1().Services() serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAddService, UpdateFunc: onUpdateService, DeleteFunc: onDeleteService, }) stopper := make(chan struct{}) defer close(stopper) // 启动 informer,List & Watch informerFactory.Start(stopper) // 等待所有启动的 Informer 的缓存被同步 informerFactory.WaitForCacheSync(stopper) // Lister,从本地缓存中获取 default 中的所有 deployment 列表,最终从 Indexer 取数据 deployments, err := deployLister.Deployments("default").List(labels.Everything()) if err != nil { panic(err) } for idx, deploy := range deployments { fmt.Printf("%d -> %s\n", idx+1, deploy.Name) } // 阻塞主 goroutine <-stopper } func onAddDeployment(obj interface{}) { deploy := obj.(*v1.Deployment) fmt.Println("add a deployment:", deploy.Name) } func onUpdateDeployment(old, new interface{}) { oldDeploy := old.(*v1.Deployment) newDeploy := new.(*v1.Deployment) fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name) } func onDeleteDeployment(obj interface{}) { deploy := obj.(*v1.Deployment) fmt.Println("delete a deployment:", deploy.Name) } func onAddService(obj interface{}) { service := obj.(*v1core.Service) fmt.Println("add a service:", service.Name) } func onUpdateService(old, new interface{}) { oldService := old.(*v1core.Service) newService := new.(*v1core.Service) fmt.Println("update service:", oldService.Name, newService.Name) } func onDeleteService(obj interface{}) { service := obj.(*v1core.Service) fmt.Println("delete a service:", service.Name) } ``` tn2>运行后,创建deployment看能不能检测到。 ```bash kubectl create deployment nginx-deploy --image=nginx ```  tn2>我们发现已经检查到了。 引入 RateLimitingQueue ------------ tn2>Q:上一个例子的 EventHandler 业务逻辑处理失败怎么办? A:基于事件驱动,没有二次处理机会 引入 WorkQueue(RateLimitingQueue) 处理业务逻辑:错误重试、防止 Hot Loop(过载) tn>简单来讲: Informer 是“快嘴广播”:一有变动立刻告诉你。 RateLimitingQueue 是“限流排队”:把要干的活按节奏重试,防止冲垮。 ### 示例代码 tn2>我们还是将去监听Deployment的增删改的事件,当我们部署`test-deployment`我们让它抛出异常让它5次内都放回队列,超过后我们就不进行处理了。 ```go package main import ( "flag" "fmt" "path/filepath" "time" v1 "k8s.io/api/apps/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "k8s.io/client-go/util/workqueue" ) // 这两个先定义 type Controller struct { indexer cache.Indexer queue workqueue.TypedRateLimitingInterface[string] informer cache.Controller } func NewController(queue workqueue.TypedRateLimitingInterface[string], indexer cache.Indexer, informer cache.Controller) *Controller { return &Controller{ informer: informer, indexer: indexer, queue: queue, } } // 然后定义 main // 处理下一个 func (c *Controller) processNextItem() bool { key, quit := c.queue.Get() if quit { return false } // 我拿到 key 了,先跟队列说声‘正在处理’ defer c.queue.Done(key) err := c.syncToStdout(key) c.handleErr(err, key) return true } // 输出日志 func (c *Controller) syncToStdout(key string) error { // 通过 key 从 indexer 中获取完整的对象 obj, exists, err := c.indexer.GetByKey(key) if err != nil { fmt.Printf("Fetching object with key %s from store failed with %v\n", key, err) return err } if !exists { fmt.Printf("Deployment %s does not exist anymore\n", key) } else { deployment := obj.(*v1.Deployment) fmt.Printf("Sync/Add/Update for Deployment %s, Replicas: %d\n", deployment.Name, *deployment.Spec.Replicas) if deployment.Name == "test-deployment" { time.Sleep(2 * time.Second) return fmt.Errorf("simulated error for deployment %s", deployment.Name) } } return nil } // 错误处理 func (c *Controller) handleErr(err error, key string) { if err == nil { c.queue.Forget(key) return } // 这个 key 已经被重新塞进队列多少次了 if c.queue.NumRequeues(key) < 5 { fmt.Printf("Retry %d for key %s\n", c.queue.NumRequeues(key), key) // 重新加入队列,并且进行速率限制,这会让他过一段时间才会被处理,避免过度重试 失败次数越多,等待越久 c.queue.AddRateLimited(key) return } // 删除这个key c.queue.Forget(key) fmt.Printf("Dropping deployment %q out of the queue: %v\n", key, err) } func main() { var err error var config *rest.Config var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径") } else { kubeconfig = flag.String("kubeconfig", "", "kubeconfig 绝对路径") } // 初始化 rest.Config 对象 if config, err = rest.InClusterConfig(); err != nil { if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } // 创建 Clientset 对象 clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 初始化 informer factory informerFactory := informers.NewSharedInformerFactory(clientset, time.Hour*12) // 创建速率限制队列 queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) // 对 Deployment 监听 deployInformer := informerFactory.Apps().V1().Deployments() informer := deployInformer.Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { onAddDeployment(obj, queue) }, UpdateFunc: func(old, new interface{}) { onUpdateDeployment(new, queue) }, DeleteFunc: func(obj interface{}) { onDeleteDeployment(obj, queue) }, }) controller := NewController(queue, deployInformer.Informer().GetIndexer(), informer) stopper := make(chan struct{}) defer close(stopper) // 启动 informer,List & Watch informerFactory.Start(stopper) informerFactory.WaitForCacheSync(stopper) // 处理队列中的事件 go func() { for { if !controller.processNextItem() { break } } }() <-stopper } func onAddDeployment(obj interface{}, queue workqueue.TypedRateLimitingInterface[string]) { // 生成 key key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } } func onUpdateDeployment(new interface{}, queue workqueue.TypedRateLimitingInterface[string]) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { queue.Add(key) } } func onDeleteDeployment(obj interface{}, queue workqueue.TypedRateLimitingInterface[string]) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } } ``` tn2>我们进行运行测试一下,运行好后打开新的窗口创建`test-deployment`的Deployment. ```bash kubectl create deployment test-deployment --image=nginx ```  tn2>我们可以看到它已经检测到了。 