From 671b89d39105dd5d1fa194fe5ec96aeb7e50b8ef Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 9 Apr 2019 18:11:00 +0200 Subject: [PATCH] runservice: merge RunConfig and RunData * Use just RunConfig * Use StaticEnvironment vs Environment in RunConfig to distinguish between env that won't change at run recreation from env that could change at every recreation * The RunCreate api will just receive the runtasks instead of a runconfig (more right) --- internal/runconfig/runconfig.go | 58 ++++++------- internal/runconfig/runconfig_test.go | 84 ++++++++----------- internal/services/gateway/webhook.go | 14 ++-- .../services/runservice/scheduler/api/api.go | 40 +++++---- .../runservice/scheduler/command/command.go | 82 ++++++++---------- .../runservice/scheduler/common/common.go | 8 -- .../runservice/scheduler/scheduler.go | 17 ++-- .../runservice/scheduler/store/store.go | 32 ------- internal/services/runservice/types/types.go | 40 ++++----- 9 files changed, 152 insertions(+), 223 deletions(-) diff --git a/internal/runconfig/runconfig.go b/internal/runconfig/runconfig.go index 4d64e24..72ee169 100644 --- a/internal/runconfig/runconfig.go +++ b/internal/runconfig/runconfig.go @@ -143,16 +143,12 @@ fi } } -// GenRunConfig generates a run config from a pipeline in the config, expanding all the references to tasks +// GenRunConfigTasks generates a run config tasks from a pipeline in the config, expanding all the references to tasks // this functions assumes that the config is already checked for possible errors (i.e referenced task must exits) -func GenRunConfig(uuid util.UUIDGenerator, c *config.Config, pipelineName string, env, variables map[string]string, branch, tag, ref string) *rstypes.RunConfig { +func GenRunConfigTasks(uuid util.UUIDGenerator, c *config.Config, pipelineName string, variables map[string]string, branch, tag, ref string) map[string]*rstypes.RunConfigTask { cp := c.Pipeline(pipelineName) - rc := &rstypes.RunConfig{ - Name: cp.Name, - Tasks: make(map[string]*rstypes.RunConfigTask), - Environment: env, - } + rcts := map[string]*rstypes.RunConfigTask{} for _, cpe := range cp.Elements { include := types.MatchWhen(cpe.When, branch, tag, ref) @@ -185,11 +181,11 @@ func GenRunConfig(uuid util.UUIDGenerator, c *config.Config, pipelineName string NeedsApproval: cpe.Approval, } - rc.Tasks[t.ID] = t + rcts[t.ID] = t } // populate depends, needs to be done after having created all the tasks so we can resolve their id - for _, rct := range rc.Tasks { + for _, rct := range rcts { cpe := cp.Elements[rct.Name] depends := make([]*rstypes.RunConfigTaskDepend, len(cpe.Depends)) @@ -211,7 +207,7 @@ func GenRunConfig(uuid util.UUIDGenerator, c *config.Config, pipelineName string } } - drct := getRunConfigTaskByName(rc, d.ElementName) + drct := getRunConfigTaskByName(rcts, d.ElementName) depends[id] = &rstypes.RunConfigTaskDepend{ TaskID: drct.ID, Conditions: conditions, @@ -221,11 +217,11 @@ func GenRunConfig(uuid util.UUIDGenerator, c *config.Config, pipelineName string rct.Depends = depends } - return rc + return rcts } -func getRunConfigTaskByName(rc *rstypes.RunConfig, name string) *rstypes.RunConfigTask { - for _, rct := range rc.Tasks { +func getRunConfigTaskByName(rcts map[string]*rstypes.RunConfigTask, name string) *rstypes.RunConfigTask { + for _, rct := range rcts { if rct.Name == name { return rct } @@ -233,17 +229,17 @@ func getRunConfigTaskByName(rc *rstypes.RunConfig, name string) *rstypes.RunConf return nil } -func CheckRunConfig(rc *rstypes.RunConfig) error { +func CheckRunConfigTasks(rcts map[string]*rstypes.RunConfigTask) error { // check circular dependencies cerrs := &util.Errors{} - for _, t := range rc.Tasks { - allParents := GetAllParents(rc, t) + for _, t := range rcts { + allParents := GetAllParents(rcts, t) for _, parent := range allParents { if parent.ID == t.ID { // TODO(sgotti) get the parent that depends on task to report it dep := []string{} for _, parent := range allParents { - pparents := GetParents(rc, parent) + pparents := GetParents(rcts, parent) for _, pparent := range pparents { if pparent.ID == t.ID { dep = append(dep, fmt.Sprintf("%q", parent.Name)) @@ -259,11 +255,11 @@ func CheckRunConfig(rc *rstypes.RunConfig) error { } // check that the task and its parent don't have a common dependency - for _, t := range rc.Tasks { - parents := GetParents(rc, t) + for _, t := range rcts { + parents := GetParents(rcts, t) for _, parent := range parents { - allParents := GetAllParents(rc, t) - allParentParents := GetAllParents(rc, parent) + allParents := GetAllParents(rcts, t) + allParentParents := GetAllParents(rcts, parent) for _, p := range allParents { for _, pp := range allParentParents { if p.ID == pp.ID { @@ -277,22 +273,22 @@ func CheckRunConfig(rc *rstypes.RunConfig) error { return nil } -func GenTasksLevels(rc *rstypes.RunConfig) error { +func GenTasksLevels(rcts map[string]*rstypes.RunConfigTask) error { // reset all task level - for _, t := range rc.Tasks { + for _, t := range rcts { t.Level = -1 } level := 0 for { c := 0 - for _, t := range rc.Tasks { + for _, t := range rcts { // skip tasks with the level already set if t.Level != -1 { continue } - parents := GetParents(rc, t) + parents := GetParents(rcts, t) ok := true for _, p := range parents { // * skip if the parent doesn't have a level yet @@ -314,7 +310,7 @@ func GenTasksLevels(rc *rstypes.RunConfig) error { } level++ } - for _, t := range rc.Tasks { + for _, t := range rcts { if t.Level == -1 { return errors.Errorf("circular dependency detected") } @@ -323,9 +319,9 @@ func GenTasksLevels(rc *rstypes.RunConfig) error { } // GetParents returns direct parents of task. -func GetParents(rc *rstypes.RunConfig, task *rstypes.RunConfigTask) []*rstypes.RunConfigTask { +func GetParents(rcts map[string]*rstypes.RunConfigTask, task *rstypes.RunConfigTask) []*rstypes.RunConfigTask { parents := []*rstypes.RunConfigTask{} - for _, t := range rc.Tasks { + for _, t := range rcts { isParent := false for _, d := range task.Depends { if d.TaskID == t.ID { @@ -342,9 +338,9 @@ func GetParents(rc *rstypes.RunConfig, task *rstypes.RunConfigTask) []*rstypes.R // GetAllParents returns all the parents (both direct and ancestors) of task. // In case of circular dependency it won't loop forever but will also return // task as parent of itself -func GetAllParents(rc *rstypes.RunConfig, task *rstypes.RunConfigTask) []*rstypes.RunConfigTask { +func GetAllParents(rcts map[string]*rstypes.RunConfigTask, task *rstypes.RunConfigTask) []*rstypes.RunConfigTask { pMap := map[string]*rstypes.RunConfigTask{} - nextParents := GetParents(rc, task) + nextParents := GetParents(rcts, task) for len(nextParents) > 0 { parents := nextParents @@ -354,7 +350,7 @@ func GetAllParents(rc *rstypes.RunConfig, task *rstypes.RunConfigTask) []*rstype continue } pMap[parent.ID] = parent - nextParents = append(nextParents, GetParents(rc, parent)...) + nextParents = append(nextParents, GetParents(rcts, parent)...) } } diff --git a/internal/runconfig/runconfig_test.go b/internal/runconfig/runconfig_test.go index e087b91..94b9b6a 100644 --- a/internal/runconfig/runconfig_test.go +++ b/internal/runconfig/runconfig_test.go @@ -205,25 +205,25 @@ func TestGenTasksLevels(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - inRunConfig := &rstypes.RunConfig{Tasks: map[string]*rstypes.RunConfigTask{}} + inRcts := map[string]*rstypes.RunConfigTask{} for _, t := range tt.in { - inRunConfig.Tasks[t.ID] = &rstypes.RunConfigTask{ + inRcts[t.ID] = &rstypes.RunConfigTask{ ID: t.ID, Level: t.Level, Depends: t.Depends, } } - outRunConfig := &rstypes.RunConfig{Tasks: map[string]*rstypes.RunConfigTask{}} + outRcts := map[string]*rstypes.RunConfigTask{} for _, t := range tt.out { - outRunConfig.Tasks[t.ID] = &rstypes.RunConfigTask{ + outRcts[t.ID] = &rstypes.RunConfigTask{ ID: t.ID, Level: t.Level, Depends: t.Depends, } } - if err := GenTasksLevels(inRunConfig); err != nil { + if err := GenTasksLevels(inRcts); err != nil { if err.Error() != tt.err.Error() { t.Fatalf("got error: %v, want error: %v", err, tt.err) } @@ -232,8 +232,8 @@ func TestGenTasksLevels(t *testing.T) { if tt.err != nil { t.Fatalf("got nil error, want error: %v", tt.err) } - if !reflect.DeepEqual(inRunConfig.Tasks, outRunConfig.Tasks) { - t.Fatalf("got %s, expected %s", util.Dump(inRunConfig), util.Dump(outRunConfig)) + if !reflect.DeepEqual(inRcts, outRcts) { + t.Fatalf("got %s, expected %s", util.Dump(inRcts), util.Dump(outRcts)) } }) } @@ -473,9 +473,9 @@ func TestGetAllParents(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - inRunConfig := &rstypes.RunConfig{Tasks: map[string]*rstypes.RunConfigTask{}} + inRcts := map[string]*rstypes.RunConfigTask{} for _, t := range tt.in { - inRunConfig.Tasks[t.ID] = &rstypes.RunConfigTask{ + inRcts[t.ID] = &rstypes.RunConfigTask{ ID: t.ID, Level: t.Level, Depends: t.Depends, @@ -483,8 +483,8 @@ func TestGetAllParents(t *testing.T) { } - for _, task := range inRunConfig.Tasks { - allParents := GetAllParents(inRunConfig, task) + for _, task := range inRcts { + allParents := GetAllParents(inRcts, task) allParentsList := []string{} for _, p := range allParents { @@ -658,9 +658,9 @@ func TestCheckRunConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - inRunConfig := &rstypes.RunConfig{Tasks: map[string]*rstypes.RunConfigTask{}} + inRcts := map[string]*rstypes.RunConfigTask{} for _, t := range tt.in { - inRunConfig.Tasks[t.ID] = &rstypes.RunConfigTask{ + inRcts[t.ID] = &rstypes.RunConfigTask{ Name: fmt.Sprintf("task%s", t.ID), ID: t.ID, Level: t.Level, @@ -669,7 +669,7 @@ func TestCheckRunConfig(t *testing.T) { } - if err := CheckRunConfig(inRunConfig); err != nil { + if err := CheckRunConfigTasks(inRcts); err != nil { if errs, ok := err.(*util.Errors); ok { if !errs.Equal(tt.err) { t.Fatalf("got error: %v, want error: %v", err, tt.err) @@ -692,9 +692,8 @@ func TestGenRunConfig(t *testing.T) { tests := []struct { name string in *config.Config - env map[string]string variables map[string]string - out *rstypes.RunConfig + out map[string]*rstypes.RunConfigTask }{ { name: "test runconfig generation", @@ -779,43 +778,34 @@ func TestGenRunConfig(t *testing.T) { }, }, }, - env: map[string]string{ - "ENV01": "ENVVALUE01", - }, variables: map[string]string{ "variable01": "VARVALUE01", }, - out: &rstypes.RunConfig{ - Name: "pipeline01", - Environment: map[string]string{ - "ENV01": "ENVVALUE01", - }, - Tasks: map[string]*rstypes.RunConfigTask{ - uuid.New("element01").String(): &rstypes.RunConfigTask{ - ID: uuid.New("element01").String(), - Name: "element01", Depends: []*rstypes.RunConfigTaskDepend{}, - Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"), - Containers: []*rstypes.Container{ - { - Image: "image01", - Environment: map[string]string{ - "ENV01": "ENV01", - "ENVFROMVARIABLE01": "VARVALUE01", - }, + out: map[string]*rstypes.RunConfigTask{ + uuid.New("element01").String(): &rstypes.RunConfigTask{ + ID: uuid.New("element01").String(), + Name: "element01", Depends: []*rstypes.RunConfigTaskDepend{}, + Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"), + Containers: []*rstypes.Container{ + { + Image: "image01", + Environment: map[string]string{ + "ENV01": "ENV01", + "ENVFROMVARIABLE01": "VARVALUE01", }, }, }, - Environment: map[string]string{ - "ENV01": "ENV01", - "ENVFROMVARIABLE01": "VARVALUE01", - }, - Steps: []interface{}{ - &rstypes.RunStep{Step: rstypes.Step{Type: "run", Name: "command01"}, Command: "command01", Environment: map[string]string{}}, - &rstypes.RunStep{Step: rstypes.Step{Type: "run", Name: "name different than command"}, Command: "command02", Environment: map[string]string{}}, - &rstypes.RunStep{Step: rstypes.Step{Type: "run", Name: "command03"}, Command: "command03", Environment: map[string]string{"ENV01": "ENV01", "ENVFROMVARIABLE01": "VARVALUE01"}}, - }, - Skip: true, }, + Environment: map[string]string{ + "ENV01": "ENV01", + "ENVFROMVARIABLE01": "VARVALUE01", + }, + Steps: []interface{}{ + &rstypes.RunStep{Step: rstypes.Step{Type: "run", Name: "command01"}, Command: "command01", Environment: map[string]string{}}, + &rstypes.RunStep{Step: rstypes.Step{Type: "run", Name: "name different than command"}, Command: "command02", Environment: map[string]string{}}, + &rstypes.RunStep{Step: rstypes.Step{Type: "run", Name: "command03"}, Command: "command03", Environment: map[string]string{"ENV01": "ENV01", "ENVFROMVARIABLE01": "VARVALUE01"}}, + }, + Skip: true, }, }, }, @@ -823,7 +813,7 @@ func TestGenRunConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - out := GenRunConfig(uuid, tt.in, "pipeline01", tt.env, tt.variables, "", "", "") + out := GenRunConfigTasks(uuid, tt.in, "pipeline01", tt.variables, "", "", "") //if err != nil { // t.Fatalf("unexpected error: %v", err) diff --git a/internal/services/gateway/webhook.go b/internal/services/gateway/webhook.go index 6c997c6..4c20590 100644 --- a/internal/services/gateway/webhook.go +++ b/internal/services/gateway/webhook.go @@ -338,7 +338,7 @@ func (h *webhooksHandler) handleWebhook(r *http.Request) (int, string, error) { return 0, "", nil } -func (h *webhooksHandler) createRuns(ctx context.Context, configData []byte, group string, annotations, env, variables map[string]string, webhookData *types.WebhookData) error { +func (h *webhooksHandler) createRuns(ctx context.Context, configData []byte, group string, annotations, staticEnv, variables map[string]string, webhookData *types.WebhookData) error { config, err := config.ParseConfig([]byte(configData)) if err != nil { return errors.Wrapf(err, "failed to parse config") @@ -347,14 +347,16 @@ func (h *webhooksHandler) createRuns(ctx context.Context, configData []byte, gro //h.log.Debugf("pipeline: %s", createRunOpts.PipelineName) for _, pipeline := range config.Pipelines { - rc := runconfig.GenRunConfig(util.DefaultUUIDGenerator{}, config, pipeline.Name, env, variables, webhookData.Branch, webhookData.Tag, webhookData.Ref) + rcts := runconfig.GenRunConfigTasks(util.DefaultUUIDGenerator{}, config, pipeline.Name, variables, webhookData.Branch, webhookData.Tag, webhookData.Ref) - h.log.Debugf("rc: %s", util.Dump(rc)) + h.log.Debugf("rcts: %s", util.Dump(rcts)) h.log.Infof("group: %s", group) createRunReq := &rsapi.RunCreateRequest{ - RunConfig: rc, - Group: group, - Annotations: annotations, + RunConfigTasks: rcts, + Group: group, + Name: pipeline.Name, + StaticEnvironment: staticEnv, + Annotations: annotations, } if _, err := h.runserviceClient.CreateRun(ctx, createRunReq); err != nil { diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 24b6221..faed497 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -480,19 +480,22 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type RunCreateRequest struct { - // new run - RunConfig *types.RunConfig `json:"run_config"` + // new run fields + RunConfigTasks map[string]*types.RunConfigTask `json:"run_config_tasks"` + Name string `json:"name"` + Group string `json:"group"` + StaticEnvironment map[string]string `json:"static_environment"` - // existing run - RunID string `json:"run_id"` - RunConfigID string `json:"run_config_id"` - FromStart bool `json:"from_start"` - ResetTasks []string `json:"reset_tasks"` + // existing run fields + RunID string `json:"run_id"` + FromStart bool `json:"from_start"` + ResetTasks []string `json:"reset_tasks"` - Group string `json:"group"` - Environment map[string]string `json:"environment"` - Annotations map[string]string `json:"annotations"` - ChangeGroupsUpdateToken string `json:"changeup_update_tokens"` + // common fields + Environment map[string]string `json:"environment"` + Annotations map[string]string `json:"annotations"` + + ChangeGroupsUpdateToken string `json:"changeup_update_tokens"` } type RunCreateHandler struct { @@ -518,12 +521,15 @@ func (h *RunCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } creq := &command.RunCreateRequest{ - RunConfig: req.RunConfig, - RunID: req.RunID, - RunConfigID: req.RunConfigID, - FromStart: req.FromStart, - ResetTasks: req.ResetTasks, - Group: req.Group, + RunConfigTasks: req.RunConfigTasks, + Name: req.Name, + Group: req.Group, + StaticEnvironment: req.StaticEnvironment, + + RunID: req.RunID, + FromStart: req.FromStart, + ResetTasks: req.ResetTasks, + Environment: req.Environment, Annotations: req.Annotations, ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/command/command.go index fd0f596..d3402ea 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/command/command.go @@ -117,14 +117,20 @@ func (s *CommandHandler) StopRun(ctx context.Context, req *RunStopRequest) error } type RunCreateRequest struct { - RunConfig *types.RunConfig - RunID string - RunConfigID string - FromStart bool - ResetTasks []string - Group string - Environment map[string]string - Annotations map[string]string + RunConfigTasks map[string]*types.RunConfigTask + Name string + Group string + StaticEnvironment map[string]string + + // existing run fields + RunID string + FromStart bool + ResetTasks []string + + // common fields + Environment map[string]string + Annotations map[string]string + ChangeGroupsUpdateToken string } @@ -148,8 +154,7 @@ func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) ( } func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { - rc := req.RunConfig - var run *types.Run + rcts := req.RunConfigTasks if req.Group == "" { return nil, util.NewErrBadRequest(errors.Errorf("run group is empty")) @@ -158,53 +163,50 @@ func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*ty return nil, util.NewErrBadRequest(errors.Errorf("run group %q must be an absolute path", req.Group)) } - // generate a new run sequence that will be the same for the run, runconfig and rundata + // generate a new run sequence that will be the same for the run and runconfig seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey) if err != nil { return nil, err } id := seq.String() - if err := runconfig.CheckRunConfig(rc); err != nil { + if err := runconfig.CheckRunConfigTasks(rcts); err != nil { return nil, util.NewErrBadRequest(err) } - // set the run config ID - rc.ID = id // generate tasks levels - if err := runconfig.GenTasksLevels(rc); err != nil { + if err := runconfig.GenTasksLevels(rcts); err != nil { return nil, util.NewErrBadRequest(err) } - rd := &types.RunData{ - ID: id, - Group: req.Group, - Environment: req.Environment, - Annotations: req.Annotations, + rc := &types.RunConfig{ + ID: id, + Name: req.Name, + Group: req.Group, + Tasks: rcts, + StaticEnvironment: req.StaticEnvironment, + Environment: req.Environment, + Annotations: req.Annotations, } - run, err = s.genRun(ctx, rc, rd) - if err != nil { - return nil, util.NewErrBadRequest(err) - } + run := s.genRun(ctx, rc) s.log.Debugf("created run: %s", util.Dump(run)) return &types.RunBundle{ Run: run, Rc: rc, - Rd: rd, }, nil } func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { - // generate a new run sequence that will be the same for the run, runconfig and rundata + // generate a new run sequence that will be the same for the run and runconfig seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey) if err != nil { return nil, err } id := seq.String() - // fetch the existing runconfig, rundata and run + // fetch the existing runconfig and run s.log.Infof("creating run from existing run") rc, err := store.LTSGetRunConfig(s.wal, req.RunID) if err != nil { @@ -212,13 +214,8 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) } // update the run config ID rc.ID = id - - rd, err := store.LTSGetRunData(s.wal, req.RunID) - if err != nil { - return nil, util.NewErrBadRequest(errors.Wrapf(err, "rundata %q doens't exist", req.RunID)) - } - // update the run data ID - rd.ID = id + // update the run config Environment + rc.Environment = req.Environment run, err := store.GetRunEtcdOrLTS(ctx, s.e, s.wal, req.RunID) if err != nil { @@ -288,14 +285,12 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) return &types.RunBundle{ Run: run, Rc: rc, - Rd: rd, }, nil } func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcgt *types.ChangeGroupsUpdateToken) error { run := rb.Run rc := rb.Rc - rd := rb.Rd c, cgt, err := s.getRunCounter(run.Group) s.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt)) @@ -321,13 +316,6 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg } actions = append(actions, rca) - // persist run data - rda, err := store.LTSSaveRunDataAction(rd) - if err != nil { - return err - } - actions = append(actions, rda) - if _, err = s.wal.WriteWal(ctx, actions, cgt); err != nil { return err } @@ -374,12 +362,12 @@ func (s *CommandHandler) genRunTask(ctx context.Context, rct *types.RunConfigTas return rt } -func (s *CommandHandler) genRun(ctx context.Context, rc *types.RunConfig, rd *types.RunData) (*types.Run, error) { +func (s *CommandHandler) genRun(ctx context.Context, rc *types.RunConfig) *types.Run { r := &types.Run{ ID: rc.ID, Name: rc.Name, - Group: rd.Group, - Annotations: rd.Annotations, + Group: rc.Group, + Annotations: rc.Annotations, Phase: types.RunPhaseQueued, Result: types.RunResultUnknown, RunTasks: make(map[string]*types.RunTask), @@ -391,7 +379,7 @@ func (s *CommandHandler) genRun(ctx context.Context, rc *types.RunConfig, rd *ty r.RunTasks[rt.ID] = rt } - return r, nil + return r } type RunTaskApproveRequest struct { diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/scheduler/common/common.go index 24a6003..8ca63dd 100644 --- a/internal/services/runservice/scheduler/common/common.go +++ b/internal/services/runservice/scheduler/common/common.go @@ -55,7 +55,6 @@ const ( var ( StorageDataDir = "" StorageRunsDir = path.Join(StorageDataDir, "runs") - StorageRunsDataDir = path.Join(StorageDataDir, "runsdata") StorageRunsConfigDir = path.Join(StorageDataDir, "runsconfig") StorageRunsIndexesDir = path.Join(StorageDataDir, "runsindexes") StorageCountersDir = path.Join(StorageDataDir, "counters") @@ -69,10 +68,6 @@ func StorageRunFile(runID string) string { return path.Join(StorageRunsDir, runID) } -func StorageRunDataFile(runID string) string { - return path.Join(StorageRunsDataDir, runID) -} - func StorageRunConfigFile(runID string) string { return path.Join(StorageRunsConfigDir, runID) } @@ -85,7 +80,6 @@ type DataType string const ( DataTypeRun DataType = "run" - DataTypeRunData DataType = "rundata" DataTypeRunConfig DataType = "runconfig" DataTypeRunCounter DataType = "runcounter" ) @@ -94,8 +88,6 @@ func DataToPathFunc(dataType string, id string) string { switch DataType(dataType) { case DataTypeRun: return StorageRunFile(id) - case DataTypeRunData: - return StorageRunDataFile(id) case DataTypeRunConfig: return StorageRunConfigFile(id) case DataTypeRunCounter: diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index e0c2c13..018b07e 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -88,11 +88,6 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error { return errors.Wrapf(err, "cannot get run config %q", r.ID) } log.Debugf("rc: %s", util.Dump(rc)) - rd, err := store.LTSGetRunData(s.wal, r.ID) - if err != nil { - return errors.Wrapf(err, "cannot get run data %q", r.ID) - } - log.Debugf("rd: %s", util.Dump(rd)) tasksToRun := []*types.RunTask{} // get tasks that can be executed @@ -106,7 +101,7 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error { } rct := rc.Tasks[rt.ID] - parents := runconfig.GetParents(rc, rct) + parents := runconfig.GetParents(rc.Tasks, rct) canRun := true for _, p := range parents { rp := r.RunTasks[p.ID] @@ -136,7 +131,7 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error { log.Debugf("tasksToRun: %s", util.Dump(tasksToRun)) for _, rt := range tasksToRun { - et, err := s.genExecutorTask(ctx, r, rt, rc, rd) + et, err := s.genExecutorTask(ctx, r, rt, rc) if err != nil { return err } @@ -178,7 +173,7 @@ func (s *Scheduler) chooseExecutor(ctx context.Context) (*types.Executor, error) return nil, nil } -func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, rd *types.RunData) (*types.ExecutorTask, error) { +func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig) (*types.ExecutorTask, error) { executor, err := s.chooseExecutor(ctx) if err != nil { return nil, err @@ -193,9 +188,9 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types if rct.Environment != nil { environment = rct.Environment } + mergeEnv(environment, rc.StaticEnvironment) + // run config Environment variables ovverride every other environment variable mergeEnv(environment, rc.Environment) - // run data Environment variables ovverride every other environment variable - mergeEnv(environment, rd.Environment) et := &types.ExecutorTask{ // The executorTask ID must be the same as the runTask ID so we can detect if @@ -225,7 +220,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types // calculate workspace layers ws := make(types.Workspace, rct.Level+1) - rctAllParents := runconfig.GetAllParents(rc, rct) + rctAllParents := runconfig.GetAllParents(rc.Tasks, rct) log.Debugf("rctAllParents: %s", util.Dump(rctAllParents)) for _, rctParent := range rctAllParents { log.Debugf("rctParent: %s", util.Dump(rctParent)) diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index 798aafe..add2a6c 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -163,38 +163,6 @@ func LTSSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) { return action, nil } -func LTSGetRunData(wal *wal.WalManager, runDataID string) (*types.RunData, error) { - runDataPath := common.StorageRunDataFile(runDataID) - rdf, _, err := wal.ReadObject(runDataPath, nil) - if err != nil { - return nil, err - } - defer rdf.Close() - d := json.NewDecoder(rdf) - var rd *types.RunData - if err := d.Decode(&rd); err != nil { - return nil, err - } - - return rd, nil -} - -func LTSSaveRunDataAction(rd *types.RunData) (*wal.Action, error) { - rdj, err := json.Marshal(rd) - if err != nil { - return nil, err - } - - action := &wal.Action{ - ActionType: wal.ActionTypePut, - DataType: string(common.DataTypeRunData), - ID: rd.ID, - Data: rdj, - } - - return action, nil -} - func LTSGetRun(wal *wal.WalManager, runID string) (*types.Run, error) { runPath := common.StorageRunFile(runID) rf, _, err := wal.ReadObject(runPath, nil) diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index 6d1d4b2..1be0578 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -33,7 +33,6 @@ const ( type RunBundle struct { Run *Run Rc *RunConfig - Rd *RunData } type RunCounter struct { @@ -251,13 +250,15 @@ type RunTaskStep struct { EndTime *time.Time `json:"end_time,omitempty"` } -// RunData +// RunConfig -// RunData is the data for a RUN. It contains everything that isn't a state -// (it's contained in a Run) and that may use a lot of space. It lives in the -// storage. There is a RunData for every Run. -type RunData struct { - ID string `json:"id,omitempty"` +// RunConfig is the run configuration. +// It contains everything that isn't a state (that is contained in a Run) and +// that may use a lot of space. It lives in the storage. There is a RunConfig +// for every Run. +type RunConfig struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` // Group is the run group of the run. Every run is assigned to a specific group // i.e. project/$projectid/$branch @@ -266,27 +267,18 @@ type RunData struct { // also needed to fetch them when they aren't indexed in the readdb. Group string `json:"group,omitempty"` - // Environment contains all environment variables that are different between runs also if using the same RunConfig - // (like secrets that may change or user provided enviroment specific to this run) - Environment map[string]string `json:"environment,omitempty"` - // Annotations contain custom run properties - // Note: Annotations are currently both saved in a Run and in RunData to easily return them without loading RunData from the lts + // Note: Annotations are currently both saved in a Run and in RunConfig to + // easily return them without loading RunConfig from the lts Annotations map[string]string `json:"annotations,omitempty"` -} -// RunConfig - -// RunConfig is the run configuration. It lives in the storage. It can be -// copied (i.e when we create a new run from an previous run). -// It could also be shared but to simplify the run delete logic we will just -// copy it when creating a new run as a modified previous run. -type RunConfig struct { - ID string `json:"id,omitempty"` - Name string `json:"name,omitempty"` - - // Environment contains all environment variables that won't change when + // StaticEnvironment contains all environment variables that won't change when // generating a new run (like COMMIT_SHA, BRANCH, REPOSITORY_URL etc...) + StaticEnvironment map[string]string `json:"static_environment,omitempty"` + + // Environment contains all environment variables that are different between + // runs recreations (like secrets that may change or user provided enviroment + // specific to this run) Environment map[string]string `json:"environment,omitempty"` Tasks map[string]*RunConfigTask `json:"tasks,omitempty"`