support websocket

This commit is contained in:
smallnest
2021-03-30 20:52:51 +08:00
parent c584448849
commit 2b8912d3c6
5 changed files with 84 additions and 6 deletions

View File

@@ -10,6 +10,7 @@
- support streaming between server side and client side
- support DNS as service discovery
- support rpcx flow tracing
- support websocket as the transport like tcp,kcp and quic
## 6.0

View File

@@ -4,6 +4,7 @@ import (
"bufio"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/http"
@@ -11,6 +12,7 @@ import (
"github.com/smallnest/rpcx/log"
"github.com/smallnest/rpcx/share"
"golang.org/x/net/websocket"
)
type ConnFactoryFn func(c *Client, network, address string) (net.Conn, error)
@@ -30,6 +32,8 @@ func (c *Client) Connect(network, address string) error {
switch network {
case "http":
conn, err = newDirectHTTPConn(c, network, address)
case "ws", "wss":
conn, err = newDirectWSConn(c, network, address)
case "kcp":
conn, err = newDirectKCPConn(c, network, address)
case "quic":
@@ -64,7 +68,7 @@ func (c *Client) Connect(network, address string) error {
c.Conn = conn
c.r = bufio.NewReaderSize(conn, ReaderBuffsize)
//c.w = bufio.NewWriterSize(conn, WriterBuffsize)
// c.w = bufio.NewWriterSize(conn, WriterBuffsize)
// start reading and writing since connected
go c.input()
@@ -88,7 +92,7 @@ func newDirectConn(c *Client, network, address string) (net.Conn, error) {
Timeout: c.option.ConnectTimeout,
}
tlsConn, err = tls.DialWithDialer(dialer, network, address, c.option.TLSConfig)
//or conn:= tls.Client(netConn, &config)
// or conn:= tls.Client(netConn, &config)
conn = net.Conn(tlsConn)
} else {
conn, err = net.DialTimeout(network, address, c.option.ConnectTimeout)
@@ -122,7 +126,7 @@ func newDirectHTTPConn(c *Client, network, address string) (net.Conn, error) {
Timeout: c.option.ConnectTimeout,
}
tlsConn, err = tls.DialWithDialer(dialer, "tcp", address, c.option.TLSConfig)
//or conn:= tls.Client(netConn, &config)
// or conn:= tls.Client(netConn, &config)
conn = net.Conn(tlsConn)
} else {
@@ -157,3 +161,40 @@ func newDirectHTTPConn(c *Client, network, address string) (net.Conn, error) {
Err: err,
}
}
func newDirectWSConn(c *Client, network, address string) (net.Conn, error) {
if c == nil {
return nil, errors.New("empty client")
}
path := c.option.RPCPath
if path == "" {
path = share.DefaultRPCPath
}
var conn net.Conn
var err error
// url := "ws://localhost:12345/ws"
var url, origin string
if network == "ws" {
url = fmt.Sprintf("ws://%s/%s", address, path)
origin = fmt.Sprintf("http://%s", address)
} else {
url = fmt.Sprintf("wss://%s/%s", address, path)
origin = fmt.Sprintf("https://%s", address)
}
if c.option.TLSConfig != nil {
config, err := websocket.NewConfig(url, origin)
if err != nil {
return nil, err
}
config.TlsConfig = c.option.TLSConfig
conn, err = websocket.DialConfig(config)
} else {
conn, err = websocket.Dial(url, "", origin)
}
return conn, err
}

1
go.mod
View File

@@ -48,6 +48,7 @@ require (
github.com/vmihailenco/msgpack/v5 v5.2.0
github.com/xtaci/kcp-go v5.4.20+incompatible
go.opencensus.io v0.22.2
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
google.golang.org/protobuf v1.25.0
)

View File

@@ -2,6 +2,7 @@ package server
import (
"crypto/tls"
"errors"
"fmt"
"net"
)
@@ -13,6 +14,8 @@ func init() {
makeListeners["tcp4"] = tcpMakeListener("tcp4")
makeListeners["tcp6"] = tcpMakeListener("tcp6")
makeListeners["http"] = tcpMakeListener("tcp")
makeListeners["ws"] = tcpMakeListener("tcp")
makeListeners["wss"] = tcpMakeListener("tcp")
}
// RegisterMakeListener registers a MakeListener for network.
@@ -30,6 +33,11 @@ func (s *Server) makeListener(network, address string) (ln net.Listener, err err
if ml == nil {
return nil, fmt.Errorf("can not make listener for %s", network)
}
if network == "wss" && s.tlsConfig == nil {
return nil, errors.New("must set tlsconfig for wss")
}
return ml(s, address)
}
@@ -43,5 +51,4 @@ func tcpMakeListener(network string) MakeListener {
return ln, err
}
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/smallnest/rpcx/log"
"github.com/smallnest/rpcx/protocol"
"github.com/smallnest/rpcx/share"
"golang.org/x/net/websocket"
)
// ErrServerClosed is returned by the Server's Serve, ListenAndServe after a call to Shutdown or Close.
@@ -219,6 +220,11 @@ func (s *Server) Serve(network, address string) (err error) {
return nil
}
if network == "ws" || network == "wss" {
s.serveByWS(ln, "")
return nil
}
// try to start gateway
ln = s.startGateway(network, ln)
@@ -317,8 +323,22 @@ func (s *Server) serveByHTTP(ln net.Listener, rpcPath string) {
if rpcPath == "" {
rpcPath = share.DefaultRPCPath
}
http.Handle(rpcPath, s)
srv := &http.Server{Handler: nil}
mux := http.NewServeMux()
mux.Handle(rpcPath, s)
srv := &http.Server{Handler: mux}
srv.Serve(ln)
}
func (s *Server) serveByWS(ln net.Listener, rpcPath string) {
s.ln = ln
if rpcPath == "" {
rpcPath = share.DefaultRPCPath
}
mux := http.NewServeMux()
mux.Handle(rpcPath, websocket.Handler(s.ServeWS))
srv := &http.Server{Handler: mux}
srv.Serve(ln)
}
@@ -743,6 +763,14 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.serveConn(conn)
}
func (s *Server) ServeWS(conn *websocket.Conn) {
s.mu.Lock()
s.activeConn[conn] = struct{}{}
s.mu.Unlock()
s.serveConn(conn)
}
// Close immediately closes all active net.Listeners.
func (s *Server) Close() error {
s.mu.Lock()