diff --git a/internal/services/runservice/action/action_test.go b/internal/services/runservice/action/action_test.go index 1ab889d..08c084a 100644 --- a/internal/services/runservice/action/action_test.go +++ b/internal/services/runservice/action/action_test.go @@ -178,7 +178,6 @@ func TestRecreateRun(t *testing.T) { req *RunCreateRequest outrc *types.RunConfig outr *types.Run - err error }{ { name: "test recreate run from start with all not start tasks", diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 99f15d1..79bbf44 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -94,22 +94,22 @@ func httpError(w http.ResponseWriter, err error) bool { switch { case errors.Is(err, &util.ErrBadRequest{}): w.WriteHeader(http.StatusBadRequest) - w.Write(resj) + _, _ = w.Write(resj) case errors.Is(err, &util.ErrNotFound{}): w.WriteHeader(http.StatusNotFound) - w.Write(resj) + _, _ = w.Write(resj) case errors.Is(err, &util.ErrForbidden{}): w.WriteHeader(http.StatusForbidden) - w.Write(resj) + _, _ = w.Write(resj) case errors.Is(err, &util.ErrUnauthorized{}): w.WriteHeader(http.StatusUnauthorized) - w.Write(resj) + _, _ = w.Write(resj) case errors.Is(err, &util.ErrInternal{}): w.WriteHeader(http.StatusInternalServerError) - w.Write(resj) + _, _ = w.Write(resj) default: w.WriteHeader(http.StatusInternalServerError) - w.Write(resj) + _, _ = w.Write(resj) } return true } @@ -597,9 +597,8 @@ type RunActionsRequest struct { } type RunActionsHandler struct { - log *zap.SugaredLogger - ah *action.ActionHandler - readDB *readdb.ReadDB + log *zap.SugaredLogger + ah *action.ActionHandler } func NewRunActionsHandler(logger *zap.Logger, ah *action.ActionHandler) *RunActionsHandler { @@ -667,9 +666,8 @@ type RunTaskActionsRequest struct { } type RunTaskActionsHandler struct { - log *zap.SugaredLogger - ah *action.ActionHandler - readDB *readdb.ReadDB + log *zap.SugaredLogger + ah *action.ActionHandler } func NewRunTaskActionsHandler(logger *zap.Logger, ah *action.ActionHandler) *RunTaskActionsHandler { diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index 5cdfe65..70a7b05 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -333,7 +333,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, return "", object.Err } - if (lastObject == nil) || (lastObject != nil && lastObject.LastModified.Before(object.LastModified)) { + if (lastObject == nil) || (lastObject.LastModified.Before(object.LastModified)) { lastObject = &object } diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index cc19844..c00dfac 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -70,10 +70,6 @@ var ( StorageCountersDir = path.Join(StorageDataDir, "counters") ) -const ( - etcdWalsMinRevisionRange = 100 -) - type DataType string const ( diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index c4eb0a3..6b0ea44 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -59,22 +59,22 @@ var ( revisionSelect = sb.Select("revision").From("revision") revisionInsert = sb.Insert("revision").Columns("revision") - runSelect = sb.Select("id", "grouppath", "phase").From("run") + //runSelect = sb.Select("id", "grouppath", "phase").From("run") runInsert = sb.Insert("run").Columns("id", "grouppath", "phase") rundataInsert = sb.Insert("rundata").Columns("id", "data") - runeventSelect = sb.Select("data").From("runevent") + //runeventSelect = sb.Select("data").From("runevent") runeventInsert = sb.Insert("runevent").Columns("sequence", "data") changegrouprevisionSelect = sb.Select("id, revision").From("changegrouprevision") changegrouprevisionInsert = sb.Insert("changegrouprevision").Columns("id", "revision") // readdb tables based on objectstorage data - revisionOSTSelect = sb.Select("revision").From("revision_ost") + //revisionOSTSelect = sb.Select("revision").From("revision_ost") revisionOSTInsert = sb.Insert("revision_ost").Columns("revision") - runOSTSelect = sb.Select("id", "grouppath", "phase").From("run_ost") + //runOSTSelect = sb.Select("id", "grouppath", "phase").From("run_ost") runOSTInsert = sb.Insert("run_ost").Columns("id", "grouppath", "phase") rundataOSTInsert = sb.Insert("rundata_ost").Columns("id", "data") @@ -367,7 +367,7 @@ func (r *ReadDB) handleEvents(ctx context.Context) error { } // check that the run sequence epoch isn't different than the current one (this means etcd // has been reset, or worst, restored from a backup or manually deleted) - if runSequence == nil || runSequence.Epoch != lastRunSequence.Epoch { + if runSequence.Epoch != lastRunSequence.Epoch { r.SetInitialized(false) return errors.Errorf("last run epoch %d is different than current epoch in etcd %d, reinitializing.", lastRunSequence.Epoch, runSequence.Epoch) } @@ -1525,7 +1525,7 @@ func fetchChangeGroupsRevisionOST(tx *db.Tx, q string, args ...interface{}) (map return nil, err } defer rows.Close() - return scanChangeGroupsRevision(rows) + return scanChangeGroupsRevisionOST(rows) } func scanChangeGroupsRevisionOST(rows *sql.Rows) (map[string]int64, error) { diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index e6b49e8..e6b9d1e 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -156,19 +156,14 @@ func (s *Runservice) Run(ctx context.Context) error { time.Sleep(1 * time.Second) } - go s.readDB.Run(ctx) + go func() { errCh <- s.readDB.Run(ctx) }() ch := make(chan *types.ExecutorTask) - // noop coors handler - corsHandler := func(h http.Handler) http.Handler { - return h - } - corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"}) corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"}) corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"}) - corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions) + corsHandler := ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions) // executor dedicated api, only calls from executor should happen on these handlers executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 45302fc..a149ac1 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -76,15 +76,6 @@ func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ( return activeTasks, nil } -func (s *Runservice) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) { - activeTasks, err := s.runActiveExecutorTasks(ctx, runID) - if err != nil { - return false, err - } - - return len(activeTasks) > 0, nil -} - func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) (*types.Run, error) { log.Debugf("run: %s", util.Dump(curRun)) log.Debugf("rc: %s", util.Dump(rc)) @@ -281,7 +272,7 @@ func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTas func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *types.Executor { requiresPrivilegedContainers := false for _, c := range rct.Runtime.Containers { - if c.Privileged == true { + if c.Privileged { requiresPrivilegedContainers = true break } @@ -293,7 +284,7 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type } // skip executor provileged containers are required but not allowed - if requiresPrivilegedContainers == true && e.AllowPrivilegedContainers == false { + if requiresPrivilegedContainers && !e.AllowPrivilegedContainers { continue } @@ -864,7 +855,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { if err := m.Lock(ctx); err != nil { return err } - defer m.Unlock(ctx) + defer func() { _ = m.Unlock(ctx) }() resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { @@ -1378,7 +1369,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. if err := m.Lock(ctx); err != nil { return err } - defer m.Unlock(ctx) + defer func() { _ = m.Unlock(ctx) }() doneCh := make(chan struct{}) defer close(doneCh) diff --git a/internal/services/runservice/scheduler_test.go b/internal/services/runservice/scheduler_test.go index 10e9d38..620e630 100644 --- a/internal/services/runservice/scheduler_test.go +++ b/internal/services/runservice/scheduler_test.go @@ -129,7 +129,6 @@ func TestAdvanceRunTasks(t *testing.T) { r *types.Run activeExecutorTasks []*types.ExecutorTask out *types.Run - err error }{ { name: "test top level task not started", @@ -472,7 +471,6 @@ func TestGetTasksToRun(t *testing.T) { rc *types.RunConfig r *types.Run out []string - err error }{ { name: "test run top level tasks", @@ -532,8 +530,8 @@ func TestGetTasksToRun(t *testing.T) { for _, t := range tasks { outTasks = append(outTasks, t.ID) } - sort.Sort(sort.StringSlice(tt.out)) - sort.Sort(sort.StringSlice(outTasks)) + sort.Strings(tt.out) + sort.Strings(outTasks) if diff := cmp.Diff(tt.out, outTasks); diff != "" { t.Error(diff) @@ -606,7 +604,6 @@ func TestChooseExecutor(t *testing.T) { executors []*types.Executor rct *types.RunConfigTask out *types.Executor - err error }{ { name: "test single executor ok", diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 8cf25e2..3591048 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -363,7 +363,7 @@ func GetExecutorTasksForRun(ctx context.Context, e *etcd.Store, runID string) ([ } rtIDs := make([]string, len(r.Tasks)) - for rtID, _ := range r.Tasks { + for rtID := range r.Tasks { rtIDs = append(rtIDs, rtID) }