diff --git a/internal/wal/wal.go b/internal/wal/wal.go index aae67fc..98c9de8 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -480,6 +480,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle defer close(walCh) for wresp := range wch { we := &WatchElement{ChangeGroupsRevisions: make(changeGroupsRevisions)} + send := false if wresp.Canceled { err := wresp.Err() @@ -501,6 +502,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle switch { case strings.HasPrefix(key, etcdWalsDir+"/"): + send = true switch ev.Type { case mvccpb.PUT: var walData *WalData @@ -514,6 +516,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle } case strings.HasPrefix(key, etcdChangeGroupsDir+"/"): + send = true switch ev.Type { case mvccpb.PUT: changeGroup := path.Base(string(ev.Kv.Key)) @@ -523,12 +526,17 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle we.ChangeGroupsRevisions[changeGroup] = 0 } + case key == etcdPingKey: + send = true + default: continue } } - walCh <- we + if send { + walCh <- we + } } }()