diff --git a/internal/services/gateway/action/run.go b/internal/services/gateway/action/run.go index 698fa75..2f674d9 100644 --- a/internal/services/gateway/action/run.go +++ b/internal/services/gateway/action/run.go @@ -82,6 +82,7 @@ func (h *ActionHandler) GetRun(ctx context.Context, runID string) (*rsapi.RunRes type GetRunsRequest struct { PhaseFilter []string + ResultFilter []string Group string LastRun bool ChangeGroups []string @@ -100,7 +101,7 @@ func (h *ActionHandler) GetRuns(ctx context.Context, req *GetRunsRequest) (*rsap } groups := []string{req.Group} - runsResp, resp, err := h.runserviceClient.GetRuns(ctx, req.PhaseFilter, groups, req.LastRun, req.ChangeGroups, req.StartRunID, req.Limit, req.Asc) + runsResp, resp, err := h.runserviceClient.GetRuns(ctx, req.PhaseFilter, req.ResultFilter, groups, req.LastRun, req.ChangeGroups, req.StartRunID, req.Limit, req.Asc) if err != nil { return nil, ErrFromRemote(resp, err) } diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 33f8219..9eb958e 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -445,6 +445,7 @@ func NewRunsHandler(logger *zap.Logger, readDB *readdb.ReadDB) *RunsHandler { func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() phaseFilter := types.RunPhaseFromStringSlice(query["phase"]) + resultFilter := types.RunResultFromStringSlice(query["result"]) changeGroups := query["changegroup"] groups := query["group"] @@ -479,7 +480,7 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { err := h.readDB.Do(func(tx *db.Tx) error { var err error - runs, err = h.readDB.GetRuns(tx, groups, lastRun, phaseFilter, start, limit, sortOrder) + runs, err = h.readDB.GetRuns(tx, groups, lastRun, phaseFilter, resultFilter, start, limit, sortOrder) if err != nil { h.log.Errorf("err: %+v", err) return err diff --git a/internal/services/runservice/api/client.go b/internal/services/runservice/api/client.go index 73b8220..a1e1695 100644 --- a/internal/services/runservice/api/client.go +++ b/internal/services/runservice/api/client.go @@ -165,11 +165,14 @@ func (c *Client) PutCache(ctx context.Context, key string, size int64, r io.Read return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, size, nil, r) } -func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) { +func (c *Client) GetRuns(ctx context.Context, phaseFilter, resultFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) { q := url.Values{} for _, phase := range phaseFilter { q.Add("phase", phase) } + for _, result := range resultFilter { + q.Add("result", result) + } for _, group := range groups { q.Add("group", group) } @@ -195,27 +198,27 @@ func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, last } func (c *Client) GetQueuedRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"queued"}, []string{}, false, changeGroups, start, limit, true) + return c.GetRuns(ctx, []string{"queued"}, nil, []string{}, false, changeGroups, start, limit, true) } func (c *Client) GetRunningRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"running"}, []string{}, false, changeGroups, start, limit, true) + return c.GetRuns(ctx, []string{"running"}, nil, []string{}, false, changeGroups, start, limit, true) } func (c *Client) GetGroupQueuedRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"queued"}, []string{group}, false, changeGroups, "", limit, false) + return c.GetRuns(ctx, []string{"queued"}, nil, []string{group}, false, changeGroups, "", limit, false) } func (c *Client) GetGroupRunningRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"running"}, []string{group}, false, changeGroups, "", limit, false) + return c.GetRuns(ctx, []string{"running"}, nil, []string{group}, false, changeGroups, "", limit, false) } func (c *Client) GetGroupFirstQueuedRuns(ctx context.Context, group string, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"queued"}, []string{group}, false, changeGroups, "", 1, true) + return c.GetRuns(ctx, []string{"queued"}, nil, []string{group}, false, changeGroups, "", 1, true) } func (c *Client) GetGroupLastRun(ctx context.Context, group string, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, nil, []string{group}, false, changeGroups, "", 1, false) + return c.GetRuns(ctx, nil, nil, []string{group}, false, changeGroups, "", 1, false) } func (c *Client) CreateRun(ctx context.Context, req *RunCreateRequest) (*RunResponse, *http.Response, error) { diff --git a/internal/services/runservice/readdb/create.go b/internal/services/runservice/readdb/create.go index 664b5fe..5a90843 100644 --- a/internal/services/runservice/readdb/create.go +++ b/internal/services/runservice/readdb/create.go @@ -18,7 +18,7 @@ var Stmts = []string{ // last processed etcd event revision "create table revision (revision bigint, PRIMARY KEY(revision))", - "create table run (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", + "create table run (id varchar, grouppath varchar, phase varchar, result varchar, PRIMARY KEY (id, grouppath, phase))", "create table rundata (id varchar, data bytea, PRIMARY KEY (id))", @@ -35,7 +35,7 @@ var Stmts = []string{ "create table changegrouprevision_ost (id varchar, revision varchar, PRIMARY KEY (id, revision))", - "create table run_ost (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", + "create table run_ost (id varchar, grouppath varchar, phase varchar, result varchar, PRIMARY KEY (id, grouppath, phase))", "create table rundata_ost (id varchar, data bytea, PRIMARY KEY (id))", diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index f6df5a0..de7e1c8 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -59,8 +59,8 @@ var ( revisionSelect = sb.Select("revision").From("revision") revisionInsert = sb.Insert("revision").Columns("revision") - //runSelect = sb.Select("id", "grouppath", "phase").From("run") - runInsert = sb.Insert("run").Columns("id", "grouppath", "phase") + //runSelect = sb.Select("id", "grouppath", "phase", "result").From("run") + runInsert = sb.Insert("run").Columns("id", "grouppath", "phase", "result") rundataInsert = sb.Insert("rundata").Columns("id", "data") @@ -74,8 +74,8 @@ var ( //revisionOSTSelect = sb.Select("revision").From("revision_ost") revisionOSTInsert = sb.Insert("revision_ost").Columns("revision") - //runOSTSelect = sb.Select("id", "grouppath", "phase").From("run_ost") - runOSTInsert = sb.Insert("run_ost").Columns("id", "grouppath", "phase") + //runOSTSelect = sb.Select("id", "grouppath", "phase", "result").From("run_ost") + runOSTInsert = sb.Insert("run_ost").Columns("id", "grouppath", "phase", "result") rundataOSTInsert = sb.Insert("rundata_ost").Columns("id", "data") @@ -339,7 +339,7 @@ func (r *ReadDB) handleEvents(ctx context.Context) error { if err != nil { return err } - lastRuns, err = r.GetActiveRuns(tx, nil, true, nil, "", 1, types.SortOrderDesc) + lastRuns, err = r.GetActiveRuns(tx, nil, true, nil, nil, "", 1, types.SortOrderDesc) return err }) if err != nil { @@ -995,7 +995,7 @@ func insertRun(tx *db.Tx, run *types.Run, data []byte) error { if _, err := tx.Exec("delete from run where id = $1", run.ID); err != nil { return errors.Errorf("failed to delete run: %w", err) } - q, args, err := runInsert.Values(run.ID, groupPath, run.Phase).ToSql() + q, args, err := runInsert.Values(run.ID, groupPath, run.Phase, run.Result).ToSql() if err != nil { return errors.Errorf("failed to build query: %w", err) } @@ -1029,7 +1029,7 @@ func (r *ReadDB) insertRunOST(tx *db.Tx, run *types.Run, data []byte) error { if _, err := tx.Exec("delete from run_ost where id = $1", run.ID); err != nil { return errors.Errorf("failed to delete run objectstorage: %w", err) } - q, args, err := runOSTInsert.Values(run.ID, groupPath, run.Phase).ToSql() + q, args, err := runOSTInsert.Values(run.ID, groupPath, run.Phase, run.Result).ToSql() if err != nil { return errors.Errorf("failed to build query: %w", err) } @@ -1121,11 +1121,11 @@ func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*types return &types.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: changeGroupsRevisions}, nil } -func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { - return r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) +func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { + return r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, resultFilter, startRunID, limit, sortOrder) } -func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { +func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { useObjectStorage := false for _, phase := range phaseFilter { if phase == types.RunPhaseFinished || phase == types.RunPhaseCancelled { @@ -1136,7 +1136,7 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [ useObjectStorage = true } - runDataRDB, err := r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) + runDataRDB, err := r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, resultFilter, startRunID, limit, sortOrder) if err != nil { return nil, err } @@ -1149,7 +1149,7 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [ if useObjectStorage { // skip if the phase requested is not finished - runDataOST, err := r.GetRunsFilteredOST(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) + runDataOST, err := r.GetRunsFilteredOST(tx, groups, lastRun, phaseFilter, resultFilter, startRunID, limit, sortOrder) if err != nil { return nil, err } @@ -1215,7 +1215,7 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [ return aruns, nil } -func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, objectstorage bool) sq.SelectBuilder { +func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, resultFilter []types.RunResult, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, objectstorage bool) sq.SelectBuilder { runt := "run" rundatat := "rundata" fields := []string{"run.id", "run.grouppath", "run.phase", "rundata.data"} @@ -1238,6 +1238,9 @@ func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []str if len(phaseFilter) > 0 { s = s.Where(sq.Eq{"phase": phaseFilter}) } + if len(resultFilter) > 0 { + s = s.Where(sq.Eq{"result": resultFilter}) + } if startRunID != "" { if lastRun { switch sortOrder { @@ -1279,8 +1282,8 @@ func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []str return s } -func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { - s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, false) +func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { + s := r.getRunsFilteredQuery(phaseFilter, resultFilter, groups, lastRun, startRunID, limit, sortOrder, false) q, args, err := s.ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) @@ -1291,8 +1294,8 @@ func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool, return fetchRuns(tx, q, args...) } -func (r *ReadDB) GetRunsFilteredOST(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { - s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, true) +func (r *ReadDB) GetRunsFilteredOST(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { + s := r.getRunsFilteredQuery(phaseFilter, resultFilter, groups, lastRun, startRunID, limit, sortOrder, true) q, args, err := s.ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index 3acda1f..18c0f1c 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -81,6 +81,14 @@ func RunPhaseFromStringSlice(slice []string) []RunPhase { return rss } +func RunResultFromStringSlice(slice []string) []RunResult { + rss := make([]RunResult, len(slice)) + for i, s := range slice { + rss[i] = RunResult(s) + } + return rss +} + // Run is the run status of a RUN. Until the run is not finished it'll live in // etcd. So we should keep it smaller to avoid using too much space type Run struct {