diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index f4b9f86..49d3caa 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -351,17 +351,15 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { if walElement.WalData.WalSequence <= curWalSeq { continue } - //if walElement.WalData.WalStatus == datamanager.WalStatusCommittedStorage { if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil { return err } - //} - //// update readdb only when the wal has been committed to objectstorage - //if walElement.WalData.WalStatus != datamanager.WalStatusCommittedStorage { - // return nil - //} + // update readdb only when the wal has been committed to etcd + if walElement.WalData.WalStatus != datamanager.WalStatusCommitted { + return nil + } r.log.Debugf("applying wal to db") if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { @@ -524,17 +522,6 @@ func (r *ReadDB) handleEvent(tx *db.Tx, we *datamanager.WatchElement) error { } func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error { - // update readdb only when the wal has been committed to objectstorage - //if we.WalData.WalStatus != wal.WalStatusCommittedStorage { - // return nil - //} - - if we.WalData != nil { - if err := r.insertCommittedWalSequence(tx, we.WalData.WalSequence); err != nil { - return err - } - } - for cgName, cgRev := range we.ChangeGroupsRevisions { if err := r.insertChangeGroupRevision(tx, cgName, cgRev); err != nil { return err @@ -542,6 +529,15 @@ func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error { } if we.WalData != nil { + // update readdb only when the wal has been committed to etcd + if we.WalData.WalStatus != datamanager.WalStatusCommitted { + return nil + } + + if err := r.insertCommittedWalSequence(tx, we.WalData.WalSequence); err != nil { + return err + } + r.log.Debugf("applying wal to db") return r.applyWal(tx, we.WalData.WalDataFileID) } diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index e259053..4f0176b 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -622,6 +622,11 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { return err } + // update readdb only when the wal has been committed to etcd + if walElement.WalData.WalStatus != datamanager.WalStatusCommitted { + return nil + } + r.log.Debugf("applying wal to db") if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { return err @@ -923,7 +928,7 @@ func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error { } if we.WalData != nil { - // update readdb only when the wal has been committed to objectstorage + // update readdb only when the wal has been committed to etcd if we.WalData.WalStatus != datamanager.WalStatusCommitted { return nil }