nがひとつ多い。

えぬなおの技術的なことを書いていくとこ。

【Kubernetes】KubernetesのCronJobのcronスケジュールはどう管理されているのか?

はじめに

本記事は Kuberenetes Advent Calendar 2019の17日目の記事です。 Kubernetesのリソースの中に、定期的に揮発性のあるジョブを生み出すCronJobについての記事です。

本題

cronjobは以下のようなマニフェストで定義されます。

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox
            args:
            - /bin/sh
            - -c
            - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

Running Automated Tasks with a CronJob - Kubernetes

さて、このcronjobの schedule: "*/1 * * * *" が今回のブログの主役です。 このスケジュールは一体どう管理されているんでしょうか?パースして・・・どうしてるんだろう?気になりませんか? 今回はこの文字列がどうやって管理されて、ジョブの生成に繋がっているかに焦点をおきます。

仮説は以下です。

Kubernetesからglibcのlibcron的な何かを叩いている
Kubernetes上で文字列をパースして独自実装されている
③第三者のなんかのライブラリで間接的に使っている

多分cronjobというやつもdeploymentと同じでコントローラがあると思っていて、 そいつがなんやかんやして定期的にjobをリソースを生み出している機構があるのでしょう。 そこまでいけたらゴールです。 さて、見ていきますか。

ソースを読む。

もう最初からソース見た方が早いよねってことで。

func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
        return nil, false, nil
    }
    cjc, err := cronjob.NewController(
        ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
    }
    go cjc.Run(ctx.Stop)
    return nil, true, nil
}

https://github.com/kubernetes/kubernetes/blob/1514bb2141d3c82830f64aa0e1f8c3650116b803/cmd/kube-controller-manager/app/batch.go#L45

うん、、、どうやら予想通り cronjob-controller という奴がいそうで、 "k8s.io/kubernetes/pkg/controller/cronjob" を見ていけば何かがわかりそう。

ここにあるようだ→ https://github.com/kubernetes/kubernetes/blob/1514bb2141/pkg/controller/cronjob/controller.go

よく見てみると、わずか数個しか関数がないのがわかります。

func NewController(kubeClient clientset.Interface) (*Controller, error) 
func (jm *Controller) Run(stopCh <-chan struct{}) 
func (jm *Controller) syncAll()
func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
    sjc sjControlInterface, recorder record.EventRecorder)
func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) 
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder)
func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder)
func getRef(object runtime.Object) (*v1.ObjectReference, error)

cronjobを使ったことあるひとは関数名だけで動作が手に取ったようにわかるかもしれません。 知らんけど多分、 syncAll syncOne がjobを生み出している関数だってことくらいはわかりました。

syncAll

こいつは単純で、 クラスター内のcronjobをリソースを持ってきて、それをfor文でぶん回して syncOne を叩いてるだけのようです。

func (jm *Controller) syncAll() {
    // snip
    for _, sj := range sjs {
        syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
        cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder)
    }
    // snip
}

syncOne

コメントアウトに注目

