datamanager: add option to force a checkpoint
This commit is contained in:
parent
512162bf98
commit
445ef24daa
@ -318,7 +318,7 @@ func TestWalCleaner(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := dm.checkpoint(ctx); err != nil {
|
if err := dm.checkpoint(ctx, true); err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
if err := dm.walCleaner(ctx); err != nil {
|
if err := dm.walCleaner(ctx); err != nil {
|
||||||
@ -436,7 +436,7 @@ func TestReadObject(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// do a checkpoint and wal clean
|
// do a checkpoint and wal clean
|
||||||
if err := dm.checkpoint(ctx); err != nil {
|
if err := dm.checkpoint(ctx, true); err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
if err := dm.walCleaner(ctx); err != nil {
|
if err := dm.walCleaner(ctx); err != nil {
|
||||||
@ -489,7 +489,7 @@ func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, ac
|
|||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
// do a checkpoint
|
// do a checkpoint
|
||||||
if err := dm.checkpoint(ctx); err != nil {
|
if err := dm.checkpoint(ctx, true); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,7 +642,7 @@ func (d *DataManager) sync(ctx context.Context) error {
|
|||||||
func (d *DataManager) checkpointLoop(ctx context.Context) {
|
func (d *DataManager) checkpointLoop(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
d.log.Debugf("checkpointer")
|
d.log.Debugf("checkpointer")
|
||||||
if err := d.checkpoint(ctx); err != nil {
|
if err := d.checkpoint(ctx, false); err != nil {
|
||||||
d.log.Errorf("checkpoint error: %v", err)
|
d.log.Errorf("checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -656,7 +656,7 @@ func (d *DataManager) checkpointLoop(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DataManager) checkpoint(ctx context.Context) error {
|
func (d *DataManager) checkpoint(ctx context.Context, force bool) error {
|
||||||
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
|
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -694,7 +694,11 @@ func (d *DataManager) checkpoint(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
walsData = append(walsData, walData)
|
walsData = append(walsData, walData)
|
||||||
}
|
}
|
||||||
if len(walsData) < d.minCheckpointWalsNum {
|
|
||||||
|
if !force && len(walsData) < d.minCheckpointWalsNum {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if len(walsData) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user