diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index 9d20622..4ae9663 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -57,18 +57,6 @@ type DataEntry struct { Data []byte `json:"data,omitempty"` } -func dataStatusPath(sequence string) string { - return fmt.Sprintf("%s/%s.status", storageDataDir, sequence) -} - -func DataFileIndexPath(dataType, id string) string { - return fmt.Sprintf("%s/%s/%s.index", storageDataDir, dataType, id) -} - -func DataFilePath(dataType, id string) string { - return fmt.Sprintf("%s/%s/%s.data", storageDataDir, dataType, id) -} - // TODO(sgotti) this implementation could be heavily optimized to store less data in memory // TODO(sgotti) @@ -191,7 +179,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er if err != nil { return err } - if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil { + if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil { return err } @@ -203,7 +191,7 @@ func (d *DataManager) writeDataFile(ctx context.Context, buf *bytes.Buffer, size return fmt.Errorf("empty data entries") } - if err := d.ost.WriteObject(DataFilePath(dataType, dataFileID), buf, size, true); err != nil { + if err := d.ost.WriteObject(d.DataFilePath(dataType, dataFileID), buf, size, true); err != nil { return err } @@ -211,7 +199,7 @@ func (d *DataManager) writeDataFile(ctx context.Context, buf *bytes.Buffer, size if err != nil { return err } - if err := d.ost.WriteObject(DataFileIndexPath(dataType, dataFileID), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil { + if err := d.ost.WriteObject(d.DataFileIndexPath(dataType, dataFileID), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil { return err } @@ -326,7 +314,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s if actionGroup.DataStatusFile != nil { // TODO(sgotti) instead of reading all entries in memory decode it's contents one by one when needed - oldDataf, err := d.ost.ReadObject(DataFilePath(dataType, actionGroup.DataStatusFile.ID)) + oldDataf, err := d.ost.ReadObject(d.DataFilePath(dataType, actionGroup.DataStatusFile.ID)) if err != nil && err != ostypes.ErrNotExist { return nil, err } @@ -497,7 +485,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { } } - dataFileIndexf, err := d.ost.ReadObject(DataFileIndexPath(dataType, matchingDataFileID)) + dataFileIndexf, err := d.ost.ReadObject(d.DataFileIndexPath(dataType, matchingDataFileID)) if err != nil { return nil, err } @@ -515,7 +503,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { return nil, ostypes.ErrNotExist } - dataf, err := d.ost.ReadObject(DataFilePath(dataType, matchingDataFileID)) + dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, matchingDataFileID)) if err != nil { return nil, err } @@ -539,7 +527,7 @@ func (d *DataManager) GetLastDataStatusPath() (string, error) { defer close(doneCh) var dataStatusPath string - for object := range d.ost.List(storageDataDir+"/", "", false, doneCh) { + for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) { if object.Err != nil { return "", object.Err } diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index 1009dd9..76d4792 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -16,6 +16,7 @@ package datamanager import ( "context" + "fmt" "path" "strings" "time" @@ -31,12 +32,6 @@ import ( // * Etcd cluster rebuild: we cannot rely on etcd header ClusterID since it could be the same as it's generated using the listen urls. We should add our own clusterid key and use it. // * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one -// Storage paths -// wals/{walSeq} -// -// Etcd paths -// wals/{walSeq} - const ( DefaultCheckpointInterval = 10 * time.Second DefaultEtcdWalsKeepNum = 100 @@ -147,6 +142,34 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo return d, nil } +func (d *DataManager) storageWalStatusFile(walSeq string) string { + return path.Join(d.basePath, storageWalsStatusDir, walSeq) +} + +func (d *DataManager) storageWalDataFile(walFileID string) string { + return path.Join(d.basePath, storageWalsDataDir, walFileID) +} + +func (d *DataManager) storageDataDir() string { + return path.Join(d.basePath, storageDataDir) +} + +func (d *DataManager) dataStatusPath(sequence string) string { + return fmt.Sprintf("%s/%s.status", d.storageDataDir(), sequence) +} + +func (d *DataManager) DataFileIndexPath(dataType, id string) string { + return fmt.Sprintf("%s/%s/%s.index", d.storageDataDir(), dataType, id) +} + +func (d *DataManager) DataFilePath(dataType, id string) string { + return fmt.Sprintf("%s/%s/%s.data", d.storageDataDir(), dataType, id) +} + +func etcdWalKey(walSeq string) string { + return path.Join(etcdWalsDir, walSeq) +} + func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { for { err := d.InitEtcd(ctx) diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index a27b62c..9c76130 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -461,7 +461,7 @@ func TestReadObject(t *testing.T) { } } -func testCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGroups [][]*Action, currentEntries map[string]*DataEntry) (map[string]*DataEntry, error) { +func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGroups [][]*Action, currentEntries map[string]*DataEntry) (map[string]*DataEntry, error) { expectedEntries := map[string]*DataEntry{} for _, e := range currentEntries { expectedEntries[e.ID] = e @@ -503,6 +503,28 @@ func testCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGr // TODO(sgotti) some fuzzy testing will be really good func TestCheckpoint(t *testing.T) { + tests := []struct { + name string + basePath string + }{ + { + name: "test with empty basepath", + basePath: "", + }, + { + name: "test with relative basepath", + basePath: "base/path", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testCheckpoint(t, tt.basePath) + }) + } +} + +func testCheckpoint(t *testing.T, basePath string) { dir, err := ioutil.TempDir("", "agola") if err != nil { t.Fatalf("unexpected err: %v", err) @@ -528,8 +550,9 @@ func TestCheckpoint(t *testing.T) { } dmConfig := &DataManagerConfig{ - E: tetcd.TestEtcd.Store, - OST: objectstorage.NewObjStorage(ost, "/"), + BasePath: basePath, + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), // remove almost all wals to see that they are removed also from changes EtcdWalsKeepNum: 1, DataTypes: []string{"datatype01"}, @@ -562,7 +585,7 @@ func TestCheckpoint(t *testing.T) { actions = append(actions, action) } - currentEntries, err := testCheckpoint(t, ctx, dm, [][]*Action{actions}, nil) + currentEntries, err := doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, nil) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -577,7 +600,7 @@ func TestCheckpoint(t *testing.T) { }) } - currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -594,7 +617,7 @@ func TestCheckpoint(t *testing.T) { actions = append(actions, action) } - currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -610,7 +633,7 @@ func TestCheckpoint(t *testing.T) { actions = append(actions, action) } - currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -626,7 +649,7 @@ func TestCheckpoint(t *testing.T) { actions = append(actions, action) } - currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -643,7 +666,7 @@ func TestCheckpoint(t *testing.T) { actions = append(actions, action) } - currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -660,7 +683,7 @@ func TestCheckpoint(t *testing.T) { actions = append(actions, action) } - currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -707,7 +730,7 @@ func TestCheckpoint(t *testing.T) { } actionGroups = append(actionGroups, actions) - _, err = testCheckpoint(t, ctx, dm, actionGroups, currentEntries) + _, err = doAndCheckCheckpoint(t, ctx, dm, actionGroups, currentEntries) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -724,7 +747,7 @@ func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expected var prevLastEntryID string for i, file := range curDataStatus.Files["datatype01"] { - dataFileIndexf, err := dm.ost.ReadObject(DataFileIndexPath("datatype01", file.ID)) + dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath("datatype01", file.ID)) if err != nil { return err } @@ -739,7 +762,7 @@ func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expected dataFileIndexf.Close() dataEntriesMap := map[string]*DataEntry{} dataEntries := []*DataEntry{} - dataf, err := dm.ost.ReadObject(DataFilePath("datatype01", file.ID)) + dataf, err := dm.ost.ReadObject(dm.DataFilePath("datatype01", file.ID)) if err != nil { return err } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index fc279a8..dbb6c26 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -38,18 +38,6 @@ import ( errors "golang.org/x/xerrors" ) -func (d *DataManager) storageWalStatusFile(walSeq string) string { - return path.Join(d.basePath, storageWalsStatusDir, walSeq) -} - -func (d *DataManager) storageWalDataFile(walFileID string) string { - return path.Join(d.basePath, storageWalsDataDir, walFileID) -} - -func etcdWalKey(walSeq string) string { - return path.Join(etcdWalsDir, walSeq) -} - type ActionType string const ( diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 7705950..b526a6f 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -146,7 +146,7 @@ func (r *ReadDB) SyncFromDump() (string, error) { } for dataType, files := range dumpIndex.Files { for _, file := range files { - dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID)) + dumpf, err := r.ost.ReadObject(r.dm.DataFilePath(dataType, file.ID)) if err != nil { return "", err } diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 6b0ea44..f6df5a0 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -659,7 +659,7 @@ func (r *ReadDB) SyncFromDump() (string, error) { } for dataType, files := range dumpIndex.Files { for _, file := range files { - dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID)) + dumpf, err := r.ost.ReadObject(r.dm.DataFilePath(dataType, file.ID)) if err != nil { return "", err }