diff --git a/internal/services/runservice/action/action.go b/internal/services/runservice/action/action.go index 2010fcd..cf13cf3 100644 --- a/internal/services/runservice/action/action.go +++ b/internal/services/runservice/action/action.go @@ -615,7 +615,7 @@ func (h *ActionHandler) GetExecutorTask(ctx context.Context, etID string) (*type } func (h *ActionHandler) GetExecutorTasks(ctx context.Context, executorID string) ([]*types.ExecutorTask, error) { - ets, err := store.GetExecutorTasks(ctx, h.e, executorID) + ets, err := store.GetExecutorTasksForExecutor(ctx, h.e, executorID) if err != nil && err != etcd.ErrKeyNotFound { return nil, err } diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index c1c28dd..7161407 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -290,10 +290,16 @@ func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTas if err != nil { return nil, err } - return chooseExecutor(executors, rct), nil + // TODO(sgotti) find a way to avoid retrieving this for every chooseExecutor + // invocation (i.e. use an etcd watcher to keep this value updated) + executorTasksCount, err := store.GetExecutorTasksCountByExecutor(ctx, s.e) + if err != nil { + return nil, err + } + return chooseExecutor(executors, executorTasksCount, rct), nil } -func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *types.Executor { +func chooseExecutor(executors []*types.Executor, executorTasksCount map[string]int, rct *types.RunConfigTask) *types.Executor { requiresPrivilegedContainers := false for _, c := range rct.Runtime.Containers { if c.Privileged { @@ -326,7 +332,14 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type } if e.ActiveTasksLimit != 0 { - if e.ActiveTasks >= e.ActiveTasksLimit { + // will be 0 when executorTasksCount[e.ID] doesn't exist + activeTasks := executorTasksCount[e.ID] + if e.ActiveTasks > activeTasks { + activeTasks = e.ActiveTasks + } + // calculate the active tasks by the max between the current scheduled + // tasks in the store and the executor reported tasks + if activeTasks >= e.ActiveTasksLimit { continue } } @@ -674,7 +687,7 @@ func (s *Runservice) updateRunTaskStatus(ctx context.Context, et *types.Executor } } if wrongstatus { - log.Warnf("wrong executor task status: %s, rt status: %s", et.Status.Phase, rt.Status) + log.Warnf("wrong executor task %q status: %q, rt status: %q", et.ID, et.Status.Phase, rt.Status) return nil } @@ -742,6 +755,7 @@ func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) { } func (s *Runservice) executorTasksCleaner(ctx context.Context) error { + // TODO(sgotti) use paged List resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { return err @@ -855,6 +869,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { } defer func() { _ = m.Unlock(ctx) }() + // TODO(sgotti) use paged List resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { return err diff --git a/internal/services/runservice/scheduler_test.go b/internal/services/runservice/scheduler_test.go index 2d1164e..35a0c12 100644 --- a/internal/services/runservice/scheduler_test.go +++ b/internal/services/runservice/scheduler_test.go @@ -686,7 +686,7 @@ func TestChooseExecutor(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - e := chooseExecutor(tt.executors, tt.rct) + e := chooseExecutor(tt.executors, map[string]int{}, tt.rct) if e == nil && tt.out == nil { return } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 35ae086..d0c472f 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -211,6 +211,7 @@ func GetExecutor(ctx context.Context, e *etcd.Store, executorID string) (*types. } func GetExecutors(ctx context.Context, e *etcd.Store) ([]*types.Executor, error) { + // TODO(sgotti) use paged List resp, err := e.List(ctx, common.EtcdExecutorsDir, "", 0) if err != nil { return nil, err @@ -298,7 +299,28 @@ func DeleteExecutorTask(ctx context.Context, e *etcd.Store, etID string) error { return e.Delete(ctx, common.EtcdTaskKey(etID)) } -func GetExecutorTasks(ctx context.Context, e *etcd.Store, executorID string) ([]*types.ExecutorTask, error) { +func GetExecutorTasksCountByExecutor(ctx context.Context, e *etcd.Store) (map[string]int, error) { + // TODO(sgotti) use paged List + resp, err := e.List(ctx, common.EtcdTasksDir, "", 0) + if err != nil { + return nil, err + } + + count := map[string]int{} + + for _, kv := range resp.Kvs { + var et *types.ExecutorTask + if err := json.Unmarshal(kv.Value, &et); err != nil { + return nil, err + } + count[et.Spec.ExecutorID] = count[et.Spec.ExecutorID] + 1 + } + + return count, nil +} + +func GetExecutorTasksForExecutor(ctx context.Context, e *etcd.Store, executorID string) ([]*types.ExecutorTask, error) { + // TODO(sgotti) use paged List resp, err := e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { return nil, err @@ -478,6 +500,7 @@ func DeleteRun(ctx context.Context, e *etcd.Store, runID string) error { } func GetRuns(ctx context.Context, e *etcd.Store) ([]*types.Run, error) { + // TODO(sgotti) use paged List resp, err := e.List(ctx, common.EtcdRunsDir, "", 0) if err != nil { return nil, err