From 04fdf4c318533d5142071b9a1054dee2e2c9aff2 Mon Sep 17 00:00:00 2001 From: Azareal Date: Tue, 25 Feb 2020 18:12:54 +1000 Subject: [PATCH] optimise agent and system counters --- common/counters/agents.go | 34 ++++++++++++---------------------- common/counters/systems.go | 26 ++++++++------------------ 2 files changed, 20 insertions(+), 40 deletions(-) diff --git a/common/counters/agents.go b/common/counters/agents.go index 30940ce9..5526ea40 100644 --- a/common/counters/agents.go +++ b/common/counters/agents.go @@ -2,6 +2,7 @@ package counters import ( "database/sql" + "sync/atomic" c "github.com/Azareal/Gosora/common" qgen "github.com/Azareal/Gosora/query_gen" @@ -11,18 +12,14 @@ import ( var AgentViewCounter *DefaultAgentViewCounter type DefaultAgentViewCounter struct { - agentBuckets []*RWMutexCounterBucket //[AgentID]count - insert *sql.Stmt + buckets []int64 //[AgentID]count + insert *sql.Stmt } func NewDefaultAgentViewCounter(acc *qgen.Accumulator) (*DefaultAgentViewCounter, error) { - var agentBuckets = make([]*RWMutexCounterBucket, len(agentMapEnum)) - for bucketID, _ := range agentBuckets { - agentBuckets[bucketID] = &RWMutexCounterBucket{counter: 0} - } co := &DefaultAgentViewCounter{ - agentBuckets: agentBuckets, - insert: acc.Insert("viewchunks_agents").Columns("count, createdAt, browser").Fields("?,UTC_TIMESTAMP(),?").Prepare(), + buckets: make([]int64, len(agentMapEnum)), + insert: acc.Insert("viewchunks_agents").Columns("count,createdAt,browser").Fields("?,UTC_TIMESTAMP(),?").Prepare(), } c.AddScheduledFifteenMinuteTask(co.Tick) //c.AddScheduledSecondTask(co.Tick) @@ -31,14 +28,9 @@ func NewDefaultAgentViewCounter(acc *qgen.Accumulator) (*DefaultAgentViewCounter } func (co *DefaultAgentViewCounter) Tick() error { - for agentID, agentBucket := range co.agentBuckets { - var count int - agentBucket.RLock() - count = agentBucket.counter - agentBucket.counter = 0 - agentBucket.RUnlock() - - err := co.insertChunk(count, agentID) // TODO: Bulk insert for speed? + for id, _ := range co.buckets { + count := atomic.SwapInt64(&co.buckets[id], 0) + err := co.insertChunk(count, id) // TODO: Bulk insert for speed? if err != nil { return errors.Wrap(errors.WithStack(err), "agent counter") } @@ -46,7 +38,7 @@ func (co *DefaultAgentViewCounter) Tick() error { return nil } -func (co *DefaultAgentViewCounter) insertChunk(count int, agent int) error { +func (co *DefaultAgentViewCounter) insertChunk(count int64, agent int) error { if count == 0 { return nil } @@ -58,11 +50,9 @@ func (co *DefaultAgentViewCounter) insertChunk(count int, agent int) error { func (co *DefaultAgentViewCounter) Bump(agent int) { // TODO: Test this check - c.DebugDetail("co.agentBuckets[", agent, "]: ", co.agentBuckets[agent]) - if len(co.agentBuckets) <= agent || agent < 0 { + c.DebugDetail("co.buckets[", agent, "]: ", co.buckets[agent]) + if len(co.buckets) <= agent || agent < 0 { return } - co.agentBuckets[agent].Lock() - co.agentBuckets[agent].counter++ - co.agentBuckets[agent].Unlock() + atomic.AddInt64(&co.buckets[agent], 1) } diff --git a/common/counters/systems.go b/common/counters/systems.go index f515b944..42e963e3 100644 --- a/common/counters/systems.go +++ b/common/counters/systems.go @@ -2,6 +2,7 @@ package counters import ( "database/sql" + "sync/atomic" c "github.com/Azareal/Gosora/common" qgen "github.com/Azareal/Gosora/query_gen" @@ -11,18 +12,14 @@ import ( var OSViewCounter *DefaultOSViewCounter type DefaultOSViewCounter struct { - buckets []*RWMutexCounterBucket //[OSID]count + buckets []int64 //[OSID]count insert *sql.Stmt } func NewDefaultOSViewCounter(acc *qgen.Accumulator) (*DefaultOSViewCounter, error) { - var osBuckets = make([]*RWMutexCounterBucket, len(osMapEnum)) - for bucketID, _ := range osBuckets { - osBuckets[bucketID] = &RWMutexCounterBucket{counter: 0} - } co := &DefaultOSViewCounter{ - buckets: osBuckets, - insert: acc.Insert("viewchunks_systems").Columns("count, createdAt, system").Fields("?,UTC_TIMESTAMP(),?").Prepare(), + buckets: make([]int64, len(osMapEnum)), + insert: acc.Insert("viewchunks_systems").Columns("count,createdAt,system").Fields("?,UTC_TIMESTAMP(),?").Prepare(), } c.AddScheduledFifteenMinuteTask(co.Tick) //c.AddScheduledSecondTask(co.Tick) @@ -31,13 +28,8 @@ func NewDefaultOSViewCounter(acc *qgen.Accumulator) (*DefaultOSViewCounter, erro } func (co *DefaultOSViewCounter) Tick() error { - for id, bucket := range co.buckets { - var count int - bucket.RLock() - count = bucket.counter - bucket.counter = 0 // TODO: Add a SetZero method to reduce the amount of duplicate code between the OS and agent counters? - bucket.RUnlock() - + for id, _ := range co.buckets { + count := atomic.SwapInt64(&co.buckets[id], 0) err := co.insertChunk(count, id) // TODO: Bulk insert for speed? if err != nil { return errors.Wrap(errors.WithStack(err), "system counter") @@ -46,7 +38,7 @@ func (co *DefaultOSViewCounter) Tick() error { return nil } -func (co *DefaultOSViewCounter) insertChunk(count int, os int) error { +func (co *DefaultOSViewCounter) insertChunk(count int64, os int) error { if count == 0 { return nil } @@ -62,7 +54,5 @@ func (co *DefaultOSViewCounter) Bump(id int) { if len(co.buckets) <= id || id < 0 { return } - co.buckets[id].Lock() - co.buckets[id].counter++ - co.buckets[id].Unlock() + atomic.AddInt64(&co.buckets[id], 1) }