From c300a37d0915407cf0ee56dbb16ece940b631422 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 11 Apr 2019 17:23:59 +0200 Subject: [PATCH] runservice: add some initial scheduler tests --- go.mod | 1 + go.sum | 4 + .../runservice/scheduler/scheduler.go | 44 +- .../runservice/scheduler/scheduler_test.go | 380 ++++++++++++++++++ internal/services/runservice/types/types.go | 17 + 5 files changed, 425 insertions(+), 21 deletions(-) create mode 100644 internal/services/runservice/scheduler/scheduler_test.go diff --git a/go.mod b/go.mod index 5284f3e..611dbdf 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/lib/pq v1.0.0 // indirect github.com/mattn/go-sqlite3 v1.10.0 github.com/minio/minio-go v6.0.14+incompatible + github.com/mitchellh/copystructure v1.0.0 github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/opencontainers/go-digest v1.0.0-rc1 // indirect github.com/opencontainers/image-spec v1.0.1 // indirect diff --git a/go.sum b/go.sum index 9de9a0c..6009d21 100644 --- a/go.sum +++ b/go.sum @@ -115,11 +115,15 @@ github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5 github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o= github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= +github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= +github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY= +github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 4309b7b..c3b924d 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -81,13 +81,12 @@ func (s *Scheduler) runHasActiveTasks(ctx context.Context, runID string) (bool, return activeTasks, nil } -func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error { +func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error { log.Debugf("run: %s", util.Dump(r)) log.Debugf("rc: %s", util.Dump(rc)) // get tasks that can be executed for _, rt := range r.RunTasks { - log.Debugf("rt: %s", util.Dump(rt)) if rt.Skip { continue } @@ -98,14 +97,21 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run, rc *types rct := rc.Tasks[rt.ID] parents := runconfig.GetParents(rc.Tasks, rct) canRun := true + allParentsSkipped := false for _, p := range parents { + allParentsSkipped = true rp := r.RunTasks[p.ID] canRun = rp.Status.IsFinished() && rp.ArchivesFetchFinished() - if rp.Status == types.RunTaskStatusSkipped { - rt.Status = types.RunTaskStatusSkipped + // skip only if all parents are skipped + if rp.Status != types.RunTaskStatusSkipped { + allParentsSkipped = false } } + if allParentsSkipped { + rt.Status = types.RunTaskStatusSkipped + } + if canRun { // now that the task can run set it to waiting approval if needed if rct.NeedsApproval && !rt.WaitingApproval && !rt.Approved { @@ -117,18 +123,13 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run, rc *types return nil } -func (s *Scheduler) getTasksToRun(ctx context.Context, r *types.Run) ([]*types.RunTask, error) { +func getTasksToRun(ctx context.Context, r *types.Run, rc *types.RunConfig) ([]*types.RunTask, error) { log.Debugf("run: %s", util.Dump(r)) - rc, err := store.LTSGetRunConfig(s.wal, r.ID) - if err != nil { - return nil, errors.Wrapf(err, "cannot get run config %q", r.ID) - } log.Debugf("rc: %s", util.Dump(rc)) tasksToRun := []*types.RunTask{} // get tasks that can be executed for _, rt := range r.RunTasks { - log.Debugf("rt: %s", util.Dump(rt)) if rt.Skip { continue } @@ -145,7 +146,7 @@ func (s *Scheduler) getTasksToRun(ctx context.Context, r *types.Run) ([]*types.R } if canRun { - // Run only if approved if needed + // Run only if approved (when needs approval) if !rct.NeedsApproval || (rct.NeedsApproval && rt.Approved) { tasksToRun = append(tasksToRun, rt) } @@ -360,7 +361,12 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run prevPhase := r.Phase prevResult := r.Result - if err := s.advanceRun(ctx, r, rc); err != nil { + hasActiveTasks, err := s.runHasActiveTasks(ctx, r.ID) + if err != nil { + return err + } + + if err := advanceRun(ctx, r, rc, hasActiveTasks); err != nil { return err } @@ -374,13 +380,13 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run } } - r, err := store.AtomicPutRun(ctx, s.e, r, runEvent, nil) + r, err = store.AtomicPutRun(ctx, s.e, r, runEvent, nil) if err != nil { return err } if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { - if err := s.advanceRunTasks(ctx, r, rc); err != nil { + if err := advanceRunTasks(ctx, r, rc); err != nil { return err } r, err := store.AtomicPutRun(ctx, s.e, r, nil, nil) @@ -388,7 +394,7 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run return err } - tasksToRun, err := s.getTasksToRun(ctx, r) + tasksToRun, err := getTasksToRun(ctx, r, rc) if err != nil { return err } @@ -401,7 +407,7 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run // advanceRun updates the run result and phase. It must be the unique function that // should update them. -func (s *Scheduler) advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error { +func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, hasActiveTasks bool) error { log.Debugf("run: %s", util.Dump(r)) // fail run if a task is failed @@ -446,13 +452,9 @@ func (s *Scheduler) advanceRun(ctx context.Context, r *types.Run, rc *types.RunC // if the run has a result defined then we can stop current tasks if r.Result.IsSet() { if !r.Phase.IsFinished() { - hasRunningTasks, err := s.runHasActiveTasks(ctx, r.ID) - if err != nil { - return err - } // if the run has a result defined AND there're no executor tasks scheduled we can mark // the run phase as finished - if !hasRunningTasks { + if !hasActiveTasks { r.ChangePhase(types.RunPhaseFinished) } } diff --git a/internal/services/runservice/scheduler/scheduler_test.go b/internal/services/runservice/scheduler/scheduler_test.go new file mode 100644 index 0000000..8eb42f1 --- /dev/null +++ b/internal/services/runservice/scheduler/scheduler_test.go @@ -0,0 +1,380 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + "sort" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/sorintlab/agola/internal/services/runservice/types" +) + +func TestAdvanceRunTasks(t *testing.T) { + // a global run config for all tests + rc := &types.RunConfig{ + Tasks: map[string]*types.RunConfigTask{ + "task01": &types.RunConfigTask{ + ID: "task01", + Name: "task01", + Depends: []*types.RunConfigTaskDepend{}, + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + "task02": &types.RunConfigTask{ + ID: "task02", + Name: "task02", + Depends: []*types.RunConfigTaskDepend{ + &types.RunConfigTaskDepend{ + TaskID: "task01", + }, + }, + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + "task03": &types.RunConfigTask{ + ID: "task03", + Name: "task03", + Depends: []*types.RunConfigTaskDepend{}, + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + "task04": &types.RunConfigTask{ + ID: "task04", + Name: "task04", + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + "task05": &types.RunConfigTask{ + ID: "task05", + Name: "task05", + Depends: []*types.RunConfigTaskDepend{ + &types.RunConfigTaskDepend{TaskID: "task03"}, + &types.RunConfigTaskDepend{TaskID: "task04"}, + }, + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + }, + } + + // initial run that matched the runconfig, all tasks are not started or skipped + // (if the runconfig task as Skip == true). This must match the status + // generated by command.genRun() + run := &types.Run{ + RunTasks: map[string]*types.RunTask{ + "task01": &types.RunTask{ + ID: "task01", + Status: types.RunTaskStatusNotStarted, + }, + "task02": &types.RunTask{ + ID: "task02", + Status: types.RunTaskStatusNotStarted, + }, + "task03": &types.RunTask{ + ID: "task03", + Status: types.RunTaskStatusNotStarted, + }, + "task04": &types.RunTask{ + ID: "task04", + Status: types.RunTaskStatusNotStarted, + }, + "task05": &types.RunTask{ + ID: "task05", + Status: types.RunTaskStatusNotStarted, + }, + }, + } + + tests := []struct { + name string + rc *types.RunConfig + r *types.Run + out *types.Run + err error + }{ + { + name: "test top level task not started", + rc: rc, + r: run.DeepCopy(), + out: run.DeepCopy(), + }, + { + name: "test task status set to skipped when parent status is skipped", + rc: func() *types.RunConfig { + rc := rc.DeepCopy() + rc.Tasks["task01"].Skip = true + return rc + }(), + r: func() *types.Run { + run := run.DeepCopy() + run.RunTasks["task01"].Status = types.RunTaskStatusSkipped + return run + }(), + out: func() *types.Run { + run := run.DeepCopy() + run.RunTasks["task01"].Status = types.RunTaskStatusSkipped + run.RunTasks["task02"].Status = types.RunTaskStatusSkipped + return run + }(), + }, + { + name: "test task status set to skipped when all parent status is skipped", + rc: func() *types.RunConfig { + rc := rc.DeepCopy() + rc.Tasks["task03"].Skip = true + rc.Tasks["task04"].Skip = true + return rc + }(), + r: func() *types.Run { + run := run.DeepCopy() + run.RunTasks["task03"].Status = types.RunTaskStatusSkipped + run.RunTasks["task04"].Status = types.RunTaskStatusSkipped + return run + }(), + out: func() *types.Run { + run := run.DeepCopy() + run.RunTasks["task03"].Status = types.RunTaskStatusSkipped + run.RunTasks["task04"].Status = types.RunTaskStatusSkipped + run.RunTasks["task05"].Status = types.RunTaskStatusSkipped + return run + }(), + }, + { + name: "test task status not set to skipped when not all parent status is skipped", + rc: func() *types.RunConfig { + rc := rc.DeepCopy() + rc.Tasks["task03"].Skip = true + return rc + }(), + r: func() *types.Run { + run := run.DeepCopy() + run.RunTasks["task03"].Status = types.RunTaskStatusSkipped + run.RunTasks["task04"].Status = types.RunTaskStatusSuccess + return run + }(), + out: func() *types.Run { + run := run.DeepCopy() + run.RunTasks["task03"].Status = types.RunTaskStatusSkipped + run.RunTasks["task04"].Status = types.RunTaskStatusSuccess + return run + }(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + if err := advanceRunTasks(ctx, tt.r, tt.rc); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if diff := cmp.Diff(tt.out, tt.r); diff != "" { + t.Error(diff) + } + }) + } +} + +func TestGetTasksToRun(t *testing.T) { + // a global run config for all tests + rc := &types.RunConfig{ + Tasks: map[string]*types.RunConfigTask{ + "task01": &types.RunConfigTask{ + ID: "task01", + Name: "task01", + Depends: []*types.RunConfigTaskDepend{}, + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + "task02": &types.RunConfigTask{ + ID: "task02", + Name: "task02", + Depends: []*types.RunConfigTaskDepend{ + &types.RunConfigTaskDepend{ + TaskID: "task01", + }, + }, + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + "task03": &types.RunConfigTask{ + ID: "task03", + Name: "task03", + Depends: []*types.RunConfigTaskDepend{}, + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + "task04": &types.RunConfigTask{ + ID: "task04", + Name: "task04", + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + "task05": &types.RunConfigTask{ + ID: "task05", + Name: "task05", + Depends: []*types.RunConfigTaskDepend{ + &types.RunConfigTaskDepend{TaskID: "task03"}, + &types.RunConfigTaskDepend{TaskID: "task04"}, + }, + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Containers: []*types.Container{{Image: "image01"}}, + }, + Environment: map[string]string{}, + Steps: []interface{}{}, + Skip: false, + }, + }, + } + + // initial run that matched the runconfig, all tasks are not started or skipped + // (if the runconfig task as Skip == true). This must match the status + // generated by command.genRun() + run := &types.Run{ + RunTasks: map[string]*types.RunTask{ + "task01": &types.RunTask{ + ID: "task01", + Status: types.RunTaskStatusNotStarted, + }, + "task02": &types.RunTask{ + ID: "task02", + Status: types.RunTaskStatusNotStarted, + }, + "task03": &types.RunTask{ + ID: "task03", + Status: types.RunTaskStatusNotStarted, + }, + "task04": &types.RunTask{ + ID: "task04", + Status: types.RunTaskStatusNotStarted, + }, + "task05": &types.RunTask{ + ID: "task05", + Status: types.RunTaskStatusNotStarted, + }, + }, + } + + tests := []struct { + name string + rc *types.RunConfig + r *types.Run + out []string + err error + }{ + { + name: "test run top level tasks", + rc: rc, + r: run.DeepCopy(), + out: []string{"task01", "task03", "task04"}, + }, + { + name: "test don't run skipped tasks", + rc: func() *types.RunConfig { + rc := rc.DeepCopy() + rc.Tasks["task01"].Skip = true + return rc + }(), + r: func() *types.Run { + run := run.DeepCopy() + run.RunTasks["task01"].Status = types.RunTaskStatusSkipped + run.RunTasks["task02"].Status = types.RunTaskStatusSkipped + return run + }(), + out: []string{"task03", "task04"}, + }, + { + name: "test don't run if needs approval but not approved", + rc: func() *types.RunConfig { + rc := rc.DeepCopy() + rc.Tasks["task01"].NeedsApproval = true + return rc + }(), + r: run.DeepCopy(), + out: []string{"task03", "task04"}, + }, + { + name: "test run if needs approval and approved", + rc: func() *types.RunConfig { + rc := rc.DeepCopy() + rc.Tasks["task01"].NeedsApproval = true + return rc + }(), + r: func() *types.Run { + run := run.DeepCopy() + run.RunTasks["task01"].Approved = true + return run + }(), + out: []string{"task01", "task03", "task04"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + tasks, err := getTasksToRun(ctx, tt.r, tt.rc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + outTasks := []string{} + for _, t := range tasks { + outTasks = append(outTasks, t.ID) + } + sort.Sort(sort.StringSlice(tt.out)) + sort.Sort(sort.StringSlice(outTasks)) + + if diff := cmp.Diff(tt.out, outTasks); diff != "" { + t.Error(diff) + } + }) + } +} diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index 7cd1815..f8ba265 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "github.com/mitchellh/copystructure" "github.com/sorintlab/agola/internal/util" ) @@ -120,6 +121,14 @@ type Run struct { Revision int64 `json:"-"` } +func (r *Run) DeepCopy() *Run { + nr, err := copystructure.Copy(r) + if err != nil { + panic(err) + } + return nr.(*Run) +} + func (r *Run) ChangePhase(phase RunPhase) { r.Phase = phase switch { @@ -298,6 +307,14 @@ type RunConfig struct { Tasks map[string]*RunConfigTask `json:"tasks,omitempty"` } +func (rc *RunConfig) DeepCopy() *RunConfig { + nrc, err := copystructure.Copy(rc) + if err != nil { + panic(err) + } + return nrc.(*RunConfig) +} + type RunConfigTask struct { Level int `json:"level,omitempty"` ID string `json:"id,omitempty"`