本記事は Kuberenetes Advent Calendar 2019の17日目の記事です。 Kubernetesのリソースの中に、定期的に揮発性のあるジョブを生み出すCronJobについての記事です。



apiVersion: batch/v1beta1
kind: CronJob
  name: hello
  schedule: "*/1 * * * *"
          - name: hello
            image: busybox
            - /bin/sh
            - -c
            - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

Running Automated Tasks with a CronJob - Kubernetes

さて、このcronjobの schedule: "*/1 * * * *" が今回のブログの主役です。 このスケジュールは一体どう管理されているんでしょうか?パースして・・・どうしてるんだろう?気になりませんか? 今回はこの文字列がどうやって管理されて、ジョブの生成に繋がっているかに焦点をおきます。



多分cronjobというやつもdeploymentと同じでコントローラがあると思っていて、 そいつがなんやかんやして定期的にjobをリソースを生み出している機構があるのでしょう。 そこまでいけたらゴールです。 さて、見ていきますか。



func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
        return nil, false, nil
    cjc, err := cronjob.NewController(
    if err != nil {
        return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
    go cjc.Run(ctx.Stop)
    return nil, true, nil


うん、、、どうやら予想通り cronjob-controller という奴がいそうで、 "k8s.io/kubernetes/pkg/controller/cronjob" を見ていけば何かがわかりそう。

ここにあるようだ→ https://github.com/kubernetes/kubernetes/blob/1514bb2141/pkg/controller/cronjob/controller.go


func NewController(kubeClient clientset.Interface) (*Controller, error) 
func (jm *Controller) Run(stopCh <-chan struct{}) 
func (jm *Controller) syncAll()
func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
    sjc sjControlInterface, recorder record.EventRecorder)
func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) 
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder)
func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder)
func getRef(object runtime.Object) (*v1.ObjectReference, error)

cronjobを使ったことあるひとは関数名だけで動作が手に取ったようにわかるかもしれません。 知らんけど多分、 syncAll syncOne がjobを生み出している関数だってことくらいはわかりました。


こいつは単純で、 クラスター内のcronjobをリソースを持ってきて、それをfor文でぶん回して syncOne を叩いてるだけのようです。

