From f81e73c097908ff9feb266d0b0a1c9e56a423671 Mon Sep 17 00:00:00 2001 From: skedar46 Date: Sun, 15 Apr 2018 13:19:25 -0700 Subject: [PATCH] replaced golang.org/x/net websockets with github.com/gorilla websockets --- client/sacrificial-socket.js | 7 +- .../webroot/js/sacrificial-socket.js | 7 +- .../webroot/js/sacrificial-socket.js | 7 +- .../chat/webroot/js/sacrificial-socket.js | 7 +- server.go | 68 ++++++++----- socket.go | 97 ++++++++++--------- tools/random.go | 28 ------ 7 files changed, 111 insertions(+), 110 deletions(-) delete mode 100644 tools/random.go diff --git a/client/sacrificial-socket.js b/client/sacrificial-socket.js index a3fdc50..38243b5 100644 --- a/client/sacrificial-socket.js +++ b/client/sacrificial-socket.js @@ -34,7 +34,8 @@ if(typeof module === 'undefined'){ headerStartChar = String.fromCharCode(headerStartCharCode), dataStartCharCode = 2, dataStartChar = String.fromCharCode(dataStartCharCode), - ws = new WebSocket(url, 'sac-sock'); + subProtocol = 'sac-sock', + ws = new WebSocket(url, subProtocol); //blomp blomp-a noop noop a-noop noop noop self.noop = function(){ }; @@ -112,7 +113,7 @@ if(typeof module === 'undefined'){ function startReconnect(){ setTimeout(function(){ console.log('attempting reconnect'); - var newWS = new WebSocket(url, 'sac-sock'); + var newWS = new WebSocket(url, subProtocol); newWS.onmessage = ws.onmessage; newWS.onclose = ws.onclose; newWS.binaryType = ws.binaryType; @@ -240,7 +241,7 @@ if(typeof module === 'undefined'){ */ self.close = function(){ reconnectOpts.enabled = false; //don't reconnect if close is called - return ws.close(); + return ws.close(1000); }; }; diff --git a/examples/not-so-simple-examples/grpc-multihome/webroot/js/sacrificial-socket.js b/examples/not-so-simple-examples/grpc-multihome/webroot/js/sacrificial-socket.js index a3fdc50..38243b5 100644 --- a/examples/not-so-simple-examples/grpc-multihome/webroot/js/sacrificial-socket.js +++ b/examples/not-so-simple-examples/grpc-multihome/webroot/js/sacrificial-socket.js @@ -34,7 +34,8 @@ if(typeof module === 'undefined'){ headerStartChar = String.fromCharCode(headerStartCharCode), dataStartCharCode = 2, dataStartChar = String.fromCharCode(dataStartCharCode), - ws = new WebSocket(url, 'sac-sock'); + subProtocol = 'sac-sock', + ws = new WebSocket(url, subProtocol); //blomp blomp-a noop noop a-noop noop noop self.noop = function(){ }; @@ -112,7 +113,7 @@ if(typeof module === 'undefined'){ function startReconnect(){ setTimeout(function(){ console.log('attempting reconnect'); - var newWS = new WebSocket(url, 'sac-sock'); + var newWS = new WebSocket(url, subProtocol); newWS.onmessage = ws.onmessage; newWS.onclose = ws.onclose; newWS.binaryType = ws.binaryType; @@ -240,7 +241,7 @@ if(typeof module === 'undefined'){ */ self.close = function(){ reconnectOpts.enabled = false; //don't reconnect if close is called - return ws.close(); + return ws.close(1000); }; }; diff --git a/examples/not-so-simple-examples/redis-multihome/webroot/js/sacrificial-socket.js b/examples/not-so-simple-examples/redis-multihome/webroot/js/sacrificial-socket.js index a3fdc50..38243b5 100644 --- a/examples/not-so-simple-examples/redis-multihome/webroot/js/sacrificial-socket.js +++ b/examples/not-so-simple-examples/redis-multihome/webroot/js/sacrificial-socket.js @@ -34,7 +34,8 @@ if(typeof module === 'undefined'){ headerStartChar = String.fromCharCode(headerStartCharCode), dataStartCharCode = 2, dataStartChar = String.fromCharCode(dataStartCharCode), - ws = new WebSocket(url, 'sac-sock'); + subProtocol = 'sac-sock', + ws = new WebSocket(url, subProtocol); //blomp blomp-a noop noop a-noop noop noop self.noop = function(){ }; @@ -112,7 +113,7 @@ if(typeof module === 'undefined'){ function startReconnect(){ setTimeout(function(){ console.log('attempting reconnect'); - var newWS = new WebSocket(url, 'sac-sock'); + var newWS = new WebSocket(url, subProtocol); newWS.onmessage = ws.onmessage; newWS.onclose = ws.onclose; newWS.binaryType = ws.binaryType; @@ -240,7 +241,7 @@ if(typeof module === 'undefined'){ */ self.close = function(){ reconnectOpts.enabled = false; //don't reconnect if close is called - return ws.close(); + return ws.close(1000); }; }; diff --git a/examples/simple-examples/chat/webroot/js/sacrificial-socket.js b/examples/simple-examples/chat/webroot/js/sacrificial-socket.js index a3fdc50..38243b5 100644 --- a/examples/simple-examples/chat/webroot/js/sacrificial-socket.js +++ b/examples/simple-examples/chat/webroot/js/sacrificial-socket.js @@ -34,7 +34,8 @@ if(typeof module === 'undefined'){ headerStartChar = String.fromCharCode(headerStartCharCode), dataStartCharCode = 2, dataStartChar = String.fromCharCode(dataStartCharCode), - ws = new WebSocket(url, 'sac-sock'); + subProtocol = 'sac-sock', + ws = new WebSocket(url, subProtocol); //blomp blomp-a noop noop a-noop noop noop self.noop = function(){ }; @@ -112,7 +113,7 @@ if(typeof module === 'undefined'){ function startReconnect(){ setTimeout(function(){ console.log('attempting reconnect'); - var newWS = new WebSocket(url, 'sac-sock'); + var newWS = new WebSocket(url, subProtocol); newWS.onmessage = ws.onmessage; newWS.onclose = ws.onclose; newWS.binaryType = ws.binaryType; @@ -240,7 +241,7 @@ if(typeof module === 'undefined'){ */ self.close = function(){ reconnectOpts.enabled = false; //don't reconnect if close is called - return ws.close(); + return ws.close(1000); }; }; diff --git a/server.go b/server.go index 53abe6f..4bf56d9 100644 --- a/server.go +++ b/server.go @@ -8,12 +8,13 @@ Sacrificial-Socket also has a MultihomeBackend interface for syncronizing broadc package ss import ( + "github.com/gorilla/websocket" "github.com/raz-varren/log" - "golang.org/x/net/websocket" "io" "net/http" "os" "os/signal" + "strings" "sync" "syscall" ) @@ -39,14 +40,16 @@ type SocketServer struct { onConnectFunc func(*Socket) onDisconnectFunc func(*Socket) l *sync.RWMutex + upgrader *websocket.Upgrader } //NewServer creates a new instance of SocketServer func NewServer() *SocketServer { s := &SocketServer{ - hub: newHub(), - events: make(map[string]*event), - l: &sync.RWMutex{}, + hub: newHub(), + events: make(map[string]*event), + l: &sync.RWMutex{}, + upgrader: DefaultUpgrader(), } return s @@ -115,16 +118,35 @@ func (serv *SocketServer) OnDisconnect(handleFunc func(*Socket)) { } //WebHandler returns a http.Handler to be passed into http.Handle +// +//Depricated: The SocketServer struct now satisfies the http.Handler interface, use that instead func (serv *SocketServer) WebHandler() http.Handler { - return websocket.Server{ - Handshake: func(c *websocket.Config, r *http.Request) error { - if !protocolSupported(c) { - return websocket.ErrBadWebSocketProtocol - } - return nil - }, - Handler: serv.loop, + return serv +} + +//ServeHTTP will upgrade a http request to a websocket using the sac-sock subprotocol +func (serv *SocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ws, err := serv.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Err.Println(err) + return } + + serv.loop(ws) +} + +//DefaultUpgrader returns a websocket upgrader suitable for creating sacrificial-socket websockets. +func DefaultUpgrader() *websocket.Upgrader { + u := &websocket.Upgrader{ + Subprotocols: []string{SubProtocol}, + } + + return u +} + +//SetUpgrader sets the websocket.Upgrader used by the SocketServer. +func (serv *SocketServer) SetUpgrader(u *websocket.Upgrader) { + serv.upgrader = u } //SetMultihomeBackend registers a MultihomeBackend interface and calls it's Init() method @@ -147,6 +169,7 @@ func (serv *SocketServer) Broadcast(eventName string, data interface{}) { func (serv *SocketServer) loop(ws *websocket.Conn) { s := newSocket(serv, ws) log.Debug.Println(s.ID(), "connected") + defer s.Close() serv.l.RLock() @@ -158,10 +181,8 @@ func (serv *SocketServer) loop(ws *websocket.Conn) { } for { - var msg []byte - - err := s.receive(&msg) - if err == io.EOF { + msg, err := s.receive() + if ignorableError(err) { return } if err != nil { @@ -180,7 +201,8 @@ func (serv *SocketServer) loop(ws *websocket.Conn) { } } if eventName == "" { - continue //no event to dispatch + log.Warn.Println("no event to dispatch") + continue } serv.l.RLock() @@ -193,11 +215,11 @@ func (serv *SocketServer) loop(ws *websocket.Conn) { } } -func protocolSupported(conf *websocket.Config) bool { - for _, p := range conf.Protocol { - if p == SubProtocol { - return true - } +func ignorableError(err error) bool { + //not an error + if err == nil { + return false } - return false + + return err == io.EOF || websocket.IsCloseError(err, 1000) || websocket.IsCloseError(err, 1001) || strings.HasSuffix(err.Error(), "use of closed network connection") } diff --git a/socket.go b/socket.go index b78b9af..70e43dc 100644 --- a/socket.go +++ b/socket.go @@ -2,11 +2,17 @@ package ss import ( "bytes" + "encoding/base64" "encoding/json" + "github.com/gorilla/websocket" "github.com/raz-varren/log" - "github.com/raz-varren/sacrificial-socket/tools" - "golang.org/x/net/websocket" + "math/rand" "sync" + "time" +) + +var ( + socketRNG = newRNG() ) //Socket represents a websocket connection @@ -21,42 +27,17 @@ type Socket struct { } const ( - idLen int = 32 + idLen int = 24 typeJSON string = "J" typeBin = "B" typeStr = "S" ) -var ( - idChars = []string{ - "0", "1", "2", "3", "4", - "5", "6", "7", "8", "9", - "A", "B", "C", "D", "E", - "F", "G", "H", "I", "J", - "K", "L", "M", "N", "O", - "P", "Q", "R", "S", "T", - "U", "V", "W", "X", "Y", - "Z", "a", "b", "c", "d", - "e", "f", "g", "h", "i", - "j", "k", "l", "m", "n", - "o", "p", "q", "r", "s", - "t", "u", "v", "w", "x", - "y", "z", "=", "_", "-", - "#", ".", - } - - idCharLen = len(idChars) - 1 -) - func newSocket(serv *SocketServer, ws *websocket.Conn) *Socket { - buf := bytes.NewBuffer(nil) - for i := 0; i < idLen; i++ { - buf.WriteString(idChars[tools.RandomInt(0, idCharLen)]) - } s := &Socket{ l: &sync.RWMutex{}, - id: buf.String(), + id: newSocketID(), ws: ws, closed: false, serv: serv, @@ -67,12 +48,21 @@ func newSocket(serv *SocketServer, ws *websocket.Conn) *Socket { return s } -func (s *Socket) receive(v interface{}) error { - return websocket.Message.Receive(s.ws, v) +func newSocketID() string { + idBuf := make([]byte, idLen) + socketRNG.Read(idBuf) + return base64.StdEncoding.EncodeToString(idBuf) } -func (s *Socket) send(data interface{}) error { - return websocket.Message.Send(s.ws, data) +func (s *Socket) receive() ([]byte, error) { + _, data, err := s.ws.ReadMessage() + return data, err +} + +func (s *Socket) send(msgType int, data []byte) error { + s.l.Lock() + defer s.l.Unlock() + return s.ws.WriteMessage(msgType, data) } //InRoom returns true if s is currently a member of roomName @@ -126,20 +116,18 @@ func (s *Socket) Broadcast(eventName string, data interface{}) { //Emit dispatches an event to s. func (s *Socket) Emit(eventName string, data interface{}) error { - return s.send(emitData(eventName, data)) + d, msgType := emitData(eventName, data) + return s.send(msgType, d) } //ID returns the unique ID of s func (s *Socket) ID() string { - s.l.RLock() - defer s.l.RUnlock() - id := s.id - return id + return s.id } //emitData combines the eventName and data into a payload that is understood -//by the sac-sock protocol. It will return either a string or a []byte -func emitData(eventName string, data interface{}) interface{} { +//by the sac-sock protocol. +func emitData(eventName string, data interface{}) ([]byte, int) { buf := bytes.NewBuffer(nil) buf.WriteString(eventName) buf.WriteByte(startOfHeaderByte) @@ -149,13 +137,13 @@ func emitData(eventName string, data interface{}) interface{} { buf.WriteString(typeStr) buf.WriteByte(startOfDataByte) buf.WriteString(d) - return buf.String() + return buf.Bytes(), websocket.TextMessage case []byte: buf.WriteString(typeBin) buf.WriteByte(startOfDataByte) buf.Write(d) - return buf.Bytes() + return buf.Bytes(), websocket.BinaryMessage default: buf.WriteString(typeJSON) @@ -166,7 +154,7 @@ func emitData(eventName string, data interface{}) interface{} { } else { buf.Write(jsonData) } - return buf.String() + return buf.Bytes(), websocket.TextMessage } } @@ -184,10 +172,7 @@ func (s *Socket) Close() { defer log.Debug.Println(s.ID(), "disconnected") - err := s.ws.Close() - if err != nil { - log.Err.Println(err) - } + s.ws.Close() rooms := s.GetRooms() @@ -205,3 +190,21 @@ func (s *Socket) Close() { s.serv.hub.removeSocket(s) } + +type rng struct { + r *rand.Rand + mu *sync.Mutex +} + +func (r *rng) Read(b []byte) (int, error) { + r.mu.Lock() + defer r.mu.Unlock() + return r.r.Read(b) +} + +func newRNG() *rng { + return &rng{ + r: rand.New(rand.NewSource(time.Now().UnixNano())), + mu: &sync.Mutex{}, + } +} diff --git a/tools/random.go b/tools/random.go deleted file mode 100644 index dfc37d0..0000000 --- a/tools/random.go +++ /dev/null @@ -1,28 +0,0 @@ -/* -Package tools is really just used during socket creation to generate random numbers for socket IDs. -*/ -package tools - -import ( - crand "crypto/rand" - "fmt" - "io" - "math/rand" - "time" -) - -func RandomInt(min, max int) int { - rand.Seed(time.Now().UnixNano()) - return rand.Intn(max-min+1) + min -} - -func RandomInt64(min, max int64) int64 { - rand.Seed(time.Now().UnixNano()) - return rand.Int63n(max-min+1) + min -} - -func UID() string { - uid := make([]byte, 16) - io.ReadFull(crand.Reader, uid) - return fmt.Sprintf("%x", uid) -}