Embed the diode lib to avoid test dependencies

This commit introduces a breaking change in the diode API in order to
hide the diodes package interface. This removes a good number of
dependencies introduced by the test framework used by the diodes
package.
This commit is contained in:
Olivier Poitrey 2018-05-24 18:50:57 -07:00
parent 64faaa6980
commit 77db4b4f35
8 changed files with 438 additions and 18 deletions

View File

@ -8,7 +8,7 @@ import (
"sync" "sync"
"time" "time"
diodes "code.cloudfoundry.org/go-diodes" "github.com/rs/zerolog/diode/internal/diodes"
) )
var bufPool = &sync.Pool{ var bufPool = &sync.Pool{
@ -17,6 +17,8 @@ var bufPool = &sync.Pool{
}, },
} }
type Alerter func(missed int)
// Writer is a io.Writer wrapper that uses a diode to make Write lock-free, // Writer is a io.Writer wrapper that uses a diode to make Write lock-free,
// non-blocking and thread safe. // non-blocking and thread safe.
type Writer struct { type Writer struct {
@ -33,19 +35,19 @@ type Writer struct {
// //
// Use a diode.Writer when // Use a diode.Writer when
// //
// d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) { // w := diode.NewWriter(w, 1000, 10 * time.Millisecond, func(missed int) {
// log.Printf("Dropped %d messages", missed) // log.Printf("Dropped %d messages", missed)
// })) // })
// w := diode.NewWriter(w, d, 10 * time.Millisecond)
// log := zerolog.New(w) // log := zerolog.New(w)
// //
// See code.cloudfoundry.org/go-diodes for more info on diode. // See code.cloudfoundry.org/go-diodes for more info on diode.
func NewWriter(w io.Writer, manyToOneDiode *diodes.ManyToOne, poolInterval time.Duration) Writer { func NewWriter(w io.Writer, size int, poolInterval time.Duration, f Alerter) Writer {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
dw := Writer{ dw := Writer{
w: w, w: w,
d: manyToOneDiode, d: d,
p: diodes.NewPoller(manyToOneDiode, p: diodes.NewPoller(d,
diodes.WithPollingInterval(poolInterval), diodes.WithPollingInterval(poolInterval),
diodes.WithPollingContext(ctx)), diodes.WithPollingContext(ctx)),
c: cancel, c: cancel,

View File

@ -7,16 +7,14 @@ import (
"os" "os"
"time" "time"
diodes "code.cloudfoundry.org/go-diodes"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/diode" "github.com/rs/zerolog/diode"
) )
func ExampleNewWriter() { func ExampleNewWriter() {
d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) { w := diode.NewWriter(os.Stdout, 1000, 10*time.Millisecond, func(missed int) {
fmt.Printf("Dropped %d messages\n", missed) fmt.Printf("Dropped %d messages\n", missed)
})) })
w := diode.NewWriter(os.Stdout, d, 10*time.Millisecond)
log := zerolog.New(w) log := zerolog.New(w)
log.Print("test") log.Print("test")

View File

@ -9,18 +9,16 @@ import (
"testing" "testing"
"time" "time"
diodes "code.cloudfoundry.org/go-diodes"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/diode" "github.com/rs/zerolog/diode"
"github.com/rs/zerolog/internal/cbor" "github.com/rs/zerolog/internal/cbor"
) )
func TestNewWriter(t *testing.T) { func TestNewWriter(t *testing.T) {
d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) {
fmt.Printf("Dropped %d messages\n", missed)
}))
buf := bytes.Buffer{} buf := bytes.Buffer{}
w := diode.NewWriter(&buf, d, 10*time.Millisecond) w := diode.NewWriter(&buf, 1000, 10*time.Millisecond, func(missed int) {
fmt.Printf("Dropped %d messages\n", missed)
})
log := zerolog.New(w) log := zerolog.New(w)
log.Print("test") log.Print("test")
@ -35,8 +33,7 @@ func TestNewWriter(t *testing.T) {
func Benchmark(b *testing.B) { func Benchmark(b *testing.B) {
log.SetOutput(ioutil.Discard) log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr) defer log.SetOutput(os.Stderr)
d := diodes.NewManyToOne(100000, nil) w := diode.NewWriter(ioutil.Discard, 100000, 10*time.Millisecond, nil)
w := diode.NewWriter(ioutil.Discard, d, 10*time.Millisecond)
log := zerolog.New(w) log := zerolog.New(w)
defer w.Close() defer w.Close()

View File

@ -0,0 +1 @@
Copied from https://github.com/cloudfoundry/go-diodes to avoid test dependencies.

View File

@ -0,0 +1,130 @@
package diodes
import (
"log"
"sync/atomic"
"unsafe"
)
// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
// reader (go-routine A). It is not thread safe for multiple readers.
type ManyToOne struct {
buffer []unsafe.Pointer
writeIndex uint64
readIndex uint64
alerter Alerter
}
// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
// is optimzed for many writers (on go-routines B-n) and a single reader
// (on go-routine A). The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewManyToOne(size int, alerter Alerter) *ManyToOne {
if alerter == nil {
alerter = AlertFunc(func(int) {})
}
d := &ManyToOne{
buffer: make([]unsafe.Pointer, size),
alerter: alerter,
}
// Start write index at the value before 0
// to allow the first write to use AddUint64
// and still have a beginning index of 0
d.writeIndex = ^d.writeIndex
return d
}
// Set sets the data in the next slot of the ring buffer.
func (d *ManyToOne) Set(data GenericDataType) {
for {
writeIndex := atomic.AddUint64(&d.writeIndex, 1)
idx := writeIndex % uint64(len(d.buffer))
old := atomic.LoadPointer(&d.buffer[idx])
if old != nil &&
(*bucket)(old) != nil &&
(*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}
newBucket := &bucket{
data: data,
seq: writeIndex,
}
if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}
return
}
}
// TryNext will attempt to read from the next slot of the ring buffer.
// If there is not data available, it will return (nil, false).
func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
// Read a value from the ring buffer based on the readIndex.
idx := d.readIndex % uint64(len(d.buffer))
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))
// When the result is nil that means the writer has not had the
// opportunity to write a value into the diode. This value must be ignored
// and the read head must not increment.
if result == nil {
return nil, false
}
// When the seq value is less than the current read index that means a
// value was read from idx that was previously written but has since has
// been dropped. This value must be ignored and the read head must not
// increment.
//
// The simulation for this scenario assumes the fast forward occurred as
// detailed below.
//
// 5. The reader reads again getting seq 5. It then reads again expecting
// seq 6 but gets seq 2. This is a read of a stale value that was
// effectively "dropped" so the read fails and the read head stays put.
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
//
if result.seq < d.readIndex {
return nil, false
}
// When the seq value is greater than the current read index that means a
// value was read from idx that overwrote the value that was expected to
// be at this idx. This happens when the writer has lapped the reader. The
// reader needs to catch up to the writer so it moves its write head to
// the new seq, effectively dropping the messages that were not read in
// between the two values.
//
// Here is a simulation of this scenario:
//
// 1. Both the read and write heads start at 0.
// `| nil | nil | nil | nil |` r: 0, w: 0
// 2. The writer fills the buffer.
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
// 3. The writer laps the read head.
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
// this forces the reader to fast forward to 5.
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
//
if result.seq > d.readIndex {
dropped := result.seq - d.readIndex
d.readIndex = result.seq
d.alerter.Alert(int(dropped))
}
// Only increment read index if a regular read occurred (where seq was
// equal to readIndex) or a value was read that caused a fast forward
// (where seq was greater than readIndex).
//
d.readIndex++
return result.data, true
}

