Added support for multiple subscribers mapped to a single user in the WebSockets Component. Untested.
This commit is contained in:
parent
b718dcdc17
commit
dfc60ec214
|
@ -3,14 +3,13 @@
|
||||||
/*
|
/*
|
||||||
*
|
*
|
||||||
* Gosora WebSocket Subsystem
|
* Gosora WebSocket Subsystem
|
||||||
* Copyright Azareal 2017 - 2018
|
* Copyright Azareal 2017 - 2019
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
package common
|
package common
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -24,401 +23,37 @@ import (
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WSUser struct {
|
|
||||||
conn *websocket.Conn
|
|
||||||
User *User
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Make this an interface?
|
|
||||||
type WsHubImpl struct {
|
|
||||||
// TODO: Implement some form of generics so we don't write as much odd-even sharding code
|
|
||||||
evenOnlineUsers map[int]*WSUser
|
|
||||||
oddOnlineUsers map[int]*WSUser
|
|
||||||
evenUserLock sync.RWMutex
|
|
||||||
oddUserLock sync.RWMutex
|
|
||||||
|
|
||||||
// TODO: Add sharding for this too?
|
|
||||||
OnlineGuests map[*WSUser]bool
|
|
||||||
GuestLock sync.RWMutex
|
|
||||||
|
|
||||||
lastTick time.Time
|
|
||||||
lastTopicList []*TopicsRow
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Disable WebSockets on high load? Add a Control Panel interface for disabling it?
|
// TODO: Disable WebSockets on high load? Add a Control Panel interface for disabling it?
|
||||||
var EnableWebsockets = true // Put this in caps for consistency with the other constants?
|
var EnableWebsockets = true // Put this in caps for consistency with the other constants?
|
||||||
|
|
||||||
// TODO: Rename this to WebSockets?
|
|
||||||
var WsHub WsHubImpl
|
|
||||||
var wsUpgrader = websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024}
|
var wsUpgrader = websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024}
|
||||||
var errWsNouser = errors.New("This user isn't connected via WebSockets")
|
var errWsNouser = errors.New("This user isn't connected via WebSockets")
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
adminStatsWatchers = make(map[*WSUser]bool)
|
adminStatsWatchers = make(map[*websocket.Conn]*WSUser)
|
||||||
topicListWatchers = make(map[*WSUser]bool)
|
topicListWatchers = make(map[*WSUser]bool)
|
||||||
// TODO: Do we really want to initialise this here instead of in main.go / general_test.go like the other things?
|
|
||||||
WsHub = WsHubImpl{
|
|
||||||
evenOnlineUsers: make(map[int]*WSUser),
|
|
||||||
oddOnlineUsers: make(map[int]*WSUser),
|
|
||||||
OnlineGuests: make(map[*WSUser]bool),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) Start() {
|
|
||||||
//fmt.Println("running hub.Start")
|
|
||||||
if Config.DisableLiveTopicList {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
hub.lastTick = time.Now()
|
|
||||||
AddScheduledSecondTask(hub.Tick)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type WsTopicList struct {
|
type WsTopicList struct {
|
||||||
Topics []*WsTopicsRow
|
Topics []*WsTopicsRow
|
||||||
}
|
}
|
||||||
|
|
||||||
// This Tick is seperate from the admin one, as we want to process that in parallel with this due to the blocking calls to gopsutil
|
|
||||||
func (hub *WsHubImpl) Tick() error {
|
|
||||||
//fmt.Println("running hub.Tick")
|
|
||||||
|
|
||||||
// Don't waste CPU time if nothing has happened
|
|
||||||
// TODO: Get a topic list method which strips stickies?
|
|
||||||
tList, _, _, err := TopicList.GetList(1)
|
|
||||||
if err != nil {
|
|
||||||
hub.lastTick = time.Now()
|
|
||||||
return err // TODO: Do we get ErrNoRows here?
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
hub.lastTick = time.Now()
|
|
||||||
hub.lastTopicList = tList
|
|
||||||
}()
|
|
||||||
if len(tList) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//fmt.Println("checking for changes")
|
|
||||||
// TODO: Optimise this by only sniffing the top non-sticky
|
|
||||||
// TODO: Optimise this by getting back an unsorted list so we don't have to hop around the stickies
|
|
||||||
// TODO: Add support for new stickies / replies to them
|
|
||||||
if len(tList) == len(hub.lastTopicList) {
|
|
||||||
var hasItem = false
|
|
||||||
for j, tItem := range tList {
|
|
||||||
if !tItem.Sticky {
|
|
||||||
if tItem.ID != hub.lastTopicList[j].ID || !tItem.LastReplyAt.Equal(hub.lastTopicList[j].LastReplyAt) {
|
|
||||||
hasItem = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !hasItem {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Implement this for guests too? Should be able to optimise it far better there due to them sharing the same permission set
|
|
||||||
// TODO: Be less aggressive with the locking, maybe use an array of sorts instead of hitting the main map every-time
|
|
||||||
topicListMutex.RLock()
|
|
||||||
if len(topicListWatchers) == 0 {
|
|
||||||
//fmt.Println("no watchers")
|
|
||||||
topicListMutex.RUnlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
//fmt.Println("found changes")
|
|
||||||
|
|
||||||
// Copy these over so we close this loop as fast as possible so we can release the read lock, especially if the group gets are backed by calls to the database
|
|
||||||
var groupIDs = make(map[int]bool)
|
|
||||||
var currentWatchers = make([]*WSUser, len(topicListWatchers))
|
|
||||||
var i = 0
|
|
||||||
for wsUser, _ := range topicListWatchers {
|
|
||||||
currentWatchers[i] = wsUser
|
|
||||||
groupIDs[wsUser.User.Group] = true
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
topicListMutex.RUnlock()
|
|
||||||
|
|
||||||
var groups = make(map[int]*Group)
|
|
||||||
var canSeeMap = make(map[string][]int)
|
|
||||||
for groupID, _ := range groupIDs {
|
|
||||||
group, err := Groups.Get(groupID)
|
|
||||||
if err != nil {
|
|
||||||
// TODO: Do we really want to halt all pushes for what is possibly just one user?
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
groups[group.ID] = group
|
|
||||||
|
|
||||||
var canSee = make([]byte, len(group.CanSee))
|
|
||||||
for i, item := range group.CanSee {
|
|
||||||
canSee[i] = byte(item)
|
|
||||||
}
|
|
||||||
canSeeMap[string(canSee)] = group.CanSee
|
|
||||||
}
|
|
||||||
|
|
||||||
var canSeeRenders = make(map[string][]byte)
|
|
||||||
for name, canSee := range canSeeMap {
|
|
||||||
topicList, forumList, _, err := TopicList.GetListByCanSee(canSee, 1)
|
|
||||||
if err != nil {
|
|
||||||
return err // TODO: Do we get ErrNoRows here?
|
|
||||||
}
|
|
||||||
if len(topicList) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
_ = forumList // Might use this later after we get the base feature working
|
|
||||||
|
|
||||||
//fmt.Println("canSeeItem")
|
|
||||||
if topicList[0].Sticky {
|
|
||||||
var lastSticky = 0
|
|
||||||
for i, row := range topicList {
|
|
||||||
if !row.Sticky {
|
|
||||||
lastSticky = i
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lastSticky == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
//fmt.Println("lastSticky: ", lastSticky)
|
|
||||||
//fmt.Println("before topicList: ", topicList)
|
|
||||||
topicList = topicList[lastSticky:]
|
|
||||||
//fmt.Println("after topicList: ", topicList)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Compare to previous tick to eliminate unnecessary work and data
|
|
||||||
var wsTopicList = make([]*WsTopicsRow, len(topicList))
|
|
||||||
for i, topicRow := range topicList {
|
|
||||||
wsTopicList[i] = topicRow.WebSockets()
|
|
||||||
}
|
|
||||||
|
|
||||||
outBytes, err := json.Marshal(&WsTopicList{wsTopicList})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
canSeeRenders[name] = outBytes
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Use MessagePack for additional speed?
|
|
||||||
//fmt.Println("writing to the clients")
|
|
||||||
for _, wsUser := range currentWatchers {
|
|
||||||
group := groups[wsUser.User.Group]
|
|
||||||
var canSee = make([]byte, len(group.CanSee))
|
|
||||||
for i, item := range group.CanSee {
|
|
||||||
canSee[i] = byte(item)
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := wsUser.conn.NextWriter(websocket.TextMessage)
|
|
||||||
if err != nil {
|
|
||||||
//fmt.Printf("werr for #%d: %s\n", wsUser.User.ID, err)
|
|
||||||
topicListMutex.Lock()
|
|
||||||
delete(topicListWatchers, wsUser)
|
|
||||||
topicListMutex.Unlock()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
//fmt.Println("writing to user #", wsUser.User.ID)
|
|
||||||
outBytes := canSeeRenders[string(canSee)]
|
|
||||||
//fmt.Println("outBytes: ", string(outBytes))
|
|
||||||
w.Write(outBytes)
|
|
||||||
w.Close()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) GuestCount() int {
|
|
||||||
defer hub.GuestLock.RUnlock()
|
|
||||||
hub.GuestLock.RLock()
|
|
||||||
return len(hub.OnlineGuests)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) UserCount() (count int) {
|
|
||||||
hub.evenUserLock.RLock()
|
|
||||||
count += len(hub.evenOnlineUsers)
|
|
||||||
hub.evenUserLock.RUnlock()
|
|
||||||
hub.oddUserLock.RLock()
|
|
||||||
count += len(hub.oddOnlineUsers)
|
|
||||||
hub.oddUserLock.RUnlock()
|
|
||||||
return count
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) broadcastMessage(msg string) error {
|
|
||||||
var userLoop = func(users map[int]*WSUser, mutex *sync.RWMutex) error {
|
|
||||||
defer mutex.RUnlock()
|
|
||||||
for _, wsUser := range users {
|
|
||||||
w, err := wsUser.conn.NextWriter(websocket.TextMessage)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, _ = w.Write([]byte(msg))
|
|
||||||
w.Close()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// TODO: Can we move this RLock inside the closure safely?
|
|
||||||
hub.evenUserLock.RLock()
|
|
||||||
err := userLoop(hub.evenOnlineUsers, &hub.evenUserLock)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hub.oddUserLock.RLock()
|
|
||||||
return userLoop(hub.oddOnlineUsers, &hub.oddUserLock)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) getUser(uid int) (wsUser *WSUser, err error) {
|
|
||||||
var ok bool
|
|
||||||
if uid%2 == 0 {
|
|
||||||
hub.evenUserLock.RLock()
|
|
||||||
wsUser, ok = hub.evenOnlineUsers[uid]
|
|
||||||
hub.evenUserLock.RUnlock()
|
|
||||||
} else {
|
|
||||||
hub.oddUserLock.RLock()
|
|
||||||
wsUser, ok = hub.oddOnlineUsers[uid]
|
|
||||||
hub.oddUserLock.RUnlock()
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
return nil, errWsNouser
|
|
||||||
}
|
|
||||||
return wsUser, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Warning: For efficiency, some of the *WSUsers may be nil pointers, DO NOT EXPORT
|
|
||||||
func (hub *WsHubImpl) getUsers(uids []int) (wsUsers []*WSUser, err error) {
|
|
||||||
if len(uids) == 0 {
|
|
||||||
return nil, errWsNouser
|
|
||||||
}
|
|
||||||
hub.evenUserLock.RLock()
|
|
||||||
// We don't want to keep a lock on this for too long, so we'll accept some nil pointers
|
|
||||||
for _, uid := range uids {
|
|
||||||
wsUsers = append(wsUsers, hub.evenOnlineUsers[uid])
|
|
||||||
}
|
|
||||||
hub.evenUserLock.RUnlock()
|
|
||||||
hub.oddUserLock.RLock()
|
|
||||||
// We don't want to keep a lock on this for too long, so we'll accept some nil pointers
|
|
||||||
for _, uid := range uids {
|
|
||||||
wsUsers = append(wsUsers, hub.oddOnlineUsers[uid])
|
|
||||||
}
|
|
||||||
hub.oddUserLock.RUnlock()
|
|
||||||
if len(wsUsers) == 0 {
|
|
||||||
return nil, errWsNouser
|
|
||||||
}
|
|
||||||
return wsUsers, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) SetUser(uid int, wsUser *WSUser) {
|
|
||||||
if uid%2 == 0 {
|
|
||||||
hub.evenUserLock.Lock()
|
|
||||||
hub.evenOnlineUsers[uid] = wsUser
|
|
||||||
hub.evenUserLock.Unlock()
|
|
||||||
} else {
|
|
||||||
hub.oddUserLock.Lock()
|
|
||||||
hub.oddOnlineUsers[uid] = wsUser
|
|
||||||
hub.oddUserLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) RemoveUser(uid int) {
|
|
||||||
if uid%2 == 0 {
|
|
||||||
hub.evenUserLock.Lock()
|
|
||||||
delete(hub.evenOnlineUsers, uid)
|
|
||||||
hub.evenUserLock.Unlock()
|
|
||||||
} else {
|
|
||||||
hub.oddUserLock.Lock()
|
|
||||||
delete(hub.oddOnlineUsers, uid)
|
|
||||||
hub.oddUserLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) pushMessage(targetUser int, msg string) error {
|
|
||||||
wsUser, err := hub.getUser(targetUser)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := wsUser.conn.NextWriter(websocket.TextMessage)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Write([]byte(msg))
|
|
||||||
w.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) pushAlert(targetUser int, asid int, event string, elementType string, actorID int, targetUserID int, elementID int) error {
|
|
||||||
wsUser, err := hub.getUser(targetUser)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
alert, err := BuildAlert(asid, event, elementType, actorID, targetUserID, elementID, *wsUser.User)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := wsUser.conn.NextWriter(websocket.TextMessage)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Write([]byte(alert))
|
|
||||||
_ = w.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hub *WsHubImpl) pushAlerts(users []int, asid int, event string, elementType string, actorID int, targetUserID int, elementID int) error {
|
|
||||||
wsUsers, err := hub.getUsers(users)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var errs []error
|
|
||||||
for _, wsUser := range wsUsers {
|
|
||||||
if wsUser == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
alert, err := BuildAlert(asid, event, elementType, actorID, targetUserID, elementID, *wsUser.User)
|
|
||||||
if err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := wsUser.conn.NextWriter(websocket.TextMessage)
|
|
||||||
if err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
w.Write([]byte(alert))
|
|
||||||
w.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return the first error
|
|
||||||
if len(errs) != 0 {
|
|
||||||
for _, err := range errs {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: How should we handle errors for this?
|
// TODO: How should we handle errors for this?
|
||||||
// TODO: Move this out of common?
|
// TODO: Move this out of common?
|
||||||
func RouteWebsockets(w http.ResponseWriter, r *http.Request, user User) RouteError {
|
func RouteWebsockets(w http.ResponseWriter, r *http.Request, user User) RouteError {
|
||||||
|
// TODO: Spit out a 500 instead of nil?
|
||||||
conn, err := wsUpgrader.Upgrade(w, r, nil)
|
conn, err := wsUpgrader.Upgrade(w, r, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
userptr, err := Users.Get(user.ID)
|
wsUser, err := WsHub.AddConn(user, conn)
|
||||||
if err != nil && err != ErrStoreCapacityOverflow {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
wsUser := &WSUser{conn, userptr}
|
|
||||||
if user.ID == 0 {
|
|
||||||
WsHub.GuestLock.Lock()
|
|
||||||
WsHub.OnlineGuests[wsUser] = true
|
|
||||||
WsHub.GuestLock.Unlock()
|
|
||||||
} else {
|
|
||||||
WsHub.SetUser(user.ID, wsUser)
|
|
||||||
}
|
|
||||||
|
|
||||||
//conn.SetReadLimit(/* put the max request size from earlier here? */)
|
//conn.SetReadLimit(/* put the max request size from earlier here? */)
|
||||||
//conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
//conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||||
var currentPage []byte
|
var currentPage string
|
||||||
for {
|
for {
|
||||||
_, message, err := conn.ReadMessage()
|
_, message, err := conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -428,30 +63,24 @@ func RouteWebsockets(w http.ResponseWriter, r *http.Request, user User) RouteErr
|
||||||
WsHub.GuestLock.Unlock()
|
WsHub.GuestLock.Unlock()
|
||||||
} else {
|
} else {
|
||||||
// TODO: Make sure the admin is removed from the admin stats list in the case that an error happens
|
// TODO: Make sure the admin is removed from the admin stats list in the case that an error happens
|
||||||
WsHub.RemoveUser(user.ID)
|
WsHub.RemoveConn(wsUser, conn)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
//log.Print("Message", message)
|
|
||||||
//log.Print("string(Message)", string(message))
|
|
||||||
messages := bytes.Split(message, []byte("\r"))
|
messages := bytes.Split(message, []byte("\r"))
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
//StoppedServer("Profile end") // A bit of code for me to profile the software
|
//StoppedServer("Profile end") // A bit of code for me to profile the software
|
||||||
//log.Print("Submessage", msg)
|
|
||||||
//log.Print("Submessage", string(msg))
|
|
||||||
if bytes.HasPrefix(msg, []byte("page ")) {
|
if bytes.HasPrefix(msg, []byte("page ")) {
|
||||||
msgblocks := bytes.SplitN(msg, []byte(" "), 2)
|
msgblocks := bytes.SplitN(msg, []byte(" "), 2)
|
||||||
if len(msgblocks) < 2 {
|
if len(msgblocks) < 2 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(msgblocks[1], currentPage) {
|
if !bytes.Equal(msgblocks[1], []byte(currentPage)) {
|
||||||
wsLeavePage(wsUser, currentPage)
|
wsLeavePage(wsUser, conn, currentPage)
|
||||||
currentPage = msgblocks[1]
|
currentPage = string(msgblocks[1])
|
||||||
//log.Print("Current Page:", currentPage)
|
wsPageResponses(wsUser, conn, currentPage)
|
||||||
//log.Print("Current Page:", string(currentPage))
|
|
||||||
wsPageResponses(wsUser, currentPage)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*if bytes.Equal(message,[]byte(`start-view`)) {
|
/*if bytes.Equal(message,[]byte(`start-view`)) {
|
||||||
|
@ -464,14 +93,12 @@ func RouteWebsockets(w http.ResponseWriter, r *http.Request, user User) RouteErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Use a map instead of a switch to make this more modular?
|
// TODO: Use a map instead of a switch to make this more modular?
|
||||||
func wsPageResponses(wsUser *WSUser, page []byte) {
|
func wsPageResponses(wsUser *WSUser, conn *websocket.Conn, page string) {
|
||||||
// TODO: Could do this more efficiently?
|
if page == "/" {
|
||||||
if string(page) == "/" {
|
page = Config.DefaultPath
|
||||||
page = []byte(Config.DefaultPath)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//fmt.Println("entering page: ", string(page))
|
switch page {
|
||||||
switch string(page) {
|
|
||||||
// Live Topic List is an experimental feature
|
// Live Topic List is an experimental feature
|
||||||
// TODO: Optimise this to reduce the amount of contention
|
// TODO: Optimise this to reduce the amount of contention
|
||||||
case "/topics/":
|
case "/topics/":
|
||||||
|
@ -479,43 +106,49 @@ func wsPageResponses(wsUser *WSUser, page []byte) {
|
||||||
topicListWatchers[wsUser] = true
|
topicListWatchers[wsUser] = true
|
||||||
topicListMutex.Unlock()
|
topicListMutex.Unlock()
|
||||||
case "/panel/":
|
case "/panel/":
|
||||||
|
if !wsUser.User.IsSuperMod {
|
||||||
|
return
|
||||||
|
}
|
||||||
// Listen for changes and inform the admins...
|
// Listen for changes and inform the admins...
|
||||||
adminStatsMutex.Lock()
|
adminStatsMutex.Lock()
|
||||||
watchers := len(adminStatsWatchers)
|
watchers := len(adminStatsWatchers)
|
||||||
adminStatsWatchers[wsUser] = true
|
adminStatsWatchers[conn] = wsUser
|
||||||
if watchers == 0 {
|
if watchers == 0 {
|
||||||
go adminStatsTicker()
|
go adminStatsTicker()
|
||||||
}
|
}
|
||||||
adminStatsMutex.Unlock()
|
adminStatsMutex.Unlock()
|
||||||
|
default:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
wsUser.SetPageForSocket(conn, page)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Use a map instead of a switch to make this more modular?
|
// TODO: Use a map instead of a switch to make this more modular?
|
||||||
func wsLeavePage(wsUser *WSUser, page []byte) {
|
func wsLeavePage(wsUser *WSUser, conn *websocket.Conn, page string) {
|
||||||
// TODO: Could do this more efficiently?
|
if page == "/" {
|
||||||
if string(page) == "/" {
|
page = Config.DefaultPath
|
||||||
page = []byte(Config.DefaultPath)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//fmt.Println("leaving page: ", string(page))
|
switch page {
|
||||||
switch string(page) {
|
|
||||||
// Live Topic List is an experimental feature
|
|
||||||
case "/topics/":
|
case "/topics/":
|
||||||
topicListMutex.Lock()
|
wsUser.FinalizePage("/topics/", func() {
|
||||||
delete(topicListWatchers, wsUser)
|
topicListMutex.Lock()
|
||||||
topicListMutex.Unlock()
|
delete(topicListWatchers, wsUser)
|
||||||
|
topicListMutex.Unlock()
|
||||||
|
})
|
||||||
case "/panel/":
|
case "/panel/":
|
||||||
adminStatsMutex.Lock()
|
adminStatsMutex.Lock()
|
||||||
delete(adminStatsWatchers, wsUser)
|
delete(adminStatsWatchers, conn)
|
||||||
adminStatsMutex.Unlock()
|
adminStatsMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
wsUser.SetPageForSocket(conn, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Abstract this
|
// TODO: Abstract this
|
||||||
// TODO: Use odd-even sharding
|
// TODO: Use odd-even sharding
|
||||||
var topicListWatchers map[*WSUser]bool
|
var topicListWatchers map[*WSUser]bool
|
||||||
var topicListMutex sync.RWMutex
|
var topicListMutex sync.RWMutex
|
||||||
var adminStatsWatchers map[*WSUser]bool
|
var adminStatsWatchers map[*websocket.Conn]*WSUser
|
||||||
var adminStatsMutex sync.RWMutex
|
var adminStatsMutex sync.RWMutex
|
||||||
|
|
||||||
func adminStatsTicker() {
|
func adminStatsTicker() {
|
||||||
|
@ -526,8 +159,7 @@ func adminStatsTicker() {
|
||||||
var lastTotonline = -1
|
var lastTotonline = -1
|
||||||
var lastCPUPerc = -1
|
var lastCPUPerc = -1
|
||||||
var lastAvailableRAM int64 = -1
|
var lastAvailableRAM int64 = -1
|
||||||
var noStatUpdates bool
|
var noStatUpdates, noRAMUpdates bool
|
||||||
var noRAMUpdates bool
|
|
||||||
|
|
||||||
var onlineColour, onlineGuestsColour, onlineUsersColour, cpustr, cpuColour, ramstr, ramColour string
|
var onlineColour, onlineGuestsColour, onlineUsersColour, cpustr, cpuColour, ramstr, ramColour string
|
||||||
var cpuerr, ramerr error
|
var cpuerr, ramerr error
|
||||||
|
@ -634,10 +266,10 @@ AdminStatLoop:
|
||||||
// Acquire a write lock for now, so we can handle the delete() case below and the read one simultaneously
|
// Acquire a write lock for now, so we can handle the delete() case below and the read one simultaneously
|
||||||
// TODO: Stop taking a write lock here if it isn't necessary
|
// TODO: Stop taking a write lock here if it isn't necessary
|
||||||
adminStatsMutex.Lock()
|
adminStatsMutex.Lock()
|
||||||
for watcher := range adminStatsWatchers {
|
for conn := range adminStatsWatchers {
|
||||||
w, err := watcher.conn.NextWriter(websocket.TextMessage)
|
w, err := conn.NextWriter(websocket.TextMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
delete(adminStatsWatchers, watcher)
|
delete(adminStatsWatchers, conn)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -672,7 +304,5 @@ AdminStatLoop:
|
||||||
lastTotonline = totonline
|
lastTotonline = totonline
|
||||||
lastCPUPerc = int(cpuPerc[0])
|
lastCPUPerc = int(cpuPerc[0])
|
||||||
lastAvailableRAM = int64(memres.Available)
|
lastAvailableRAM = int64(memres.Available)
|
||||||
|
|
||||||
//time.Sleep(time.Second)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,376 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: Rename this to WebSockets?
|
||||||
|
var WsHub WsHubImpl
|
||||||
|
|
||||||
|
// TODO: Make this an interface?
|
||||||
|
type WsHubImpl struct {
|
||||||
|
// TODO: Implement some form of generics so we don't write as much odd-even sharding code
|
||||||
|
evenOnlineUsers map[int]*WSUser
|
||||||
|
oddOnlineUsers map[int]*WSUser
|
||||||
|
evenUserLock sync.RWMutex
|
||||||
|
oddUserLock sync.RWMutex
|
||||||
|
|
||||||
|
// TODO: Add sharding for this too?
|
||||||
|
OnlineGuests map[*WSUser]bool
|
||||||
|
GuestLock sync.RWMutex
|
||||||
|
|
||||||
|
lastTick time.Time
|
||||||
|
lastTopicList []*TopicsRow
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// TODO: Do we really want to initialise this here instead of in main.go / general_test.go like the other things?
|
||||||
|
WsHub = WsHubImpl{
|
||||||
|
evenOnlineUsers: make(map[int]*WSUser),
|
||||||
|
oddOnlineUsers: make(map[int]*WSUser),
|
||||||
|
OnlineGuests: make(map[*WSUser]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) Start() {
|
||||||
|
if Config.DisableLiveTopicList {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
hub.lastTick = time.Now()
|
||||||
|
AddScheduledSecondTask(hub.Tick)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This Tick is seperate from the admin one, as we want to process that in parallel with this due to the blocking calls to gopsutil
|
||||||
|
func (hub *WsHubImpl) Tick() error {
|
||||||
|
// Don't waste CPU time if nothing has happened
|
||||||
|
// TODO: Get a topic list method which strips stickies?
|
||||||
|
tList, _, _, err := TopicList.GetList(1)
|
||||||
|
if err != nil {
|
||||||
|
hub.lastTick = time.Now()
|
||||||
|
return err // TODO: Do we get ErrNoRows here?
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
hub.lastTick = time.Now()
|
||||||
|
hub.lastTopicList = tList
|
||||||
|
}()
|
||||||
|
if len(tList) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Optimise this by only sniffing the top non-sticky
|
||||||
|
// TODO: Optimise this by getting back an unsorted list so we don't have to hop around the stickies
|
||||||
|
// TODO: Add support for new stickies / replies to them
|
||||||
|
if len(tList) == len(hub.lastTopicList) {
|
||||||
|
var hasItem = false
|
||||||
|
for j, tItem := range tList {
|
||||||
|
if !tItem.Sticky {
|
||||||
|
if tItem.ID != hub.lastTopicList[j].ID || !tItem.LastReplyAt.Equal(hub.lastTopicList[j].LastReplyAt) {
|
||||||
|
hasItem = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !hasItem {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Implement this for guests too? Should be able to optimise it far better there due to them sharing the same permission set
|
||||||
|
// TODO: Be less aggressive with the locking, maybe use an array of sorts instead of hitting the main map every-time
|
||||||
|
topicListMutex.RLock()
|
||||||
|
if len(topicListWatchers) == 0 {
|
||||||
|
topicListMutex.RUnlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy these over so we close this loop as fast as possible so we can release the read lock, especially if the group gets are backed by calls to the database
|
||||||
|
var groupIDs = make(map[int]bool)
|
||||||
|
var currentWatchers = make([]*WSUser, len(topicListWatchers))
|
||||||
|
var i = 0
|
||||||
|
for wsUser, _ := range topicListWatchers {
|
||||||
|
currentWatchers[i] = wsUser
|
||||||
|
groupIDs[wsUser.User.Group] = true
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
topicListMutex.RUnlock()
|
||||||
|
|
||||||
|
var groups = make(map[int]*Group)
|
||||||
|
var canSeeMap = make(map[string][]int)
|
||||||
|
for groupID, _ := range groupIDs {
|
||||||
|
group, err := Groups.Get(groupID)
|
||||||
|
if err != nil {
|
||||||
|
// TODO: Do we really want to halt all pushes for what is possibly just one user?
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
groups[group.ID] = group
|
||||||
|
|
||||||
|
var canSee = make([]byte, len(group.CanSee))
|
||||||
|
for i, item := range group.CanSee {
|
||||||
|
canSee[i] = byte(item)
|
||||||
|
}
|
||||||
|
canSeeMap[string(canSee)] = group.CanSee
|
||||||
|
}
|
||||||
|
|
||||||
|
var canSeeRenders = make(map[string][]byte)
|
||||||
|
for name, canSee := range canSeeMap {
|
||||||
|
topicList, forumList, _, err := TopicList.GetListByCanSee(canSee, 1)
|
||||||
|
if err != nil {
|
||||||
|
return err // TODO: Do we get ErrNoRows here?
|
||||||
|
}
|
||||||
|
if len(topicList) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_ = forumList // Might use this later after we get the base feature working
|
||||||
|
|
||||||
|
//fmt.Println("canSeeItem")
|
||||||
|
if topicList[0].Sticky {
|
||||||
|
var lastSticky = 0
|
||||||
|
for i, row := range topicList {
|
||||||
|
if !row.Sticky {
|
||||||
|
lastSticky = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastSticky == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//fmt.Println("lastSticky: ", lastSticky)
|
||||||
|
//fmt.Println("before topicList: ", topicList)
|
||||||
|
topicList = topicList[lastSticky:]
|
||||||
|
//fmt.Println("after topicList: ", topicList)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Compare to previous tick to eliminate unnecessary work and data
|
||||||
|
var wsTopicList = make([]*WsTopicsRow, len(topicList))
|
||||||
|
for i, topicRow := range topicList {
|
||||||
|
wsTopicList[i] = topicRow.WebSockets()
|
||||||
|
}
|
||||||
|
|
||||||
|
outBytes, err := json.Marshal(&WsTopicList{wsTopicList})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
canSeeRenders[name] = outBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Use MessagePack for additional speed?
|
||||||
|
//fmt.Println("writing to the clients")
|
||||||
|
for _, wsUser := range currentWatchers {
|
||||||
|
group := groups[wsUser.User.Group]
|
||||||
|
var canSee = make([]byte, len(group.CanSee))
|
||||||
|
for i, item := range group.CanSee {
|
||||||
|
canSee[i] = byte(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
//fmt.Println("writing to user #", wsUser.User.ID)
|
||||||
|
outBytes := canSeeRenders[string(canSee)]
|
||||||
|
//fmt.Println("outBytes: ", string(outBytes))
|
||||||
|
err := wsUser.WriteToPageBytes(outBytes, "/topics/")
|
||||||
|
if err == ErrNoneOnPage {
|
||||||
|
//fmt.Printf("werr for #%d: %s\n", wsUser.User.ID, err)
|
||||||
|
wsUser.FinalizePage("/topics/", func() {
|
||||||
|
topicListMutex.Lock()
|
||||||
|
delete(topicListWatchers, wsUser)
|
||||||
|
topicListMutex.Unlock()
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) GuestCount() int {
|
||||||
|
defer hub.GuestLock.RUnlock()
|
||||||
|
hub.GuestLock.RLock()
|
||||||
|
return len(hub.OnlineGuests)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) UserCount() (count int) {
|
||||||
|
hub.evenUserLock.RLock()
|
||||||
|
count += len(hub.evenOnlineUsers)
|
||||||
|
hub.evenUserLock.RUnlock()
|
||||||
|
hub.oddUserLock.RLock()
|
||||||
|
count += len(hub.oddOnlineUsers)
|
||||||
|
hub.oddUserLock.RUnlock()
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) broadcastMessage(msg string) error {
|
||||||
|
var userLoop = func(users map[int]*WSUser, mutex *sync.RWMutex) error {
|
||||||
|
defer mutex.RUnlock()
|
||||||
|
for _, wsUser := range users {
|
||||||
|
err := wsUser.WriteAll(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// TODO: Can we move this RLock inside the closure safely?
|
||||||
|
hub.evenUserLock.RLock()
|
||||||
|
err := userLoop(hub.evenOnlineUsers, &hub.evenUserLock)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
hub.oddUserLock.RLock()
|
||||||
|
return userLoop(hub.oddOnlineUsers, &hub.oddUserLock)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) getUser(uid int) (wsUser *WSUser, err error) {
|
||||||
|
var ok bool
|
||||||
|
if uid%2 == 0 {
|
||||||
|
hub.evenUserLock.RLock()
|
||||||
|
wsUser, ok = hub.evenOnlineUsers[uid]
|
||||||
|
hub.evenUserLock.RUnlock()
|
||||||
|
} else {
|
||||||
|
hub.oddUserLock.RLock()
|
||||||
|
wsUser, ok = hub.oddOnlineUsers[uid]
|
||||||
|
hub.oddUserLock.RUnlock()
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return nil, errWsNouser
|
||||||
|
}
|
||||||
|
return wsUser, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Warning: For efficiency, some of the *WSUsers may be nil pointers, DO NOT EXPORT
|
||||||
|
func (hub *WsHubImpl) getUsers(uids []int) (wsUsers []*WSUser, err error) {
|
||||||
|
if len(uids) == 0 {
|
||||||
|
return nil, errWsNouser
|
||||||
|
}
|
||||||
|
hub.evenUserLock.RLock()
|
||||||
|
// We don't want to keep a lock on this for too long, so we'll accept some nil pointers
|
||||||
|
for _, uid := range uids {
|
||||||
|
wsUsers = append(wsUsers, hub.evenOnlineUsers[uid])
|
||||||
|
}
|
||||||
|
hub.evenUserLock.RUnlock()
|
||||||
|
hub.oddUserLock.RLock()
|
||||||
|
// We don't want to keep a lock on this for too long, so we'll accept some nil pointers
|
||||||
|
for _, uid := range uids {
|
||||||
|
wsUsers = append(wsUsers, hub.oddOnlineUsers[uid])
|
||||||
|
}
|
||||||
|
hub.oddUserLock.RUnlock()
|
||||||
|
if len(wsUsers) == 0 {
|
||||||
|
return nil, errWsNouser
|
||||||
|
}
|
||||||
|
return wsUsers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) removeUser(uid int) {
|
||||||
|
if uid%2 == 0 {
|
||||||
|
hub.evenUserLock.Lock()
|
||||||
|
delete(hub.evenOnlineUsers, uid)
|
||||||
|
hub.evenUserLock.Unlock()
|
||||||
|
} else {
|
||||||
|
hub.oddUserLock.Lock()
|
||||||
|
delete(hub.oddOnlineUsers, uid)
|
||||||
|
hub.oddUserLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) AddConn(user User, conn *websocket.Conn) (*WSUser, error) {
|
||||||
|
// TODO: How should we handle user state changes if we're holding a pointer which never changes?
|
||||||
|
userptr, err := Users.Get(user.ID)
|
||||||
|
if err != nil && err != ErrStoreCapacityOverflow {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if user.ID == 0 {
|
||||||
|
wsUser := new(WSUser)
|
||||||
|
wsUser.User = userptr
|
||||||
|
wsUser.AddSocket(conn, "")
|
||||||
|
WsHub.GuestLock.Lock()
|
||||||
|
WsHub.OnlineGuests[wsUser] = true
|
||||||
|
WsHub.GuestLock.Unlock()
|
||||||
|
return wsUser, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var mutex *sync.RWMutex
|
||||||
|
var theMap map[int]*WSUser
|
||||||
|
if user.ID%2 == 0 {
|
||||||
|
mutex = &hub.evenUserLock
|
||||||
|
theMap = hub.evenOnlineUsers
|
||||||
|
} else {
|
||||||
|
mutex = &hub.oddUserLock
|
||||||
|
theMap = hub.oddOnlineUsers
|
||||||
|
}
|
||||||
|
|
||||||
|
mutex.Lock()
|
||||||
|
wsUser, ok := theMap[user.ID]
|
||||||
|
if !ok {
|
||||||
|
wsUser = new(WSUser)
|
||||||
|
wsUser.User = userptr
|
||||||
|
wsUser.Sockets = []*WSUserSocket{&WSUserSocket{conn, ""}}
|
||||||
|
theMap[user.ID] = wsUser
|
||||||
|
mutex.Unlock()
|
||||||
|
return wsUser, nil
|
||||||
|
}
|
||||||
|
mutex.Unlock()
|
||||||
|
wsUser.AddSocket(conn, "")
|
||||||
|
return wsUser, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) RemoveConn(wsUser *WSUser, conn *websocket.Conn) {
|
||||||
|
wsUser.RemoveSocket(conn)
|
||||||
|
wsUser.Lock()
|
||||||
|
if len(wsUser.Sockets) == 0 {
|
||||||
|
hub.removeUser(wsUser.User.ID)
|
||||||
|
}
|
||||||
|
wsUser.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) pushMessage(targetUser int, msg string) error {
|
||||||
|
wsUser, err := hub.getUser(targetUser)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return wsUser.WriteAll(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) pushAlert(targetUser int, asid int, event string, elementType string, actorID int, targetUserID int, elementID int) error {
|
||||||
|
wsUser, err := hub.getUser(targetUser)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
alert, err := BuildAlert(asid, event, elementType, actorID, targetUserID, elementID, *wsUser.User)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wsUser.WriteAll(alert)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) pushAlerts(users []int, asid int, event string, elementType string, actorID int, targetUserID int, elementID int) error {
|
||||||
|
wsUsers, err := hub.getUsers(users)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var errs []error
|
||||||
|
for _, wsUser := range wsUsers {
|
||||||
|
if wsUser == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
alert, err := BuildAlert(asid, event, elementType, actorID, targetUserID, elementID, *wsUser.User)
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
err = wsUser.WriteAll(alert)
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the first error
|
||||||
|
if len(errs) != 0 {
|
||||||
|
for _, err := range errs {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,131 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrNoneOnPage = errors.New("This user isn't on that page")
|
||||||
|
|
||||||
|
type WSUser struct {
|
||||||
|
User *User
|
||||||
|
Sockets []*WSUserSocket
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type WSUserSocket struct {
|
||||||
|
conn *websocket.Conn
|
||||||
|
Page string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wsUser *WSUser) WriteAll(msg string) error {
|
||||||
|
msgbytes := []byte(msg)
|
||||||
|
for _, socket := range wsUser.Sockets {
|
||||||
|
w, err := socket.conn.NextWriter(websocket.TextMessage)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, _ = w.Write(msgbytes)
|
||||||
|
w.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wsUser *WSUser) WriteToPage(msg string, page string) error {
|
||||||
|
return wsUser.WriteToPageBytes([]byte(msg), page)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inefficient as it looks for sockets for a page even if there are none
|
||||||
|
func (wsUser *WSUser) WriteToPageBytes(msg []byte, page string) error {
|
||||||
|
var success bool
|
||||||
|
for _, socket := range wsUser.Sockets {
|
||||||
|
if socket.Page != page {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
w, err := socket.conn.NextWriter(websocket.TextMessage)
|
||||||
|
if err != nil {
|
||||||
|
continue // Skip dead sockets, a dedicated goroutine handles those
|
||||||
|
}
|
||||||
|
_, _ = w.Write(msg)
|
||||||
|
w.Close()
|
||||||
|
success = true
|
||||||
|
}
|
||||||
|
if !success {
|
||||||
|
return ErrNoneOnPage
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wsUser *WSUser) AddSocket(conn *websocket.Conn, page string) {
|
||||||
|
wsUser.Lock()
|
||||||
|
// If the number of the sockets is small, then we can keep the size of the slice mostly static and just walk through it looking for empty slots
|
||||||
|
if len(wsUser.Sockets) < 6 {
|
||||||
|
for i, socket := range wsUser.Sockets {
|
||||||
|
if socket == nil {
|
||||||
|
wsUser.Sockets[i] = &WSUserSocket{conn, page}
|
||||||
|
wsUser.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wsUser.Sockets = append(wsUser.Sockets, &WSUserSocket{conn, page})
|
||||||
|
wsUser.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wsUser *WSUser) RemoveSocket(conn *websocket.Conn) {
|
||||||
|
wsUser.Lock()
|
||||||
|
if len(wsUser.Sockets) < 6 {
|
||||||
|
for i, socket := range wsUser.Sockets {
|
||||||
|
if socket.conn == conn {
|
||||||
|
wsUser.Sockets[i] = nil
|
||||||
|
wsUser.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var key int
|
||||||
|
for i, socket := range wsUser.Sockets {
|
||||||
|
if socket.conn == conn {
|
||||||
|
key = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wsUser.Sockets = append(wsUser.Sockets[:key], wsUser.Sockets[key+1:]...)
|
||||||
|
|
||||||
|
wsUser.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wsUser *WSUser) SetPageForSocket(conn *websocket.Conn, page string) {
|
||||||
|
wsUser.Lock()
|
||||||
|
for _, socket := range wsUser.Sockets {
|
||||||
|
if socket.conn == conn {
|
||||||
|
socket.Page = page
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wsUser.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wsUser *WSUser) InPage(page string) bool {
|
||||||
|
wsUser.Lock()
|
||||||
|
for _, socket := range wsUser.Sockets {
|
||||||
|
if socket.Page == page {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wsUser.Unlock()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wsUser *WSUser) FinalizePage(page string, handle func()) {
|
||||||
|
wsUser.Lock()
|
||||||
|
for _, socket := range wsUser.Sockets {
|
||||||
|
if socket.Page == page {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handle()
|
||||||
|
wsUser.Unlock()
|
||||||
|
}
|
Loading…
Reference in New Issue