// syncOne reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "sj" should be included in "js".
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {

// syncOne reconciles a CronJob with a list of any Jobs that it created. // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing.

syncOneは、CronJobと、作成したジョブのリストを照合します。 「sj」によって作成されたすべての既知のジョブは、「js」に含める必要があります。テストを容易にするために、現在の時刻が渡されます。テストを容易にするための受信機はありません。

結構重要なのはなんと、この now time.Time という引数はテストのために渡されているということが書かれていますが、 多分関数の中で time.Now()するとテストの時に照合しにくくなってしまうから引数で渡そう的なアイディアで、実処理にも使われてはいると推測します。

するすると見ていくと、実にあやしいコードを見つけます。

   times, err := getRecentUnmetScheduleTimes(*sj, now)
    if err != nil {
        recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
        klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
        return
    }
    // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
    if len(times) == 0 {
        klog.V(4).Infof("No unmet start times for %s", nameForLog)
        return
    }
    if len(times) > 1 {
        klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
    }

https://github.com/kubernetes/kubernetes/blob/1514bb2141/pkg/controller/cronjob/controller.go#L268-L281

うんーどうやらここっぽいねえ。その後の処理を見ても、どうやらこの times というリストが結構参照されているので重要な奴だそうだ。 どう生み出されているのか見ていこう。

(ちなみにsjは*batchv1beta1.CronJobなので、cronjobリソースそのものだ、なので尚更あやしい)

getRecentUnmetScheduleTimes

この子はさっき見ていた controller.go にはなかった。 同パッケージ内の utils.go に入っている。

// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.
//
// If there are too many (>100) unstarted times, just give up and return an empty slice.
// If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
    starts := []time.Time{}
    sched, err := cron.ParseStandard(sj.Spec.Schedule)
    if err != nil {
        return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)
    }

    var earliestTime time.Time
    if sj.Status.LastScheduleTime != nil {
        earliestTime = sj.Status.LastScheduleTime.Time
    } else {
        // If none found, then this is either a recently created scheduledJob,
        // or the active/completed info was somehow lost (contract for status
        // in kubernetes says it may need to be recreated), or that we have
        // started a job, but have not noticed it yet (distributed systems can
        // have arbitrary delays).  In any case, use the creation time of the
        // CronJob as last known start time.
        earliestTime = sj.ObjectMeta.CreationTimestamp.Time
    }
    if sj.Spec.StartingDeadlineSeconds != nil {
        // Controller is not going to schedule anything below this point
        schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds))

        if schedulingDeadline.After(earliestTime) {
            earliestTime = schedulingDeadline
        }
    }
    if earliestTime.After(now) {
        return []time.Time{}, nil
    }

    for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
        starts = append(starts, t)
        // An object might miss several starts. For example, if
        // controller gets wedged on friday at 5:01pm when everyone has
        // gone home, and someone comes in on tuesday AM and discovers
        // the problem and restarts the controller, then all the hourly
        // jobs, more than 80 of them for one hourly scheduledJob, should
        // all start running with no further intervention (if the scheduledJob
        // allows concurrency and late starts).
        //
        // However, if there is a bug somewhere, or incorrect clock
        // on controller's server or apiservers (for setting creationTimestamp)
        // then there could be so many missed start times (it could be off
        // by decades or more), that it would eat up all the CPU and memory
        // of this controller. In that case, we want to not try to list
        // all the missed start times.
        //
        // I've somewhat arbitrarily picked 100, as more than 80,
        // but less than "lots".
        if len(starts) > 100 {
            // We can't get the most recent times so just return an empty slice
            return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew")
        }
    }
    return starts, nil
}

https://github.com/kubernetes/kubernetes/blob/1514bb2141d3c82830f64aa0e1f8c3650116b803/pkg/controller/cronjob/utils.go#L89-L149

コメントアウトから

getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.

getRecentUnmetScheduleTimesは、ジョブが開始されるべきであるが、開始されなかったときに経過した時間のスライス(最も古いものから最新のものまで)を取得します。

ほほう。先ほどの読み出し元の関数と見比べてみても、この getRecentUnmetScheduleTimes が返す []time.Time を元に、 スケジュール時間がすぎて開始されるべきジョブ を見極めるようです。

っと言っていると・・・おっっ!!!

   "github.com/robfig/cron"

https://github.com/kubernetes/kubernetes/blob/1514bb2141d3c82830f64aa0e1f8c3650116b803/pkg/controller/cronjob/utils.go#L23

   sched, err := cron.ParseStandard(sj.Spec.Schedule)

https://github.com/kubernetes/kubernetes/blob/1514bb2141d3c82830f64aa0e1f8c3650116b803/pkg/controller/cronjob/utils.go#L95

おおー!これが答えみたいなもんです・・・!なんと、予想の ③第三者のなんかのライブラリで間接的に使っている でした・・・結構意外でしょう?取れ高ですね。 さて、最後にこの関数をみていきましょう

github.com

(めっちゃスターついてますやん)

cron.ParseStandard

// ParseStandard returns a new crontab schedule representing the given
// standardSpec (https://en.wikipedia.org/wiki/Cron). It requires 5 entries
// representing: minute, hour, day of month, month and day of week, in that
// order. It returns a descriptive error if the spec is not valid.
//
// It accepts
//   - Standard crontab specs, e.g. "* * * * ?"
//   - Descriptors, e.g. "@midnight", "@every 1h30m"
func ParseStandard(standardSpec string) (Schedule, error) {
    return standardParser.Parse(standardSpec)
}

