improve code quality

This commit is contained in:
yuanjize
2019-11-22 20:48:06 +08:00
parent f0fe8130a4
commit deffb45b1e
7 changed files with 23 additions and 75 deletions

View File

@@ -109,8 +109,8 @@ func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, di
option: option,
}
servers := make(map[string]string)
pairs := discovery.GetServices()
servers := make(map[string]string,len(pairs))
for _, p := range pairs {
servers[p.Key] = p.Value
}
@@ -144,8 +144,8 @@ func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode S
serverMessageChan: serverMessageChan,
}
servers := make(map[string]string)
pairs := discovery.GetServices()
servers := make(map[string]string,len(pairs))
for _, p := range pairs {
servers[p.Key] = p.Value
}
@@ -199,7 +199,7 @@ func (c *xClient) Auth(auth string) {
// watch changes of service and update cached clients.
func (c *xClient) watch(ch chan []*KVPair) {
for pairs := range ch {
servers := make(map[string]string)
servers := make(map[string]string,len(pairs))
for _, p := range pairs {
servers[p.Key] = p.Value
}

View File

@@ -8,7 +8,7 @@ import (
// MultiError holds multiple errors
type MultiError struct {
Errors []error
sync.Mutex
mu sync.Mutex
}
// Error returns the message of the actual error
@@ -17,8 +17,8 @@ func (e *MultiError) Error() string {
}
func (e *MultiError) Append(err error) {
e.Lock()
defer e.Unlock()
e.mu.Lock()
defer e.mu.Unlock()
e.Errors = append(e.Errors, err)
}

View File

@@ -114,7 +114,7 @@ func (p *pluginContainer) Remove(plugin Plugin) {
return
}
var plugins []Plugin
plugins := make([]Plugin, 0, len(p.plugins))
for _, p := range p.plugins {
if p != plugin {
plugins = append(plugins, p)

View File

@@ -98,6 +98,9 @@ func NewServer(options ...OptionFn) *Server {
s := &Server{
Plugins: &pluginContainer{},
options: make(map[string]interface{}),
activeConn: make(map[net.Conn]struct{}),
doneChan: make(chan struct{}),
serviceMap: make(map[string]*service),
}
for _, op := range options {
@@ -119,8 +122,7 @@ func (s *Server) Address() net.Addr {
// ActiveClientConn returns active connections.
func (s *Server) ActiveClientConn() []net.Conn {
var result []net.Conn
result := make([]net.Conn, 0, len(s.activeConn))
s.mu.RLock()
for clientConn := range s.activeConn {
result = append(result, clientConn)
@@ -160,12 +162,6 @@ func (s *Server) SendMessage(conn net.Conn, servicePath, serviceMethod string, m
}
func (s *Server) getDoneChan() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
if s.doneChan == nil {
s.doneChan = make(chan struct{})
}
return s.doneChan
}
@@ -210,17 +206,11 @@ func (s *Server) Serve(network, address string) (err error) {
// creating a new service goroutine for each.
// The service goroutines read requests and then call services to reply to them.
func (s *Server) serveListener(ln net.Listener) error {
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
var tempDelay time.Duration
s.mu.Lock()
s.ln = ln
if s.activeConn == nil {
s.activeConn = make(map[net.Conn]struct{})
}
s.mu.Unlock()
for {
@@ -276,22 +266,12 @@ func (s *Server) serveListener(ln net.Listener) error {
func (s *Server) serveByHTTP(ln net.Listener, rpcPath string) {
s.ln = ln
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
if rpcPath == "" {
rpcPath = share.DefaultRPCPath
}
http.Handle(rpcPath, s)
srv := &http.Server{Handler: nil}
s.mu.Lock()
if s.activeConn == nil {
s.activeConn = make(map[net.Conn]struct{})
}
s.mu.Unlock()
srv.Serve(ln)
}
@@ -312,10 +292,6 @@ func (s *Server) serveConn(conn net.Conn) {
s.mu.Unlock()
conn.Close()
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
s.Plugins.DoPostConnClose(conn)
}()
@@ -630,7 +606,7 @@ var connected = "200 Connected to rpcx"
// ServeHTTP implements an http.Handler that answers RPC requests.
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
if req.Method != http.MethodConnect {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
@@ -724,22 +700,15 @@ func (s *Server) checkProcessMsg() bool {
}
func (s *Server) closeDoneChanLocked() {
ch := s.getDoneChanLocked()
select {
case <-ch:
case <-s.doneChan:
// Already closed. Don't close again.
default:
// Safe to close here. We're the only closer, guarded
// by s.mu.
close(ch)
// by s.mu.RegisterName
close(s.doneChan)
}
}
func (s *Server) getDoneChanLocked() chan struct{} {
if s.doneChan == nil {
s.doneChan = make(chan struct{})
}
return s.doneChan
}
var ip4Reg = regexp.MustCompile(`^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$`)

View File

@@ -83,7 +83,7 @@ func TestHandleRequest(t *testing.T) {
req.Payload = data
server := &Server{}
server := NewServer()
server.RegisterName("Arith", new(Arith), "")
res, err := server.handleRequest(context.Background(), req)
if err != nil {

View File

@@ -74,19 +74,12 @@ func (s *Server) Register(rcvr interface{}, metadata string) error {
if err != nil {
return err
}
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
return s.Plugins.DoRegister(sname, rcvr, metadata)
}
// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error {
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
s.Plugins.DoRegister(name, rcvr, metadata)
_, err := s.register(rcvr, name, true)
return err
@@ -102,9 +95,6 @@ func (s *Server) RegisterFunction(servicePath string, fn interface{}, metadata s
if err != nil {
return err
}
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
return s.Plugins.DoRegisterFunction(servicePath, fname, fn, metadata)
}
@@ -112,10 +102,6 @@ func (s *Server) RegisterFunction(servicePath string, fn interface{}, metadata s
// RegisterFunctionName is like RegisterFunction but uses the provided name for the function
// instead of the function's concrete type.
func (s *Server) RegisterFunctionName(servicePath string, name string, fn interface{}, metadata string) error {
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
s.Plugins.DoRegisterFunction(servicePath, name, fn, metadata)
_, err := s.registerFunction(servicePath, fn, name, true)
return err
@@ -124,9 +110,6 @@ func (s *Server) RegisterFunctionName(servicePath string, name string, fn interf
func (s *Server) register(rcvr interface{}, name string, useName bool) (string, error) {
s.serviceMapMu.Lock()
defer s.serviceMapMu.Unlock()
if s.serviceMap == nil {
s.serviceMap = make(map[string]*service)
}
service := new(service)
service.typ = reflect.TypeOf(rcvr)
@@ -170,9 +153,6 @@ func (s *Server) register(rcvr interface{}, name string, useName bool) (string,
func (s *Server) registerFunction(servicePath string, fn interface{}, name string, useName bool) (string, error) {
s.serviceMapMu.Lock()
defer s.serviceMapMu.Unlock()
if s.serviceMap == nil {
s.serviceMap = make(map[string]*service)
}
ss := s.serviceMap[servicePath]
if ss == nil {
@@ -322,10 +302,6 @@ func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
// UnregisterAll unregisters all services.
// You can call this method when you want to shutdown/upgrade this node.
func (s *Server) UnregisterAll() error {
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
var es []error
for k := range s.serviceMap {
err := s.Plugins.DoUnregister(k)

View File

@@ -19,9 +19,12 @@ type Context struct {
}
func NewContext(ctx context.Context) *Context {
tags := make(map[interface{}]interface{})
return &Context{Context: ctx, tags: tags}
return &Context{
Context: ctx,
tags: make(map[interface{}]interface{}),
}
}
func (c *Context) Value(key interface{}) interface{} {
if v, ok := c.tags[key]; ok {
return v