本記事は Kuberenetes Advent Calendar 2019の17日目の記事です。 Kubernetesのリソースの中に、定期的に揮発性のあるジョブを生み出すCronJobについての記事です。
apiVersion: batch/v1beta1 kind: CronJob metadata: name: hello spec: schedule: "*/1 * * * *" jobTemplate: spec: template: spec: containers: - name: hello image: busybox args: - /bin/sh - -c - date; echo Hello from the Kubernetes cluster restartPolicy: OnFailure
Running Automated Tasks with a CronJob - Kubernetes
さて、このcronjobの schedule: "*/1 * * * *"
多分cronjobというやつもdeploymentと同じでコントローラがあると思っていて、 そいつがなんやかんやして定期的にjobをリソースを生み出している機構があるのでしょう。 そこまでいけたらゴールです。 さて、見ていきますか。
func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] { return nil, false, nil } cjc, err := cronjob.NewController( ctx.ClientBuilder.ClientOrDie("cronjob-controller"), ) if err != nil { return nil, true, fmt.Errorf("error creating CronJob controller: %v", err) } go cjc.Run(ctx.Stop) return nil, true, nil }
うん、、、どうやら予想通り cronjob-controller
という奴がいそうで、 "k8s.io/kubernetes/pkg/controller/cronjob"
ここにあるようだ→ https://github.com/kubernetes/kubernetes/blob/1514bb2141/pkg/controller/cronjob/controller.go
func NewController(kubeClient clientset.Interface) (*Controller, error) func (jm *Controller) Run(stopCh <-chan struct{}) func (jm *Controller) syncAll() func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) func getRef(object runtime.Object) (*v1.ObjectReference, error)
知らんけど多分、 syncAll
こいつは単純で、 クラスター内のcronjobをリソースを持ってきて、それをfor文でぶん回して syncOne
func (jm *Controller) syncAll() { // snip for _, sj := range sjs { syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder) } // snip }
// syncOne reconciles a CronJob with a list of any Jobs that it created. // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing. func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
// syncOne reconciles a CronJob with a list of any Jobs that it created. // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing.
syncOneは、CronJobと、作成したジョブのリストを照合します。 「sj」によって作成されたすべての既知のジョブは、「js」に含める必要があります。テストを容易にするために、現在の時刻が渡されます。テストを容易にするための受信機はありません。
結構重要なのはなんと、この now time.Time
多分関数の中で time.Now()するとテストの時に照合しにくくなってしまうから引数で渡そう的なアイディアで、実処理にも使われてはいると推測します。
times, err := getRecentUnmetScheduleTimes(*sj, now) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) return } // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed. if len(times) == 0 { klog.V(4).Infof("No unmet start times for %s", nameForLog) return } if len(times) > 1 { klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog) }
うんーどうやらここっぽいねえ。その後の処理を見ても、どうやらこの times
この子はさっき見ていた controller.go
同パッケージ内の utils.go
// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not. // // If there are too many (>100) unstarted times, just give up and return an empty slice. // If there were missed times prior to the last known start time, then those are not returned. func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) { starts := []time.Time{} sched, err := cron.ParseStandard(sj.Spec.Schedule) if err != nil { return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err) } var earliestTime time.Time if sj.Status.LastScheduleTime != nil { earliestTime = sj.Status.LastScheduleTime.Time } else { // If none found, then this is either a recently created scheduledJob, // or the active/completed info was somehow lost (contract for status // in kubernetes says it may need to be recreated), or that we have // started a job, but have not noticed it yet (distributed systems can // have arbitrary delays). In any case, use the creation time of the // CronJob as last known start time. earliestTime = sj.ObjectMeta.CreationTimestamp.Time } if sj.Spec.StartingDeadlineSeconds != nil { // Controller is not going to schedule anything below this point schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)) if schedulingDeadline.After(earliestTime) { earliestTime = schedulingDeadline } } if earliestTime.After(now) { return []time.Time{}, nil } for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) { starts = append(starts, t) // An object might miss several starts. For example, if // controller gets wedged on friday at 5:01pm when everyone has // gone home, and someone comes in on tuesday AM and discovers // the problem and restarts the controller, then all the hourly // jobs, more than 80 of them for one hourly scheduledJob, should // all start running with no further intervention (if the scheduledJob // allows concurrency and late starts). // // However, if there is a bug somewhere, or incorrect clock // on controller's server or apiservers (for setting creationTimestamp) // then there could be so many missed start times (it could be off // by decades or more), that it would eat up all the CPU and memory // of this controller. In that case, we want to not try to list // all the missed start times. // // I've somewhat arbitrarily picked 100, as more than 80, // but less than "lots". if len(starts) > 100 { // We can't get the most recent times so just return an empty slice return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew") } } return starts, nil }
getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.
ほほう。先ほどの読み出し元の関数と見比べてみても、この getRecentUnmetScheduleTimes
が返す []time.Time
を元に、 スケジュール時間がすぎて開始されるべきジョブ
sched, err := cron.ParseStandard(sj.Spec.Schedule)
おおー!これが答えみたいなもんです・・・!なんと、予想の ③第三者のなんかのライブラリで間接的に使っている
// ParseStandard returns a new crontab schedule representing the given // standardSpec (https://en.wikipedia.org/wiki/Cron). It requires 5 entries // representing: minute, hour, day of month, month and day of week, in that // order. It returns a descriptive error if the spec is not valid. // // It accepts // - Standard crontab specs, e.g. "* * * * ?" // - Descriptors, e.g. "@midnight", "@every 1h30m" func ParseStandard(standardSpec string) (Schedule, error) { return standardParser.Parse(standardSpec) } // Parse returns a new crontab schedule representing the given spec. // It returns a descriptive error if the spec is not valid. // It accepts crontab specs and features configured by NewParser. func (p Parser) Parse(spec string) (Schedule, error) { if len(spec) == 0 { return nil, fmt.Errorf("empty spec string") } // Extract timezone if present var loc = time.Local if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") { var err error i := strings.Index(spec, " ") eq := strings.Index(spec, "=") if loc, err = time.LoadLocation(spec[eq+1 : i]); err != nil { return nil, fmt.Errorf("provided bad location %s: %v", spec[eq+1:i], err) } spec = strings.TrimSpace(spec[i:]) } // Handle named schedules (descriptors), if configured if strings.HasPrefix(spec, "@") { if p.options&Descriptor == 0 { return nil, fmt.Errorf("parser does not accept descriptors: %v", spec) } return parseDescriptor(spec, loc) } // Split on whitespace. fields := strings.Fields(spec) // Validate & fill in any omitted or optional fields var err error fields, err = normalizeFields(fields, p.options) if err != nil { return nil, err } field := func(field string, r bounds) uint64 { if err != nil { return 0 } var bits uint64 bits, err = getField(field, r) return bits } var ( second = field(fields[0], seconds) minute = field(fields[1], minutes) hour = field(fields[2], hours) dayofmonth = field(fields[3], dom) month = field(fields[4], months) dayofweek = field(fields[5], dow) ) if err != nil { return nil, err } return &SpecSchedule{ Second: second, Minute: minute, Hour: hour, Dom: dayofmonth, Month: month, Dow: dayofweek, Location: loc, }, nil }
ParseStandard(standardSpec string)
引数が文字列なので間違って無さそうですが、何よりその処理中が興味深いですね・・・つまり構文解析は自前で実装してるわけです。 使い方も間違って無さそう
func TestStandardSpecSchedule(t *testing.T) { entries := []struct { expr string expected Schedule err string }{ { expr: "5 * * * *", expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow), time.Local}, }, { expr: "@every 5m", expected: ConstantDelaySchedule{time.Duration(5) * time.Minute}, }, { expr: "5 j * * *", err: "failed to parse int from", }, { expr: "* * * *", err: "expected exactly 5 fields", }, } for _, c := range entries { actual, err := ParseStandard(c.expr) if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) } if len(c.err) == 0 && err != nil { t.Errorf("%s => unexpected error %v", c.expr, err) } if !reflect.DeepEqual(actual, c.expected) { t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual) } } }
ということで、KubernetesのCronJobのcronスケジュールは https://github.com/robfig/cron
今回使われた robfig/cron
このcronjobのスケジューラのライブラリのなかで、使われた ParseStandard