如何创建多个工作人员以从工作队列中获取密钥并处理它们的一些业务逻辑?

How to create multiple workers to fetch the keys from the workqueue and process some business logic over them?

我是 Golang 和 Kubernetes 的新手。我尝试使用 client-go 库在 golang 中创建自定义控制器。控制器与 K8s Api 服务器连接,将 pods 详细信息放入缓存并将其发送到工作队列,我在工作队列中对 pods 执行一些操作。但是我希望这个过程更快,为此我需要创建多个工人。如何创建多个可以作用于同一个工作队列并提高代码速度的工作器?

下面是我的控制器示例:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "github.com/golang/glog"

    "k8s.io/apimachinery/pkg/watch"

    v1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

    rs "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)

type Controller struct {
    clientset kubernetes.Interface
    queue     workqueue.RateLimitingInterface
    informer  cache.SharedIndexInformer
}

var (
    //used the config file
    kubeconfig = flag.String("kubeconfig", "location", "absolute path to the kubeconfig file")
)

// Creating the SharedIndexInformer to bring the details into  the cache
func CreateSharedIndexInformer() {
    flag.Parse()
    //creating config using the kubeconfig file
    configuration, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err.Error())
        fmt.Println("Unable to find the file")
    }

    cs, err := kubernetes.NewForConfig(configuration)

    //Creating the queue
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    pods, err := cs.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
    
   //Creating the SharedIndexInformer 
    informer := cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (rs.Object, error) {
                return cs.CoreV1().Pods("").List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                return cs.CoreV1().Pods("").Watch(context.TODO(), options)
            },
        },
        &v1.Pod{},
        time.Second*10, //Skip resync
        cache.Indexers{},
    )
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
                if err == nil {
                    queue.Add(key)
                }
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    })

    controller := &Controller{
        clientset: cs,
        queue:     queue,
        informer:  informer,
    }
    stop := make(chan struct{})
    go controller.Run(stop)

    // Wait forever
    select {}
}


func (c *Controller) Run(stopCh chan struct{}) {

    // don't let panics crash the process
    defer runtime.HandleCrash()
    // make sure the work queue is shutdown which will trigger workers to end
    defer c.queue.ShutDown()

    //c.logger.Info("Starting kubewatch controller")

    go c.informer.Run(stopCh)

    // wait for the caches to synchronize before starting the worker
    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    //c.logger.Info("Kubewatch controller synced and ready")

    // runWorker will loop until "something bad" happens.  The .Until will
    // then rekick the worker after one second
    go wait.Until(c.runWorker, time.Second, stopCh)
    <-stopCh
}

func (c *Controller) runWorker() {
    // processNextWorkItem will automatically wait until there's work available
    for c.processNextItem() {
        // continue looping
    }
}

// processNextWorkItem deals with one key off the queue.  It returns false
// when it's time to quit.

func (c *Controller) processNextItem() bool {
    // pull the next work item from queue.  It should be a key we use to lookup
    // something in a cache
    key, quit := c.queue.Get()
    if quit {
        return false
    }

    // you always have to indicate to the queue that you've completed a piece of
    // work
    defer c.queue.Done(key)
    var obj string
    var ok bool
    if obj, ok = key.(string); !ok {
        // As the item in the workqueue is actually invalid, we call
        // Forget here else we'd go into a loop of attempting to
        // process a work item that is invalid.
        c.queue.Forget(key)
        runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))

    }

    // do your work on the key.
    err := c.processBusinessLogic(key.(string))

    if err == nil {
        // No error, tell the queue to stop tracking history
        c.queue.Forget(key)
    } else if c.queue.NumRequeues(key) < 10 {
        //c.logger.Errorf("Error processing %s (will retry): %v", key, err)
        // requeue the item to work on later
        c.queue.AddRateLimited(key)
    } else {
        // err != nil and too many retries
        //c.logger.Errorf("Error processing %s (giving up): %v", key, err)
        c.queue.Forget(key)
        runtime.HandleError(err)
    }

    return true
}

func (c *Controller) processBusinessLogic(key string) error {
    obj, exists, err := c.informer.GetIndexer().GetByKey(key)
    if err != nil {
        glog.Errorf("Fetching object with key %s from store failed with %v", key, err)
        return err
    }   

    if !exists {
        // Below we will warm up our cache with a Pod, so that we will see a delete for one pod
        fmt.Printf("Pod %s does not exist anymore\n", key)
    } else {
              //Perform some business logic over the pods or Deployment

        // Note that you also have to check the uid if you have a local controlled resource, which
        // is dependent on the actual instance, to detect that a Pod was recreated with the same name
        fmt.Printf("Add event for Pod %s\n", obj.(*v1.Pod).GetName())
    }
        }

    }

    return nil
}

func (c *Controller) handleErr(err error, key interface{}) {
    glog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

func main() {
    CreateSharedIndexInformer()
}

您可以在 Run 函数中添加更多工人,如下所示:

func (c *Controller) Run(stopCh chan struct{}) {
    ...
    // runWorker will loop until "something bad" happens.  The .Until will
    // then rekick the worker after one second
    for i := 0; i < 5; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    <-stopCh
}