View File

@ -0,0 +1,129 @@
package diodes
import (
"sync/atomic"
"unsafe"
)
// GenericDataType is the data type the diodes operate on.
type GenericDataType unsafe.Pointer
// Alerter is used to report how many values were overwritten since the
// last write.
type Alerter interface {
Alert(missed int)
}
// AlertFunc type is an adapter to allow the use of ordinary functions as
// Alert handlers.
type AlertFunc func(missed int)
// Alert calls f(missed)
func (f AlertFunc) Alert(missed int) {
f(missed)
}
type bucket struct {
data GenericDataType
seq uint64 // seq is the recorded write index at the time of writing
}
// OneToOne diode is meant to be used by a single reader and a single writer.
// It is not thread safe if used otherwise.
type OneToOne struct {
buffer []unsafe.Pointer
writeIndex uint64
readIndex uint64
alerter Alerter
}
// NewOneToOne creates a new diode is meant to be used by a single reader and
// a single writer. The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewOneToOne(size int, alerter Alerter) *OneToOne {
if alerter == nil {
alerter = AlertFunc(func(int) {})
}
return &OneToOne{
buffer: make([]unsafe.Pointer, size),
alerter: alerter,
}
}
// Set sets the data in the next slot of the ring buffer.
func (d *OneToOne) Set(data GenericDataType) {
idx := d.writeIndex % uint64(len(d.buffer))
newBucket := &bucket{
data: data,
seq: d.writeIndex,
}
d.writeIndex++
atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket))
}
// TryNext will attempt to read from the next slot of the ring buffer.
// If there is no data available, it will return (nil, false).
func (d *OneToOne) TryNext() (data GenericDataType, ok bool) {
// Read a value from the ring buffer based on the readIndex.
idx := d.readIndex % uint64(len(d.buffer))
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))
// When the result is nil that means the writer has not had the
// opportunity to write a value into the diode. This value must be ignored
// and the read head must not increment.
if result == nil {
return nil, false
}
// When the seq value is less than the current read index that means a
// value was read from idx that was previously written but has since has
// been dropped. This value must be ignored and the read head must not
// increment.
//
// The simulation for this scenario assumes the fast forward occurred as
// detailed below.
//
// 5. The reader reads again getting seq 5. It then reads again expecting
// seq 6 but gets seq 2. This is a read of a stale value that was
// effectively "dropped" so the read fails and the read head stays put.
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
//
if result.seq < d.readIndex {
return nil, false
}
// When the seq value is greater than the current read index that means a
// value was read from idx that overwrote the value that was expected to
// be at this idx. This happens when the writer has lapped the reader. The
// reader needs to catch up to the writer so it moves its write head to
// the new seq, effectively dropping the messages that were not read in
// between the two values.
//
// Here is a simulation of this scenario:
//
// 1. Both the read and write heads start at 0.
// `| nil | nil | nil | nil |` r: 0, w: 0
// 2. The writer fills the buffer.
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
// 3. The writer laps the read head.
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
// this forces the reader to fast forward to 5.
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
//
if result.seq > d.readIndex {
dropped := result.seq - d.readIndex
d.readIndex = result.seq
d.alerter.Alert(int(dropped))
}
// Only increment read index if a regular read occurred (where seq was
// equal to readIndex) or a value was read that caused a fast forward
// (where seq was greater than readIndex).
d.readIndex++
return result.data, true
}

