【Kubernetes】KubernetesのCronJobのcronスケジュールはどう管理されているのか?
はじめに
本記事は Kuberenetes Advent Calendar 2019の17日目の記事です。 Kubernetesのリソースの中に、定期的に揮発性のあるジョブを生み出すCronJobについての記事です。
本題
cronjobは以下のようなマニフェストで定義されます。
apiVersion: batch/v1beta1 kind: CronJob metadata: name: hello spec: schedule: "*/1 * * * *" jobTemplate: spec: template: spec: containers: - name: hello image: busybox args: - /bin/sh - -c - date; echo Hello from the Kubernetes cluster restartPolicy: OnFailure
Running Automated Tasks with a CronJob - Kubernetes
さて、このcronjobの schedule: "*/1 * * * *"
が今回のブログの主役です。
このスケジュールは一体どう管理されているんでしょうか?パースして・・・どうしてるんだろう?気になりませんか?
今回はこの文字列がどうやって管理されて、ジョブの生成に繋がっているかに焦点をおきます。
仮説は以下です。
①Kubernetesからglibcのlibcron的な何かを叩いている
②Kubernetes上で文字列をパースして独自実装されている
③第三者のなんかのライブラリで間接的に使っている
多分cronjobというやつもdeploymentと同じでコントローラがあると思っていて、 そいつがなんやかんやして定期的にjobをリソースを生み出している機構があるのでしょう。 そこまでいけたらゴールです。 さて、見ていきますか。
ソースを読む。
もう最初からソース見た方が早いよねってことで。
func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] { return nil, false, nil } cjc, err := cronjob.NewController( ctx.ClientBuilder.ClientOrDie("cronjob-controller"), ) if err != nil { return nil, true, fmt.Errorf("error creating CronJob controller: %v", err) } go cjc.Run(ctx.Stop) return nil, true, nil }
うん、、、どうやら予想通り cronjob-controller
という奴がいそうで、 "k8s.io/kubernetes/pkg/controller/cronjob"
を見ていけば何かがわかりそう。
ここにあるようだ→ https://github.com/kubernetes/kubernetes/blob/1514bb2141/pkg/controller/cronjob/controller.go
よく見てみると、わずか数個しか関数がないのがわかります。
func NewController(kubeClient clientset.Interface) (*Controller, error) func (jm *Controller) Run(stopCh <-chan struct{}) func (jm *Controller) syncAll() func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) func getRef(object runtime.Object) (*v1.ObjectReference, error)
cronjobを使ったことあるひとは関数名だけで動作が手に取ったようにわかるかもしれません。
知らんけど多分、 syncAll
syncOne
がjobを生み出している関数だってことくらいはわかりました。
syncAll
こいつは単純で、 クラスター内のcronjobをリソースを持ってきて、それをfor文でぶん回して syncOne
を叩いてるだけのようです。
func (jm *Controller) syncAll() { // snip for _, sj := range sjs { syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder) } // snip }
syncOne
コメントアウトに注目
// syncOne reconciles a CronJob with a list of any Jobs that it created. // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing. func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
// syncOne reconciles a CronJob with a list of any Jobs that it created. // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing.
syncOneは、CronJobと、作成したジョブのリストを照合します。 「sj」によって作成されたすべての既知のジョブは、「js」に含める必要があります。テストを容易にするために、現在の時刻が渡されます。テストを容易にするための受信機はありません。
結構重要なのはなんと、この now time.Time
という引数はテストのために渡されているということが書かれていますが、
多分関数の中で time.Now()するとテストの時に照合しにくくなってしまうから引数で渡そう的なアイディアで、実処理にも使われてはいると推測します。
するすると見ていくと、実にあやしいコードを見つけます。
times, err := getRecentUnmetScheduleTimes(*sj, now) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) return } // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed. if len(times) == 0 { klog.V(4).Infof("No unmet start times for %s", nameForLog) return } if len(times) > 1 { klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog) }
うんーどうやらここっぽいねえ。その後の処理を見ても、どうやらこの times
というリストが結構参照されているので重要な奴だそうだ。
どう生み出されているのか見ていこう。
(ちなみにsjは*batchv1beta1.CronJobなので、cronjobリソースそのものだ、なので尚更あやしい)
getRecentUnmetScheduleTimes
この子はさっき見ていた controller.go
にはなかった。
同パッケージ内の utils.go
に入っている。
// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not. // // If there are too many (>100) unstarted times, just give up and return an empty slice. // If there were missed times prior to the last known start time, then those are not returned. func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) { starts := []time.Time{} sched, err := cron.ParseStandard(sj.Spec.Schedule) if err != nil { return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err) } var earliestTime time.Time if sj.Status.LastScheduleTime != nil { earliestTime = sj.Status.LastScheduleTime.Time } else { // If none found, then this is either a recently created scheduledJob, // or the active/completed info was somehow lost (contract for status // in kubernetes says it may need to be recreated), or that we have // started a job, but have not noticed it yet (distributed systems can // have arbitrary delays). In any case, use the creation time of the // CronJob as last known start time. earliestTime = sj.ObjectMeta.CreationTimestamp.Time } if sj.Spec.StartingDeadlineSeconds != nil { // Controller is not going to schedule anything below this point schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)) if schedulingDeadline.After(earliestTime) { earliestTime = schedulingDeadline } } if earliestTime.After(now) { return []time.Time{}, nil } for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) { starts = append(starts, t) // An object might miss several starts. For example, if // controller gets wedged on friday at 5:01pm when everyone has // gone home, and someone comes in on tuesday AM and discovers // the problem and restarts the controller, then all the hourly // jobs, more than 80 of them for one hourly scheduledJob, should // all start running with no further intervention (if the scheduledJob // allows concurrency and late starts). // // However, if there is a bug somewhere, or incorrect clock // on controller's server or apiservers (for setting creationTimestamp) // then there could be so many missed start times (it could be off // by decades or more), that it would eat up all the CPU and memory // of this controller. In that case, we want to not try to list // all the missed start times. // // I've somewhat arbitrarily picked 100, as more than 80, // but less than "lots". if len(starts) > 100 { // We can't get the most recent times so just return an empty slice return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew") } } return starts, nil }
コメントアウトから
getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.
getRecentUnmetScheduleTimesは、ジョブが開始されるべきであるが、開始されなかったときに経過した時間のスライス(最も古いものから最新のものまで)を取得します。
ほほう。先ほどの読み出し元の関数と見比べてみても、この getRecentUnmetScheduleTimes
が返す []time.Time
を元に、 スケジュール時間がすぎて開始されるべきジョブ
を見極めるようです。
っと言っていると・・・おっっ!!!
"github.com/robfig/cron"
sched, err := cron.ParseStandard(sj.Spec.Schedule)
おおー!これが答えみたいなもんです・・・!なんと、予想の ③第三者のなんかのライブラリで間接的に使っている
でした・・・結構意外でしょう?取れ高ですね。
さて、最後にこの関数をみていきましょう
(めっちゃスターついてますやん)
cron.ParseStandard
// ParseStandard returns a new crontab schedule representing the given // standardSpec (https://en.wikipedia.org/wiki/Cron). It requires 5 entries // representing: minute, hour, day of month, month and day of week, in that // order. It returns a descriptive error if the spec is not valid. // // It accepts // - Standard crontab specs, e.g. "* * * * ?" // - Descriptors, e.g. "@midnight", "@every 1h30m" func ParseStandard(standardSpec string) (Schedule, error) { return standardParser.Parse(standardSpec) } // Parse returns a new crontab schedule representing the given spec. // It returns a descriptive error if the spec is not valid. // It accepts crontab specs and features configured by NewParser. func (p Parser) Parse(spec string) (Schedule, error) { if len(spec) == 0 { return nil, fmt.Errorf("empty spec string") } // Extract timezone if present var loc = time.Local if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") { var err error i := strings.Index(spec, " ") eq := strings.Index(spec, "=") if loc, err = time.LoadLocation(spec[eq+1 : i]); err != nil { return nil, fmt.Errorf("provided bad location %s: %v", spec[eq+1:i], err) } spec = strings.TrimSpace(spec[i:]) } // Handle named schedules (descriptors), if configured if strings.HasPrefix(spec, "@") { if p.options&Descriptor == 0 { return nil, fmt.Errorf("parser does not accept descriptors: %v", spec) } return parseDescriptor(spec, loc) } // Split on whitespace. fields := strings.Fields(spec) // Validate & fill in any omitted or optional fields var err error fields, err = normalizeFields(fields, p.options) if err != nil { return nil, err } field := func(field string, r bounds) uint64 { if err != nil { return 0 } var bits uint64 bits, err = getField(field, r) return bits } var ( second = field(fields[0], seconds) minute = field(fields[1], minutes) hour = field(fields[2], hours) dayofmonth = field(fields[3], dom) month = field(fields[4], months) dayofweek = field(fields[5], dow) ) if err != nil { return nil, err } return &SpecSchedule{ Second: second, Minute: minute, Hour: hour, Dom: dayofmonth, Month: month, Dow: dayofweek, Location: loc, }, nil }
https://github.com/robfig/cron/blob/e843a09e5b2db454d77aad25b1660173445fb2fc/parser.go
ParseStandard(standardSpec string)
引数が文字列なので間違って無さそうですが、何よりその処理中が興味深いですね・・・つまり構文解析は自前で実装してるわけです。 使い方も間違って無さそう
func TestStandardSpecSchedule(t *testing.T) { entries := []struct { expr string expected Schedule err string }{ { expr: "5 * * * *", expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow), time.Local}, }, { expr: "@every 5m", expected: ConstantDelaySchedule{time.Duration(5) * time.Minute}, }, { expr: "5 j * * *", err: "failed to parse int from", }, { expr: "* * * *", err: "expected exactly 5 fields", }, } for _, c := range entries { actual, err := ParseStandard(c.expr) if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) } if len(c.err) == 0 && err != nil { t.Errorf("%s => unexpected error %v", c.expr, err) } if !reflect.DeepEqual(actual, c.expected) { t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual) } } }
https://github.com/robfig/cron/blob/master/parser_test.go#L315-L351
結論
ということで、KubernetesのCronJobのcronスケジュールは https://github.com/robfig/cron
によってパースしたのちに、現在時刻から比較してジョブを生成する工程に入る、でした。
ちょっぴり考察すると、libcronみたいなCのライブラリとかを使っちゃうとKuberentesのコントローラ自体の依存が増えてしまうので、
今回使われた robfig/cron
のようなgo言語のみでcron文字列をパースできるライブラリを使ったんじゃないかなーと思ったりラジバンダリ😪
おまけ
このcronjobのスケジューラのライブラリのなかで、使われた ParseStandard
ですが、これは、
以下の二つのページを参照してくれよとREADME書いてあるので、何のフォーマットが対応しているかは以下を参照してください。
https://en.wikipedia.org/wiki/Cron
[http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/tutorial-lesson-06.html