replaced golang.org/x/net websockets with github.com/gorilla websockets

This commit is contained in:
skedar46
2018-04-15 13:19:25 -07:00
parent ad769a9e40
commit f81e73c097
7 changed files with 111 additions and 110 deletions

View File

@@ -34,7 +34,8 @@ if(typeof module === 'undefined'){
headerStartChar = String.fromCharCode(headerStartCharCode), headerStartChar = String.fromCharCode(headerStartCharCode),
dataStartCharCode = 2, dataStartCharCode = 2,
dataStartChar = String.fromCharCode(dataStartCharCode), dataStartChar = String.fromCharCode(dataStartCharCode),
ws = new WebSocket(url, 'sac-sock'); subProtocol = 'sac-sock',
ws = new WebSocket(url, subProtocol);
//blomp blomp-a noop noop a-noop noop noop //blomp blomp-a noop noop a-noop noop noop
self.noop = function(){ }; self.noop = function(){ };
@@ -112,7 +113,7 @@ if(typeof module === 'undefined'){
function startReconnect(){ function startReconnect(){
setTimeout(function(){ setTimeout(function(){
console.log('attempting reconnect'); console.log('attempting reconnect');
var newWS = new WebSocket(url, 'sac-sock'); var newWS = new WebSocket(url, subProtocol);
newWS.onmessage = ws.onmessage; newWS.onmessage = ws.onmessage;
newWS.onclose = ws.onclose; newWS.onclose = ws.onclose;
newWS.binaryType = ws.binaryType; newWS.binaryType = ws.binaryType;
@@ -240,7 +241,7 @@ if(typeof module === 'undefined'){
*/ */
self.close = function(){ self.close = function(){
reconnectOpts.enabled = false; //don't reconnect if close is called reconnectOpts.enabled = false; //don't reconnect if close is called
return ws.close(); return ws.close(1000);
}; };
}; };

View File

@@ -34,7 +34,8 @@ if(typeof module === 'undefined'){
headerStartChar = String.fromCharCode(headerStartCharCode), headerStartChar = String.fromCharCode(headerStartCharCode),
dataStartCharCode = 2, dataStartCharCode = 2,
dataStartChar = String.fromCharCode(dataStartCharCode), dataStartChar = String.fromCharCode(dataStartCharCode),
ws = new WebSocket(url, 'sac-sock'); subProtocol = 'sac-sock',
ws = new WebSocket(url, subProtocol);
//blomp blomp-a noop noop a-noop noop noop //blomp blomp-a noop noop a-noop noop noop
self.noop = function(){ }; self.noop = function(){ };
@@ -112,7 +113,7 @@ if(typeof module === 'undefined'){
function startReconnect(){ function startReconnect(){
setTimeout(function(){ setTimeout(function(){
console.log('attempting reconnect'); console.log('attempting reconnect');
var newWS = new WebSocket(url, 'sac-sock'); var newWS = new WebSocket(url, subProtocol);
newWS.onmessage = ws.onmessage; newWS.onmessage = ws.onmessage;
newWS.onclose = ws.onclose; newWS.onclose = ws.onclose;
newWS.binaryType = ws.binaryType; newWS.binaryType = ws.binaryType;
@@ -240,7 +241,7 @@ if(typeof module === 'undefined'){
*/ */
self.close = function(){ self.close = function(){
reconnectOpts.enabled = false; //don't reconnect if close is called reconnectOpts.enabled = false; //don't reconnect if close is called
return ws.close(); return ws.close(1000);
}; };
}; };

View File

@@ -34,7 +34,8 @@ if(typeof module === 'undefined'){
headerStartChar = String.fromCharCode(headerStartCharCode), headerStartChar = String.fromCharCode(headerStartCharCode),
dataStartCharCode = 2, dataStartCharCode = 2,
dataStartChar = String.fromCharCode(dataStartCharCode), dataStartChar = String.fromCharCode(dataStartCharCode),
ws = new WebSocket(url, 'sac-sock'); subProtocol = 'sac-sock',
ws = new WebSocket(url, subProtocol);
//blomp blomp-a noop noop a-noop noop noop //blomp blomp-a noop noop a-noop noop noop
self.noop = function(){ }; self.noop = function(){ };
@@ -112,7 +113,7 @@ if(typeof module === 'undefined'){
function startReconnect(){ function startReconnect(){
setTimeout(function(){ setTimeout(function(){
console.log('attempting reconnect'); console.log('attempting reconnect');
var newWS = new WebSocket(url, 'sac-sock'); var newWS = new WebSocket(url, subProtocol);
newWS.onmessage = ws.onmessage; newWS.onmessage = ws.onmessage;
newWS.onclose = ws.onclose; newWS.onclose = ws.onclose;
newWS.binaryType = ws.binaryType; newWS.binaryType = ws.binaryType;
@@ -240,7 +241,7 @@ if(typeof module === 'undefined'){
*/ */
self.close = function(){ self.close = function(){
reconnectOpts.enabled = false; //don't reconnect if close is called reconnectOpts.enabled = false; //don't reconnect if close is called
return ws.close(); return ws.close(1000);
}; };
}; };

View File

@@ -34,7 +34,8 @@ if(typeof module === 'undefined'){
headerStartChar = String.fromCharCode(headerStartCharCode), headerStartChar = String.fromCharCode(headerStartCharCode),
dataStartCharCode = 2, dataStartCharCode = 2,
dataStartChar = String.fromCharCode(dataStartCharCode), dataStartChar = String.fromCharCode(dataStartCharCode),
ws = new WebSocket(url, 'sac-sock'); subProtocol = 'sac-sock',
ws = new WebSocket(url, subProtocol);
//blomp blomp-a noop noop a-noop noop noop //blomp blomp-a noop noop a-noop noop noop
self.noop = function(){ }; self.noop = function(){ };
@@ -112,7 +113,7 @@ if(typeof module === 'undefined'){
function startReconnect(){ function startReconnect(){
setTimeout(function(){ setTimeout(function(){
console.log('attempting reconnect'); console.log('attempting reconnect');
var newWS = new WebSocket(url, 'sac-sock'); var newWS = new WebSocket(url, subProtocol);
newWS.onmessage = ws.onmessage; newWS.onmessage = ws.onmessage;
newWS.onclose = ws.onclose; newWS.onclose = ws.onclose;
newWS.binaryType = ws.binaryType; newWS.binaryType = ws.binaryType;
@@ -240,7 +241,7 @@ if(typeof module === 'undefined'){
*/ */
self.close = function(){ self.close = function(){
reconnectOpts.enabled = false; //don't reconnect if close is called reconnectOpts.enabled = false; //don't reconnect if close is called
return ws.close(); return ws.close(1000);
}; };
}; };

View File

@@ -8,12 +8,13 @@ Sacrificial-Socket also has a MultihomeBackend interface for syncronizing broadc
package ss package ss
import ( import (
"github.com/gorilla/websocket"
"github.com/raz-varren/log" "github.com/raz-varren/log"
"golang.org/x/net/websocket"
"io" "io"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"strings"
"sync" "sync"
"syscall" "syscall"
) )
@@ -39,14 +40,16 @@ type SocketServer struct {
onConnectFunc func(*Socket) onConnectFunc func(*Socket)
onDisconnectFunc func(*Socket) onDisconnectFunc func(*Socket)
l *sync.RWMutex l *sync.RWMutex
upgrader *websocket.Upgrader
} }
//NewServer creates a new instance of SocketServer //NewServer creates a new instance of SocketServer
func NewServer() *SocketServer { func NewServer() *SocketServer {
s := &SocketServer{ s := &SocketServer{
hub: newHub(), hub: newHub(),
events: make(map[string]*event), events: make(map[string]*event),
l: &sync.RWMutex{}, l: &sync.RWMutex{},
upgrader: DefaultUpgrader(),
} }
return s return s
@@ -115,16 +118,35 @@ func (serv *SocketServer) OnDisconnect(handleFunc func(*Socket)) {
} }
//WebHandler returns a http.Handler to be passed into http.Handle //WebHandler returns a http.Handler to be passed into http.Handle
//
//Depricated: The SocketServer struct now satisfies the http.Handler interface, use that instead
func (serv *SocketServer) WebHandler() http.Handler { func (serv *SocketServer) WebHandler() http.Handler {
return websocket.Server{ return serv
Handshake: func(c *websocket.Config, r *http.Request) error { }
if !protocolSupported(c) {
return websocket.ErrBadWebSocketProtocol //ServeHTTP will upgrade a http request to a websocket using the sac-sock subprotocol
} func (serv *SocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return nil ws, err := serv.upgrader.Upgrade(w, r, nil)
}, if err != nil {
Handler: serv.loop, log.Err.Println(err)
return
} }
serv.loop(ws)
}
//DefaultUpgrader returns a websocket upgrader suitable for creating sacrificial-socket websockets.
func DefaultUpgrader() *websocket.Upgrader {
u := &websocket.Upgrader{
Subprotocols: []string{SubProtocol},
}
return u
}
//SetUpgrader sets the websocket.Upgrader used by the SocketServer.
func (serv *SocketServer) SetUpgrader(u *websocket.Upgrader) {
serv.upgrader = u
} }
//SetMultihomeBackend registers a MultihomeBackend interface and calls it's Init() method //SetMultihomeBackend registers a MultihomeBackend interface and calls it's Init() method
@@ -147,6 +169,7 @@ func (serv *SocketServer) Broadcast(eventName string, data interface{}) {
func (serv *SocketServer) loop(ws *websocket.Conn) { func (serv *SocketServer) loop(ws *websocket.Conn) {
s := newSocket(serv, ws) s := newSocket(serv, ws)
log.Debug.Println(s.ID(), "connected") log.Debug.Println(s.ID(), "connected")
defer s.Close() defer s.Close()
serv.l.RLock() serv.l.RLock()
@@ -158,10 +181,8 @@ func (serv *SocketServer) loop(ws *websocket.Conn) {
} }
for { for {
var msg []byte msg, err := s.receive()
if ignorableError(err) {
err := s.receive(&msg)
if err == io.EOF {
return return
} }
if err != nil { if err != nil {
@@ -180,7 +201,8 @@ func (serv *SocketServer) loop(ws *websocket.Conn) {
} }
} }
if eventName == "" { if eventName == "" {
continue //no event to dispatch log.Warn.Println("no event to dispatch")
continue
} }
serv.l.RLock() serv.l.RLock()
@@ -193,11 +215,11 @@ func (serv *SocketServer) loop(ws *websocket.Conn) {
} }
} }
func protocolSupported(conf *websocket.Config) bool { func ignorableError(err error) bool {
for _, p := range conf.Protocol { //not an error
if p == SubProtocol { if err == nil {
return true return false
}
} }
return false
return err == io.EOF || websocket.IsCloseError(err, 1000) || websocket.IsCloseError(err, 1001) || strings.HasSuffix(err.Error(), "use of closed network connection")
} }

View File

@@ -2,11 +2,17 @@ package ss
import ( import (
"bytes" "bytes"
"encoding/base64"
"encoding/json" "encoding/json"
"github.com/gorilla/websocket"
"github.com/raz-varren/log" "github.com/raz-varren/log"
"github.com/raz-varren/sacrificial-socket/tools" "math/rand"
"golang.org/x/net/websocket"
"sync" "sync"
"time"
)
var (
socketRNG = newRNG()
) )
//Socket represents a websocket connection //Socket represents a websocket connection
@@ -21,42 +27,17 @@ type Socket struct {
} }
const ( const (
idLen int = 32 idLen int = 24
typeJSON string = "J" typeJSON string = "J"
typeBin = "B" typeBin = "B"
typeStr = "S" typeStr = "S"
) )
var (
idChars = []string{
"0", "1", "2", "3", "4",
"5", "6", "7", "8", "9",
"A", "B", "C", "D", "E",
"F", "G", "H", "I", "J",
"K", "L", "M", "N", "O",
"P", "Q", "R", "S", "T",
"U", "V", "W", "X", "Y",
"Z", "a", "b", "c", "d",
"e", "f", "g", "h", "i",
"j", "k", "l", "m", "n",
"o", "p", "q", "r", "s",
"t", "u", "v", "w", "x",
"y", "z", "=", "_", "-",
"#", ".",
}
idCharLen = len(idChars) - 1
)
func newSocket(serv *SocketServer, ws *websocket.Conn) *Socket { func newSocket(serv *SocketServer, ws *websocket.Conn) *Socket {
buf := bytes.NewBuffer(nil)
for i := 0; i < idLen; i++ {
buf.WriteString(idChars[tools.RandomInt(0, idCharLen)])
}
s := &Socket{ s := &Socket{
l: &sync.RWMutex{}, l: &sync.RWMutex{},
id: buf.String(), id: newSocketID(),
ws: ws, ws: ws,
closed: false, closed: false,
serv: serv, serv: serv,
@@ -67,12 +48,21 @@ func newSocket(serv *SocketServer, ws *websocket.Conn) *Socket {
return s return s
} }
func (s *Socket) receive(v interface{}) error { func newSocketID() string {
return websocket.Message.Receive(s.ws, v) idBuf := make([]byte, idLen)
socketRNG.Read(idBuf)
return base64.StdEncoding.EncodeToString(idBuf)
} }
func (s *Socket) send(data interface{}) error { func (s *Socket) receive() ([]byte, error) {
return websocket.Message.Send(s.ws, data) _, data, err := s.ws.ReadMessage()
return data, err
}
func (s *Socket) send(msgType int, data []byte) error {
s.l.Lock()
defer s.l.Unlock()
return s.ws.WriteMessage(msgType, data)
} }
//InRoom returns true if s is currently a member of roomName //InRoom returns true if s is currently a member of roomName
@@ -126,20 +116,18 @@ func (s *Socket) Broadcast(eventName string, data interface{}) {
//Emit dispatches an event to s. //Emit dispatches an event to s.
func (s *Socket) Emit(eventName string, data interface{}) error { func (s *Socket) Emit(eventName string, data interface{}) error {
return s.send(emitData(eventName, data)) d, msgType := emitData(eventName, data)
return s.send(msgType, d)
} }
//ID returns the unique ID of s //ID returns the unique ID of s
func (s *Socket) ID() string { func (s *Socket) ID() string {
s.l.RLock() return s.id
defer s.l.RUnlock()
id := s.id
return id
} }
//emitData combines the eventName and data into a payload that is understood //emitData combines the eventName and data into a payload that is understood
//by the sac-sock protocol. It will return either a string or a []byte //by the sac-sock protocol.
func emitData(eventName string, data interface{}) interface{} { func emitData(eventName string, data interface{}) ([]byte, int) {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
buf.WriteString(eventName) buf.WriteString(eventName)
buf.WriteByte(startOfHeaderByte) buf.WriteByte(startOfHeaderByte)
@@ -149,13 +137,13 @@ func emitData(eventName string, data interface{}) interface{} {
buf.WriteString(typeStr) buf.WriteString(typeStr)
buf.WriteByte(startOfDataByte) buf.WriteByte(startOfDataByte)
buf.WriteString(d) buf.WriteString(d)
return buf.String() return buf.Bytes(), websocket.TextMessage
case []byte: case []byte:
buf.WriteString(typeBin) buf.WriteString(typeBin)
buf.WriteByte(startOfDataByte) buf.WriteByte(startOfDataByte)
buf.Write(d) buf.Write(d)
return buf.Bytes() return buf.Bytes(), websocket.BinaryMessage
default: default:
buf.WriteString(typeJSON) buf.WriteString(typeJSON)
@@ -166,7 +154,7 @@ func emitData(eventName string, data interface{}) interface{} {
} else { } else {
buf.Write(jsonData) buf.Write(jsonData)
} }
return buf.String() return buf.Bytes(), websocket.TextMessage
} }
} }
@@ -184,10 +172,7 @@ func (s *Socket) Close() {
defer log.Debug.Println(s.ID(), "disconnected") defer log.Debug.Println(s.ID(), "disconnected")
err := s.ws.Close() s.ws.Close()
if err != nil {
log.Err.Println(err)
}
rooms := s.GetRooms() rooms := s.GetRooms()
@@ -205,3 +190,21 @@ func (s *Socket) Close() {
s.serv.hub.removeSocket(s) s.serv.hub.removeSocket(s)
} }
type rng struct {
r *rand.Rand
mu *sync.Mutex
}
func (r *rng) Read(b []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.r.Read(b)
}
func newRNG() *rng {
return &rng{
r: rand.New(rand.NewSource(time.Now().UnixNano())),
mu: &sync.Mutex{},
}
}

View File

@@ -1,28 +0,0 @@
/*
Package tools is really just used during socket creation to generate random numbers for socket IDs.
*/
package tools
import (
crand "crypto/rand"
"fmt"
"io"
"math/rand"
"time"
)
func RandomInt(min, max int) int {
rand.Seed(time.Now().UnixNano())
return rand.Intn(max-min+1) + min
}
func RandomInt64(min, max int64) int64 {
rand.Seed(time.Now().UnixNano())
return rand.Int63n(max-min+1) + min
}
func UID() string {
uid := make([]byte, 16)
io.ReadFull(crand.Reader, uid)
return fmt.Sprintf("%x", uid)
}