View File

@ -0,0 +1,80 @@
package diodes
import (
"context"
"time"
)
// Diode is any implementation of a diode.
type Diode interface {
Set(GenericDataType)
TryNext() (GenericDataType, bool)
}
// Poller will poll a diode until a value is available.
type Poller struct {
Diode
interval time.Duration
ctx context.Context
}
// PollerConfigOption can be used to setup the poller.
type PollerConfigOption func(*Poller)
// WithPollingInterval sets the interval at which the diode is queried
// for new data. The default is 10ms.
func WithPollingInterval(interval time.Duration) PollerConfigOption {
return PollerConfigOption(func(c *Poller) {
c.interval = interval
})
}
// WithPollingContext sets the context to cancel any retrieval (Next()). It
// will not change any results for adding data (Set()). Default is
// context.Background().
func WithPollingContext(ctx context.Context) PollerConfigOption {
return PollerConfigOption(func(c *Poller) {
c.ctx = ctx
})
}
// NewPoller returns a new Poller that wraps the given diode.
func NewPoller(d Diode, opts ...PollerConfigOption) *Poller {
p := &Poller{
Diode: d,
interval: 10 * time.Millisecond,
ctx: context.Background(),
}
for _, o := range opts {
o(p)
}
return p
}
// Next polls the diode until data is available or until the context is done.
// If the context is done, then nil will be returned.
func (p *Poller) Next() GenericDataType {
for {
data, ok := p.Diode.TryNext()
if !ok {
if p.isDone() {
return nil
}
time.Sleep(p.interval)
continue
}
return data
}
}
func (p *Poller) isDone() bool {
select {
case <-p.ctx.Done():
return true
default:
return false
}
}

View File

@ -0,0 +1,83 @@
package diodes
import (
"context"
"sync"
)
// Waiter will use a conditional mutex to alert the reader to when data is
// available.
type Waiter struct {
Diode
mu sync.Mutex
c *sync.Cond
ctx context.Context
}
// WaiterConfigOption can be used to setup the waiter.
type WaiterConfigOption func(*Waiter)
// WithWaiterContext sets the context to cancel any retrieval (Next()). It
// will not change any results for adding data (Set()). Default is
// context.Background().
func WithWaiterContext(ctx context.Context) WaiterConfigOption {
return WaiterConfigOption(func(c *Waiter) {
c.ctx = ctx
})
}
// NewWaiter returns a new Waiter that wraps the given diode.
func NewWaiter(d Diode, opts ...WaiterConfigOption) *Waiter {
w := new(Waiter)
w.Diode = d
w.c = sync.NewCond(&w.mu)
w.ctx = context.Background()
for _, opt := range opts {
opt(w)
}
go func() {
<-w.ctx.Done()
w.c.Broadcast()
}()
return w
}
// Set invokes the wrapped diode's Set with the given data and uses Broadcast
// to wake up any readers.
func (w *Waiter) Set(data GenericDataType) {
w.Diode.Set(data)
w.c.Broadcast()
}
// Next returns the next data point on the wrapped diode. If there is not any
// new data, it will Wait for set to be called or the context to be done.
// If the context is done, then nil will be returned.
func (w *Waiter) Next() GenericDataType {
w.mu.Lock()
defer w.mu.Unlock()
for {
data, ok := w.Diode.TryNext()
if !ok {
if w.isDone() {
return nil
}
w.c.Wait()
continue
}
return data
}
}
func (w *Waiter) isDone() bool {
select {
case <-w.ctx.Done():
return true
default:
return false
}
}