diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index baf5c2f..b510eed 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -529,6 +529,43 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { return bytes.NewReader(de.Data), nil } +func (d *DataManager) GetFirstDataStatusSequences(n int) ([]*sequence.Sequence, error) { + if n < 1 { + return nil, errors.Errorf("n must be greater than 0") + } + + dataStatusSequences := []*sequence.Sequence{} + c := 0 + + doneCh := make(chan struct{}) + defer close(doneCh) + for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) { + if object.Err != nil { + return nil, object.Err + } + if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil { + seq, err := sequence.Parse(m[1]) + if err != nil { + d.log.Warnf("cannot parse sequence for data status file %q", object.Path) + continue + } + dataStatusSequences = append(dataStatusSequences, seq) + c++ + } else { + d.log.Warnf("bad file %q found in storage data dir", object.Path) + } + if c >= n { + break + } + } + + if len(dataStatusSequences) == 0 { + return nil, ostypes.ErrNotExist + } + + return dataStatusSequences, nil +} + func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, error) { if n < 1 { return nil, errors.Errorf("n must be greater than 0") @@ -582,6 +619,15 @@ func (d *DataManager) GetDataStatus(dataSequence *sequence.Sequence) (*DataStatu return dataStatus, dec.Decode(&dataStatus) } +func (d *DataManager) GetFirstDataStatusSequence() (*sequence.Sequence, error) { + dataStatusSequences, err := d.GetFirstDataStatusSequences(1) + if err != nil { + return nil, err + } + + return dataStatusSequences[0], nil +} + func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) { dataStatusSequences, err := d.GetLastDataStatusSequences(1) if err != nil { @@ -591,6 +637,15 @@ func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) { return dataStatusSequences[0], nil } +func (d *DataManager) GetFirstDataStatus() (*DataStatus, error) { + dataStatusSequence, err := d.GetFirstDataStatusSequence() + if err != nil { + return nil, err + } + + return d.GetDataStatus(dataStatusSequence) +} + func (d *DataManager) GetLastDataStatus() (*DataStatus, error) { dataStatusSequence, err := d.GetLastDataStatusSequence() if err != nil { diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index 911ae2e..2ff994d 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -34,10 +34,15 @@ import ( // * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one const ( - DefaultCheckpointInterval = 10 * time.Second - DefaultCheckpointCleanInterval = 5 * time.Minute - DefaultEtcdWalsKeepNum = 100 - DefaultMinCheckpointWalsNum = 100 + DefaultSyncInterval = 5 * time.Second + DefaultCheckpointInterval = 10 * time.Second + DefaultCheckpointCleanInterval = 5 * time.Minute + DefaultEtcdWalCleanInterval = 2 * time.Second + DefaultStorageWalCleanInterval = 5 * time.Minute + DefaultCompactChangeGroupsInterval = 1 * time.Second + DefaultEtcdPingerInterval = 1 * time.Second + DefaultEtcdWalsKeepNum = 100 + DefaultMinCheckpointWalsNum = 100 ) var ( @@ -66,6 +71,7 @@ var ( etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock") etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") + etcdStorageWalCleanerLockKey = path.Join(etcdWalBaseDir, "storagewalcleanerlock") etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups") etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev") @@ -154,12 +160,20 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo return d, nil } +func (d *DataManager) storageWalStatusDir() string { + return path.Join(d.basePath, storageWalsStatusDir) +} + func (d *DataManager) storageWalStatusFile(walSeq string) string { - return path.Join(d.basePath, storageWalsStatusDir, walSeq) + return path.Join(d.storageWalStatusDir(), walSeq) +} + +func (d *DataManager) storageWalDataDir() string { + return path.Join(d.basePath, storageWalsDataDir) } func (d *DataManager) storageWalDataFile(walFileID string) string { - return path.Join(d.basePath, storageWalsDataDir, walFileID) + return path.Join(d.storageWalDataDir(), walFileID) } func (d *DataManager) storageDataDir() string { @@ -239,7 +253,8 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { go d.syncLoop(ctx) go d.checkpointLoop(ctx) go d.checkpointCleanLoop(ctx) - go d.walCleanerLoop(ctx) + go d.etcdWalCleanerLoop(ctx) + go d.storageWalCleanerLoop(ctx) go d.compactChangeGroupsLoop(ctx) go d.etcdPingerLoop(ctx) diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index a639638..703f998 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -394,7 +394,7 @@ func TestConcurrentUpdate(t *testing.T) { } } -func TestWalCleaner(t *testing.T) { +func TestEtcdWalCleaner(t *testing.T) { dir, err := ioutil.TempDir("", "agola") if err != nil { t.Fatalf("unexpected err: %v", err) @@ -455,7 +455,7 @@ func TestWalCleaner(t *testing.T) { if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } - if err := dm.walCleaner(ctx); err != nil { + if err := dm.etcdWalCleaner(ctx); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -573,7 +573,7 @@ func TestReadObject(t *testing.T) { if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } - if err := dm.walCleaner(ctx); err != nil { + if err := dm.etcdWalCleaner(ctx); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1316,6 +1316,168 @@ func testCleanConcurrentCheckpoint(t *testing.T, basePath string) { } } +func TestStorageWalCleaner(t *testing.T) { + tests := []struct { + name string + basePath string + }{ + { + name: "test with empty basepath", + basePath: "", + }, + { + name: "test with relative basepath", + basePath: "base/path", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testStorageWalCleaner(t, tt.basePath) + }) + } +} + +func testStorageWalCleaner(t *testing.T, basePath string) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx := context.Background() + + ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + ost, err := posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + dmConfig := &DataManagerConfig{ + BasePath: basePath, + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh := make(chan struct{}) + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh + + time.Sleep(5 * time.Second) + + contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + var currentEntries map[string]*DataEntry + actions := []*Action{} + for n := 0; n < 10; n++ { + for i := 0; i < 400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // get the last data status sequence + lastDataStatusSequences, err := dm.GetLastDataStatusSequences(dataStatusToKeep) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // Use the first dataStatusToKeep data status + dataStatus, err := dm.GetDataStatus(lastDataStatusSequences[dataStatusToKeep-1]) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + // get the list of expected wals + doneCh := make(chan struct{}) + defer close(doneCh) + + expectedWalStatusFiles := []string{} + expectedWalDataFiles := []string{} + for object := range dm.ost.List(dm.storageWalStatusDir()+"/", "", true, doneCh) { + if object.Err != nil { + t.Fatalf("unexpected err: %v", err) + } + + name := path.Base(object.Path) + ext := path.Ext(name) + walSequence := strings.TrimSuffix(name, ext) + + if walSequence < dataStatus.WalSequence { + continue + } + header, err := dm.ReadWal(walSequence) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + expectedWalStatusFiles = append(expectedWalStatusFiles, object.Path) + expectedWalDataFiles = append(expectedWalDataFiles, dm.storageWalDataFile(header.WalDataFileID)) + } + sort.Strings(expectedWalDataFiles) + + if err := dm.CleanOldCheckpoints(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if err := dm.storageWalCleaner(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + currentWalStatusFiles := []string{} + currentWalDataFiles := []string{} + for object := range dm.ost.List(dm.storageWalStatusDir()+"/", "", true, doneCh) { + if object.Err != nil { + t.Fatalf("unexpected err: %v", err) + } + + currentWalStatusFiles = append(currentWalStatusFiles, object.Path) + } + for object := range dm.ost.List(dm.storageWalDataDir()+"/", "", true, doneCh) { + if object.Err != nil { + t.Fatalf("unexpected err: %v", err) + } + + currentWalDataFiles = append(currentWalDataFiles, object.Path) + } + sort.Strings(currentWalDataFiles) + if diff := cmp.Diff(currentWalStatusFiles, expectedWalStatusFiles); diff != "" { + t.Fatalf("different wal status files: %v", diff) + } + if diff := cmp.Diff(currentWalDataFiles, expectedWalDataFiles); diff != "" { + t.Fatalf("different wal data files: %v", diff) + } +} + func TestExportImport(t *testing.T) { dir, err := ioutil.TempDir("", "agola") if err != nil { diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index b81a4ec..4aaabdc 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -179,7 +179,7 @@ func (d *DataManager) ListOSTWals(start string) <-chan *WalFile { startPath = d.storageWalStatusFile(start) } - for object := range d.ost.List(path.Join(d.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { + for object := range d.ost.List(d.storageWalStatusDir()+"/", startPath, true, doneCh) { if object.Err != nil { walCh <- &WalFile{ Err: object.Err, @@ -547,7 +547,7 @@ func (d *DataManager) syncLoop(ctx context.Context) { d.log.Errorf("syncer error: %+v", err) } - sleepCh := time.NewTimer(5 * time.Second).C + sleepCh := time.NewTimer(DefaultSyncInterval).C select { case <-ctx.Done(): return @@ -751,14 +751,14 @@ func (d *DataManager) checkpointClean(ctx context.Context) error { return nil } -func (d *DataManager) walCleanerLoop(ctx context.Context) { +func (d *DataManager) etcdWalCleanerLoop(ctx context.Context) { for { - d.log.Debugf("walcleaner") - if err := d.walCleaner(ctx); err != nil { - d.log.Errorf("walcleaner error: %v", err) + d.log.Debugf("etcdwalcleaner") + if err := d.etcdWalCleaner(ctx); err != nil { + d.log.Errorf("etcdwalcleaner error: %v", err) } - sleepCh := time.NewTimer(2 * time.Second).C + sleepCh := time.NewTimer(DefaultEtcdWalCleanInterval).C select { case <-ctx.Done(): return @@ -767,10 +767,10 @@ func (d *DataManager) walCleanerLoop(ctx context.Context) { } } -// walCleaner will clean already checkpointed wals from etcd +// etcdWalCleaner will clean already checkpointed wals from etcd // it must always keep at least one wal that is needed for resync operations // from clients -func (d *DataManager) walCleaner(ctx context.Context) error { +func (d *DataManager) etcdWalCleaner(ctx context.Context) error { session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err @@ -826,13 +826,125 @@ func (d *DataManager) walCleaner(ctx context.Context) error { return nil } +func (d *DataManager) storageWalCleanerLoop(ctx context.Context) { + for { + d.log.Debugf("storagewalcleaner") + if err := d.storageWalCleaner(ctx); err != nil { + d.log.Errorf("storagewalcleaner error: %v", err) + } + + sleepCh := time.NewTimer(DefaultStorageWalCleanInterval).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +// storageWalCleaner will clean unneeded wals from the storage +func (d *DataManager) storageWalCleaner(ctx context.Context) error { + session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdStorageWalCleanerLockKey) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer func() { _ = m.Unlock(ctx) }() + + firstDataStatus, err := d.GetFirstDataStatus() + if err != nil { + return err + } + firstWalSequence := firstDataStatus.WalSequence + + // get the first wal in etcd (in any state) and use it's wal sequence if + // it's lesser than the first data status wal sequence + resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0) + if err != nil { + return err + } + if len(resp.Kvs) == 0 { + return errors.Errorf("no wals in etcd") + } + var walData WalData + if err := json.Unmarshal(resp.Kvs[0].Value, &walData); err != nil { + return err + } + if walData.WalSequence < firstWalSequence { + firstWalSequence = walData.WalSequence + } + + doneCh := make(chan struct{}) + defer close(doneCh) + + for object := range d.ost.List(d.storageWalStatusDir()+"/", "", true, doneCh) { + if object.Err != nil { + return err + } + name := path.Base(object.Path) + ext := path.Ext(name) + walSequence := strings.TrimSuffix(name, ext) + + // handle committed status file and related data file + if ext == ".committed" { + if walSequence >= firstWalSequence { + break + } + + header, err := d.ReadWal(walSequence) + if err != nil { + return err + } + + // first remove wal data file + walStatusFilePath := d.storageWalDataFile(header.WalDataFileID) + d.log.Infof("removing %q", walStatusFilePath) + if err := d.ost.DeleteObject(walStatusFilePath); err != nil { + if err != ostypes.ErrNotExist { + return err + } + } + + // then remove wal status files + d.log.Infof("removing %q", object.Path) + if err := d.ost.DeleteObject(object.Path); err != nil { + if err != ostypes.ErrNotExist { + return err + } + } + } + + // handle old checkpointed status file + // TODO(sgotti) remove this in future versions since .checkpointed files are not created anymore + if ext == ".checkpointed" { + d.log.Infof("removing %q", object.Path) + if err := d.ost.DeleteObject(object.Path); err != nil { + if err != ostypes.ErrNotExist { + return err + } + } + } + } + + return nil +} + func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) { for { if err := d.compactChangeGroups(ctx); err != nil { d.log.Errorf("err: %+v", err) } - sleepCh := time.NewTimer(1 * time.Second).C + sleepCh := time.NewTimer(DefaultCompactChangeGroupsInterval).C select { case <-ctx.Done(): return @@ -917,7 +1029,7 @@ func (d *DataManager) etcdPingerLoop(ctx context.Context) { d.log.Errorf("err: %+v", err) } - sleepCh := time.NewTimer(1 * time.Second).C + sleepCh := time.NewTimer(DefaultEtcdPingerInterval).C select { case <-ctx.Done(): return