From 01ccdb2087dc1be6e85d02852ed7b04226754b63 Mon Sep 17 00:00:00 2001 From: Azareal Date: Sun, 28 Jul 2019 13:46:19 +1000 Subject: [PATCH] simplify the requests, posts, topics, routes and systems counters --- common/counters/posts.go | 41 ++++++++++++++++++--------------- common/counters/requests.go | 45 ++++++++++++++++++++----------------- common/counters/routes.go | 43 ++++++++++++++++++----------------- common/counters/systems.go | 43 ++++++++++++++++++----------------- common/counters/topics.go | 45 ++++++++++++++++++++----------------- 5 files changed, 117 insertions(+), 100 deletions(-) diff --git a/common/counters/posts.go b/common/counters/posts.go index fa6cedd0..8a54c66c 100644 --- a/common/counters/posts.go +++ b/common/counters/posts.go @@ -6,6 +6,7 @@ import ( c "github.com/Azareal/Gosora/common" "github.com/Azareal/Gosora/query_gen" + "github.com/pkg/errors" ) var PostCounter *DefaultPostCounter @@ -19,40 +20,44 @@ type DefaultPostCounter struct { func NewPostCounter() (*DefaultPostCounter, error) { acc := qgen.NewAcc() - counter := &DefaultPostCounter{ + co := &DefaultPostCounter{ currentBucket: 0, insert: acc.Insert("postchunks").Columns("count, createdAt").Fields("?,UTC_TIMESTAMP()").Prepare(), } - c.AddScheduledFifteenMinuteTask(counter.Tick) - //c.AddScheduledSecondTask(counter.Tick) - c.AddShutdownTask(counter.Tick) - return counter, acc.FirstError() + c.AddScheduledFifteenMinuteTask(co.Tick) + //c.AddScheduledSecondTask(co.Tick) + c.AddShutdownTask(co.Tick) + return co, acc.FirstError() } -func (counter *DefaultPostCounter) Tick() (err error) { - var oldBucket = counter.currentBucket +func (co *DefaultPostCounter) Tick() (err error) { + oldBucket := co.currentBucket var nextBucket int64 // 0 - if counter.currentBucket == 0 { + if co.currentBucket == 0 { nextBucket = 1 } - atomic.AddInt64(&counter.buckets[oldBucket], counter.buckets[nextBucket]) - atomic.StoreInt64(&counter.buckets[nextBucket], 0) - atomic.StoreInt64(&counter.currentBucket, nextBucket) + atomic.AddInt64(&co.buckets[oldBucket], co.buckets[nextBucket]) + atomic.StoreInt64(&co.buckets[nextBucket], 0) + atomic.StoreInt64(&co.currentBucket, nextBucket) - var previousViewChunk = counter.buckets[oldBucket] - atomic.AddInt64(&counter.buckets[oldBucket], -previousViewChunk) - return counter.insertChunk(previousViewChunk) + previousViewChunk := co.buckets[oldBucket] + atomic.AddInt64(&co.buckets[oldBucket], -previousViewChunk) + err = co.insertChunk(previousViewChunk) + if err != nil { + return errors.Wrap(errors.WithStack(err),"post counter") + } + return nil } -func (counter *DefaultPostCounter) Bump() { - atomic.AddInt64(&counter.buckets[counter.currentBucket], 1) +func (co *DefaultPostCounter) Bump() { + atomic.AddInt64(&co.buckets[co.currentBucket], 1) } -func (counter *DefaultPostCounter) insertChunk(count int64) error { +func (co *DefaultPostCounter) insertChunk(count int64) error { if count == 0 { return nil } c.DebugLogf("Inserting a postchunk with a count of %d", count) - _, err := counter.insert.Exec(count) + _, err := co.insert.Exec(count) return err } diff --git a/common/counters/requests.go b/common/counters/requests.go index a3a08de5..5da9d70d 100644 --- a/common/counters/requests.go +++ b/common/counters/requests.go @@ -4,8 +4,9 @@ import ( "database/sql" "sync/atomic" - "github.com/Azareal/Gosora/common" + c "github.com/Azareal/Gosora/common" qgen "github.com/Azareal/Gosora/query_gen" + "github.com/pkg/errors" ) // TODO: Rename this? @@ -20,40 +21,44 @@ type DefaultViewCounter struct { } func NewGlobalViewCounter(acc *qgen.Accumulator) (*DefaultViewCounter, error) { - counter := &DefaultViewCounter{ + co := &DefaultViewCounter{ currentBucket: 0, insert: acc.Insert("viewchunks").Columns("count, createdAt, route").Fields("?,UTC_TIMESTAMP(),''").Prepare(), } - common.AddScheduledFifteenMinuteTask(counter.Tick) // This is run once every fifteen minutes to match the frequency of the RouteViewCounter - //common.AddScheduledSecondTask(counter.Tick) - common.AddShutdownTask(counter.Tick) - return counter, acc.FirstError() + c.AddScheduledFifteenMinuteTask(co.Tick) // This is run once every fifteen minutes to match the frequency of the RouteViewCounter + //c.AddScheduledSecondTask(co.Tick) + c.AddShutdownTask(co.Tick) + return co, acc.FirstError() } -func (counter *DefaultViewCounter) Tick() (err error) { - var oldBucket = counter.currentBucket +func (co *DefaultViewCounter) Tick() (err error) { + oldBucket := co.currentBucket var nextBucket int64 // 0 - if counter.currentBucket == 0 { + if co.currentBucket == 0 { nextBucket = 1 } - atomic.AddInt64(&counter.buckets[oldBucket], counter.buckets[nextBucket]) - atomic.StoreInt64(&counter.buckets[nextBucket], 0) - atomic.StoreInt64(&counter.currentBucket, nextBucket) + atomic.AddInt64(&co.buckets[oldBucket], co.buckets[nextBucket]) + atomic.StoreInt64(&co.buckets[nextBucket], 0) + atomic.StoreInt64(&co.currentBucket, nextBucket) - var previousViewChunk = counter.buckets[oldBucket] - atomic.AddInt64(&counter.buckets[oldBucket], -previousViewChunk) - return counter.insertChunk(previousViewChunk) + previousViewChunk := co.buckets[oldBucket] + atomic.AddInt64(&co.buckets[oldBucket], -previousViewChunk) + err = co.insertChunk(previousViewChunk) + if err != nil { + return errors.Wrap(errors.WithStack(err), "req counter") + } + return nil } -func (counter *DefaultViewCounter) Bump() { - atomic.AddInt64(&counter.buckets[counter.currentBucket], 1) +func (co *DefaultViewCounter) Bump() { + atomic.AddInt64(&co.buckets[co.currentBucket], 1) } -func (counter *DefaultViewCounter) insertChunk(count int64) error { +func (co *DefaultViewCounter) insertChunk(count int64) error { if count == 0 { return nil } - common.DebugLogf("Inserting a viewchunk with a count of %d", count) - _, err := counter.insert.Exec(count) + c.DebugLogf("Inserting a vchunk with a count of %d", count) + _, err := co.insert.Exec(count) return err } diff --git a/common/counters/routes.go b/common/counters/routes.go index 8f7df60c..c9bddbb1 100644 --- a/common/counters/routes.go +++ b/common/counters/routes.go @@ -3,8 +3,9 @@ package counters import ( "database/sql" - "github.com/Azareal/Gosora/common" + c "github.com/Azareal/Gosora/common" qgen "github.com/Azareal/Gosora/query_gen" + "github.com/pkg/errors" ) var RouteViewCounter *DefaultRouteViewCounter @@ -16,53 +17,53 @@ type DefaultRouteViewCounter struct { } func NewDefaultRouteViewCounter(acc *qgen.Accumulator) (*DefaultRouteViewCounter, error) { - var routeBuckets = make([]*RWMutexCounterBucket, len(routeMapEnum)) + routeBuckets := make([]*RWMutexCounterBucket, len(routeMapEnum)) for bucketID, _ := range routeBuckets { routeBuckets[bucketID] = &RWMutexCounterBucket{counter: 0} } - counter := &DefaultRouteViewCounter{ + co := &DefaultRouteViewCounter{ buckets: routeBuckets, insert: acc.Insert("viewchunks").Columns("count, createdAt, route").Fields("?,UTC_TIMESTAMP(),?").Prepare(), } - common.AddScheduledFifteenMinuteTask(counter.Tick) // There could be a lot of routes, so we don't want to be running this every second - //common.AddScheduledSecondTask(counter.Tick) - common.AddShutdownTask(counter.Tick) - return counter, acc.FirstError() + c.AddScheduledFifteenMinuteTask(co.Tick) // There could be a lot of routes, so we don't want to be running this every second + //c.AddScheduledSecondTask(co.Tick) + c.AddShutdownTask(co.Tick) + return co, acc.FirstError() } -func (counter *DefaultRouteViewCounter) Tick() error { - for routeID, routeBucket := range counter.buckets { +func (co *DefaultRouteViewCounter) Tick() error { + for routeID, routeBucket := range co.buckets { var count int routeBucket.RLock() count = routeBucket.counter routeBucket.counter = 0 routeBucket.RUnlock() - err := counter.insertChunk(count, routeID) // TODO: Bulk insert for speed? + err := co.insertChunk(count, routeID) // TODO: Bulk insert for speed? if err != nil { - return err + return errors.Wrap(errors.WithStack(err), "route counter") } } return nil } -func (counter *DefaultRouteViewCounter) insertChunk(count int, route int) error { +func (co *DefaultRouteViewCounter) insertChunk(count int, route int) error { if count == 0 { return nil } - var routeName = reverseRouteMapEnum[route] - common.DebugLogf("Inserting a viewchunk with a count of %d for route %s (%d)", count, routeName, route) - _, err := counter.insert.Exec(count, routeName) + routeName := reverseRouteMapEnum[route] + c.DebugLogf("Inserting a vchunk with a count of %d for route %s (%d)", count, routeName, route) + _, err := co.insert.Exec(count, routeName) return err } -func (counter *DefaultRouteViewCounter) Bump(route int) { +func (co *DefaultRouteViewCounter) Bump(route int) { // TODO: Test this check - common.DebugDetail("counter.buckets[", route, "]: ", counter.buckets[route]) - if len(counter.buckets) <= route || route < 0 { + c.DebugDetail("co.buckets[", route, "]: ", co.buckets[route]) + if len(co.buckets) <= route || route < 0 { return } - counter.buckets[route].Lock() - counter.buckets[route].counter++ - counter.buckets[route].Unlock() + co.buckets[route].Lock() + co.buckets[route].counter++ + co.buckets[route].Unlock() } diff --git a/common/counters/systems.go b/common/counters/systems.go index 163dc0e9..f515b944 100644 --- a/common/counters/systems.go +++ b/common/counters/systems.go @@ -3,8 +3,9 @@ package counters import ( "database/sql" - "github.com/Azareal/Gosora/common" - "github.com/Azareal/Gosora/query_gen" + c "github.com/Azareal/Gosora/common" + qgen "github.com/Azareal/Gosora/query_gen" + "github.com/pkg/errors" ) var OSViewCounter *DefaultOSViewCounter @@ -19,49 +20,49 @@ func NewDefaultOSViewCounter(acc *qgen.Accumulator) (*DefaultOSViewCounter, erro for bucketID, _ := range osBuckets { osBuckets[bucketID] = &RWMutexCounterBucket{counter: 0} } - counter := &DefaultOSViewCounter{ + co := &DefaultOSViewCounter{ buckets: osBuckets, insert: acc.Insert("viewchunks_systems").Columns("count, createdAt, system").Fields("?,UTC_TIMESTAMP(),?").Prepare(), } - common.AddScheduledFifteenMinuteTask(counter.Tick) - //common.AddScheduledSecondTask(counter.Tick) - common.AddShutdownTask(counter.Tick) - return counter, acc.FirstError() + c.AddScheduledFifteenMinuteTask(co.Tick) + //c.AddScheduledSecondTask(co.Tick) + c.AddShutdownTask(co.Tick) + return co, acc.FirstError() } -func (counter *DefaultOSViewCounter) Tick() error { - for id, bucket := range counter.buckets { +func (co *DefaultOSViewCounter) Tick() error { + for id, bucket := range co.buckets { var count int bucket.RLock() count = bucket.counter bucket.counter = 0 // TODO: Add a SetZero method to reduce the amount of duplicate code between the OS and agent counters? bucket.RUnlock() - err := counter.insertChunk(count, id) // TODO: Bulk insert for speed? + err := co.insertChunk(count, id) // TODO: Bulk insert for speed? if err != nil { - return err + return errors.Wrap(errors.WithStack(err), "system counter") } } return nil } -func (counter *DefaultOSViewCounter) insertChunk(count int, os int) error { +func (co *DefaultOSViewCounter) insertChunk(count int, os int) error { if count == 0 { return nil } - var osName = reverseOSMapEnum[os] - common.DebugLogf("Inserting a viewchunk with a count of %d for OS %s (%d)", count, osName, os) - _, err := counter.insert.Exec(count, osName) + osName := reverseOSMapEnum[os] + c.DebugLogf("Inserting a vchunk with a count of %d for OS %s (%d)", count, osName, os) + _, err := co.insert.Exec(count, osName) return err } -func (counter *DefaultOSViewCounter) Bump(id int) { +func (co *DefaultOSViewCounter) Bump(id int) { // TODO: Test this check - common.DebugDetail("counter.buckets[", id, "]: ", counter.buckets[id]) - if len(counter.buckets) <= id || id < 0 { + c.DebugDetail("co.buckets[", id, "]: ", co.buckets[id]) + if len(co.buckets) <= id || id < 0 { return } - counter.buckets[id].Lock() - counter.buckets[id].counter++ - counter.buckets[id].Unlock() + co.buckets[id].Lock() + co.buckets[id].counter++ + co.buckets[id].Unlock() } diff --git a/common/counters/topics.go b/common/counters/topics.go index f72908fb..a580eefc 100644 --- a/common/counters/topics.go +++ b/common/counters/topics.go @@ -4,8 +4,9 @@ import ( "database/sql" "sync/atomic" - "github.com/Azareal/Gosora/common" + c "github.com/Azareal/Gosora/common" "github.com/Azareal/Gosora/query_gen" + "github.com/pkg/errors" ) var TopicCounter *DefaultTopicCounter @@ -19,40 +20,44 @@ type DefaultTopicCounter struct { func NewTopicCounter() (*DefaultTopicCounter, error) { acc := qgen.NewAcc() - counter := &DefaultTopicCounter{ + co := &DefaultTopicCounter{ currentBucket: 0, insert: acc.Insert("topicchunks").Columns("count, createdAt").Fields("?,UTC_TIMESTAMP()").Prepare(), } - common.AddScheduledFifteenMinuteTask(counter.Tick) - //common.AddScheduledSecondTask(counter.Tick) - common.AddShutdownTask(counter.Tick) - return counter, acc.FirstError() + c.AddScheduledFifteenMinuteTask(co.Tick) + //c.AddScheduledSecondTask(co.Tick) + c.AddShutdownTask(co.Tick) + return co, acc.FirstError() } -func (counter *DefaultTopicCounter) Tick() (err error) { - var oldBucket = counter.currentBucket +func (co *DefaultTopicCounter) Tick() (err error) { + oldBucket := co.currentBucket var nextBucket int64 // 0 - if counter.currentBucket == 0 { + if co.currentBucket == 0 { nextBucket = 1 } - atomic.AddInt64(&counter.buckets[oldBucket], counter.buckets[nextBucket]) - atomic.StoreInt64(&counter.buckets[nextBucket], 0) - atomic.StoreInt64(&counter.currentBucket, nextBucket) + atomic.AddInt64(&co.buckets[oldBucket], co.buckets[nextBucket]) + atomic.StoreInt64(&co.buckets[nextBucket], 0) + atomic.StoreInt64(&co.currentBucket, nextBucket) - var previousViewChunk = counter.buckets[oldBucket] - atomic.AddInt64(&counter.buckets[oldBucket], -previousViewChunk) - return counter.insertChunk(previousViewChunk) + previousViewChunk := co.buckets[oldBucket] + atomic.AddInt64(&co.buckets[oldBucket], -previousViewChunk) + err = co.insertChunk(previousViewChunk) + if err != nil { + return errors.Wrap(errors.WithStack(err),"topics counter") + } + return nil } -func (counter *DefaultTopicCounter) Bump() { - atomic.AddInt64(&counter.buckets[counter.currentBucket], 1) +func (co *DefaultTopicCounter) Bump() { + atomic.AddInt64(&co.buckets[co.currentBucket], 1) } -func (counter *DefaultTopicCounter) insertChunk(count int64) error { +func (co *DefaultTopicCounter) insertChunk(count int64) error { if count == 0 { return nil } - common.DebugLogf("Inserting a topicchunk with a count of %d", count) - _, err := counter.insert.Exec(count) + c.DebugLogf("Inserting a topicchunk with a count of %d", count) + _, err := co.insert.Exec(count) return err }