diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index f230436..a8efad9 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -97,12 +97,15 @@ func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string) return len(activeTasks) > 0, nil } -func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error { - log.Debugf("run: %s", util.Dump(r)) +func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig) (*types.Run, error) { + log.Debugf("run: %s", util.Dump(curRun)) log.Debugf("rc: %s", util.Dump(rc)) + // take a deepcopy of r so we do logic only on fixed status and not affeccted by current changes (due to random map iteration) + newRun := curRun.DeepCopy() + // get tasks that can be executed - for _, rt := range r.Tasks { + for _, rt := range newRun.Tasks { if rt.Skip { continue } @@ -114,7 +117,8 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err parents := runconfig.GetParents(rc.Tasks, rct) finishedParents := 0 for _, p := range parents { - rp := r.Tasks[p.ID] + // use current run status to not be affected by previous changes to to random map iteration + rp := curRun.Tasks[p.ID] if rp.Status.IsFinished() && rp.ArchivesFetchFinished() { finishedParents++ } @@ -127,7 +131,7 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err if allParentsFinished { for _, p := range parents { matched := false - rp := r.Tasks[p.ID] + rp := curRun.Tasks[p.ID] conds := runconfig.GetParentDependConditions(rct, p) for _, cond := range conds { switch cond { @@ -164,7 +168,7 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err } } - return nil + return newRun, nil } func getTasksToRun(ctx context.Context, r *types.Run, rc *types.RunConfig) ([]*types.RunTask, error) { @@ -463,10 +467,11 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run // advance tasks if r.Phase == types.RunPhaseRunning { - if err := advanceRunTasks(ctx, r, rc); err != nil { + r, err := advanceRunTasks(ctx, r, rc) + if err != nil { return err } - r, err := store.AtomicPutRun(ctx, s.e, r, nil, nil) + r, err = store.AtomicPutRun(ctx, s.e, r, nil, nil) if err != nil { return err } diff --git a/internal/services/runservice/scheduler/scheduler_test.go b/internal/services/runservice/scheduler/scheduler_test.go index 0ba108d..edd48b1 100644 --- a/internal/services/runservice/scheduler/scheduler_test.go +++ b/internal/services/runservice/scheduler/scheduler_test.go @@ -306,10 +306,11 @@ func TestAdvanceRunTasks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - if err := advanceRunTasks(ctx, tt.r, tt.rc); err != nil { + r, err := advanceRunTasks(ctx, tt.r, tt.rc) + if err != nil { t.Fatalf("unexpected error: %v", err) } - if diff := cmp.Diff(tt.out, tt.r); diff != "" { + if diff := cmp.Diff(tt.out, r); diff != "" { t.Error(diff) } })