监控 kubernetes 作业

Monitoring a kubernetes job

我的 kubernetes 作业需要不同的时间才能完成。 4 到 8 分钟之间。有什么方法可以让我知道工作何时完成,而不是在最坏的情况下等待 8 分钟。我有一个执行以下操作的测试用例:

1) Submits the kubernetes job.
2) Waits for its completion.
3) Checks whether the job has had the expected affect.

问题是在我的 java 测试中提交了 kubernetes 中的部署作业,我正在等待 8 分钟,即使作业完成时间少于 8 分钟,因为我没有办法从 java 测试监视作业的状态。

您没有提到实际检查作业完成的是什么,但是与其盲目等待并希望最好,您应该在循环中不断轮询作业 status 直到它变成 "Completed"。

我不知道你在说什么类型的任务,但我们假设你是 运行 一些 pods

你可以做到

watch 'kubectl get pods | grep <name of the pod>'

kubectl get pods -w

当然不会是全名,因为大多数时候 pods 会得到随机名称,如果你是 运行 nginx 副本或部署你的 pods 最终会是像 nginx-1696122428-ftjvy 这样的东西,所以你会想做

watch 'kubectl get pods | grep nginx'

您可以将 pods 替换为您正在做的任何工作,即 (rc,svc,deployments....)

<kube master>/apis/batch/v1/namespaces/default/jobs 

endpoint 列出作业的状态。我已解析此 json 并检索到以 "deploy...".

开头的最新 运行 作业的名称

然后我们可以点击

<kube master>/apis/batch/v1/namespaces/default/jobs/<job name retrieved above>

并监控作业成功后状态字段值如下

"status": {
    "conditions": [
      {
        "type": "Complete",
        "status": "True",
        "lastProbeTime": "2016-09-22T13:59:03Z",
        "lastTransitionTime": "2016-09-22T13:59:03Z"
      }
    ],
    "startTime": "2016-09-22T13:56:42Z",
    "completionTime": "2016-09-22T13:59:03Z",
    "succeeded": 1
  }

所以我们一直在轮询这个端点,直到它完成。希望这对某人有所帮助。

既然你说了Java;您可以使用来自 fabric8 的 kubernetes java 绑定来启动作业并添加观察者:

KubernetesClient k = ...
k.extensions().jobs().load(yaml).watch (new Watcher <Job>() {

  @Override
  public void onClose (KubernetesClientException e) {}

  @Override
  public void eventReceived (Action a, Job j) {
    if(j.getStatus().getSucceeded()>0)
      System.out.println("At least one job attempt succeeded");
    if(j.getStatus().getFailed()>0)
      System.out.println("At least one job attempt failed");
  }
});

我发现在使用 job.getStatus() 进行轮询时,JobStatus 没有得到更新 即使在使用 kubectl 从命令提示符检查时状态发生变化。

为了解决这个问题,我重新加载了作业处理程序:

    client.extensions().jobs()
                       .inNamespace(myJob.getMetadata().getNamespace())
                       .withName(myJob.getMetadata().getName())
                       .get();

我检查作业状态的循环如下所示:

    KubernetesClient client = new DefaultKubernetesClient(config);
    Job myJob = client.extensions().jobs()
                      .load(new FileInputStream("/path/x.yaml"))
                      .create();
    boolean jobActive = true;
    while(jobActive){
        myJob = client.extensions().jobs()
                .inNamespace(myJob.getMetadata().getNamespace())
                .withName(myJob.getMetadata().getName())
                .get();
        JobStatus myJobStatus = myJob.getStatus();
        System.out.println("==================");
        System.out.println(myJobStatus.toString());

        if(myJob.getStatus().getActive()==null){
            jobActive = false;
        }
        else {
            System.out.println(myJob.getStatus().getActive());
            System.out.println("Sleeping for a minute before polling again!!");
            Thread.sleep(60000);
        }
    }

    System.out.println(myJob.getStatus().toString());

希望对您有所帮助

您可以使用 NewSharedInformer 方法查看作业的状态。不确定如何在 Java 中编写它,这里是定期获取工作列表的 golang 示例:

type ClientImpl struct {
    clients *kubernetes.Clientset
}

type JobListFunc func() ([]batchv1.Job, error)

var (
    jobsSelector = labels.SelectorFromSet(labels.Set(map[string]string{"job_label": "my_label"})).String()
)


func (c *ClientImpl) NewJobSharedInformer(resyncPeriod time.Duration) JobListFunc {
    var once sync.Once
    var jobListFunc JobListFunc

    once.Do(
        func() {
            restClient := c.clients.BatchV1().RESTClient()
            optionsModifer := func(options *metav1.ListOptions) {
                options.LabelSelector = jobsSelector
            }
            watchList := cache.NewFilteredListWatchFromClient(restClient, "jobs", metav1.NamespaceAll, optionsModifer)
            informer := cache.NewSharedInformer(watchList, &batchv1.Job{}, resyncPeriod)

            go informer.Run(context.Background().Done())

            jobListFunc = JobListFunc(func() (jobs []batchv1.Job, err error) {
                for _, c := range informer.GetStore().List() {
                    jobs = append(jobs, *(c.(*batchv1.Job)))
                }
                return jobs, nil
            })
        })

    return jobListFunc
}

然后在您的监视器中,您可以通过调整作业列表来检查状态:

func syncJobStatus() {
    jobs, err := jobListFunc()
    if err != nil {
        log.Errorf("Failed to list jobs: %v", err)
        return
    }

    // TODO: other code

    for _, job := range jobs {
        name := job.Name
        // check status...
    }
}
$ kubectl wait --for=condition=complete --timeout=600s job/myjob