func (jm *Controller) syncAll() {
    // snip
    for _, sj := range sjs {
        syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
        cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder)
    // snip



// syncOne reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "sj" should be included in "js".
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {

// syncOne reconciles a CronJob with a list of any Jobs that it created. // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing.

syncOneは、CronJobと、作成したジョブのリストを照合します。 「sj」によって作成されたすべての既知のジョブは、「js」に含める必要があります。テストを容易にするために、現在の時刻が渡されます。テストを容易にするための受信機はありません。

結構重要なのはなんと、この now time.Time という引数はテストのために渡されているということが書かれていますが、 多分関数の中で time.Now()するとテストの時に照合しにくくなってしまうから引数で渡そう的なアイディアで、実処理にも使われてはいると推測します。


   times, err := getRecentUnmetScheduleTimes(*sj, now)
    if err != nil {
        recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
        klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
    // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
    if len(times) == 0 {
        klog.V(4).Infof("No unmet start times for %s", nameForLog)
    if len(times) > 1 {
        klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)


うんーどうやらここっぽいねえ。その後の処理を見ても、どうやらこの times というリストが結構参照されているので重要な奴だそうだ。 どう生み出されているのか見ていこう。



この子はさっき見ていた controller.go にはなかった。 同パッケージ内の utils.go に入っている。

// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.
// If there are too many (>100) unstarted times, just give up and return an empty slice.
// If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
    starts := []time.Time{}
    sched, err := cron.ParseStandard(sj.Spec.Schedule)
    if err != nil {
        return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)

    var earliestTime time.Time
    if sj.Status.LastScheduleTime != nil {
        earliestTime = sj.Status.LastScheduleTime.Time
    } else {
        // If none found, then this is either a recently created scheduledJob,
        // or the active/completed info was somehow lost (contract for status
        // in kubernetes says it may need to be recreated), or that we have
        // started a job, but have not noticed it yet (distributed systems can
        // have arbitrary delays).  In any case, use the creation time of the
        // CronJob as last known start time.
        earliestTime = sj.ObjectMeta.CreationTimestamp.Time
    if sj.Spec.StartingDeadlineSeconds != nil {
        // Controller is not going to schedule anything below this point
        schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds))

        if schedulingDeadline.After(earliestTime) {
            earliestTime = schedulingDeadline
    if earliestTime.After(now) {
        return []time.Time{}, nil

    for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
        starts = append(starts, t)
        // An object might miss several starts. For example, if
        // controller gets wedged on friday at 5:01pm when everyone has
        // gone home, and someone comes in on tuesday AM and discovers
        // the problem and restarts the controller, then all the hourly
        // jobs, more than 80 of them for one hourly scheduledJob, should
        // all start running with no further intervention (if the scheduledJob
        // allows concurrency and late starts).
        // However, if there is a bug somewhere, or incorrect clock
        // on controller's server or apiservers (for setting creationTimestamp)
        // then there could be so many missed start times (it could be off
        // by decades or more), that it would eat up all the CPU and memory
        // of this controller. In that case, we want to not try to list
        // all the missed start times.
        // I've somewhat arbitrarily picked 100, as more than 80,
        // but less than "lots".
        if len(starts) > 100 {
            // We can't get the most recent times so just return an empty slice
            return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew")
    return starts, nil



getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.


ほほう。先ほどの読み出し元の関数と見比べてみても、この getRecentUnmetScheduleTimes が返す []time.Time を元に、 スケジュール時間がすぎて開始されるべきジョブ を見極めるようです。




   sched, err := cron.ParseStandard(sj.Spec.Schedule)


おおー!これが答えみたいなもんです・・・!なんと、予想の ③第三者のなんかのライブラリで間接的に使っている でした・・・結構意外でしょう?取れ高ですね。 さて、最後にこの関数をみていきましょう




// ParseStandard returns a new crontab schedule representing the given
// standardSpec (https://en.wikipedia.org/wiki/Cron). It requires 5 entries
// representing: minute, hour, day of month, month and day of week, in that
// order. It returns a descriptive error if the spec is not valid.
// It accepts
//   - Standard crontab specs, e.g. "* * * * ?"
//   - Descriptors, e.g. "@midnight", "@every 1h30m"
func ParseStandard(standardSpec string) (Schedule, error) {
    return standardParser.Parse(standardSpec)

// Parse returns a new crontab schedule representing the given spec.
// It returns a descriptive error if the spec is not valid.
// It accepts crontab specs and features configured by NewParser.
func (p Parser) Parse(spec string) (Schedule, error) {
    if len(spec) == 0 {
        return nil, fmt.Errorf("empty spec string")

    // Extract timezone if present
    var loc = time.Local
    if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") {
        var err error
        i := strings.Index(spec, " ")
        eq := strings.Index(spec, "=")
        if loc, err = time.LoadLocation(spec[eq+1 : i]); err != nil {
            return nil, fmt.Errorf("provided bad location %s: %v", spec[eq+1:i], err)
        spec = strings.TrimSpace(spec[i:])

    // Handle named schedules (descriptors), if configured
    if strings.HasPrefix(spec, "@") {
        if p.options&Descriptor == 0 {
            return nil, fmt.Errorf("parser does not accept descriptors: %v", spec)
        return parseDescriptor(spec, loc)

    // Split on whitespace.
    fields := strings.Fields(spec)

    // Validate & fill in any omitted or optional fields
    var err error
    fields, err = normalizeFields(fields, p.options)
    if err != nil {
        return nil, err

    field := func(field string, r bounds) uint64 {
        if err != nil {
            return 0
        var bits uint64
        bits, err = getField(field, r)
        return bits

    var (
        second     = field(fields[0], seconds)
        minute     = field(fields[1], minutes)
        hour       = field(fields[2], hours)
        dayofmonth = field(fields[3], dom)
        month      = field(fields[4], months)
        dayofweek  = field(fields[5], dow)
    if err != nil {
        return nil, err

    return &SpecSchedule{
        Second:   second,
        Minute:   minute,
        Hour:     hour,
        Dom:      dayofmonth,
        Month:    month,
        Dow:      dayofweek,
        Location: loc,
    }, nil


 ParseStandard(standardSpec string) 

引数が文字列なので間違って無さそうですが、何よりその処理中が興味深いですね・・・つまり構文解析は自前で実装してるわけです。 使い方も間違って無さそう

func TestStandardSpecSchedule(t *testing.T) {
    entries := []struct {
        expr     string
        expected Schedule
        err      string
            expr:     "5 * * * *",
            expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow), time.Local},
            expr:     "@every 5m",
            expected: ConstantDelaySchedule{time.Duration(5) * time.Minute},
            expr: "5 j * * *",
            err:  "failed to parse int from",
            expr: "* * * *",
            err:  "expected exactly 5 fields",

    for _, c := range entries {
        actual, err := ParseStandard(c.expr)
        if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) {
            t.Errorf("%s => expected %v, got %v", c.expr, c.err, err)
        if len(c.err) == 0 && err != nil {
            t.Errorf("%s => unexpected error %v", c.expr, err)
        if !reflect.DeepEqual(actual, c.expected) {
            t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual)



ということで、KubernetesのCronJobのcronスケジュールは https://github.com/robfig/cron によってパースしたのちに、現在時刻から比較してジョブを生成する工程に入る、でした。

ちょっぴり考察すると、libcronみたいなCのライブラリとかを使っちゃうとKuberentesのコントローラ自体の依存が増えてしまうので、 今回使われた robfig/cronのようなgo言語のみでcron文字列をパースできるライブラリを使ったんじゃないかなーと思ったりラジバンダリ😪


このcronjobのスケジューラのライブラリのなかで、使われた ParseStandard ですが、これは、

  • The "standard" cron format, described on the Cron wikipedia page and used by the cron Linux system utility.
  • The cron format used by the Quartz Scheduler, commonly used for scheduled jobs in Java software