From 22bd181fc82fea6dde36f0e5acdd60bd0fdab88f Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Mon, 3 Jun 2019 16:17:27 +0200 Subject: [PATCH] datamanager: implement data files splitting split data files in multiple files of a max size (default 10Mib) In this way every data snapshot will change only the datafiles that have some changes instead of the whole single file. --- internal/datamanager/data.go | 496 ++++++++++++++---- internal/datamanager/datamanager.go | 10 +- internal/datamanager/datamanager_test.go | 379 ++++++++++++- internal/datamanager/wal.go | 2 +- .../services/configstore/readdb/readdb.go | 64 +-- internal/services/runservice/readdb/readdb.go | 64 +-- 6 files changed, 828 insertions(+), 187 deletions(-) diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index 721026e..4b77cdf 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -20,21 +20,35 @@ import ( "encoding/json" "fmt" "io" + "sort" "strings" ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" + + uuid "github.com/satori/go.uuid" errors "golang.org/x/xerrors" ) +const ( + DefaultMaxDataFileSize = 10 * 1024 * 1024 +) + type DataStatus struct { - DataSequence string `json:"data_sequence,omitempty"` - WalSequence string `json:"wal_sequence,omitempty"` - Files map[string][]string `json:"files,omitempty"` + DataSequence string `json:"data_sequence,omitempty"` + WalSequence string `json:"wal_sequence,omitempty"` + // an entry id ordered list of files for a specific data type (map key) + Files map[string][]*DataStatusFile `json:"files,omitempty"` +} + +type DataStatusFile struct { + ID string `json:"id,omitempty"` + // the last entry id in this file + LastEntryID string `json:"last_entry_id,omitempty"` } type DataFileIndex struct { - Index map[string]int `json:"index,omitempty"` + Index map[string]int64 `json:"index,omitempty"` } type DataEntry struct { @@ -47,105 +61,50 @@ func dataStatusPath(sequence string) string { return fmt.Sprintf("%s/%s.status", storageDataDir, sequence) } -func dataFileIndexPath(datatype, sequence string) string { - return fmt.Sprintf("%s/%s/%s.index", storageDataDir, datatype, sequence) +func DataFileIndexPath(dataType, id string) string { + return fmt.Sprintf("%s/%s/%s.index", storageDataDir, dataType, id) } -func dataFilePath(datatype, sequence string) string { - return fmt.Sprintf("%s/%s/%s.data", storageDataDir, datatype, sequence) +func DataFilePath(dataType, id string) string { + return fmt.Sprintf("%s/%s/%s.data", storageDataDir, dataType, id) } +// TODO(sgotti) this implementation could be heavily optimized to store less data in memory + // TODO(sgotti) // split/merge data files at max N bytes (i.e 16MiB) so we'll rewrite only files // with changed data -func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { - dataSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey) - if err != nil { - return err - } +// walIndex is a map of dataType of id of walEntry +// TODO(sgotti) write this index to local disk (a temporary sqlite lite) instead of storing all in memory +type walIndex map[string]walActions - for _, dataType := range d.dataTypes { - if err := d.writeDataType(ctx, wals, dataType, dataSequence.String()); err != nil { - return err - } - } +// walDataEntries is an order by id list of data entries +type walActions []*Action - var lastWalSequence string - for _, walData := range wals { - lastWalSequence = walData.WalSequence - } +func (w walActions) Len() int { return len(w) } +func (w walActions) Less(i, j int) bool { return w[i].ID < w[j].ID } +func (w walActions) Swap(i, j int) { w[i], w[j] = w[j], w[i] } - dataStatus := &DataStatus{ - DataSequence: dataSequence.String(), - WalSequence: lastWalSequence, - Files: make(map[string][]string), - } - for _, dataType := range d.dataTypes { - dataStatus.Files[dataType] = []string{dataFilePath(dataType, dataSequence.String())} - } - - dataStatusj, err := json.Marshal(dataStatus) - if err != nil { - return err - } - if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil { - return err - } - - return nil -} - -func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, datatype, dataSequence string) error { - curDataStatus, err := d.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { - return err - } - - dataEntriesMap := map[string]*DataEntry{} - if err != ostypes.ErrNotExist { - curDataSequence := curDataStatus.DataSequence - - oldDataf, err := d.ost.ReadObject(dataFilePath(datatype, curDataSequence)) - if err != nil && err != ostypes.ErrNotExist { - return err - } - if err != ostypes.ErrNotExist { - dec := json.NewDecoder(oldDataf) - for { - var de *DataEntry - - err := dec.Decode(&de) - if err == io.EOF { - // all done - break - } - if err != nil { - oldDataf.Close() - return err - } - dataEntriesMap[de.ID] = de - } - oldDataf.Close() - } - } +func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex, error) { + wimap := map[string]map[string]*Action{} for _, walData := range wals { walFilef, err := d.ReadWal(walData.WalSequence) if err != nil { - return err + return nil, err } dec := json.NewDecoder(walFilef) var header *WalHeader if err = dec.Decode(&header); err != nil && err != io.EOF { walFilef.Close() - return err + return nil, err } walFilef.Close() walFile, err := d.ReadWalData(header.WalDataFileID) if err != nil { - return errors.Errorf("cannot read wal data file %q: %w", header.WalDataFileID, err) + return nil, errors.Errorf("cannot read wal data file %q: %w", header.WalDataFileID, err) } defer walFile.Close() @@ -159,51 +118,92 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty break } if err != nil { - return errors.Errorf("failed to decode wal file: %w", err) - } - if action.DataType != datatype { - continue + return nil, errors.Errorf("failed to decode wal file: %w", err) } - switch action.ActionType { - case ActionTypePut: - de := &DataEntry{ - ID: action.ID, - DataType: action.DataType, - Data: action.Data, - } - dataEntriesMap[de.ID] = de - case ActionTypeDelete: - delete(dataEntriesMap, action.ID) + if _, ok := wimap[action.DataType]; !ok { + wimap[action.DataType] = map[string]*Action{} } + + // only keep the last action for every entry id + wimap[action.DataType][action.ID] = action } } - dataEntries := []*DataEntry{} - for _, de := range dataEntriesMap { - dataEntries = append(dataEntries, de) + wi := map[string]walActions{} + for dataType, dd := range wimap { + for _, de := range dd { + wi[dataType] = append(wi[dataType], de) + } + sort.Sort(wi[dataType]) } - dataFileIndex := &DataFileIndex{ - Index: make(map[string]int), + return wi, nil +} + +// writeDataSnapshot will create a new data snapshot merging the uncheckpointed +// wals. It will split data files at maxDataFileSize bytes so we'll rewrite only +// files with changed data. +// Only new files will be created, previous snapshot data files won't be touched +// +// TODO(sgotti) add a function to merge small data files (i.e after deletions) to avoid fragmentation +// TODO(sgotti) add a function to delete old data files keeping only N snapshots +func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) error { + dataSequence, err := sequence.IncSequence(ctx, d.e, etcdCheckpointSeqKey) + if err != nil { + return err } - var buf bytes.Buffer - pos := 0 - for _, de := range dataEntries { - dataFileIndex.Index[de.ID] = pos + var lastWalSequence string + for _, walData := range wals { + lastWalSequence = walData.WalSequence + } - dataEntryj, err := json.Marshal(de) + dataStatus := &DataStatus{ + DataSequence: dataSequence.String(), + WalSequence: lastWalSequence, + Files: make(map[string][]*DataStatusFile), + } + + wi, err := d.walIndex(ctx, wals) + if err != nil { + return err + } + + curDataStatus, err := d.GetLastDataStatus() + if err != nil && err != ostypes.ErrNotExist { + return err + } + + for _, dataType := range d.dataTypes { + var curDataStatusFiles []*DataStatusFile + if curDataStatus != nil { + curDataStatusFiles = curDataStatus.Files[dataType] + } + dataStatusFiles, err := d.writeDataType(ctx, wi, dataType, curDataStatusFiles) if err != nil { return err } - if _, err := buf.Write(dataEntryj); err != nil { - return err - } - - pos += len(dataEntryj) + dataStatus.Files[dataType] = dataStatusFiles } - if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, int64(buf.Len()), true); err != nil { + + dataStatusj, err := json.Marshal(dataStatus) + if err != nil { + return err + } + if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil { + return err + } + + return nil +} + +func (d *DataManager) writeDataFile(ctx context.Context, buf *bytes.Buffer, size int64, dataFileIndex *DataFileIndex, dataFileID, dataType string) error { + if buf.Len() == 0 { + return fmt.Errorf("empty data entries") + } + + if err := d.ost.WriteObject(DataFilePath(dataType, dataFileID), buf, size, true); err != nil { return err } @@ -211,21 +211,293 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty if err != nil { return err } - if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil { + if err := d.ost.WriteObject(DataFileIndexPath(dataType, dataFileID), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil { return err } return nil } +type ActionGroup struct { + DataStatusFile *DataStatusFile + StartActionIndex int + ActionsSize int + PreviousDataStatusFiles []*DataStatusFile +} + +func (d *DataManager) actionGroups(ctx context.Context, wi walIndex, dataType string, curDataStatusFiles []*DataStatusFile) ([]*ActionGroup, []*DataStatusFile) { + dataStatusFiles := []*DataStatusFile{} + remainingDataStatusFiles := []*DataStatusFile{} + + actionGroups := []*ActionGroup{} + + var startActionIndex int + var actionsSize int + + var actionIndex int + var curDataStatusFileIndex int + for { + var action *Action + if actionIndex <= len(wi[dataType])-1 { + action = wi[dataType][actionIndex] + } + + var curDataStatusFile *DataStatusFile + if curDataStatusFileIndex <= len(curDataStatusFiles)-1 { + curDataStatusFile = curDataStatusFiles[curDataStatusFileIndex] + } + + if action == nil { + if actionsSize > 0 { + actionGroup := &ActionGroup{ + DataStatusFile: curDataStatusFile, + StartActionIndex: startActionIndex, + ActionsSize: actionsSize, + PreviousDataStatusFiles: dataStatusFiles, + } + actionGroups = append(actionGroups, actionGroup) + curDataStatusFileIndex++ + if curDataStatusFileIndex <= len(curDataStatusFiles)-1 { + remainingDataStatusFiles = curDataStatusFiles[curDataStatusFileIndex:] + } + } + break + } + + if curDataStatusFile != nil { + if curDataStatusFile.LastEntryID >= action.ID || curDataStatusFileIndex == len(curDataStatusFiles)-1 { + // continue using this status file + actionIndex++ + actionsSize++ + } else { + // find new status file + if actionsSize > 0 { + actionGroup := &ActionGroup{ + DataStatusFile: curDataStatusFile, + StartActionIndex: startActionIndex, + ActionsSize: actionsSize, + PreviousDataStatusFiles: dataStatusFiles, + } + actionGroups = append(actionGroups, actionGroup) + + startActionIndex = actionIndex + actionsSize = 0 + dataStatusFiles = []*DataStatusFile{} + } else { + dataStatusFiles = append(dataStatusFiles, curDataStatusFile) + } + curDataStatusFileIndex++ + } + } else { + actionIndex++ + actionsSize++ + } + } + + return actionGroups, remainingDataStatusFiles +} + +func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType string, curDataStatusFiles []*DataStatusFile) ([]*DataStatusFile, error) { + type SplitPoint struct { + pos int64 + lastEntryID string + } + + if len(wi[dataType]) == 0 { + // no actions + return curDataStatusFiles, nil + } + actionGroups, remainingDataStatusFiles := d.actionGroups(ctx, wi, dataType, curDataStatusFiles) + + dataStatusFiles := []*DataStatusFile{} + + for _, actionGroup := range actionGroups { + dataStatusFiles = append(dataStatusFiles, actionGroup.PreviousDataStatusFiles...) + + splitPoints := []SplitPoint{} + dataFileIndexes := []*DataFileIndex{} + dataFileIndex := &DataFileIndex{ + Index: make(map[string]int64), + } + dataEntries := []*DataEntry{} + var buf bytes.Buffer + var pos int64 + var lastEntryID string + + if actionGroup.DataStatusFile != nil { + // TODO(sgotti) instead of reading all entries in memory decode it's contents one by one when needed + oldDataf, err := d.ost.ReadObject(DataFilePath(dataType, actionGroup.DataStatusFile.ID)) + if err != nil && err != ostypes.ErrNotExist { + return nil, err + } + if err != ostypes.ErrNotExist { + dec := json.NewDecoder(oldDataf) + for { + var de *DataEntry + + err := dec.Decode(&de) + if err == io.EOF { + // all done + break + } + if err != nil { + oldDataf.Close() + return nil, err + } + + dataEntries = append(dataEntries, de) + } + oldDataf.Close() + } + } + + dataEntryIndex := 0 + actionIndex := actionGroup.StartActionIndex + + // iterate over data entries and action in order + for { + exists := false + useAction := false + + var action *Action + if actionIndex < actionGroup.StartActionIndex+actionGroup.ActionsSize { + action = wi[dataType][actionIndex] + } + + var de *DataEntry + if dataEntryIndex <= len(dataEntries)-1 { + de = dataEntries[dataEntryIndex] + } + + if de == nil && action == nil { + break + } + + if action != nil { + if de != nil { + if de.ID == action.ID { + exists = true + useAction = true + } + if de.ID > action.ID { + useAction = true + } + } else { + useAction = true + } + + if useAction { + de = nil + switch action.ActionType { + case ActionTypePut: + de = &DataEntry{ + ID: action.ID, + DataType: action.DataType, + Data: action.Data, + } + if exists { + // replace current data entry with the action data + dataEntryIndex++ + } + case ActionTypeDelete: + if exists { + // skip current data entry + dataEntryIndex++ + } + } + actionIndex++ + } else { + dataEntryIndex++ + } + } else { + dataEntryIndex++ + } + + if de != nil { + lastEntryID = de.ID + dataEntryj, err := json.Marshal(de) + if err != nil { + return nil, err + } + if _, err := buf.Write(dataEntryj); err != nil { + return nil, err + } + dataFileIndex.Index[de.ID] = pos + prevPos := pos + pos += int64(len(dataEntryj)) + var lastSplitPos int64 + if len(splitPoints) > 0 { + lastSplitPos = splitPoints[len(splitPoints)-1].pos + } + if pos-lastSplitPos > d.maxDataFileSize { + // add split point only if it's different (less) than the previous one + if lastSplitPos < prevPos { + splitPoints = append(splitPoints, SplitPoint{pos: int64(buf.Len()), lastEntryID: lastEntryID}) + dataFileIndexes = append(dataFileIndexes, dataFileIndex) + dataFileIndex = &DataFileIndex{ + Index: make(map[string]int64), + } + } + } + } + } + + // save remaining data + if buf.Len() != 0 { + var curPos int64 + var lastSplitPos int64 + if len(splitPoints) > 0 { + lastSplitPos = splitPoints[len(splitPoints)-1].pos + } + // add final split point if there's something left in the buffer + if lastSplitPos != int64(buf.Len()) { + splitPoints = append(splitPoints, SplitPoint{pos: int64(buf.Len()), lastEntryID: lastEntryID}) + } + dataFileIndexes = append(dataFileIndexes, dataFileIndex) + for i, sp := range splitPoints { + curDataFileID := uuid.NewV4().String() + if err := d.writeDataFile(ctx, &buf, sp.pos-curPos, dataFileIndexes[i], curDataFileID, dataType); err != nil { + return nil, err + } + // insert new dataStatusFile + dataStatusFiles = append(dataStatusFiles, &DataStatusFile{ + ID: curDataFileID, + LastEntryID: sp.lastEntryID, + }) + + curPos = sp.pos + } + } + + } + + dataStatusFiles = append(dataStatusFiles, remainingDataStatusFiles...) + + return dataStatusFiles, nil +} + func (d *DataManager) Read(dataType, id string) (io.Reader, error) { curDataStatus, err := d.GetLastDataStatus() if err != nil { return nil, err } - dataSequence := curDataStatus.DataSequence + curFiles := curDataStatus.Files - dataFileIndexf, err := d.ost.ReadObject(dataFileIndexPath(dataType, dataSequence)) + var matchingDataFileID string + // get the matching data file for the action entry ID + if len(curFiles[dataType]) == 0 { + return nil, ostypes.ErrNotExist + } + + matchingDataFileID = curFiles[dataType][0].ID + for _, dataStatusFile := range curFiles[dataType] { + if dataStatusFile.LastEntryID > id { + matchingDataFileID = dataStatusFile.ID + break + } + } + + dataFileIndexf, err := d.ost.ReadObject(DataFileIndexPath(dataType, matchingDataFileID)) if err != nil { return nil, err } @@ -243,7 +515,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { return nil, ostypes.ErrNotExist } - dataf, err := d.ost.ReadObject(dataFilePath(dataType, dataSequence)) + dataf, err := d.ost.ReadObject(DataFilePath(dataType, matchingDataFileID)) if err != nil { return nil, err } diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index d175705..8c0641b 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -38,7 +38,7 @@ import ( // wals/{walSeq} const ( - DefaultCheckpointInterval = 1 * time.Minute + DefaultCheckpointInterval = 10 * time.Second DefaultEtcdWalsKeepNum = 100 DefaultMinCheckpointWalsNum = 100 ) @@ -63,6 +63,8 @@ var ( etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq") etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq") + etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq") + etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") @@ -88,6 +90,7 @@ type DataManagerConfig struct { CheckpointInterval time.Duration // MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint MinCheckpointWalsNum int + MaxDataFileSize int64 } type DataManager struct { @@ -100,6 +103,7 @@ type DataManager struct { etcdWalsKeepNum int checkpointInterval time.Duration minCheckpointWalsNum int + maxDataFileSize int64 } func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) { @@ -118,6 +122,9 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo if conf.MinCheckpointWalsNum < 1 { return nil, errors.New("minCheckpointWalsNum must be greater than 0") } + if conf.MaxDataFileSize == 0 { + conf.MaxDataFileSize = DefaultMaxDataFileSize + } d := &DataManager{ basePath: conf.BasePath, @@ -129,6 +136,7 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo etcdWalsKeepNum: conf.EtcdWalsKeepNum, checkpointInterval: conf.CheckpointInterval, minCheckpointWalsNum: conf.MinCheckpointWalsNum, + maxDataFileSize: conf.MaxDataFileSize, } // add trailing slash the basepath diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index ff6c2d4..9d41df1 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -16,9 +16,13 @@ package datamanager import ( "context" + "encoding/json" "fmt" + "io" "io/ioutil" "os" + "reflect" + "sort" "testing" "time" @@ -147,7 +151,7 @@ func TestEtcdReset(t *testing.T) { dm, err = NewDataManager(ctx, logger, dmConfig) dmReadyCh = make(chan struct{}) - t.Logf("starting wal") + t.Logf("starting datamanager") go dm.Run(ctx, dmReadyCh) <-dmReadyCh @@ -293,8 +297,12 @@ func TestWalCleaner(t *testing.T) { } } - dm.checkpoint(ctx) - dm.walCleaner(ctx) + if err := dm.checkpoint(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := dm.walCleaner(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } walsCount := 0 for range dm.ListEtcdWals(ctx, 0) { @@ -333,6 +341,12 @@ func TestReadObject(t *testing.T) { } dm, err := NewDataManager(ctx, logger, dmConfig) + dmReadyCh := make(chan struct{}) + go dm.Run(ctx, dmReadyCh) + <-dmReadyCh + + time.Sleep(5 * time.Second) + actions := []*Action{} for i := 0; i < 20; i++ { actions = append(actions, &Action{ @@ -343,12 +357,6 @@ func TestReadObject(t *testing.T) { }) } - dmReadyCh := make(chan struct{}) - go dm.Run(ctx, dmReadyCh) - <-dmReadyCh - - time.Sleep(5 * time.Second) - // populate with a wal _, err = dm.WriteWal(ctx, actions, nil) if err != nil { @@ -398,8 +406,12 @@ func TestReadObject(t *testing.T) { } // do a checkpoint and wal clean - dm.checkpoint(ctx) - dm.walCleaner(ctx) + if err := dm.checkpoint(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := dm.walCleaner(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } // wait for the event to be read time.Sleep(500 * time.Millisecond) @@ -417,3 +429,348 @@ func TestReadObject(t *testing.T) { t.Fatalf("unexpected err: %v", err) } } + +func testCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGroups [][]*Action, currentEntries map[string]*DataEntry) (map[string]*DataEntry, error) { + expectedEntries := map[string]*DataEntry{} + for _, e := range currentEntries { + expectedEntries[e.ID] = e + } + + for _, actionGroup := range actionGroups { + for _, action := range actionGroup { + switch action.ActionType { + case ActionTypePut: + expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data} + case ActionTypeDelete: + delete(expectedEntries, action.ID) + } + } + } + + for _, actionGroup := range actionGroups { + // populate with a wal + _, err := dm.WriteWal(ctx, actionGroup, nil) + if err != nil { + return nil, err + } + } + + // wait for the event to be read + time.Sleep(500 * time.Millisecond) + + // do a checkpoint + if err := dm.checkpoint(ctx); err != nil { + return nil, err + } + + if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil { + return nil, err + } + + return expectedEntries, nil +} + +// TODO(sgotti) some fuzzy testing will be really good +func TestCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx := context.Background() + + ostDir, err := ioutil.TempDir(dir, "ost") + ost, err := posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + dmConfig := &DataManagerConfig{ + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + dmReadyCh := make(chan struct{}) + go dm.Run(ctx, dmReadyCh) + <-dmReadyCh + + time.Sleep(5 * time.Second) + + contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + // test insert from scratch (no current entries) + actions := []*Action{} + for i := 200; i < 400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + + currentEntries, err := testCheckpoint(t, ctx, dm, [][]*Action{actions}, nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // test delete of all existing entries + actions = []*Action{} + for i := 200; i < 400; i++ { + actions = append(actions, &Action{ + ActionType: ActionTypeDelete, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + }) + } + + currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // test insert from scratch again (no current entries) + actions = []*Action{} + for i := 200; i < 400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + + currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // test delete some existing entries in the middle + actions = []*Action{} + for i := 250; i < 350; i++ { + action := &Action{ + ActionType: ActionTypeDelete, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + } + actions = append(actions, action) + } + + currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // test delete of unexisting entries + actions = []*Action{} + for i := 1000; i < 1010; i++ { + action := &Action{ + ActionType: ActionTypeDelete, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + } + actions = append(actions, action) + } + + currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // test update and insert at the end + actions = []*Action{} + for i := 300; i < 500; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + + currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // test update and insert at the start + actions = []*Action{} + for i := 0; i < 300; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + + currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // test multiple wals with different insert, updated, deletes + actionGroups := [][]*Action{} + for i := 0; i < 150; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + actionGroups = append(actionGroups, actions) + for i := 50; i < 100; i++ { + action := &Action{ + ActionType: ActionTypeDelete, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + actionGroups = append(actionGroups, actions) + for i := 250; i < 300; i++ { + action := &Action{ + ActionType: ActionTypeDelete, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + for i := 70; i < 80; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + actionGroups = append(actionGroups, actions) + + currentEntries, err = testCheckpoint(t, ctx, dm, actionGroups, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } +} + +func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error { + // read the data file + curDataStatus, err := dm.GetLastDataStatus() + if err != nil { + return err + } + + allEntriesMap := map[string]*DataEntry{} + var prevLastEntryID string + + for i, file := range curDataStatus.Files["datatype01"] { + dataFileIndexf, err := dm.ost.ReadObject(DataFileIndexPath("datatype01", file.ID)) + if err != nil { + return err + } + var dataFileIndex *DataFileIndex + dec := json.NewDecoder(dataFileIndexf) + err = dec.Decode(&dataFileIndex) + if err != nil { + dataFileIndexf.Close() + return err + } + + dataFileIndexf.Close() + dataEntriesMap := map[string]*DataEntry{} + dataEntries := []*DataEntry{} + dataf, err := dm.ost.ReadObject(DataFilePath("datatype01", file.ID)) + if err != nil { + return err + } + dec = json.NewDecoder(dataf) + var prevEntryID string + for { + var de *DataEntry + + err := dec.Decode(&de) + if err == io.EOF { + // all done + break + } + if err != nil { + dataf.Close() + return err + } + // check that there are no duplicate entries + if _, ok := allEntriesMap[de.ID]; ok { + return fmt.Errorf("duplicate entry id: %s", de.ID) + } + // check that the entries are in order + if de.ID < prevEntryID { + return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID) + } + + dataEntriesMap[de.ID] = de + dataEntries = append(dataEntries, de) + allEntriesMap[de.ID] = de + } + dataf.Close() + + // check that the index matches the entries + if len(dataFileIndex.Index) != len(dataEntriesMap) { + return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap)) + } + indexIDs := make([]string, len(dataFileIndex.Index)) + entriesIDs := make([]string, len(dataEntriesMap)) + for id := range dataFileIndex.Index { + indexIDs = append(indexIDs, id) + } + for id := range dataEntriesMap { + entriesIDs = append(entriesIDs, id) + } + sort.Strings(indexIDs) + sort.Strings(entriesIDs) + if !reflect.DeepEqual(indexIDs, entriesIDs) { + return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs) + } + + if file.LastEntryID != dataEntries[len(dataEntries)-1].ID { + return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID) + } + + // check that all the files are in order + if file.LastEntryID == prevLastEntryID { + return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID) + } + if file.LastEntryID < prevLastEntryID { + return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID) + } + prevLastEntryID = file.LastEntryID + } + + // check that the number of entries is right + if len(allEntriesMap) != len(expectedEntriesMap) { + return fmt.Errorf("expected %d total entries, got %d", len(expectedEntriesMap), len(allEntriesMap)) + } + if !reflect.DeepEqual(expectedEntriesMap, allEntriesMap) { + return fmt.Errorf("expected entries don't match current entries") + } + + return nil +} diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 2714f46..40ecbd8 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -723,7 +723,7 @@ func (d *DataManager) checkpoint(ctx context.Context) error { return nil } - if err := d.writeData(ctx, walsData); err != nil { + if err := d.writeDataSnapshot(ctx, walsData); err != nil { return errors.Errorf("checkpoint function error: %w", err) } diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 49d3caa..f329602 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -133,44 +133,46 @@ func (r *ReadDB) SyncFromDump() (string, error) { return "", nil } for dataType, files := range dumpIndex.Files { - dumpf, err := r.ost.ReadObject(files[0]) - if err != nil { - return "", err - } - dumpEntries := []*datamanager.DataEntry{} - dec := json.NewDecoder(dumpf) - for { - var de *datamanager.DataEntry - - err := dec.Decode(&de) - if err == io.EOF { - // all done - break - } + for _, file := range files { + dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID)) if err != nil { - dumpf.Close() return "", err } - dumpEntries = append(dumpEntries, de) - } - dumpf.Close() + dumpEntries := []*datamanager.DataEntry{} + dec := json.NewDecoder(dumpf) + for { + var de *datamanager.DataEntry - err = r.rdb.Do(func(tx *db.Tx) error { - for _, de := range dumpEntries { - action := &datamanager.Action{ - ActionType: datamanager.ActionTypePut, - ID: de.ID, - DataType: dataType, - Data: de.Data, + err := dec.Decode(&de) + if err == io.EOF { + // all done + break } - if err := r.applyAction(tx, action); err != nil { - return err + if err != nil { + dumpf.Close() + return "", err } + dumpEntries = append(dumpEntries, de) + } + dumpf.Close() + + err = r.rdb.Do(func(tx *db.Tx) error { + for _, de := range dumpEntries { + action := &datamanager.Action{ + ActionType: datamanager.ActionTypePut, + ID: de.ID, + DataType: dataType, + Data: de.Data, + } + if err := r.applyAction(tx, action); err != nil { + return err + } + } + return nil + }) + if err != nil { + return "", err } - return nil - }) - if err != nil { - return "", err } } diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 4f0176b..cd00f17 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -660,44 +660,46 @@ func (r *ReadDB) SyncFromDump() (string, error) { return "", nil } for dataType, files := range dumpIndex.Files { - dumpf, err := r.ost.ReadObject(files[0]) - if err != nil { - return "", err - } - dumpEntries := []*datamanager.DataEntry{} - dec := json.NewDecoder(dumpf) - for { - var de *datamanager.DataEntry - - err := dec.Decode(&de) - if err == io.EOF { - // all done - break - } + for _, file := range files { + dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID)) if err != nil { - dumpf.Close() return "", err } - dumpEntries = append(dumpEntries, de) - } - dumpf.Close() + dumpEntries := []*datamanager.DataEntry{} + dec := json.NewDecoder(dumpf) + for { + var de *datamanager.DataEntry - err = r.rdb.Do(func(tx *db.Tx) error { - for _, de := range dumpEntries { - action := &datamanager.Action{ - ActionType: datamanager.ActionTypePut, - ID: de.ID, - DataType: dataType, - Data: de.Data, + err := dec.Decode(&de) + if err == io.EOF { + // all done + break } - if err := r.applyAction(tx, action); err != nil { - return err + if err != nil { + dumpf.Close() + return "", err } + dumpEntries = append(dumpEntries, de) + } + dumpf.Close() + + err = r.rdb.Do(func(tx *db.Tx) error { + for _, de := range dumpEntries { + action := &datamanager.Action{ + ActionType: datamanager.ActionTypePut, + ID: de.ID, + DataType: dataType, + Data: de.Data, + } + if err := r.applyAction(tx, action); err != nil { + return err + } + } + return nil + }) + if err != nil { + return "", err } - return nil - }) - if err != nil { - return "", err } }