From 8c1c6a0cd7c081326ce0743ea905b1d85a89e63d Mon Sep 17 00:00:00 2001 From: Olivier Poitrey Date: Tue, 20 Feb 2018 01:52:12 -0800 Subject: [PATCH] Add diode.Writer, a thread-safe, lock-free, non-blocking writer wrapper --- README.md | 23 +++++++++--- diode/diode.go | 88 +++++++++++++++++++++++++++++++++++++++++++++ diode/diode_test.go | 44 +++++++++++++++++++++++ 3 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 diode/diode.go create mode 100644 diode/diode_test.go diff --git a/README.md b/README.md index b387c5a..76c51b8 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ func main() { // Output: {"time":1516134303,"level":"debug","message":"hello world"} ``` > Note: The default log level for `log.Print` is *debug* ----- + ### Leveled Logging #### Simple Leveled Logging Example @@ -84,7 +84,9 @@ func main() { You can set the Global logging level to any of these options using the `SetGlobalLevel` function in the zerolog package, passing in one of the given constants above, e.g. `zerolog.InfoLevel` would be the "info" level. Whichever level is chosen, all logs with a level greater than or equal to that level will be written. To turn off logging entirely, pass the `zerolog.Disabled` constant. #### Setting Global Log Level + This example uses command-line flags to demonstrate various outputs depending on the chosen log level. + ```go package main @@ -158,7 +160,7 @@ func main() { // exit status 1 ``` > NOTE: Using `Msgf` generates one allocation even when the logger is disabled. ----------------- + ### Contextual Logging #### Fields can be added to log messages @@ -192,8 +194,6 @@ sublogger.Info().Msg("hello world") // Output: {"level":"info","time":1494567715,"message":"hello world","component":"foo"} ``` - - ### Pretty logging ```go @@ -245,6 +245,21 @@ log.Log().Str("foo","bar").Msg("") log.Logger = log.With().Str("foo", "bar").Logger() ``` +### Thread-safe, lock-free, non-blocking writer + +If your writer might be slow or not thread-safe and you need your log producers to never get slowed down by a slow writer, you can use a `diode.Writer` as follow: + +```go +d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) { + fmt.Printf("Dropped %d messages\n", missed) +})) +w := diode.NewWriter(os.Stdout, d, 10*time.Millisecond) +log := zerolog.New(w) +log.Print("test") +``` + +You will need to install `code.cloudfoundry.org/go-diodes` to use this feature. + ### Log Sampling ```go diff --git a/diode/diode.go b/diode/diode.go new file mode 100644 index 0000000..377a523 --- /dev/null +++ b/diode/diode.go @@ -0,0 +1,88 @@ +// Package diode provides a thread-safe, lock-free, non-blocking io.Writer +// wrapper. +package diode + +import ( + "context" + "io" + "sync" + "time" + + diodes "code.cloudfoundry.org/go-diodes" +) + +var bufPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 500) + }, +} + +// Writer is a io.Writer wrapper that uses a diode to make Write lock-free, +// non-blocking and thread safe. +type Writer struct { + w io.Writer + d *diodes.ManyToOne + p *diodes.Poller + c context.CancelFunc + done chan struct{} +} + +// NewWriter creates a writer wrapping w with a many-to-one diode in order to +// never block log producers and drop events if the writer can't keep up with +// the flow of data. +// +// Use a diode.Writer when +// +// d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) { +// log.Printf("Dropped %d messages", missed) +// })) +// w := diode.NewWriter(w, d, 10 * time.Millisecond) +// log := zerolog.New(w) +// +// See code.cloudfoundry.org/go-diodes for more info on diode. +func NewWriter(w io.Writer, manyToOneDiode *diodes.ManyToOne, poolInterval time.Duration) Writer { + ctx, cancel := context.WithCancel(context.Background()) + dw := Writer{ + w: w, + d: manyToOneDiode, + p: diodes.NewPoller(manyToOneDiode, + diodes.WithPollingInterval(poolInterval), + diodes.WithPollingContext(ctx)), + c: cancel, + done: make(chan struct{}), + } + go dw.poll() + return dw +} + +func (dw Writer) Write(p []byte) (n int, err error) { + // p is pooled in zerolog so we can't hold it passed this call, hence the + // copy. + p = append(bufPool.Get().([]byte), p...) + dw.d.Set(diodes.GenericDataType(&p)) + return len(p), nil +} + +// Close releases the diode poller and call Close on the wrapped writer if +// io.Closer is implemented. +func (dw Writer) Close() error { + dw.c() + <-dw.done + if w, ok := dw.w.(io.Closer); ok { + return w.Close() + } + return nil +} + +func (dw Writer) poll() { + defer close(dw.done) + for { + d := dw.p.Next() + if d == nil { + return + } + p := *(*[]byte)(d) + dw.w.Write(p) + bufPool.Put(p[:0]) + } +} diff --git a/diode/diode_test.go b/diode/diode_test.go new file mode 100644 index 0000000..f89ccf3 --- /dev/null +++ b/diode/diode_test.go @@ -0,0 +1,44 @@ +package diode_test + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "testing" + "time" + + diodes "code.cloudfoundry.org/go-diodes" + "github.com/rs/zerolog" + "github.com/rs/zerolog/diode" +) + +func ExampleNewWriter() { + d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) { + fmt.Printf("Dropped %d messages\n", missed) + })) + w := diode.NewWriter(os.Stdout, d, 10*time.Millisecond) + log := zerolog.New(w) + log.Print("test") + + w.Close() + + // Output: {"level":"debug","message":"test"} +} + +func Benchmark(b *testing.B) { + log.SetOutput(ioutil.Discard) + defer log.SetOutput(os.Stderr) + d := diodes.NewManyToOne(100000, nil) + w := diode.NewWriter(ioutil.Discard, d, 10*time.Millisecond) + log := zerolog.New(w) + defer w.Close() + + b.SetParallelism(1000) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + log.Print("test") + } + }) + +}