diff --git a/client/xclient_test.go b/client/xclient_test.go index ca223b6..50dbd38 100644 --- a/client/xclient_test.go +++ b/client/xclient_test.go @@ -9,12 +9,44 @@ import ( "github.com/smallnest/rpcx/share" "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/_testutils" + "fmt" ) +func TestLoop(t *testing.T) { + opt := Option{ + Retries: 1, + RPCPath: share.DefaultRPCPath, + ConnectTimeout: 10 * time.Second, + SerializeType: protocol.Thrift, + CompressType: protocol.None, + BackupLatency: 10 * time.Millisecond, + } + + d := NewPeer2PeerDiscovery("tcp@127.0.0.1:8995", "desc=a test service") + xclient := NewXClient("Arith", Failtry, RandomSelect, d, opt) + + defer xclient.Close() + + + tick := time.NewTicker(2*time.Second) + for ti := range tick.C { + fmt.Println(ti) + args := testutils.ThriftArgs_{} + args.A = 200 + args.B = 100 + go func(){ + reply := testutils.ThriftReply{} + err := xclient.Call(context.Background(), "ThriftMul", &args, &reply) + fmt.Println(reply.C,err) + }() + } + +} + func TestXClient_Thrift(t *testing.T) { opt := Option{ - Retries: 3, + Retries: 1, RPCPath: share.DefaultRPCPath, ConnectTimeout: 10 * time.Second, SerializeType: protocol.Thrift, @@ -38,6 +70,7 @@ func TestXClient_Thrift(t *testing.T) { t.Fatalf("failed to call: %v", err) } + fmt.Println(reply.C) if reply.C != 20000 { t.Fatalf("expect 20000 but got %d", reply.C) } diff --git a/codec/codec.go b/codec/codec.go index d6fe540..8137768 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -2,7 +2,6 @@ package codec import ( "bytes" - "context" "encoding/json" "fmt" "reflect" @@ -113,7 +112,7 @@ func (c ThriftCodec) Encode(i interface{}) ([]byte, error) { Protocol: p, } t.Transport.Close() - return t.Write(context.Background(), i.(thrift.TStruct)) + return t.Write( i.(thrift.TStruct)) } func (c ThriftCodec) Decode(data []byte, i interface{}) error { diff --git a/server/common_test.go b/server/common_test.go new file mode 100644 index 0000000..47ad606 --- /dev/null +++ b/server/common_test.go @@ -0,0 +1,14 @@ +package server + +import ( + "testing" + "fmt" +) + +func TestGo1(t *testing.T) { + ch := make(chan struct{},1000) + ch<-struct{}{} + fmt.Println(len(ch)) + <-ch + fmt.Println(len(ch)) +} diff --git a/server/server.go b/server/server.go index bf4a50b..2bb6036 100644 --- a/server/server.go +++ b/server/server.go @@ -20,6 +20,9 @@ import ( "github.com/smallnest/rpcx/log" "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/share" + "os" + "os/signal" + "syscall" ) // ErrServerClosed is returned by the Server's Serve, ListenAndServe after a call to Shutdown or Close. @@ -65,7 +68,7 @@ type Server struct { doneChan chan struct{} seq uint64 - // inShutdown int32 + inShutdown int32 onShutdown []func() // TLSConfig for creating tls tcp connection. @@ -81,6 +84,10 @@ type Server struct { // AuthFunc can be used to auth. AuthFunc func(ctx context.Context, req *protocol.Message, token string) error + + ShutdownFunc func(s *Server) + + HandleMsgChan chan struct{} } // NewServer returns a server. @@ -90,6 +97,8 @@ func NewServer(options ...OptionFn) *Server { options: make(map[string]interface{}), } + s.HandleMsgChan = make(chan struct{}, 100000) + for _, op := range options { op(s) } @@ -147,9 +156,25 @@ func (s *Server) getDoneChan() <-chan struct{} { return s.doneChan } +func (s *Server)startShutdownListener() { + go func(s *Server) { + log.Info("server pid:", os.Getpid()) + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM) + si := <-c + if si.String() == "terminated" { + if nil != s.ShutdownFunc { + s.ShutdownFunc(s) + } + os.Exit(0) + } + }(s) +} + // Serve starts and listens RPC requests. // It is blocked until receiving connectings from clients. func (s *Server) Serve(network, address string) (err error) { + s.startShutdownListener() var ln net.Listener ln, err = s.makeListener(network, address) if err != nil { @@ -215,6 +240,7 @@ func (s *Server) serveListener(ln net.Listener) error { if tc, ok := conn.(*net.TCPConn); ok { tc.SetKeepAlive(true) tc.SetKeepAlivePeriod(3 * time.Minute) + tc.SetLinger(10) } s.mu.Lock() @@ -278,6 +304,11 @@ func (s *Server) serveConn(conn net.Conn) { s.Plugins.DoPostConnClose(conn) }() + if isShutdown(s) { + closeChannel(s,conn) + return + } + if tlsConn, ok := conn.(*tls.Conn); ok { if d := s.readTimeout; d != 0 { conn.SetReadDeadline(time.Now().Add(d)) @@ -292,9 +323,13 @@ func (s *Server) serveConn(conn net.Conn) { } r := bufio.NewReaderSize(conn, ReaderBuffsize) - //w := bufio.NewWriterSize(conn, WriterBuffsize) for { + if isShutdown(s) { + closeChannel(s,conn) + return + } + t0 := time.Now() if s.readTimeout != 0 { conn.SetReadDeadline(t0.Add(s.readTimeout)) @@ -313,6 +348,8 @@ func (s *Server) serveConn(conn net.Conn) { return } + s.HandleMsgChan <- struct{}{} + if s.writeTimeout != 0 { conn.SetWriteDeadline(t0.Add(s.writeTimeout)) } @@ -320,7 +357,6 @@ func (s *Server) serveConn(conn net.Conn) { ctx = context.WithValue(ctx, StartRequestContextKey, time.Now().UnixNano()) err = s.auth(ctx, req) if err != nil { - if !req.IsOneway() { res := req.Clone() res.SetMessageType(protocol.Response) @@ -337,11 +373,14 @@ func (s *Server) serveConn(conn net.Conn) { } else { s.Plugins.DoPreWriteResponse(ctx, req, nil) } - + <-s.HandleMsgChan protocol.FreeMsg(req) continue } go func() { + defer func(){ + <-s.HandleMsgChan + }() if req.IsHeartbeat() { req.SetMessageType(protocol.Response) data := req.Encode() @@ -387,6 +426,17 @@ func (s *Server) serveConn(conn net.Conn) { } } +func isShutdown(s *Server) (bool) { + return atomic.LoadInt32(&s.inShutdown) == 1 +} + +func closeChannel(s *Server,conn net.Conn) { + s.mu.Lock() + delete(s.activeConn, conn) + s.mu.Unlock() + conn.Close() +} + func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Message, err error) { err = s.Plugins.DoPreReadRequest(ctx) if err != nil { @@ -567,14 +617,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { // Close immediately closes all active net.Listeners. func (s *Server) Close() error { - s.mu.Lock() - defer s.mu.Unlock() s.closeDoneChanLocked() var err error if s.ln != nil { err = s.ln.Close() } - for c := range s.activeConn { c.Close() delete(s.activeConn, c) @@ -591,7 +638,7 @@ func (s *Server) RegisterOnShutdown(f func()) { s.mu.Unlock() } -// var shutdownPollInterval = 500 * time.Millisecond +var shutdownPollInterval = 500 * time.Millisecond // // Shutdown gracefully shuts down the server without interrupting any // // active connections. Shutdown works by first closing the @@ -600,51 +647,35 @@ func (s *Server) RegisterOnShutdown(f func()) { // // If the provided context expires before the shutdown is complete, // // Shutdown returns the context's error, otherwise it returns any // // error returned from closing the Server's underlying Listener. -// func (s *Server) Shutdown(ctx context.Context) error { -// atomic.AddInt32(&s.inShutdown, 1) -// defer atomic.AddInt32(&s.inShutdown, -1) +func (s *Server) Shutdown(ctx context.Context) error { + if atomic.CompareAndSwapInt32(&s.inShutdown, 0, 1) { + log.Info("shutdown begin") + ticker := time.NewTicker(shutdownPollInterval) + defer ticker.Stop() + for { + if s.checkProcessMsg() { + break + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } + s.Close() + log.Info("shutdown end") + } + return nil +} -// s.mu.Lock() -// err := s.ln.Close() -// s.closeDoneChanLocked() -// for _, f := range s.onShutdown { -// go f() -// } -// s.mu.Unlock() - -// ticker := time.NewTicker(shutdownPollInterval) -// defer ticker.Stop() -// for { -// if s.closeIdleConns() { -// return err -// } -// select { -// case <-ctx.Done(): -// return ctx.Err() -// case <-ticker.C: -// } -// } -// } - -// func (s *Server) closeIdleConns() { -// s.mu.Lock() -// defer s.mu.Unlock() -// quiescent := true - -// for c := range s.activeConn { -// // check whether the conn is idle -// st, ok := c.curState.Load().(ConnState) -// if !ok || st != StateIdle { -// quiescent = false -// continue -// } - -// s.Close() -// delete(s.activeConn, c) -// } - -// return quiescent -// } +func (s *Server) checkProcessMsg() (bool) { + size := len(s.HandleMsgChan) + log.Info("need handle msg size:",size) + if size == 0 { + return true + } + return false +} func (s *Server) closeDoneChanLocked() { ch := s.getDoneChanLocked() diff --git a/server/server_test.go b/server/server_test.go index f0de77c..9914f27 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -8,6 +8,8 @@ import ( "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/share" "github.com/smallnest/rpcx/_testutils" + "fmt" + "time" ) type Args struct { @@ -27,11 +29,11 @@ func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error { } func (t *Arith) ThriftMul(ctx context.Context, args *testutils.ThriftArgs_, reply *testutils.ThriftReply) error { - reply.C = args.A * args.B + reply.C = args.A * args.B + 11111111 + time.Sleep(10*time.Second) return nil } - func TestThrift(t *testing.T) { s := NewServer() s.RegisterName("Arith", new(Arith), "") @@ -39,7 +41,30 @@ func TestThrift(t *testing.T) { s.Register(new(Arith), "") } +func TestGo(t *testing.T) { + go func() { + ch := make(chan int, 1) + time.Sleep(2 * time.Second) + ch <- 1 + <-ch + fmt.Println("go") + }() + ch2 := make(chan struct{}, 1) + <-ch2 + fmt.Println("over") +} + +func TestShutdownHook(t *testing.T) { + s := NewServer() + s.ShutdownFunc = func(s *Server) { + ctx, _ := context.WithTimeout(context.Background(), 155*time.Second) + s.Shutdown(ctx) + } + s.RegisterName("Arith", new(Arith), "") + s.Serve("tcp", ":8995") + s.Register(new(Arith), "") +} func TestHandleRequest(t *testing.T) { //use jsoncodec diff --git a/sync/map.go b/sync/map.go new file mode 100644 index 0000000..a324cc0 --- /dev/null +++ b/sync/map.go @@ -0,0 +1,376 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" + "sync" +) + +// Map is a concurrent map with amortized-constant-time loads, stores, and deletes. +// It is safe for multiple goroutines to call a Map's methods concurrently. +// +// It is optimized for use in concurrent loops with keys that are +// stable over time, and either few steady-state stores, or stores +// localized to one goroutine per key. +// +// For use cases that do not share these attributes, it will likely have +// comparable or worse performance and worse type safety than an ordinary +// map paired with a read-write mutex. +// +// The zero Map is valid and empty. +// +// A Map must not be copied after first use. +type Map struct { + mu sync.Mutex + + // read contains the portion of the map's contents that are safe for + // concurrent access (with or without mu held). + // + // The read field itself is always safe to load, but must only be stored with + // mu held. + // + // Entries stored in read may be updated concurrently without mu, but updating + // a previously-expunged entry requires that the entry be copied to the dirty + // map and unexpunged with mu held. + read atomic.Value // readOnly + + // dirty contains the portion of the map's contents that require mu to be + // held. To ensure that the dirty map can be promoted to the read map quickly, + // it also includes all of the non-expunged entries in the read map. + // + // Expunged entries are not stored in the dirty map. An expunged entry in the + // clean map must be unexpunged and added to the dirty map before a new value + // can be stored to it. + // + // If the dirty map is nil, the next write to the map will initialize it by + // making a shallow copy of the clean map, omitting stale entries. + dirty map[interface{}]*entry + + // misses counts the number of loads since the read map was last updated that + // needed to lock mu to determine whether the key was present. + // + // Once enough misses have occurred to cover the cost of copying the dirty + // map, the dirty map will be promoted to the read map (in the unamended + // state) and the next store to the map will make a new dirty copy. + misses int +} + +// readOnly is an immutable struct stored atomically in the Map.read field. +type readOnly struct { + m map[interface{}]*entry + amended bool // true if the dirty map contains some key not in m. +} + +// expunged is an arbitrary pointer that marks entries which have been deleted +// from the dirty map. +var expunged = unsafe.Pointer(new(interface{})) + +// An entry is a slot in the map corresponding to a particular key. +type entry struct { + // p points to the interface{} value stored for the entry. + // + // If p == nil, the entry has been deleted and m.dirty == nil. + // + // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry + // is missing from m.dirty. + // + // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty + // != nil, in m.dirty[key]. + // + // An entry can be deleted by atomic replacement with nil: when m.dirty is + // next created, it will atomically replace nil with expunged and leave + // m.dirty[key] unset. + // + // An entry's associated value can be updated by atomic replacement, provided + // p != expunged. If p == expunged, an entry's associated value can be updated + // only after first setting m.dirty[key] = e so that lookups using the dirty + // map find the entry. + p unsafe.Pointer // *interface{} +} + +func newEntry(i interface{}) *entry { + return &entry{p: unsafe.Pointer(&i)} +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *Map) Load(key interface{}) (value interface{}, ok bool) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + // Avoid reporting a spurious miss if m.dirty got promoted while we were + // blocked on m.mu. (If further loads of the same key will not miss, it's + // not worth copying the dirty map for this key.) + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if !ok { + return nil, false + } + return e.load() +} + +func (e *entry) load() (value interface{}, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return nil, false + } + return *(*interface{})(p), true +} + +// Store sets the value for a key. +func (m *Map) Store(key, value interface{}) { + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok && e.tryStore(&value) { + return + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + // The entry was previously expunged, which implies that there is a + // non-nil dirty map and this entry is not in it. + m.dirty[key] = e + } + e.storeLocked(&value) + } else if e, ok := m.dirty[key]; ok { + e.storeLocked(&value) + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + } + m.mu.Unlock() +} + +// tryStore stores a value if the entry has not been expunged. +// +// If the entry is expunged, tryStore returns false and leaves the entry +// unchanged. +func (e *entry) tryStore(i *interface{}) bool { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return false + } + for { + if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { + return true + } + p = atomic.LoadPointer(&e.p) + if p == expunged { + return false + } + } +} + +// unexpungeLocked ensures that the entry is not marked as expunged. +// +// If the entry was previously expunged, it must be added to the dirty map +// before m.mu is unlocked. +func (e *entry) unexpungeLocked() (wasExpunged bool) { + return atomic.CompareAndSwapPointer(&e.p, expunged, nil) +} + +// storeLocked unconditionally stores a value to the entry. +// +// The entry must be known not to be expunged. +func (e *entry) storeLocked(i *interface{}) { + atomic.StorePointer(&e.p, unsafe.Pointer(i)) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + // Avoid locking if it's a clean hit. + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + actual, loaded, ok := e.tryLoadOrStore(value) + if ok { + return actual, loaded + } + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + m.dirty[key] = e + } + actual, loaded, _ = e.tryLoadOrStore(value) + } else if e, ok := m.dirty[key]; ok { + actual, loaded, _ = e.tryLoadOrStore(value) + m.missLocked() + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + actual, loaded = value, false + } + m.mu.Unlock() + + return actual, loaded +} + +// tryLoadOrStore atomically loads or stores a value if the entry is not +// expunged. +// +// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and +// returns with ok==false. +func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*interface{})(p), true, true + } + + // Copy the interface after the first load to make this method more amenable + // to escape analysis: if we hit the "load" path or the entry is expunged, we + // shouldn't bother heap-allocating. + ic := i + for { + if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { + return i, false, true + } + p = atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*interface{})(p), true, true + } + } +} + +// Delete deletes the value for a key. +func (m *Map) Delete(key interface{}) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + delete(m.dirty, key) + } + m.mu.Unlock() + } + if ok { + e.delete() + } +} + +func (e *entry) delete() (hadValue bool) { + for { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return false + } + if atomic.CompareAndSwapPointer(&e.p, p, nil) { + return true + } + } +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *Map) Range(f func(key, value interface{}) bool) { + // We need to be able to iterate over all of the keys that were already + // present at the start of the call to Range. + // If read.amended is false, then read.m satisfies that property without + // requiring us to hold m.mu for a long time. + read, _ := m.read.Load().(readOnly) + if read.amended { + // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) + // (assuming the caller does not break out early), so a call to Range + // amortizes an entire copy of the map: we can promote the dirty copy + // immediately! + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if read.amended { + read = readOnly{m: m.dirty} + m.read.Store(read) + m.dirty = nil + m.misses = 0 + } + m.mu.Unlock() + } + + for k, e := range read.m { + v, ok := e.load() + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +func (m *Map) missLocked() { + m.misses++ + if m.misses < len(m.dirty) { + return + } + m.read.Store(readOnly{m: m.dirty}) + m.dirty = nil + m.misses = 0 +} + +func (m *Map) dirtyLocked() { + if m.dirty != nil { + return + } + + read, _ := m.read.Load().(readOnly) + m.dirty = make(map[interface{}]*entry, len(read.m)) + for k, e := range read.m { + if !e.tryExpungeLocked() { + m.dirty[k] = e + } + } +} + +func (e *entry) tryExpungeLocked() (isExpunged bool) { + p := atomic.LoadPointer(&e.p) + for p == nil { + if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { + return true + } + p = atomic.LoadPointer(&e.p) + } + return p == expunged +}