From 3ac018e6e5c203bf60635f356420bfe1d160742a Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 27 Feb 2020 14:59:04 +0100 Subject: [PATCH] runservice: use all scheduled tasks in scheduleRun rename activeExecutorTasks to scheduledExecutorTasks and don't filter out finished tasks. In some logic we need all the scheduled tasks and not only the not finished ones. --- internal/services/runservice/scheduler.go | 43 ++++++------------- .../services/runservice/scheduler_test.go | 16 +++---- 2 files changed, 22 insertions(+), 37 deletions(-) diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 7161407..9b7f356 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -44,25 +44,6 @@ const ( defaultExecutorNotAliveInterval = 60 * time.Second ) -func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { - // the real source of active tasks is the number of executor tasks in etcd - // we can't rely on RunTask.Status since it's only updated when receiveing - // updated from the executor so it could be in a NotStarted state but have an - // executor tasks scheduled and running - ets, err := store.GetExecutorTasksForRun(ctx, s.e, runID) - if err != nil { - return nil, err - } - activeTasks := []*types.ExecutorTask{} - for _, et := range ets { - if !et.Status.Phase.IsFinished() { - activeTasks = append(activeTasks, et) - } - } - - return activeTasks, nil -} - func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r *types.Run, rc *types.RunConfig) bool { rct := rc.Tasks[rt.ID] parents := runconfig.GetParents(rc.Tasks, rct) @@ -96,7 +77,7 @@ func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r return len(parents) == matchedNum } -func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) (*types.Run, error) { +func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) (*types.Run, error) { log.Debugf("run: %s", util.Dump(curRun)) log.Debugf("rc: %s", util.Dump(rc)) @@ -107,7 +88,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig // if the run is set to stop, skip all not running tasks for _, rt := range newRun.Tasks { isScheduled := false - for _, et := range activeExecutorTasks { + for _, et := range scheduledExecutorTasks { if rt.ID == et.ID { isScheduled = true } @@ -139,7 +120,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig // cancel task if the run has a result set and is not yet scheduled if curRun.Result.IsSet() { isScheduled := false - for _, et := range activeExecutorTasks { + for _, et := range scheduledExecutorTasks { if rt.ID == et.ID { isScheduled = true } @@ -480,12 +461,16 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru prevPhase := r.Phase prevResult := r.Result - activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID) + // the real source of active tasks is the number of executor tasks in etcd + // we can't rely on RunTask.Status since it's only updated when receiveing + // updated from the executor so it could be in a NotStarted state but have an + // executor tasks scheduled and running + scheduledExecutorTasks, err := store.GetExecutorTasksForRun(ctx, s.e, r.ID) if err != nil { return err } - if err := advanceRun(ctx, r, rc, activeExecutorTasks); err != nil { + if err := advanceRun(ctx, r, rc, scheduledExecutorTasks); err != nil { return err } @@ -506,7 +491,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru // if the run is set to stop, stop all active tasks if r.Stop { - for _, et := range activeExecutorTasks { + for _, et := range scheduledExecutorTasks { et.Spec.Stop = true if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { return err @@ -519,7 +504,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru // advance tasks if r.Phase == types.RunPhaseRunning { - r, err := advanceRunTasks(ctx, r, rc, activeExecutorTasks) + r, err := advanceRunTasks(ctx, r, rc, scheduledExecutorTasks) if err != nil { return err } @@ -541,9 +526,9 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru // advanceRun updates the run result and phase. It must be the unique function that // should update them. -func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) error { +func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) error { log.Debugf("run: %s", util.Dump(r)) - hasActiveTasks := len(activeExecutorTasks) > 0 + hasScheduledTasks := len(scheduledExecutorTasks) > 0 // fail run if a task is failed if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { @@ -595,7 +580,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx } if finished && !r.Phase.IsFinished() { - if !hasActiveTasks { + if !hasScheduledTasks { r.ChangePhase(types.RunPhaseFinished) } } diff --git a/internal/services/runservice/scheduler_test.go b/internal/services/runservice/scheduler_test.go index 35a0c12..cf15b3b 100644 --- a/internal/services/runservice/scheduler_test.go +++ b/internal/services/runservice/scheduler_test.go @@ -125,11 +125,11 @@ func TestAdvanceRunTasks(t *testing.T) { } tests := []struct { - name string - rc *types.RunConfig - r *types.Run - activeExecutorTasks []*types.ExecutorTask - out *types.Run + name string + rc *types.RunConfig + r *types.Run + scheduledExecutorTasks []*types.ExecutorTask + out *types.Run }{ { name: "test top level task not started", @@ -343,7 +343,7 @@ func TestAdvanceRunTasks(t *testing.T) { run.Tasks["task04"].Status = types.RunTaskStatusSuccess return run }(), - activeExecutorTasks: []*types.ExecutorTask{ + scheduledExecutorTasks: []*types.ExecutorTask{ &types.ExecutorTask{ID: "task01"}, }, out: func() *types.Run { @@ -371,7 +371,7 @@ func TestAdvanceRunTasks(t *testing.T) { run.Stop = true return run }(), - activeExecutorTasks: []*types.ExecutorTask{ + scheduledExecutorTasks: []*types.ExecutorTask{ &types.ExecutorTask{ID: "task01"}, }, out: func() *types.Run { @@ -390,7 +390,7 @@ func TestAdvanceRunTasks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.activeExecutorTasks) + r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.scheduledExecutorTasks) if err != nil { t.Fatalf("unexpected error: %v", err) }