diff --git a/diode/diode.go b/diode/diode.go index 377a523..a720f0b 100644 --- a/diode/diode.go +++ b/diode/diode.go @@ -8,7 +8,7 @@ import ( "sync" "time" - diodes "code.cloudfoundry.org/go-diodes" + "github.com/rs/zerolog/diode/internal/diodes" ) var bufPool = &sync.Pool{ @@ -17,6 +17,8 @@ var bufPool = &sync.Pool{ }, } +type Alerter func(missed int) + // Writer is a io.Writer wrapper that uses a diode to make Write lock-free, // non-blocking and thread safe. type Writer struct { @@ -33,19 +35,19 @@ type Writer struct { // // Use a diode.Writer when // -// d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) { +// w := diode.NewWriter(w, 1000, 10 * time.Millisecond, func(missed int) { // log.Printf("Dropped %d messages", missed) -// })) -// w := diode.NewWriter(w, d, 10 * time.Millisecond) +// }) // log := zerolog.New(w) // // See code.cloudfoundry.org/go-diodes for more info on diode. -func NewWriter(w io.Writer, manyToOneDiode *diodes.ManyToOne, poolInterval time.Duration) Writer { +func NewWriter(w io.Writer, size int, poolInterval time.Duration, f Alerter) Writer { ctx, cancel := context.WithCancel(context.Background()) + d := diodes.NewManyToOne(size, diodes.AlertFunc(f)) dw := Writer{ w: w, - d: manyToOneDiode, - p: diodes.NewPoller(manyToOneDiode, + d: d, + p: diodes.NewPoller(d, diodes.WithPollingInterval(poolInterval), diodes.WithPollingContext(ctx)), c: cancel, diff --git a/diode/diode_example_test.go b/diode/diode_example_test.go index e04f36b..a097c57 100644 --- a/diode/diode_example_test.go +++ b/diode/diode_example_test.go @@ -7,16 +7,14 @@ import ( "os" "time" - diodes "code.cloudfoundry.org/go-diodes" "github.com/rs/zerolog" "github.com/rs/zerolog/diode" ) func ExampleNewWriter() { - d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) { + w := diode.NewWriter(os.Stdout, 1000, 10*time.Millisecond, func(missed int) { fmt.Printf("Dropped %d messages\n", missed) - })) - w := diode.NewWriter(os.Stdout, d, 10*time.Millisecond) + }) log := zerolog.New(w) log.Print("test") diff --git a/diode/diode_test.go b/diode/diode_test.go index 31df653..6171cb4 100644 --- a/diode/diode_test.go +++ b/diode/diode_test.go @@ -9,18 +9,16 @@ import ( "testing" "time" - diodes "code.cloudfoundry.org/go-diodes" "github.com/rs/zerolog" "github.com/rs/zerolog/diode" "github.com/rs/zerolog/internal/cbor" ) func TestNewWriter(t *testing.T) { - d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) { - fmt.Printf("Dropped %d messages\n", missed) - })) buf := bytes.Buffer{} - w := diode.NewWriter(&buf, d, 10*time.Millisecond) + w := diode.NewWriter(&buf, 1000, 10*time.Millisecond, func(missed int) { + fmt.Printf("Dropped %d messages\n", missed) + }) log := zerolog.New(w) log.Print("test") @@ -35,8 +33,7 @@ func TestNewWriter(t *testing.T) { func Benchmark(b *testing.B) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stderr) - d := diodes.NewManyToOne(100000, nil) - w := diode.NewWriter(ioutil.Discard, d, 10*time.Millisecond) + w := diode.NewWriter(ioutil.Discard, 100000, 10*time.Millisecond, nil) log := zerolog.New(w) defer w.Close() diff --git a/diode/internal/diodes/README b/diode/internal/diodes/README new file mode 100644 index 0000000..6c4ec5f --- /dev/null +++ b/diode/internal/diodes/README @@ -0,0 +1 @@ +Copied from https://github.com/cloudfoundry/go-diodes to avoid test dependencies. diff --git a/diode/internal/diodes/many_to_one.go b/diode/internal/diodes/many_to_one.go new file mode 100644 index 0000000..0f562f7 --- /dev/null +++ b/diode/internal/diodes/many_to_one.go @@ -0,0 +1,130 @@ +package diodes + +import ( + "log" + "sync/atomic" + "unsafe" +) + +// ManyToOne diode is optimal for many writers (go-routines B-n) and a single +// reader (go-routine A). It is not thread safe for multiple readers. +type ManyToOne struct { + buffer []unsafe.Pointer + writeIndex uint64 + readIndex uint64 + alerter Alerter +} + +// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode +// is optimzed for many writers (on go-routines B-n) and a single reader +// (on go-routine A). The alerter is invoked on the read's go-routine. It is +// called when it notices that the writer go-routine has passed it and wrote +// over data. A nil can be used to ignore alerts. +func NewManyToOne(size int, alerter Alerter) *ManyToOne { + if alerter == nil { + alerter = AlertFunc(func(int) {}) + } + + d := &ManyToOne{ + buffer: make([]unsafe.Pointer, size), + alerter: alerter, + } + + // Start write index at the value before 0 + // to allow the first write to use AddUint64 + // and still have a beginning index of 0 + d.writeIndex = ^d.writeIndex + return d +} + +// Set sets the data in the next slot of the ring buffer. +func (d *ManyToOne) Set(data GenericDataType) { + for { + writeIndex := atomic.AddUint64(&d.writeIndex, 1) + idx := writeIndex % uint64(len(d.buffer)) + old := atomic.LoadPointer(&d.buffer[idx]) + + if old != nil && + (*bucket)(old) != nil && + (*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) { + log.Println("Diode set collision: consider using a larger diode") + continue + } + + newBucket := &bucket{ + data: data, + seq: writeIndex, + } + + if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) { + log.Println("Diode set collision: consider using a larger diode") + continue + } + + return + } +} + +// TryNext will attempt to read from the next slot of the ring buffer. +// If there is not data available, it will return (nil, false). +func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) { + // Read a value from the ring buffer based on the readIndex. + idx := d.readIndex % uint64(len(d.buffer)) + result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil)) + + // When the result is nil that means the writer has not had the + // opportunity to write a value into the diode. This value must be ignored + // and the read head must not increment. + if result == nil { + return nil, false + } + + // When the seq value is less than the current read index that means a + // value was read from idx that was previously written but has since has + // been dropped. This value must be ignored and the read head must not + // increment. + // + // The simulation for this scenario assumes the fast forward occurred as + // detailed below. + // + // 5. The reader reads again getting seq 5. It then reads again expecting + // seq 6 but gets seq 2. This is a read of a stale value that was + // effectively "dropped" so the read fails and the read head stays put. + // `| 4 | 5 | 2 | 3 |` r: 7, w: 6 + // + if result.seq < d.readIndex { + return nil, false + } + + // When the seq value is greater than the current read index that means a + // value was read from idx that overwrote the value that was expected to + // be at this idx. This happens when the writer has lapped the reader. The + // reader needs to catch up to the writer so it moves its write head to + // the new seq, effectively dropping the messages that were not read in + // between the two values. + // + // Here is a simulation of this scenario: + // + // 1. Both the read and write heads start at 0. + // `| nil | nil | nil | nil |` r: 0, w: 0 + // 2. The writer fills the buffer. + // `| 0 | 1 | 2 | 3 |` r: 0, w: 4 + // 3. The writer laps the read head. + // `| 4 | 5 | 2 | 3 |` r: 0, w: 6 + // 4. The reader reads the first value, expecting a seq of 0 but reads 4, + // this forces the reader to fast forward to 5. + // `| 4 | 5 | 2 | 3 |` r: 5, w: 6 + // + if result.seq > d.readIndex { + dropped := result.seq - d.readIndex + d.readIndex = result.seq + d.alerter.Alert(int(dropped)) + } + + // Only increment read index if a regular read occurred (where seq was + // equal to readIndex) or a value was read that caused a fast forward + // (where seq was greater than readIndex). + // + d.readIndex++ + return result.data, true +} diff --git a/diode/internal/diodes/one_to_one.go b/diode/internal/diodes/one_to_one.go new file mode 100644 index 0000000..aaf66d1 --- /dev/null +++ b/diode/internal/diodes/one_to_one.go @@ -0,0 +1,129 @@ +package diodes + +import ( + "sync/atomic" + "unsafe" +) + +// GenericDataType is the data type the diodes operate on. +type GenericDataType unsafe.Pointer + +// Alerter is used to report how many values were overwritten since the +// last write. +type Alerter interface { + Alert(missed int) +} + +// AlertFunc type is an adapter to allow the use of ordinary functions as +// Alert handlers. +type AlertFunc func(missed int) + +// Alert calls f(missed) +func (f AlertFunc) Alert(missed int) { + f(missed) +} + +type bucket struct { + data GenericDataType + seq uint64 // seq is the recorded write index at the time of writing +} + +// OneToOne diode is meant to be used by a single reader and a single writer. +// It is not thread safe if used otherwise. +type OneToOne struct { + buffer []unsafe.Pointer + writeIndex uint64 + readIndex uint64 + alerter Alerter +} + +// NewOneToOne creates a new diode is meant to be used by a single reader and +// a single writer. The alerter is invoked on the read's go-routine. It is +// called when it notices that the writer go-routine has passed it and wrote +// over data. A nil can be used to ignore alerts. +func NewOneToOne(size int, alerter Alerter) *OneToOne { + if alerter == nil { + alerter = AlertFunc(func(int) {}) + } + + return &OneToOne{ + buffer: make([]unsafe.Pointer, size), + alerter: alerter, + } +} + +// Set sets the data in the next slot of the ring buffer. +func (d *OneToOne) Set(data GenericDataType) { + idx := d.writeIndex % uint64(len(d.buffer)) + + newBucket := &bucket{ + data: data, + seq: d.writeIndex, + } + d.writeIndex++ + + atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket)) +} + +// TryNext will attempt to read from the next slot of the ring buffer. +// If there is no data available, it will return (nil, false). +func (d *OneToOne) TryNext() (data GenericDataType, ok bool) { + // Read a value from the ring buffer based on the readIndex. + idx := d.readIndex % uint64(len(d.buffer)) + result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil)) + + // When the result is nil that means the writer has not had the + // opportunity to write a value into the diode. This value must be ignored + // and the read head must not increment. + if result == nil { + return nil, false + } + + // When the seq value is less than the current read index that means a + // value was read from idx that was previously written but has since has + // been dropped. This value must be ignored and the read head must not + // increment. + // + // The simulation for this scenario assumes the fast forward occurred as + // detailed below. + // + // 5. The reader reads again getting seq 5. It then reads again expecting + // seq 6 but gets seq 2. This is a read of a stale value that was + // effectively "dropped" so the read fails and the read head stays put. + // `| 4 | 5 | 2 | 3 |` r: 7, w: 6 + // + if result.seq < d.readIndex { + return nil, false + } + + // When the seq value is greater than the current read index that means a + // value was read from idx that overwrote the value that was expected to + // be at this idx. This happens when the writer has lapped the reader. The + // reader needs to catch up to the writer so it moves its write head to + // the new seq, effectively dropping the messages that were not read in + // between the two values. + // + // Here is a simulation of this scenario: + // + // 1. Both the read and write heads start at 0. + // `| nil | nil | nil | nil |` r: 0, w: 0 + // 2. The writer fills the buffer. + // `| 0 | 1 | 2 | 3 |` r: 0, w: 4 + // 3. The writer laps the read head. + // `| 4 | 5 | 2 | 3 |` r: 0, w: 6 + // 4. The reader reads the first value, expecting a seq of 0 but reads 4, + // this forces the reader to fast forward to 5. + // `| 4 | 5 | 2 | 3 |` r: 5, w: 6 + // + if result.seq > d.readIndex { + dropped := result.seq - d.readIndex + d.readIndex = result.seq + d.alerter.Alert(int(dropped)) + } + + // Only increment read index if a regular read occurred (where seq was + // equal to readIndex) or a value was read that caused a fast forward + // (where seq was greater than readIndex). + d.readIndex++ + return result.data, true +} diff --git a/diode/internal/diodes/poller.go b/diode/internal/diodes/poller.go new file mode 100644 index 0000000..d317a23 --- /dev/null +++ b/diode/internal/diodes/poller.go @@ -0,0 +1,80 @@ +package diodes + +import ( + "context" + "time" +) + +// Diode is any implementation of a diode. +type Diode interface { + Set(GenericDataType) + TryNext() (GenericDataType, bool) +} + +// Poller will poll a diode until a value is available. +type Poller struct { + Diode + interval time.Duration + ctx context.Context +} + +// PollerConfigOption can be used to setup the poller. +type PollerConfigOption func(*Poller) + +// WithPollingInterval sets the interval at which the diode is queried +// for new data. The default is 10ms. +func WithPollingInterval(interval time.Duration) PollerConfigOption { + return PollerConfigOption(func(c *Poller) { + c.interval = interval + }) +} + +// WithPollingContext sets the context to cancel any retrieval (Next()). It +// will not change any results for adding data (Set()). Default is +// context.Background(). +func WithPollingContext(ctx context.Context) PollerConfigOption { + return PollerConfigOption(func(c *Poller) { + c.ctx = ctx + }) +} + +// NewPoller returns a new Poller that wraps the given diode. +func NewPoller(d Diode, opts ...PollerConfigOption) *Poller { + p := &Poller{ + Diode: d, + interval: 10 * time.Millisecond, + ctx: context.Background(), + } + + for _, o := range opts { + o(p) + } + + return p +} + +// Next polls the diode until data is available or until the context is done. +// If the context is done, then nil will be returned. +func (p *Poller) Next() GenericDataType { + for { + data, ok := p.Diode.TryNext() + if !ok { + if p.isDone() { + return nil + } + + time.Sleep(p.interval) + continue + } + return data + } +} + +func (p *Poller) isDone() bool { + select { + case <-p.ctx.Done(): + return true + default: + return false + } +} diff --git a/diode/internal/diodes/waiter.go b/diode/internal/diodes/waiter.go new file mode 100644 index 0000000..a3770ff --- /dev/null +++ b/diode/internal/diodes/waiter.go @@ -0,0 +1,83 @@ +package diodes + +import ( + "context" + "sync" +) + +// Waiter will use a conditional mutex to alert the reader to when data is +// available. +type Waiter struct { + Diode + mu sync.Mutex + c *sync.Cond + ctx context.Context +} + +// WaiterConfigOption can be used to setup the waiter. +type WaiterConfigOption func(*Waiter) + +// WithWaiterContext sets the context to cancel any retrieval (Next()). It +// will not change any results for adding data (Set()). Default is +// context.Background(). +func WithWaiterContext(ctx context.Context) WaiterConfigOption { + return WaiterConfigOption(func(c *Waiter) { + c.ctx = ctx + }) +} + +// NewWaiter returns a new Waiter that wraps the given diode. +func NewWaiter(d Diode, opts ...WaiterConfigOption) *Waiter { + w := new(Waiter) + w.Diode = d + w.c = sync.NewCond(&w.mu) + w.ctx = context.Background() + + for _, opt := range opts { + opt(w) + } + + go func() { + <-w.ctx.Done() + w.c.Broadcast() + }() + + return w +} + +// Set invokes the wrapped diode's Set with the given data and uses Broadcast +// to wake up any readers. +func (w *Waiter) Set(data GenericDataType) { + w.Diode.Set(data) + w.c.Broadcast() +} + +// Next returns the next data point on the wrapped diode. If there is not any +// new data, it will Wait for set to be called or the context to be done. +// If the context is done, then nil will be returned. +func (w *Waiter) Next() GenericDataType { + w.mu.Lock() + defer w.mu.Unlock() + + for { + data, ok := w.Diode.TryNext() + if !ok { + if w.isDone() { + return nil + } + + w.c.Wait() + continue + } + return data + } +} + +func (w *Waiter) isDone() bool { + select { + case <-w.ctx.Done(): + return true + default: + return false + } +}