diff --git a/internal/config/config.go b/internal/config/config.go index 9b040f7..e4ffad7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -147,7 +147,7 @@ type Value struct { Value string } -type SaveToWorkspaceContent struct { +type SaveContent struct { SourceDir string `yaml:"source_dir"` DestDir string `yaml:"dest_dir"` Paths []string `yaml:"paths"` @@ -155,7 +155,7 @@ type SaveToWorkspaceContent struct { type SaveToWorkspaceStep struct { Step `yaml:",inline"` - Contents []SaveToWorkspaceContent + Contents []*SaveContent `yaml:"contents"` } type RestoreWorkspaceStep struct { @@ -163,6 +163,18 @@ type RestoreWorkspaceStep struct { DestDir string `yaml:"dest_dir"` } +type SaveCacheStep struct { + Step `yaml:",inline"` + Key string `yaml:"key"` + Contents []*SaveContent `yaml:"contents"` +} + +type RestoreCacheStep struct { + Step `yaml:",inline"` + Keys []string `yaml:"keys"` + DestDir string `yaml:"dest_dir"` +} + func (t *Task) UnmarshalYAML(unmarshal func(interface{}) error) error { type task struct { Name string `yaml:"name"` @@ -198,38 +210,54 @@ func (t *Task) UnmarshalYAML(unmarshal func(interface{}) error) error { } switch stepType { case "clone": - var cs CloneStep - cs.Type = stepType - steps[i] = &cs + var s CloneStep + s.Type = stepType + steps[i] = &s case "run": - var rs RunStep + var s RunStep switch stepSpec.(type) { case string: - rs.Command = stepSpec.(string) + s.Command = stepSpec.(string) default: - if err := yaml.Unmarshal(o, &rs); err != nil { + if err := yaml.Unmarshal(o, &s); err != nil { return err } } - rs.Type = stepType - steps[i] = &rs + s.Type = stepType + steps[i] = &s case "save_to_workspace": - var sws SaveToWorkspaceStep - if err := yaml.Unmarshal(o, &sws); err != nil { + var s SaveToWorkspaceStep + if err := yaml.Unmarshal(o, &s); err != nil { return err } - sws.Type = stepType - steps[i] = &sws + s.Type = stepType + steps[i] = &s case "restore_workspace": - var rws RestoreWorkspaceStep - if err := yaml.Unmarshal(o, &rws); err != nil { + var s RestoreWorkspaceStep + if err := yaml.Unmarshal(o, &s); err != nil { return err } - rws.Type = stepType - steps[i] = &rws + s.Type = stepType + steps[i] = &s + + case "save_cache": + var s SaveCacheStep + if err := yaml.Unmarshal(o, &s); err != nil { + return err + } + s.Type = stepType + steps[i] = &s + + case "restore_cache": + var s RestoreCacheStep + if err := yaml.Unmarshal(o, &s); err != nil { + return err + } + s.Type = stepType + steps[i] = &s default: return errors.Errorf("unknown step type: %s", stepType) } @@ -572,6 +600,25 @@ func ParseConfig(configData []byte) (*Config, error) { } } + // set steps defaults + for _, t := range config.Tasks { + for _, s := range t.Steps { + switch step := s.(type) { + // TODO(sgotti) we could use the run step command as step name but when the + // command is very long or multi line it doesn't makes sense and will + // probably be quite unuseful/confusing from an UI point of view + case *SaveCacheStep: + for _, content := range step.Contents { + if len(content.Paths) == 0 { + // default to all files inside the sourceDir + content.Paths = []string{"**"} + } + } + log.Infof("s: %s", util.Dump(s)) + } + } + } + return &config, checkConfig(&config) } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 229ea38..ead5381 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -167,6 +167,10 @@ func TestParseOutput(t *testing.T) { ENV01: ENV01 ENVFROMVARIABLE01: from_variable: variable01 + - save_cache: + key: cache-{{ arch }} + contents: + - source_dir: /go/pkg/mod/cache pipelines: pipeline01: @@ -247,6 +251,11 @@ func TestParseOutput(t *testing.T) { "ENVFROMVARIABLE01": Value{Type: ValueTypeFromVariable, Value: "variable01"}, }, }, + &SaveCacheStep{ + Step: Step{Type: "save_cache"}, + Key: "cache-{{ arch }}", + Contents: []*SaveContent{&SaveContent{SourceDir: "/go/pkg/mod/cache", Paths: []string{"**"}}}, + }, }, }, }, diff --git a/internal/runconfig/runconfig.go b/internal/runconfig/runconfig.go index 2672e02..c89654f 100644 --- a/internal/runconfig/runconfig.go +++ b/internal/runconfig/runconfig.go @@ -131,9 +131,9 @@ fi sws.Type = cs.Type sws.Name = cs.Name - sws.Contents = make([]rstypes.SaveToWorkspaceContent, len(cs.Contents)) + sws.Contents = make([]rstypes.SaveContent, len(cs.Contents)) for i, csc := range cs.Contents { - sc := rstypes.SaveToWorkspaceContent{} + sc := rstypes.SaveContent{} sc.SourceDir = csc.SourceDir sc.DestDir = csc.DestDir sc.Paths = csc.Paths @@ -150,6 +150,33 @@ fi return rws + case *config.SaveCacheStep: + sws := &rstypes.SaveCacheStep{} + + sws.Type = cs.Type + sws.Name = cs.Name + sws.Key = cs.Key + + sws.Contents = make([]rstypes.SaveContent, len(cs.Contents)) + for i, csc := range cs.Contents { + sc := rstypes.SaveContent{} + sc.SourceDir = csc.SourceDir + sc.DestDir = csc.DestDir + sc.Paths = csc.Paths + + sws.Contents[i] = sc + } + return sws + + case *config.RestoreCacheStep: + rws := &rstypes.RestoreCacheStep{} + rws.Name = cs.Name + rws.Type = cs.Type + rws.Keys = cs.Keys + rws.DestDir = cs.DestDir + + return rws + default: panic(fmt.Errorf("unknown config step type: %s", util.Dump(cs))) } diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index e5ada93..24f0bb7 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -200,6 +200,10 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *Run s.Name = "save to workspace" case *rstypes.RestoreWorkspaceStep: s.Name = "restore workspace" + case *rstypes.SaveCacheStep: + s.Name = "save cache" + case *rstypes.RestoreCacheStep: + s.Name = "restore cache" } t.Steps[i] = s diff --git a/internal/services/runservice/executor/executor.go b/internal/services/runservice/executor/executor.go index a5a601e..b78a4cc 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/runservice/executor/executor.go @@ -253,6 +253,42 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor return exitCode, nil } +func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driver.Pod, logf io.Writer, key string) (string, error) { + cmd := append([]string{toolboxContainerPath, "template"}) + + // limit the template answer to max 1MiB + stdout := util.NewLimitedBuffer(1024 * 1024) + + execConfig := &driver.ExecConfig{ + Cmd: cmd, + Env: t.Environment, + WorkingDir: t.WorkingDir, + Stdout: stdout, + Stderr: logf, + } + + ce, err := pod.Exec(ctx, execConfig) + if err != nil { + return "", err + } + + stdin := ce.Stdin() + go func() { + io.WriteString(stdin, key) + stdin.Close() + }() + + exitCode, err := ce.Wait(ctx) + if err != nil { + return "", err + } + if exitCode != 0 { + return "", errors.Errorf("template ended with exit code %d", exitCode) + } + + return stdout.String(), nil +} + func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source io.Reader, pod driver.Pod, logf io.Writer, destDir string, overwrite, removeDestDir bool) error { args := []string{"--destdir", destDir} if overwrite { @@ -328,6 +364,176 @@ func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreW return 0, nil } +func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, t *types.ExecutorTask, pod driver.Pod, logPath string, archivePath string) (int, error) { + cmd := []string{toolboxContainerPath, "archive"} + + if err := os.MkdirAll(filepath.Dir(logPath), 0770); err != nil { + return -1, err + } + logf, err := os.Create(logPath) + if err != nil { + return -1, err + } + defer logf.Close() + + save := false + + // calculate key from template + userKey, err := e.template(ctx, t, pod, logf, s.Key) + if err != nil { + return -1, err + } + fmt.Fprintf(logf, "cache key %q\n", userKey) + + // append cache prefix + key := t.CachePrefix + "-" + userKey + + // check that the cache key doesn't already exists + resp, err := e.runserviceClient.CheckCache(ctx, key, false) + if err != nil { + // ignore 404 errors since they means that the cache key doesn't exists + if resp != nil && resp.StatusCode == http.StatusNotFound { + fmt.Fprintf(logf, "no cache available for key %q. Saving.\n", userKey) + save = true + } else { + // TODO(sgotti) retry before giving up + fmt.Fprintf(logf, "error checking for cache key %q: %v\n", userKey, err) + return -1, err + } + } + if !save { + fmt.Fprintf(logf, "cache for key %q already exists\n", userKey) + return 0, nil + } + + fmt.Fprintf(logf, "archiving cache with key %q\n", userKey) + if err := os.MkdirAll(filepath.Dir(archivePath), 0770); err != nil { + return -1, err + } + archivef, err := os.Create(archivePath) + if err != nil { + return -1, err + } + defer archivef.Close() + + execConfig := &driver.ExecConfig{ + Cmd: cmd, + Env: t.Environment, + WorkingDir: t.WorkingDir, + Stdout: archivef, + Stderr: logf, + } + + ce, err := pod.Exec(ctx, execConfig) + if err != nil { + return -1, err + } + + type ArchiveInfo struct { + SourceDir string + DestDir string + Paths []string + } + type Archive struct { + ArchiveInfos []*ArchiveInfo + OutFile string + } + + a := &Archive{ + OutFile: "", // use stdout + ArchiveInfos: make([]*ArchiveInfo, len(s.Contents)), + } + + for i, c := range s.Contents { + a.ArchiveInfos[i] = &ArchiveInfo{ + SourceDir: c.SourceDir, + DestDir: c.DestDir, + Paths: c.Paths, + } + + } + + stdin := ce.Stdin() + enc := json.NewEncoder(stdin) + + go func() { + enc.Encode(a) + stdin.Close() + }() + + exitCode, err := ce.Wait(ctx) + if err != nil { + return -1, err + } + + if exitCode != 0 { + return exitCode, errors.Errorf("save cache archiving command ended with exit code %d", exitCode) + } + + f, err := os.Open(archivePath) + if err != nil { + return -1, err + } + + // send cache archive to scheduler + if resp, err := e.runserviceClient.PutCache(ctx, key, f); err != nil { + if resp != nil && resp.StatusCode == http.StatusNotModified { + return exitCode, nil + } + return -1, err + } + + return exitCode, nil +} + +func (e *Executor) doRestoreCacheStep(ctx context.Context, s *types.RestoreCacheStep, t *types.ExecutorTask, pod driver.Pod, logPath string) (int, error) { + if err := os.MkdirAll(filepath.Dir(logPath), 0770); err != nil { + return -1, err + } + logf, err := os.Create(logPath) + if err != nil { + return -1, err + } + defer logf.Close() + + fmt.Fprintf(logf, "restoring cache: %s\n", util.Dump(s)) + for _, key := range s.Keys { + // calculate key from template + userKey, err := e.template(ctx, t, pod, logf, key) + if err != nil { + return -1, err + } + fmt.Fprintf(logf, "cache key %q\n", userKey) + + // append cache prefix + key := t.CachePrefix + "-" + userKey + + resp, err := e.runserviceClient.GetCache(ctx, key, true) + if err != nil { + // ignore 404 errors since they means that the cache key doesn't exists + if resp != nil && resp.StatusCode == http.StatusNotFound { + fmt.Fprintf(logf, "no cache available for key %q\n", userKey) + continue + } + // TODO(sgotti) retry before giving up + fmt.Fprintf(logf, "error reading cache: %v\n", err) + return -1, err + } + fmt.Fprintf(logf, "restoring cache with key %q\n", userKey) + cachef := resp.Body + if err := e.unarchive(ctx, t, cachef, pod, logf, s.DestDir, false, false); err != nil { + cachef.Close() + return -1, err + } + cachef.Close() + + // stop here + break + } + + return 0, nil +} + func (e *Executor) executorIDPath() string { return filepath.Join(e.c.DataDir, "id") } @@ -570,6 +776,17 @@ func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTa stepName = s.Name exitCode, err = e.doRestoreWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i)) + case *types.SaveCacheStep: + log.Debugf("save cache step: %s", util.Dump(s)) + stepName = s.Name + archivePath := e.archivePath(et.ID, i) + exitCode, err = e.doSaveCacheStep(ctx, s, et, pod, e.stepLogPath(et.ID, i), archivePath) + + case *types.RestoreCacheStep: + log.Debugf("restore cache step: %s", util.Dump(s)) + stepName = s.Name + exitCode, err = e.doRestoreCacheStep(ctx, s, et, pod, e.stepLogPath(et.ID, i)) + default: return i, errors.Errorf("unknown step type: %s", util.Dump(s)) } diff --git a/internal/services/runservice/scheduler/api/client.go b/internal/services/runservice/scheduler/api/client.go index 7c41581..63a22b4 100644 --- a/internal/services/runservice/scheduler/api/client.go +++ b/internal/services/runservice/scheduler/api/client.go @@ -141,6 +141,26 @@ func (c *Client) GetArchive(ctx context.Context, taskID string, step int) (*http return c.getResponse(ctx, "GET", "/executor/archives", q, nil, nil) } +func (c *Client) CheckCache(ctx context.Context, key string, prefix bool) (*http.Response, error) { + q := url.Values{} + if prefix { + q.Add("prefix", "") + } + return c.getResponse(ctx, "HEAD", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, nil, nil) +} + +func (c *Client) GetCache(ctx context.Context, key string, prefix bool) (*http.Response, error) { + q := url.Values{} + if prefix { + q.Add("prefix", "") + } + return c.getResponse(ctx, "GET", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, nil, nil) +} + +func (c *Client) PutCache(ctx context.Context, key string, r io.Reader) (*http.Response, error) { + return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, nil, r) +} + func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) { q := url.Values{} for _, phase := range phaseFilter { diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/scheduler/api/executor.go index a6ebb40..3e093a4 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/scheduler/api/executor.go @@ -19,6 +19,7 @@ import ( "encoding/json" "io" "net/http" + "net/url" "strconv" "github.com/gorilla/mux" @@ -168,7 +169,7 @@ func NewArchivesHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *Arch } func (h *ArchivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // TODO(sgotti) Check authorized call from scheduler + // TODO(sgotti) Check authorized call from executors taskID := r.URL.Query().Get("taskid") if taskID == "" { @@ -216,6 +217,165 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error return err } +type CacheHandler struct { + log *zap.SugaredLogger + lts *objectstorage.ObjStorage +} + +func NewCacheHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *CacheHandler { + return &CacheHandler{ + log: logger.Sugar(), + lts: lts, + } +} + +func (h *CacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + // TODO(sgotti) Check authorized call from executors + key, err := url.PathUnescape(vars["key"]) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if key == "" { + http.Error(w, "empty cache key", http.StatusBadRequest) + return + } + if len(key) > common.MaxCacheKeyLength { + http.Error(w, "cache key too long", http.StatusBadRequest) + return + } + query := r.URL.Query() + _, prefix := query["prefix"] + + matchedKey, err := matchCache(h.lts, key, prefix) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if matchedKey == "" { + http.Error(w, "", http.StatusNotFound) + return + } + + if r.Method == "HEAD" { + return + } + + w.Header().Set("Cache-Control", "no-cache") + + if err := h.readCache(matchedKey, w); err != nil { + switch err.(type) { + case common.ErrNotExist: + http.Error(w, err.Error(), http.StatusNotFound) + default: + http.Error(w, err.Error(), http.StatusInternalServerError) + } + return + } +} + +func matchCache(lts *objectstorage.ObjStorage, key string, prefix bool) (string, error) { + cachePath := store.LTSCachePath(key) + + if prefix { + doneCh := make(chan struct{}) + defer close(doneCh) + + // get the latest modified object + var lastObject *objectstorage.ObjectInfo + for object := range lts.List(store.LTSCacheDir()+"/"+key, "", false, doneCh) { + if object.Err != nil { + return "", object.Err + } + + if (lastObject == nil) || (lastObject != nil && lastObject.LastModified.Before(object.LastModified)) { + lastObject = &object + } + + } + if lastObject == nil { + return "", nil + + } + return store.LTSCacheKey(lastObject.Path), nil + } + + _, err := lts.Stat(cachePath) + if err == objectstorage.ErrNotExist { + return "", nil + } + if err != nil { + return "", err + } + return key, nil +} + +func (h *CacheHandler) readCache(key string, w io.Writer) error { + cachePath := store.LTSCachePath(key) + f, err := h.lts.ReadObject(cachePath) + if err != nil { + if err == objectstorage.ErrNotExist { + return common.NewErrNotExist(err) + } + return err + } + defer f.Close() + + br := bufio.NewReader(f) + + _, err = io.Copy(w, br) + return err +} + +type CacheCreateHandler struct { + log *zap.SugaredLogger + lts *objectstorage.ObjStorage +} + +func NewCacheCreateHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *CacheCreateHandler { + return &CacheCreateHandler{ + log: logger.Sugar(), + lts: lts, + } +} + +func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + // TODO(sgotti) Check authorized call from executors + key, err := url.PathUnescape(vars["key"]) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if key == "" { + http.Error(w, "empty cache key", http.StatusBadRequest) + return + } + if len(key) > common.MaxCacheKeyLength { + http.Error(w, "cache key too long", http.StatusBadRequest) + return + } + + w.Header().Set("Cache-Control", "no-cache") + + matchedKey, err := matchCache(h.lts, key, false) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if matchedKey != "" { + http.Error(w, "", http.StatusNotModified) + return + } + + cachePath := store.LTSCachePath(key) + if err := h.lts.WriteObject(cachePath, r.Body); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + type ExecutorDeleteHandler struct { log *zap.SugaredLogger ch *command.CommandHandler @@ -230,7 +390,6 @@ func NewExecutorDeleteHandler(logger *zap.Logger, ch *command.CommandHandler) *E func (h *ExecutorDeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - vars := mux.Vars(r) // TODO(sgotti) Check authorized call from executors diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/scheduler/common/common.go index 8ca63dd..5478020 100644 --- a/internal/services/runservice/scheduler/common/common.go +++ b/internal/services/runservice/scheduler/common/common.go @@ -19,6 +19,10 @@ import ( "path" ) +const ( + MaxCacheKeyLength = 200 +) + type ErrNotExist struct { err error } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 201e096..529dff7 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -270,6 +270,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types Shell: rct.Shell, User: rct.User, Steps: rct.Steps, + CachePrefix: store.LTSRootGroup(r.Group), Status: types.ExecutorTaskStatus{ Phase: types.ExecutorTaskPhaseNotStarted, Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)), @@ -1515,6 +1516,8 @@ func (s *Scheduler) Run(ctx context.Context) error { executorTaskHandler := api.NewExecutorTaskHandler(s.e) executorTasksHandler := api.NewExecutorTasksHandler(s.e) archivesHandler := api.NewArchivesHandler(logger, s.lts) + cacheHandler := api.NewCacheHandler(logger, s.lts) + cacheCreateHandler := api.NewCacheCreateHandler(logger, s.lts) // api from clients executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ch) @@ -1530,6 +1533,8 @@ func (s *Scheduler) Run(ctx context.Context) error { router := mux.NewRouter() apirouter := router.PathPrefix("/api/v1alpha").Subrouter() + + // don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST") @@ -1538,6 +1543,9 @@ func (s *Scheduler) Run(ctx context.Context) error { apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET") apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST") apirouter.Handle("/executor/archives", archivesHandler).Methods("GET") + apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD") + apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET") + apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST") apirouter.Handle("/logs", logsHandler).Methods("GET") diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index 9b39a24..39e4f94 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -20,6 +20,7 @@ import ( "fmt" "path" "reflect" + "strings" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" @@ -42,12 +43,12 @@ func LTSSubGroupsAndGroupTypes(group string) []string { } func LTSRootGroup(group string) string { - h := util.PathHierarchy(group) - if len(h)%2 != 1 { - panic(fmt.Errorf("wrong group path %q", group)) + pl := util.PathList(group) + if len(pl) < 2 { + panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group)) } - return h[2] + return pl[1] } func LTSSubGroups(group string) []string { @@ -131,6 +132,19 @@ func LTSRunArchivePath(rtID string, step int) string { return path.Join("workspacearchives", fmt.Sprintf("%s/%d.tar", rtID, step)) } +func LTSCacheDir() string { + return "caches" +} + +func LTSCachePath(key string) string { + return path.Join(LTSCacheDir(), fmt.Sprintf("%s.tar", key)) +} + +func LTSCacheKey(p string) string { + base := path.Base(p) + return strings.TrimSuffix(base, path.Ext(base)) +} + func LTSGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, error) { runConfigPath := common.StorageRunConfigFile(runConfigID) rcf, _, err := wal.ReadObject(runConfigPath, nil) diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index 2559265..a6d17f5 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -357,11 +357,11 @@ const ( ) type RegistryAuth struct { - Type RegistryAuthType `yaml:"type"` + Type RegistryAuthType `json:"type,omitempty"` // default auth - Username string `yaml:"username"` - Password string `yaml:"password"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` } type Runtime struct { @@ -387,30 +387,42 @@ func (rct *RunConfigTask) UnmarshalJSON(b []byte) error { } steps := make([]interface{}, len(st.Steps)) - for i, s := range st.Steps { + for i, step := range st.Steps { var bs Step - if err := json.Unmarshal(s, &bs); err != nil { + if err := json.Unmarshal(step, &bs); err != nil { return err } switch bs.Type { case "run": - var rs RunStep - if err := json.Unmarshal(s, &rs); err != nil { + var s RunStep + if err := json.Unmarshal(step, &s); err != nil { return err } - steps[i] = &rs + steps[i] = &s case "save_to_workspace": - var rs SaveToWorkspaceStep - if err := json.Unmarshal(s, &rs); err != nil { + var s SaveToWorkspaceStep + if err := json.Unmarshal(step, &s); err != nil { return err } - steps[i] = &rs + steps[i] = &s case "restore_workspace": - var rs RestoreWorkspaceStep - if err := json.Unmarshal(s, &rs); err != nil { + var s RestoreWorkspaceStep + if err := json.Unmarshal(step, &s); err != nil { return err } - steps[i] = &rs + steps[i] = &s + case "save_cache": + var s SaveCacheStep + if err := json.Unmarshal(step, &s); err != nil { + return err + } + steps[i] = &s + case "restore_cache": + var s RestoreCacheStep + if err := json.Unmarshal(step, &s); err != nil { + return err + } + steps[i] = &s } } @@ -433,7 +445,7 @@ type RunStep struct { User string `json:"user,omitempty"` } -type SaveToWorkspaceContent struct { +type SaveContent struct { SourceDir string `json:"source_dir,omitempty"` DestDir string `json:"dest_dir,omitempty"` Paths []string `json:"paths,omitempty"` @@ -441,7 +453,7 @@ type SaveToWorkspaceContent struct { type SaveToWorkspaceStep struct { Step - Contents []SaveToWorkspaceContent `json:"contents,omitempty"` + Contents []SaveContent `json:"contents,omitempty"` } type RestoreWorkspaceStep struct { @@ -449,6 +461,18 @@ type RestoreWorkspaceStep struct { DestDir string `json:"dest_dir,omitempty"` } +type SaveCacheStep struct { + Step + Key string `json:"key,omitempty"` + Contents []SaveContent `json:"contents,omitempty"` +} + +type RestoreCacheStep struct { + Step + Keys []string `json:"keys,omitempty"` + DestDir string `json:"dest_dir,omitempty"` +} + type ExecutorTaskPhase string const ( @@ -474,7 +498,7 @@ type ExecutorTask struct { WorkingDir string `json:"working_dir,omitempty"` Shell string `json:"shell,omitempty"` User string `json:"user,omitempty"` - Privileged bool `yaml:"privileged"` + Privileged bool `json:"privileged"` Steps []interface{} `json:"steps,omitempty"` @@ -484,6 +508,10 @@ type ExecutorTask struct { Workspace Workspace `json:"workspace,omitempty"` + // Cache prefix to use when asking for a cache key. To isolate caches between + // groups (projects) + CachePrefix string `json:"cache_prefix,omitempty"` + // Stop is used to signal from the scheduler when the task must be stopped Stop bool `json:"stop,omitempty"` } @@ -546,30 +574,42 @@ func (et *ExecutorTask) UnmarshalJSON(b []byte) error { } steps := make([]interface{}, len(ett.Steps)) - for i, s := range st.Steps { + for i, step := range st.Steps { var bs Step - if err := json.Unmarshal(s, &bs); err != nil { + if err := json.Unmarshal(step, &bs); err != nil { return err } switch bs.Type { case "run": - var rs RunStep - if err := json.Unmarshal(s, &rs); err != nil { + var s RunStep + if err := json.Unmarshal(step, &s); err != nil { return err } - steps[i] = &rs + steps[i] = &s case "save_to_workspace": - var rs SaveToWorkspaceStep - if err := json.Unmarshal(s, &rs); err != nil { + var s SaveToWorkspaceStep + if err := json.Unmarshal(step, &s); err != nil { return err } - steps[i] = &rs + steps[i] = &s case "restore_workspace": - var rs RestoreWorkspaceStep - if err := json.Unmarshal(s, &rs); err != nil { + var s RestoreWorkspaceStep + if err := json.Unmarshal(step, &s); err != nil { return err } - steps[i] = &rs + steps[i] = &s + case "save_cache": + var s SaveCacheStep + if err := json.Unmarshal(step, &s); err != nil { + return err + } + steps[i] = &s + case "restore_cache": + var s RestoreCacheStep + if err := json.Unmarshal(step, &s); err != nil { + return err + } + steps[i] = &s } } diff --git a/internal/util/buffer.go b/internal/util/buffer.go new file mode 100644 index 0000000..2846695 --- /dev/null +++ b/internal/util/buffer.go @@ -0,0 +1,36 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "bytes" + "io" +) + +type LimitedBuffer struct { + *bytes.Buffer + cap int +} + +func (b *LimitedBuffer) Write(p []byte) (n int, err error) { + if len(p)+b.Len() > b.cap { + return 0, io.EOF + } + return b.Buffer.Write(p) +} + +func NewLimitedBuffer(cap int) *LimitedBuffer { + return &LimitedBuffer{Buffer: &bytes.Buffer{}, cap: cap} +}