// Parse returns a new crontab schedule representing the given spec.
// It returns a descriptive error if the spec is not valid.
// It accepts crontab specs and features configured by NewParser.
func (p Parser) Parse(spec string) (Schedule, error) {
    if len(spec) == 0 {
        return nil, fmt.Errorf("empty spec string")
    }

    // Extract timezone if present
    var loc = time.Local
    if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") {
        var err error
        i := strings.Index(spec, " ")
        eq := strings.Index(spec, "=")
        if loc, err = time.LoadLocation(spec[eq+1 : i]); err != nil {
            return nil, fmt.Errorf("provided bad location %s: %v", spec[eq+1:i], err)
        }
        spec = strings.TrimSpace(spec[i:])
    }

    // Handle named schedules (descriptors), if configured
    if strings.HasPrefix(spec, "@") {
        if p.options&Descriptor == 0 {
            return nil, fmt.Errorf("parser does not accept descriptors: %v", spec)
        }
        return parseDescriptor(spec, loc)
    }

    // Split on whitespace.
    fields := strings.Fields(spec)

    // Validate & fill in any omitted or optional fields
    var err error
    fields, err = normalizeFields(fields, p.options)
    if err != nil {
        return nil, err
    }

    field := func(field string, r bounds) uint64 {
        if err != nil {
            return 0
        }
        var bits uint64
        bits, err = getField(field, r)
        return bits
    }

    var (
        second     = field(fields[0], seconds)
        minute     = field(fields[1], minutes)
        hour       = field(fields[2], hours)
        dayofmonth = field(fields[3], dom)
        month      = field(fields[4], months)
        dayofweek  = field(fields[5], dow)
    )
    if err != nil {
        return nil, err
    }

    return &SpecSchedule{
        Second:   second,
        Minute:   minute,
        Hour:     hour,
        Dom:      dayofmonth,
        Month:    month,
        Dow:      dayofweek,
        Location: loc,
    }, nil
}

https://github.com/robfig/cron/blob/e843a09e5b2db454d77aad25b1660173445fb2fc/parser.go

 ParseStandard(standardSpec string) 

引数が文字列なので間違って無さそうですが、何よりその処理中が興味深いですね・・・つまり構文解析は自前で実装してるわけです。 使い方も間違って無さそう

func TestStandardSpecSchedule(t *testing.T) {
    entries := []struct {
        expr     string
        expected Schedule
        err      string
    }{
        {
            expr:     "5 * * * *",
            expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow), time.Local},
        },
        {
            expr:     "@every 5m",
            expected: ConstantDelaySchedule{time.Duration(5) * time.Minute},
        },
        {
            expr: "5 j * * *",
            err:  "failed to parse int from",
        },
        {
            expr: "* * * *",
            err:  "expected exactly 5 fields",
        },
    }

    for _, c := range entries {
        actual, err := ParseStandard(c.expr)
        if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) {
            t.Errorf("%s => expected %v, got %v", c.expr, c.err, err)
        }
        if len(c.err) == 0 && err != nil {
            t.Errorf("%s => unexpected error %v", c.expr, err)
        }
        if !reflect.DeepEqual(actual, c.expected) {
            t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual)
        }
    }
}

https://github.com/robfig/cron/blob/master/parser_test.go#L315-L351

結論

ということで、KubernetesのCronJobのcronスケジュールは https://github.com/robfig/cron によってパースしたのちに、現在時刻から比較してジョブを生成する工程に入る、でした。

ちょっぴり考察すると、libcronみたいなCのライブラリとかを使っちゃうとKuberentesのコントローラ自体の依存が増えてしまうので、 今回使われた robfig/cronのようなgo言語のみでcron文字列をパースできるライブラリを使ったんじゃないかなーと思ったりラジバンダリ😪

おまけ

このcronjobのスケジューラのライブラリのなかで、使われた ParseStandard ですが、これは、

  • The "standard" cron format, described on the Cron wikipedia page and used by the cron Linux system utility.
  • The cron format used by the Quartz Scheduler, commonly used for scheduled jobs in Java software

以下の二つのページを参照してくれよとREADME書いてあるので、何のフォーマットが対応しているかは以下を参照してください。

https://en.wikipedia.org/wiki/Cron

[http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/tutorial-lesson-06.html