diff --git a/cmd/agola/cmd/serve.go b/cmd/agola/cmd/serve.go index 4f960d5..3dd0ea8 100644 --- a/cmd/agola/cmd/serve.go +++ b/cmd/agola/cmd/serve.go @@ -22,6 +22,7 @@ import ( "github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/runservice/executor" rsscheduler "github.com/sorintlab/agola/internal/services/runservice/scheduler" + "github.com/sorintlab/agola/internal/services/scheduler" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -33,7 +34,7 @@ var ( gatewayURL = fmt.Sprintf("http://%s:%d", "localhost", 8000) ) -var CmdServe = &cobra.Command{ +var cmdServe = &cobra.Command{ Use: "serve", Short: "serve", Version: cmd.Version, @@ -53,15 +54,15 @@ type serveOptions struct { var serveOpts serveOptions func init() { - flags := CmdServe.PersistentFlags() + flags := cmdServe.PersistentFlags() flags.StringVar(&serveOpts.config, "config", "", "config file path") flags.BoolVar(&serveOpts.embeddedEtcd, "embedded-etcd", false, "start and use an embedded etcd, only for testing purpose") flags.StringVar(&serveOpts.embeddedEtcdDataDir, "embedded-etcd-data-dir", "/tmp/agola/etcd", "embedded etcd data dir, only for testing purpose") - cmdAgola.MarkFlagRequired("config") + cmdServe.MarkFlagRequired("config") - cmdAgola.AddCommand(CmdServe) + cmdAgola.AddCommand(cmdServe) } func embeddedEtcd(ctx context.Context) error { @@ -115,10 +116,16 @@ func serve(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to start run service executor") } + sched1, err := scheduler.NewScheduler(&c.Scheduler) + if err != nil { + return errors.Wrapf(err, "failed to start scheduler") + } + errCh := make(chan error) go func() { errCh <- rsex1.Run(ctx) }() go func() { errCh <- rssched1.Run(ctx) }() + go func() { errCh <- sched1.Run(ctx) }() return <-errCh } diff --git a/go.mod b/go.mod index dccfd53..91f1139 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/spf13/cobra v0.0.3 go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25 go.uber.org/zap v1.9.1 - golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 // indirect + golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e // indirect golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect google.golang.org/appengine v1.4.0 // indirect diff --git a/internal/services/scheduler/scheduler.go b/internal/services/scheduler/scheduler.go new file mode 100644 index 0000000..2ab261b --- /dev/null +++ b/internal/services/scheduler/scheduler.go @@ -0,0 +1,131 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + "fmt" + "time" + + slog "github.com/sorintlab/agola/internal/log" + "github.com/sorintlab/agola/internal/services/config" + rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + + "github.com/pkg/errors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) +var logger = slog.New(level) +var log = logger.Sugar() + +func (s *Scheduler) scheduleLoop(ctx context.Context) { + for { + if err := s.schedule(ctx); err != nil { + log.Errorf("err: %+v", err) + } + time.Sleep(1 * time.Second) + } +} + +func (s *Scheduler) schedule(ctx context.Context) error { + // create a list of project and users with queued runs + groups := map[string]struct{}{} + + var lastRunID string + for { + queuedRunsResponse, _, err := s.runserviceClient.GetQueuedRuns(ctx, lastRunID, 0) + if err != nil { + return errors.Wrapf(err, "failed to get queued runs") + } + //log.Infof("queuedRuns: %s", util.Dump(queuedRunsResponse.Runs)) + + for _, run := range queuedRunsResponse.Runs { + groups[run.Group] = struct{}{} + } + + if len(queuedRunsResponse.Runs) == 0 { + break + } + + if len(queuedRunsResponse.Runs) > 0 { + lastRunID = queuedRunsResponse.Runs[len(queuedRunsResponse.Runs)-1].ID + } + } + + for groupID, _ := range groups { + if err := s.scheduleRun(ctx, groupID); err != nil { + log.Errorf("scheduler err: %v", err) + } + } + + return nil +} + +func (s *Scheduler) scheduleRun(ctx context.Context, groupID string) error { + // get first queued run + queuedRunsResponse, _, err := s.runserviceClient.GetGroupFirstQueuedRuns(ctx, groupID, nil) + //log.Infof("first queuedRuns: %s", util.Dump(queuedRunsResponse.Runs)) + if err != nil { + return errors.Wrapf(err, "failed to get the first project queued run") + } + if len(queuedRunsResponse.Runs) == 0 { + return nil + } + + //log.Infof("queued runs: %s", queuedRunsResponse.Runs) + run := queuedRunsResponse.Runs[0] + + runningRunsResponse, _, err := s.runserviceClient.GetGroupRunningRuns(ctx, groupID, 1, []string{fmt.Sprintf("changegroup-%s", groupID)}) + if err != nil { + return errors.Wrapf(err, "failed to get running runs") + } + //log.Infof("running Runs: %s", util.Dump(runningRunsResponse.Runs)) + if len(runningRunsResponse.Runs) == 0 { + log.Infof("starting run %s", run.ID) + log.Infof("changegroups: %s", runningRunsResponse.ChangeGroupsUpdateToken) + if _, err := s.runserviceClient.StartRun(ctx, run.ID, runningRunsResponse.ChangeGroupsUpdateToken); err != nil { + log.Errorf("failed to start run %s: %v", run.ID, err) + } + } + + return nil +} + +type Scheduler struct { + c *config.Scheduler + runserviceClient *rsapi.Client +} + +func NewScheduler(c *config.Scheduler) (*Scheduler, error) { + if c.Debug { + level.SetLevel(zapcore.DebugLevel) + } + + return &Scheduler{ + runserviceClient: rsapi.NewClient(c.RunServiceURL), + }, nil +} + +func (s *Scheduler) Run(ctx context.Context) error { + go s.scheduleLoop(ctx) + + select { + case <-ctx.Done(): + log.Infof("scheduler exiting") + return nil + } +}