Talk about Kubernetes cronJob controller
背景
之前一段时间正好接触到 kubernetes cronjob, 在接入时遇上了在一定量级下 cronjob schedule delay 的问题, 故开始读了下代码, 发现了一些问题并试着调优了下
存在的问题
按生产环境实际测试来看约 250-375 个 */1 * * * *
每分钟 interval 的 cronjob 就会产生 delay, cronjob 和 controller manager 没有异常 event 但新产生的 job 出现了延迟, 由于我们设置了 startingDeadlineSeconds
故累加起来的 delay 最终导致了 cron 任务严重滞后
代码解读
出于分析上述问题的目的, 读了下 cronjob controller 的代码, 代码量不多, 可能由于没上 GA 的原因, 整个 controllor 代码的设计也比较过程式, 不会像其他组件用到一些比如 Informer, refractor之类的组件读起来相对晦涩
下面开始解读下 release1.17 分支的 k8s cronjob controller 代码
- Controller struct
|
|
cronjob controller 结构体, 即下文中常见的 jm(jobManager) , 主要包了 k8s internal api clinet kubeclinet
, jobControl
和 sjControl
k8s job 控制块,cronjob controller 会直接操作 job, 由 job 再去创建 pod, 并不会直接接触到 pod 对象(包括读)
- 入口函数 Run:
|
|
cronjob controller 是个单线程单执行流的调度器, 由固定每 10s 的 interval 的 goroutine 做一次 syncAll 调用
- 主 loop 函数 syncAll
|
|
首先 pager.New(pager.SimplePageFunc(jobListFunc))
通过 Pager 调用了 jobListFunc
回调函数, 用于 list 出所有 namespace 下的 k8s job 对象, 并将这些 jobs 加入 slice 中, 这个 slices js := make([]batchv1.Job, 0)
用于在之后对 sync 单个 cronJob 时作为是否已经 trigger job 的判断
同理 pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem
list 所有 cronjob 对象并对每个对象调用 syncOne
做实际 cronjob 调度, 在调度完后调用 cleanupFinishedJobs
完成清理工作
- 对于成功执行的 job 根据 HistoryLimit
进行 apiserver 中的资源清理
- 对于执行失败的 job 按 limitBackoff 的限制进行重试
- 若处于非上述两种状态的 job 则忽略
-
主调度函数 syncOne
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) { nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) // 首先在之前的 batchv1.Job slice 中顺序查找是否有当前 cronJob 的子 job 并且看看是否有不在 jobActive 列表中的孤儿,以及已经执行完成但是还在 Active 列表中的 job,根据 job 状态记录 event (UnexpectedJob, SawCompletedJob),删掉不对应的状态 childrenJobs := make(map[types.UID]bool) for _, j := range js { childrenJobs[j.ObjectMeta.UID] = true found := inActiveList(*sj, j.ObjectMeta.UID) if !found && !IsJobFinished(&j) { recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) } else if found && IsJobFinished(&j) { _, status := getFinishedStatus(&j) deleteFromActiveList(sj, j.ObjectMeta.UID) recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) } } // 接着判断 Active 列表中是否有不在当前 cronjob 子 job 里的 job, 如果有则记录 MissingJob event 并从 Active 列表中移除 for _, j := range sj.Status.Active { if found := childrenJobs[j.UID]; !found { recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) deleteFromActiveList(sj, j.UID) } } // 更新 cronjob 的状态 updatedSJ, err := sjc.UpdateStatus(sj) if err != nil { klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) return } *sj = *updatedSJ // 判断 cronjob 是否被删除, 若被删除则停止调度该 cronjob if sj.DeletionTimestamp != nil { // The CronJob is being deleted. // Don't do anything other than updating status. return } // 判断 cronjob 是否被暂停, 若被暂停则停止调度该 cronjob if sj.Spec.Suspend != nil && *sj.Spec.Suspend { klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) return } // getRecentUnmetScheduleTimes 是按 crontab 计算出的 job 执行时间表 // 主要是根据配置的 unix cron table 计算出下一次 schedule 时间并做一些有效性判断 times, err := getRecentUnmetScheduleTimes(*sj, now) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) return } // 若没有取到该 cronjob 的 table 时间表, 则认为该 job nextSchedule time 可能不合理, 停止调度该 cronjob if len(times) == 0 { klog.V(4).Infof("No unmet start times for %s", nameForLog) return } // 若取到时间表, 则计算出最近一次需要执行的时间, 若最近一次执行时间超过了 currentTime + StartingDeadlineSeconds 则标记为 tooLate 停止调度并记录 event if len(times) > 1 { klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog) } scheduledTime := times[len(times)-1] tooLate := false if sj.Spec.StartingDeadlineSeconds != nil { tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now) } if tooLate { klog.V(4).Infof("Missed starting window for %s", nameForLog) recorder.Eventf(sj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z)) return } // 若当前 cronjob 设置了并发策略则按照对应的并发策略进行并行 job 调度 if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 { klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog) return } if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { for _, j := range sj.Status.Active { klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) job, err := jc.GetJob(j.Namespace, j.Name) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) return } if !deleteJob(sj, job, jc, recorder) { return } } } // 根据CronJob Spec中JobTemplate的配置获取Job对象,其中Job对象的名字会加上scheduledTime计算出的Hash,目前是unix timestamp jobReq, err := getJobFromTemplate(sj, scheduledTime) if err != nil { klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) return } // 调用 createJob 接口根据上面拿到的 jobTemplate 创建一个新 job jobResp, err := jc.CreateJob(sj.Namespace, jobReq) if err != nil { // If the namespace is being torn down, we can safely ignore // this error since all subsequent creations will fail. if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) } return } klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog) recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) // 将刚创建的Job加到CronJob的Active列表中,设置LastScheduleTime,更新CronJob ref, err := getRef(jobResp) if err != nil { klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) } else { sj.Status.Active = append(sj.Status.Active, *ref) } // lastSchedulerTime 常用于监控判断 job schedule 是否符合预期 sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} if _, err := sjc.UpdateStatus(sj); err != nil { klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) } return }
-
获取 cron table 的
getRecentUnmetScheduleTimes
函数
|
|
基本上主要的业务逻辑都在这里了,整体上看还是十分简单粗暴的,是个单执行流 one by one 地不停轮询、计算需要执行的任务、任务执行时间表、同步任务状态的过程。
其中从上述代码也观察到3点可能的问题:
- 在 syncAll 函数中用到 pager.List 这是个非常冗余的操作, 每次调度时都在遍历一些可能未产生变更的 cronjob, 应该改为 watch 的机制
- 对于 cronjob 状态的变更可以不通过在每次 loop 时主动查询并更新, 而是通过 informer 注册 event 回调的方式, 在有变更时同步状态
- 对于过期数量超过 100 的 cronjob 会直接停止调度, 并且只记录的 event, 没有一个 drop old jobs 自愈的过程
调优
首先对于 pager.List 尝试替换成 informer watch 的机制, 思路也比较简单, 原先是通过 pager.List 传入的回调来获取 namespace 下所有的 job/cronjob, 现在改为在新建 controller 时注册进 watch event, 监听到变更事件时通过 k8s 封装好的 internal.api.sharedInformer 取回并构造成相同 struct 的对象即可
先看一段原本 pager.List 的代码:
|
|
接着按上述方式在 controller 中先注册 event:
|
|
k8s informer 的机制需要有 event trigger 来判断在什么事件下触发, 故我们可以简单先加上增删改时的 trigger
故上述代码修改为
|
|
接着上述代码只是声明了一堆需要注册的 event trigger(增删改 cronjob 时将cronjob对象放入 workQueue) 和 event handler(syncOne)
但 syncAll 主函数仍然做着原先的工作, 所以我期望的是在主函数中阻塞的获取 workQueue 中的 job, 并按 FIFO 的方式 process 每个 job (暂时没必要对于每个 job 起 goroutine syncOne, 会增加复杂性, 本身的处理能力猜测是足够的)
故将 syncAll 主函数代码改为
|
|
这样整体逻辑大概清晰了, 需要做些检查性的耕作,比如由于无法确认是否为 deleted cronjob 故需要在 syncOne 函数中再加一次有效性判断等等
|
|
所以综上, 我们就把 pager.List 的方式改造成了 informer watch 的方式,整体改造的代码如下
|
|