From f09602cdc370d42d6febee14da8b4e1c18158c0b Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 8 Mar 2019 10:02:37 +0100 Subject: [PATCH] *: implement run stop --- internal/services/gateway/api/run.go | 16 +++++++++ .../services/runservice/scheduler/api/api.go | 11 +++++- .../runservice/scheduler/command/command.go | 34 +++++++++++++++++++ .../runservice/scheduler/scheduler.go | 4 +++ internal/services/runservice/types/types.go | 5 +-- 5 files changed, 67 insertions(+), 3 deletions(-) diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index e13a791..ecfc301 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -336,6 +336,7 @@ type RunActionType string const ( RunActionTypeRestart RunActionType = "restart" + RunActionTypeStop RunActionType = "stop" ) type RunActionsRequest struct { @@ -382,6 +383,21 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } + + case RunActionTypeStop: + req := &rsapi.RunActionsRequest{ + ActionType: rsapi.RunActionTypeStop, + } + + resp, err := h.runserviceClient.RunActions(ctx, runID, req) + if err != nil { + if resp != nil && resp.StatusCode == http.StatusNotFound { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } } diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 9bbcd6e..908435d 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -484,6 +484,7 @@ type RunActionType string const ( RunActionTypeChangePhase RunActionType = "changephase" + RunActionTypeStop RunActionType = "stop" ) type RunActionsRequest struct { @@ -511,7 +512,6 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) runID := vars["runid"] - // TODO(sgotti) Check authorized call from client var req RunActionsRequest d := json.NewDecoder(r.Body) if err := d.Decode(&req); err != nil { @@ -530,6 +530,15 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } + case RunActionTypeStop: + creq := &command.RunStopRequest{ + RunID: runID, + ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, + } + if err := h.ch.StopRun(ctx, creq); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } default: http.Error(w, "", http.StatusBadRequest) return diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/command/command.go index a50a847..54bb9f8 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/command/command.go @@ -72,6 +72,39 @@ func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhase return errors.Errorf("run %s is not queued but in %q phase", r.ID, r.Phase) } r.ChangePhase(types.RunPhaseRunning) + case types.RunPhaseFinished: + if r.Phase != types.RunPhaseRunning { + return errors.Errorf("run %s is not running but in %q phase", r.ID, r.Phase) + } + r.Stop = true + } + + _, err = store.AtomicPutRun(ctx, s.e, r, "", cgt) + return err +} + +type RunStopRequest struct { + RunID string + ChangeGroupsUpdateToken string +} + +func (s *CommandHandler) StopRun(ctx context.Context, req *RunStopRequest) error { + cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken) + if err != nil { + return err + } + + r, _, err := store.GetRun(ctx, s.e, req.RunID) + if err != nil { + return err + } + + if r.Phase != types.RunPhaseRunning { + return errors.Errorf("run %s is not running but in %q phase", r.ID, r.Phase) + } + if !r.Result.IsSet() { + // stop only if the result is not setted yet + r.Stop = true } _, err = store.AtomicPutRun(ctx, s.e, r, "", cgt) @@ -191,6 +224,7 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) run.Phase = types.RunPhaseQueued run.Result = types.RunResultUnknown run.Archived = false + run.Stop = false // TODO(sgotti) handle reset tasks // currently we only restart a run resetting al failed tasks diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 5ac8cdd..13bc148 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -352,6 +352,10 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error { return nil } + if r.Stop { + r.Result = types.RunResultStopped + } + if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { return err } diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index 8a3d21e..07c823b 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -42,8 +42,6 @@ const ( RunPhaseCancelled RunPhase = "cancelled" RunPhaseRunning RunPhase = "running" RunPhaseFinished RunPhase = "finished" - //RunPhaseSuccess RunPhase = "success" - //RunPhaseFailed RunPhase = "failed" ) type RunResult string @@ -99,6 +97,9 @@ type Run struct { // Result of a Run. Result RunResult `json:"result,omitempty"` + // Stop is used to signal from the scheduler when the run must be stopped + Stop bool `json:"stop,omitempty"` + RunTasks map[string]*RunTask `json:"run_tasks,omitempty"` EnqueueTime *time.Time `json:"enqueue_time,omitempty"` StartTime *time.Time `json:"start_time,omitempty"`