如何创建多个工作人员以从工作队列中获取密钥并处理它们的一些业务逻辑?
How to create multiple workers to fetch the keys from the workqueue and process some business logic over them?
我是 Golang 和 Kubernetes 的新手。我尝试使用 client-go 库在 golang 中创建自定义控制器。控制器与 K8s Api 服务器连接,将 pods 详细信息放入缓存并将其发送到工作队列,我在工作队列中对 pods 执行一些操作。但是我希望这个过程更快,为此我需要创建多个工人。如何创建多个可以作用于同一个工作队列并提高代码速度的工作器?
下面是我的控制器示例:
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/watch"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
rs "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
}
var (
//used the config file
kubeconfig = flag.String("kubeconfig", "location", "absolute path to the kubeconfig file")
)
// Creating the SharedIndexInformer to bring the details into the cache
func CreateSharedIndexInformer() {
flag.Parse()
//creating config using the kubeconfig file
configuration, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
fmt.Println("Unable to find the file")
}
cs, err := kubernetes.NewForConfig(configuration)
//Creating the queue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
pods, err := cs.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
//Creating the SharedIndexInformer
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (rs.Object, error) {
return cs.CoreV1().Pods("").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cs.CoreV1().Pods("").Watch(context.TODO(), options)
},
},
&v1.Pod{},
time.Second*10, //Skip resync
cache.Indexers{},
)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
controller := &Controller{
clientset: cs,
queue: queue,
informer: informer,
}
stop := make(chan struct{})
go controller.Run(stop)
// Wait forever
select {}
}
func (c *Controller) Run(stopCh chan struct{}) {
// don't let panics crash the process
defer runtime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
//c.logger.Info("Starting kubewatch controller")
go c.informer.Run(stopCh)
// wait for the caches to synchronize before starting the worker
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
//c.logger.Info("Kubewatch controller synced and ready")
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
}
func (c *Controller) runWorker() {
// processNextWorkItem will automatically wait until there's work available
for c.processNextItem() {
// continue looping
}
}
// processNextWorkItem deals with one key off the queue. It returns false
// when it's time to quit.
func (c *Controller) processNextItem() bool {
// pull the next work item from queue. It should be a key we use to lookup
// something in a cache
key, quit := c.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of
// work
defer c.queue.Done(key)
var obj string
var ok bool
if obj, ok = key.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.queue.Forget(key)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
}
// do your work on the key.
err := c.processBusinessLogic(key.(string))
if err == nil {
// No error, tell the queue to stop tracking history
c.queue.Forget(key)
} else if c.queue.NumRequeues(key) < 10 {
//c.logger.Errorf("Error processing %s (will retry): %v", key, err)
// requeue the item to work on later
c.queue.AddRateLimited(key)
} else {
// err != nil and too many retries
//c.logger.Errorf("Error processing %s (giving up): %v", key, err)
c.queue.Forget(key)
runtime.HandleError(err)
}
return true
}
func (c *Controller) processBusinessLogic(key string) error {
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
glog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
fmt.Printf("Pod %s does not exist anymore\n", key)
} else {
//Perform some business logic over the pods or Deployment
// Note that you also have to check the uid if you have a local controlled resource, which
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
fmt.Printf("Add event for Pod %s\n", obj.(*v1.Pod).GetName())
}
}
}
return nil
}
func (c *Controller) handleErr(err error, key interface{}) {
glog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
func main() {
CreateSharedIndexInformer()
}
您可以在 Run
函数中添加更多工人,如下所示:
func (c *Controller) Run(stopCh chan struct{}) {
...
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
for i := 0; i < 5; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}
我是 Golang 和 Kubernetes 的新手。我尝试使用 client-go 库在 golang 中创建自定义控制器。控制器与 K8s Api 服务器连接,将 pods 详细信息放入缓存并将其发送到工作队列,我在工作队列中对 pods 执行一些操作。但是我希望这个过程更快,为此我需要创建多个工人。如何创建多个可以作用于同一个工作队列并提高代码速度的工作器?
下面是我的控制器示例:
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/watch"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
rs "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
}
var (
//used the config file
kubeconfig = flag.String("kubeconfig", "location", "absolute path to the kubeconfig file")
)
// Creating the SharedIndexInformer to bring the details into the cache
func CreateSharedIndexInformer() {
flag.Parse()
//creating config using the kubeconfig file
configuration, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
fmt.Println("Unable to find the file")
}
cs, err := kubernetes.NewForConfig(configuration)
//Creating the queue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
pods, err := cs.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
//Creating the SharedIndexInformer
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (rs.Object, error) {
return cs.CoreV1().Pods("").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cs.CoreV1().Pods("").Watch(context.TODO(), options)
},
},
&v1.Pod{},
time.Second*10, //Skip resync
cache.Indexers{},
)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
controller := &Controller{
clientset: cs,
queue: queue,
informer: informer,
}
stop := make(chan struct{})
go controller.Run(stop)
// Wait forever
select {}
}
func (c *Controller) Run(stopCh chan struct{}) {
// don't let panics crash the process
defer runtime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
//c.logger.Info("Starting kubewatch controller")
go c.informer.Run(stopCh)
// wait for the caches to synchronize before starting the worker
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
//c.logger.Info("Kubewatch controller synced and ready")
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
}
func (c *Controller) runWorker() {
// processNextWorkItem will automatically wait until there's work available
for c.processNextItem() {
// continue looping
}
}
// processNextWorkItem deals with one key off the queue. It returns false
// when it's time to quit.
func (c *Controller) processNextItem() bool {
// pull the next work item from queue. It should be a key we use to lookup
// something in a cache
key, quit := c.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of
// work
defer c.queue.Done(key)
var obj string
var ok bool
if obj, ok = key.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.queue.Forget(key)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
}
// do your work on the key.
err := c.processBusinessLogic(key.(string))
if err == nil {
// No error, tell the queue to stop tracking history
c.queue.Forget(key)
} else if c.queue.NumRequeues(key) < 10 {
//c.logger.Errorf("Error processing %s (will retry): %v", key, err)
// requeue the item to work on later
c.queue.AddRateLimited(key)
} else {
// err != nil and too many retries
//c.logger.Errorf("Error processing %s (giving up): %v", key, err)
c.queue.Forget(key)
runtime.HandleError(err)
}
return true
}
func (c *Controller) processBusinessLogic(key string) error {
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
glog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
fmt.Printf("Pod %s does not exist anymore\n", key)
} else {
//Perform some business logic over the pods or Deployment
// Note that you also have to check the uid if you have a local controlled resource, which
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
fmt.Printf("Add event for Pod %s\n", obj.(*v1.Pod).GetName())
}
}
}
return nil
}
func (c *Controller) handleErr(err error, key interface{}) {
glog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
func main() {
CreateSharedIndexInformer()
}
您可以在 Run
函数中添加更多工人,如下所示:
func (c *Controller) Run(stopCh chan struct{}) {
...
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
for i := 0; i < 5; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}