mirror of
https://github.com/eolinker/apinto
synced 2025-09-26 21:01:19 +08:00
993 lines
22 KiB
Go
993 lines
22 KiB
Go
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
* contributor license agreements. See the NOTICE file distributed with
|
|
* this work for additional information regarding copyright ownership.
|
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
* (the "License"); you may not use this file except in compliance with
|
|
* the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package getty
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"github.com/eolinker/eosc/log"
|
|
"io"
|
|
"net"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
import (
|
|
gxbytes "github.com/dubbogo/gost/bytes"
|
|
gxcontext "github.com/dubbogo/gost/context"
|
|
gxtime "github.com/dubbogo/gost/time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
perrors "github.com/pkg/errors"
|
|
|
|
uatomic "go.uber.org/atomic"
|
|
)
|
|
|
|
const (
|
|
maxReadBufLen = 4 * 1024
|
|
netIOTimeout = 1e9 // 1s
|
|
period = 60 * 1e9 // 1 minute
|
|
pendingDuration = 3e9
|
|
// MaxWheelTimeSpan 900s, 15 minute
|
|
MaxWheelTimeSpan = 900e9
|
|
maxPacketLen = 16 * 1024
|
|
|
|
defaultSessionName = "session"
|
|
defaultTCPSessionName = "tcp-session"
|
|
defaultUDPSessionName = "udp-session"
|
|
defaultWSSessionName = "ws-session"
|
|
defaultWSSSessionName = "wss-session"
|
|
outputFormat = "session %s, Read Bytes: %d, Write Bytes: %d, Read Pkgs: %d, Write Pkgs: %d"
|
|
)
|
|
|
|
var defaultTimerWheel *gxtime.TimerWheel
|
|
|
|
func init() {
|
|
gxtime.InitDefaultTimerWheel()
|
|
defaultTimerWheel = gxtime.GetDefaultTimerWheel()
|
|
}
|
|
|
|
// Session wrap connection between the server and the client
|
|
type Session interface {
|
|
Connection
|
|
Reset()
|
|
Conn() net.Conn
|
|
Stat() string
|
|
IsClosed() bool
|
|
// EndPoint get endpoint type
|
|
EndPoint() EndPoint
|
|
SetMaxMsgLen(int)
|
|
SetName(string)
|
|
SetEventListener(EventListener)
|
|
SetPkgHandler(ReadWriter)
|
|
SetReader(Reader)
|
|
SetWriter(Writer)
|
|
SetCronPeriod(int)
|
|
SetWaitTime(time.Duration)
|
|
GetAttribute(interface{}) interface{}
|
|
SetAttribute(interface{}, interface{})
|
|
RemoveAttribute(interface{})
|
|
|
|
// WritePkg the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
|
|
// for udp session, the first parameter should be UDPContext.
|
|
// totalBytesLength: @pkg stream bytes length after encoding @pkg.
|
|
// sendBytesLength: stream bytes length that sent out successfully.
|
|
// err: maybe it has illegal data, encoding error, or write out system error.
|
|
WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error)
|
|
WriteBytes([]byte) (int, error)
|
|
WriteBytesArray(...[]byte) (int, error)
|
|
Close()
|
|
}
|
|
|
|
// getty base session
|
|
type session struct {
|
|
name string
|
|
endPoint EndPoint
|
|
|
|
// net read Write
|
|
Connection
|
|
|
|
listener EventListener
|
|
|
|
// codec
|
|
reader Reader // @reader should be nil when @conn is a gettyWSConn object.
|
|
writer Writer
|
|
|
|
// handle logic
|
|
maxMsgLen int32
|
|
|
|
// heartbeat
|
|
period time.Duration
|
|
|
|
// done
|
|
wait time.Duration
|
|
once *sync.Once
|
|
done chan struct{}
|
|
|
|
// attribute
|
|
attrs *gxcontext.ValuesContext
|
|
|
|
// goroutines sync
|
|
grNum uatomic.Int32
|
|
lock sync.RWMutex
|
|
packetLock sync.RWMutex
|
|
}
|
|
|
|
func newSession(endPoint EndPoint, conn Connection) *session {
|
|
ss := &session{
|
|
name: defaultSessionName,
|
|
endPoint: endPoint,
|
|
|
|
Connection: conn,
|
|
|
|
maxMsgLen: maxReadBufLen,
|
|
|
|
period: period,
|
|
|
|
once: &sync.Once{},
|
|
done: make(chan struct{}),
|
|
wait: pendingDuration,
|
|
attrs: gxcontext.NewValuesContext(context.Background()),
|
|
}
|
|
|
|
ss.Connection.setSession(ss)
|
|
ss.SetWriteTimeout(netIOTimeout)
|
|
ss.SetReadTimeout(netIOTimeout)
|
|
|
|
return ss
|
|
}
|
|
|
|
func newTCPSession(conn net.Conn, endPoint EndPoint) Session {
|
|
c := newGettyTCPConn(conn)
|
|
session := newSession(endPoint, c)
|
|
session.name = defaultTCPSessionName
|
|
|
|
return session
|
|
}
|
|
|
|
func newUDPSession(conn *net.UDPConn, endPoint EndPoint) Session {
|
|
c := newGettyUDPConn(conn)
|
|
session := newSession(endPoint, c)
|
|
session.name = defaultUDPSessionName
|
|
|
|
return session
|
|
}
|
|
|
|
func newWSSession(conn *websocket.Conn, endPoint EndPoint) Session {
|
|
c := newGettyWSConn(conn)
|
|
session := newSession(endPoint, c)
|
|
session.name = defaultWSSessionName
|
|
|
|
return session
|
|
}
|
|
|
|
func (s *session) Reset() {
|
|
*s = session{
|
|
name: defaultSessionName,
|
|
once: &sync.Once{},
|
|
done: make(chan struct{}),
|
|
period: period,
|
|
wait: pendingDuration,
|
|
attrs: gxcontext.NewValuesContext(context.Background()),
|
|
}
|
|
}
|
|
|
|
func (s *session) Conn() net.Conn {
|
|
if tc, ok := s.Connection.(*gettyTCPConn); ok {
|
|
return tc.conn
|
|
}
|
|
|
|
if uc, ok := s.Connection.(*gettyUDPConn); ok {
|
|
return uc.conn
|
|
}
|
|
|
|
if wc, ok := s.Connection.(*gettyWSConn); ok {
|
|
return wc.conn.UnderlyingConn()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *session) EndPoint() EndPoint {
|
|
return s.endPoint
|
|
}
|
|
|
|
func (s *session) gettyConn() *gettyConn {
|
|
if tc, ok := s.Connection.(*gettyTCPConn); ok {
|
|
return &(tc.gettyConn)
|
|
}
|
|
|
|
if uc, ok := s.Connection.(*gettyUDPConn); ok {
|
|
return &(uc.gettyConn)
|
|
}
|
|
|
|
if wc, ok := s.Connection.(*gettyWSConn); ok {
|
|
return &(wc.gettyConn)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stat get the connect statistic data
|
|
func (s *session) Stat() string {
|
|
var conn *gettyConn
|
|
if conn = s.gettyConn(); conn == nil {
|
|
return ""
|
|
}
|
|
return fmt.Sprintf(
|
|
outputFormat,
|
|
s.sessionToken(),
|
|
conn.readBytes.Load(),
|
|
conn.writeBytes.Load(),
|
|
conn.readPkgNum.Load(),
|
|
conn.writePkgNum.Load(),
|
|
)
|
|
}
|
|
|
|
// IsClosed check whether the session has been closed.
|
|
func (s *session) IsClosed() bool {
|
|
select {
|
|
case <-s.done:
|
|
return true
|
|
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// SetMaxMsgLen set maximum package length of every package in (EventListener)OnMessage(@pkgs)
|
|
func (s *session) SetMaxMsgLen(length int) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.maxMsgLen = int32(length)
|
|
}
|
|
|
|
// SetName set session name
|
|
func (s *session) SetName(name string) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.name = name
|
|
}
|
|
|
|
// SetEventListener set event listener
|
|
func (s *session) SetEventListener(listener EventListener) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.listener = listener
|
|
}
|
|
|
|
// SetPkgHandler set package handler
|
|
func (s *session) SetPkgHandler(handler ReadWriter) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.reader = handler
|
|
s.writer = handler
|
|
}
|
|
|
|
func (s *session) SetReader(reader Reader) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.reader = reader
|
|
}
|
|
|
|
func (s *session) SetWriter(writer Writer) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.writer = writer
|
|
}
|
|
|
|
// SetCronPeriod period is in millisecond. Websocket session will send ping frame automatically every peroid.
|
|
func (s *session) SetCronPeriod(period int) {
|
|
if period < 1 {
|
|
panic("@period < 1")
|
|
}
|
|
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
s.period = time.Duration(period) * time.Millisecond
|
|
}
|
|
|
|
// SetWaitTime set maximum wait time when session got error or got exit signal
|
|
func (s *session) SetWaitTime(waitTime time.Duration) {
|
|
if waitTime < 1 {
|
|
panic("@wait < 1")
|
|
}
|
|
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
s.wait = waitTime
|
|
}
|
|
|
|
// GetAttribute get attribute of key @session:key
|
|
func (s *session) GetAttribute(key interface{}) interface{} {
|
|
s.lock.RLock()
|
|
if s.attrs == nil {
|
|
s.lock.RUnlock()
|
|
return nil
|
|
}
|
|
ret, flag := s.attrs.Get(key)
|
|
s.lock.RUnlock()
|
|
|
|
if !flag {
|
|
return nil
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
// SetAttribute set attribute of key @session:key
|
|
func (s *session) SetAttribute(key interface{}, value interface{}) {
|
|
s.lock.Lock()
|
|
if s.attrs != nil {
|
|
s.attrs.Set(key, value)
|
|
}
|
|
s.lock.Unlock()
|
|
}
|
|
|
|
// RemoveAttribute remove attribute of key @session:key
|
|
func (s *session) RemoveAttribute(key interface{}) {
|
|
s.lock.Lock()
|
|
if s.attrs != nil {
|
|
s.attrs.Delete(key)
|
|
}
|
|
s.lock.Unlock()
|
|
}
|
|
|
|
func (s *session) sessionToken() string {
|
|
if s.IsClosed() || s.Connection == nil {
|
|
return "session-closed"
|
|
}
|
|
|
|
return fmt.Sprintf("{%s:%s:%d:%s<->%s}",
|
|
s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
|
|
}
|
|
|
|
func (s *session) WritePkg(pkg interface{}, timeout time.Duration) (int, int, error) {
|
|
if pkg == nil {
|
|
return 0, 0, fmt.Errorf("@pkg is nil")
|
|
}
|
|
if s.IsClosed() {
|
|
return 0, 0, ErrSessionClosed
|
|
}
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
const size = 64 << 10
|
|
rBuf := make([]byte, size)
|
|
rBuf = rBuf[:runtime.Stack(rBuf, false)]
|
|
log.Errorf("[session.WritePkg] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
|
|
}
|
|
}()
|
|
|
|
pkgBytes, err := s.writer.Write(s, pkg)
|
|
if err != nil {
|
|
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err)
|
|
return len(pkgBytes), 0, perrors.WithStack(err)
|
|
}
|
|
var udpCtxPtr *UDPContext
|
|
if udpCtx, ok := pkg.(UDPContext); ok {
|
|
udpCtxPtr = &udpCtx
|
|
} else if udpCtxP, ok := pkg.(*UDPContext); ok {
|
|
udpCtxPtr = udpCtxP
|
|
}
|
|
if udpCtxPtr != nil {
|
|
udpCtxPtr.Pkg = pkgBytes
|
|
pkg = *udpCtxPtr
|
|
} else {
|
|
pkg = pkgBytes
|
|
}
|
|
s.packetLock.RLock()
|
|
defer s.packetLock.RUnlock()
|
|
if 0 < timeout {
|
|
s.Connection.SetWriteTimeout(timeout)
|
|
}
|
|
var succssCount int
|
|
succssCount, err = s.Connection.send(pkg)
|
|
if err != nil {
|
|
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
|
|
return len(pkgBytes), succssCount, perrors.WithStack(err)
|
|
}
|
|
return len(pkgBytes), succssCount, nil
|
|
}
|
|
|
|
// WriteBytes for codecs
|
|
func (s *session) WriteBytes(pkg []byte) (int, error) {
|
|
if s.IsClosed() {
|
|
return 0, ErrSessionClosed
|
|
}
|
|
|
|
leftPackageSize, totalSize, writeSize := len(pkg), len(pkg), 0
|
|
if leftPackageSize > maxPacketLen {
|
|
s.packetLock.Lock()
|
|
defer s.packetLock.Unlock()
|
|
} else {
|
|
s.packetLock.RLock()
|
|
defer s.packetLock.RUnlock()
|
|
}
|
|
|
|
for leftPackageSize > maxPacketLen {
|
|
_, err := s.Connection.send(pkg[writeSize:(writeSize + maxPacketLen)])
|
|
if err != nil {
|
|
return writeSize, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
|
|
}
|
|
leftPackageSize -= maxPacketLen
|
|
writeSize += maxPacketLen
|
|
}
|
|
|
|
if leftPackageSize == 0 {
|
|
return writeSize, nil
|
|
}
|
|
|
|
_, err := s.Connection.send(pkg[writeSize:])
|
|
if err != nil {
|
|
return writeSize, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
|
|
}
|
|
|
|
return totalSize, nil
|
|
}
|
|
|
|
// WriteBytesArray Write multiple packages at once. so we invoke write sys.call just one time.
|
|
func (s *session) WriteBytesArray(pkgs ...[]byte) (int, error) {
|
|
if s.IsClosed() {
|
|
return 0, ErrSessionClosed
|
|
}
|
|
if len(pkgs) == 1 {
|
|
return s.WriteBytes(pkgs[0])
|
|
}
|
|
|
|
// reduce syscall and memcopy for multiple packages
|
|
if _, ok := s.Connection.(*gettyTCPConn); ok {
|
|
s.packetLock.RLock()
|
|
defer s.packetLock.RUnlock()
|
|
lg, err := s.Connection.send(pkgs)
|
|
if err != nil {
|
|
return 0, perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
|
|
}
|
|
return lg, nil
|
|
}
|
|
|
|
// get len
|
|
var (
|
|
l int
|
|
wlg int
|
|
err error
|
|
length int
|
|
arrp *[]byte
|
|
arr []byte
|
|
)
|
|
length = 0
|
|
for i := 0; i < len(pkgs); i++ {
|
|
length += len(pkgs[i])
|
|
}
|
|
|
|
// merge the pkgs
|
|
arrp = gxbytes.AcquireBytes(length)
|
|
defer gxbytes.ReleaseBytes(arrp)
|
|
arr = *arrp
|
|
|
|
l = 0
|
|
for i := 0; i < len(pkgs); i++ {
|
|
copy(arr[l:], pkgs[i])
|
|
l += len(pkgs[i])
|
|
}
|
|
|
|
wlg, err = s.WriteBytes(arr)
|
|
if err != nil {
|
|
return 0, perrors.WithStack(err)
|
|
}
|
|
|
|
num := len(pkgs) - 1
|
|
for i := 0; i < num; i++ {
|
|
s.incWritePkgNum()
|
|
}
|
|
|
|
return wlg, nil
|
|
}
|
|
|
|
func heartbeat(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
|
|
ss, _ := arg.(*session)
|
|
if ss == nil || ss.IsClosed() {
|
|
return ErrSessionClosed
|
|
}
|
|
|
|
f := func() {
|
|
wsConn, wsFlag := ss.Connection.(*gettyWSConn)
|
|
if wsFlag {
|
|
err := wsConn.writePing()
|
|
if err != nil {
|
|
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
|
|
}
|
|
}
|
|
|
|
ss.listener.OnCron(ss)
|
|
}
|
|
|
|
// if enable task pool, run @f asynchronously.
|
|
if taskPool := ss.EndPoint().GetTaskPool(); taskPool != nil {
|
|
taskPool.AddTaskAlways(f)
|
|
return nil
|
|
}
|
|
f()
|
|
return nil
|
|
}
|
|
|
|
// func (s *session) RunEventLoop() {
|
|
func (s *session) run() {
|
|
if s.Connection == nil || s.listener == nil || s.writer == nil {
|
|
errStr := fmt.Sprintf("session{name:%s, conn:%#v, listener:%#v, writer:%#v}",
|
|
s.name, s.Connection, s.listener, s.writer)
|
|
log.Error(errStr)
|
|
panic(errStr)
|
|
}
|
|
|
|
// call session opened
|
|
s.UpdateActive()
|
|
if err := s.listener.OnOpen(s); err != nil {
|
|
log.Errorf("[OnOpen] session %s, error: %#v", s.Stat(), err)
|
|
s.Close()
|
|
return
|
|
}
|
|
|
|
if _, err := defaultTimerWheel.AddTimer(heartbeat, gxtime.TimerLoop, s.period, s); err != nil {
|
|
panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel err:%v", s.Stat(), err))
|
|
}
|
|
|
|
s.grNum.Add(1)
|
|
// start read gr
|
|
go s.handlePackage()
|
|
}
|
|
|
|
func (s *session) addTask(pkg interface{}) {
|
|
f := func() {
|
|
s.listener.OnMessage(s, pkg)
|
|
s.incReadPkgNum()
|
|
}
|
|
if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
|
|
taskPool.AddTaskAlways(f)
|
|
return
|
|
}
|
|
f()
|
|
}
|
|
|
|
func (s *session) handlePackage() {
|
|
var err error
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
const size = 64 << 10
|
|
rBuf := make([]byte, size)
|
|
rBuf = rBuf[:runtime.Stack(rBuf, false)]
|
|
log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
|
|
}
|
|
grNum := s.grNum.Add(-1)
|
|
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
|
|
s.stop()
|
|
if err != nil {
|
|
log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), perrors.WithStack(err))
|
|
if s != nil || s.listener != nil {
|
|
s.listener.OnError(s, err)
|
|
}
|
|
}
|
|
|
|
s.listener.OnClose(s)
|
|
s.gc()
|
|
}()
|
|
|
|
if _, ok := s.Connection.(*gettyTCPConn); ok {
|
|
if s.reader == nil {
|
|
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
|
|
log.Error(errStr)
|
|
panic(errStr)
|
|
}
|
|
|
|
err = s.handleTCPPackage()
|
|
} else if _, ok := s.Connection.(*gettyWSConn); ok {
|
|
err = s.handleWSPackage()
|
|
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
|
|
err = s.handleUDPPackage()
|
|
} else {
|
|
panic(fmt.Sprintf("unknown type session{%#v}", s))
|
|
}
|
|
}
|
|
|
|
// get package from tcp stream(packet)
|
|
func (s *session) handleTCPPackage() error {
|
|
var (
|
|
ok bool
|
|
err error
|
|
netError net.Error
|
|
conn *gettyTCPConn
|
|
exit bool
|
|
bufLen int
|
|
pkgLen int
|
|
buf []byte
|
|
pktBuf *gxbytes.Buffer
|
|
pkg interface{}
|
|
)
|
|
|
|
pktBuf = gxbytes.NewBuffer(nil)
|
|
|
|
conn = s.Connection.(*gettyTCPConn)
|
|
for {
|
|
if s.IsClosed() {
|
|
err = nil
|
|
// do not handle the left stream in pktBuf and exit asap.
|
|
// it is impossible packing a package by the left stream.
|
|
break
|
|
}
|
|
|
|
bufLen = 0
|
|
for {
|
|
// for clause for the network timeout condition check
|
|
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
|
|
buf = pktBuf.WriteNextBegin(maxReadBufLen)
|
|
bufLen, err = conn.recv(buf)
|
|
if err != nil {
|
|
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
|
|
break
|
|
}
|
|
if perrors.Cause(err) == io.EOF {
|
|
log.Infof("%s, session.conn read EOF, client send over, session exit", s.sessionToken())
|
|
err = nil
|
|
exit = true
|
|
if bufLen != 0 {
|
|
// as https://github.com/apache/dubbo-getty/issues/77#issuecomment-939652203
|
|
// this branch is impossible. Even if it happens, the bufLen will be zero and the error
|
|
// is io.EOF when getty continues to read the socket.
|
|
exit = false
|
|
log.Infof("%s, session.conn read EOF, while the bufLen(%d) is non-zero.", s.sessionToken())
|
|
}
|
|
break
|
|
}
|
|
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
|
|
exit = true
|
|
}
|
|
break
|
|
}
|
|
if 0 != bufLen {
|
|
pktBuf.WriteNextEnd(bufLen)
|
|
for {
|
|
if pktBuf.Len() <= 0 {
|
|
break
|
|
}
|
|
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
|
|
// for case 3/case 4
|
|
if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) {
|
|
err = perrors.Errorf("pkgLen %d > session max message len %d", pkgLen, s.maxMsgLen)
|
|
}
|
|
// handle case 1
|
|
if err != nil {
|
|
log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v",
|
|
s.sessionToken(), pkgLen, perrors.WithStack(err))
|
|
exit = true
|
|
break
|
|
}
|
|
// handle case 2/case 3
|
|
if pkg == nil {
|
|
break
|
|
}
|
|
// handle case 4
|
|
s.UpdateActive()
|
|
s.addTask(pkg)
|
|
pktBuf.Next(pkgLen)
|
|
// continue to handle case 5
|
|
}
|
|
}
|
|
if exit {
|
|
break
|
|
}
|
|
}
|
|
|
|
return perrors.WithStack(err)
|
|
}
|
|
|
|
// get package from udp packet
|
|
func (s *session) handleUDPPackage() error {
|
|
var (
|
|
ok bool
|
|
err error
|
|
netError net.Error
|
|
conn *gettyUDPConn
|
|
bufLen int
|
|
maxBufLen int
|
|
bufp *[]byte
|
|
buf []byte
|
|
addr *net.UDPAddr
|
|
pkgLen int
|
|
pkg interface{}
|
|
)
|
|
|
|
conn = s.Connection.(*gettyUDPConn)
|
|
maxBufLen = int(s.maxMsgLen + maxReadBufLen)
|
|
if int(s.maxMsgLen<<1) < bufLen {
|
|
maxBufLen = int(s.maxMsgLen << 1)
|
|
}
|
|
bufp = gxbytes.AcquireBytes(maxBufLen)
|
|
defer gxbytes.ReleaseBytes(bufp)
|
|
buf = *bufp
|
|
for {
|
|
if s.IsClosed() {
|
|
break
|
|
}
|
|
|
|
bufLen, addr, err = conn.recv(buf)
|
|
log.DebugF("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, perrors.WithStack(err))
|
|
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
log.Errorf("%s, [session.handleUDPPackage] = len:%d, error:%+v",
|
|
s.sessionToken(), bufLen, perrors.WithStack(err))
|
|
err = perrors.Wrapf(err, "conn.read()")
|
|
break
|
|
}
|
|
|
|
if bufLen == 0 {
|
|
log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, perrors.WithStack(err))
|
|
continue
|
|
}
|
|
|
|
if bufLen == len(connectPingPackage) && bytes.Equal(connectPingPackage, buf[:bufLen]) {
|
|
log.Infof("got %s connectPingPackage", addr)
|
|
continue
|
|
}
|
|
|
|
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
|
|
log.DebugF("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
|
|
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
|
|
err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen)
|
|
}
|
|
if err != nil {
|
|
log.Warnf("%s, [session.handleUDPPackage] = len:%d, error:%+v",
|
|
s.sessionToken(), pkgLen, perrors.WithStack(err))
|
|
continue
|
|
}
|
|
if pkgLen == 0 {
|
|
log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
|
|
continue
|
|
}
|
|
|
|
s.UpdateActive()
|
|
s.addTask(UDPContext{Pkg: pkg, PeerAddr: addr})
|
|
}
|
|
|
|
return perrors.WithStack(err)
|
|
}
|
|
|
|
// get package from websocket stream
|
|
func (s *session) handleWSPackage() error {
|
|
var (
|
|
ok bool
|
|
err error
|
|
netError net.Error
|
|
length int
|
|
conn *gettyWSConn
|
|
pkg []byte
|
|
unmarshalPkg interface{}
|
|
)
|
|
|
|
conn = s.Connection.(*gettyWSConn)
|
|
for {
|
|
if s.IsClosed() {
|
|
break
|
|
}
|
|
pkg, err = conn.recv()
|
|
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
log.Warnf("%s, [session.handleWSPackage] = error:%+v",
|
|
s.sessionToken(), perrors.WithStack(err))
|
|
return perrors.WithStack(err)
|
|
}
|
|
s.UpdateActive()
|
|
if s.reader != nil {
|
|
unmarshalPkg, length, err = s.reader.Read(s, pkg)
|
|
if err == nil && s.maxMsgLen > 0 && length > int(s.maxMsgLen) {
|
|
err = perrors.Errorf("Message Too Long, length %d, session max message len %d", length, s.maxMsgLen)
|
|
}
|
|
if err != nil {
|
|
log.Warnf("%s, [session.handleWSPackage] = len:%d, error:%+v",
|
|
s.sessionToken(), length, perrors.WithStack(err))
|
|
continue
|
|
}
|
|
|
|
s.addTask(unmarshalPkg)
|
|
} else {
|
|
s.addTask(pkg)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *session) stop() {
|
|
select {
|
|
case <-s.done: // s.done is a blocked channel. if it has not been closed, the default branch will be invoked.
|
|
return
|
|
|
|
default:
|
|
s.once.Do(func() {
|
|
// let read/Write timeout asap
|
|
now := time.Now()
|
|
if conn := s.Conn(); conn != nil {
|
|
conn.SetReadDeadline(now.Add(s.readTimeout()))
|
|
conn.SetWriteDeadline(now.Add(s.writeTimeout()))
|
|
}
|
|
close(s.done)
|
|
c := s.GetAttribute(sessionClientKey)
|
|
if clt, ok := c.(*client); ok {
|
|
clt.reConnect()
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *session) gc() {
|
|
var conn Connection
|
|
|
|
s.lock.Lock()
|
|
if s.attrs != nil {
|
|
s.attrs = nil
|
|
conn = s.Connection
|
|
s.Connection = nil
|
|
}
|
|
s.lock.Unlock()
|
|
|
|
go func() {
|
|
if conn != nil {
|
|
conn.close(int(s.wait))
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Close will be invoked by NewSessionCallback(if return error is not nil)
|
|
// or (session)handleLoop automatically. It's thread safe.
|
|
func (s *session) Close() {
|
|
s.stop()
|
|
log.Infof("%s closed now. its current gr num is %d", s.sessionToken(), s.grNum.Load())
|
|
}
|
|
|
|
// GetActive return connection's time
|
|
func (s *session) GetActive() time.Time {
|
|
if s == nil {
|
|
return launchTime
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
if s.Connection != nil {
|
|
return s.Connection.GetActive()
|
|
}
|
|
return launchTime
|
|
}
|
|
|
|
// UpdateActive update connection's active time
|
|
func (s *session) UpdateActive() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
|
|
if s.Connection != nil {
|
|
s.Connection.UpdateActive()
|
|
}
|
|
}
|
|
|
|
func (s *session) ID() uint32 {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
if s.Connection != nil {
|
|
return s.Connection.ID()
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (s *session) LocalAddr() string {
|
|
if s == nil {
|
|
return ""
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
if s.Connection != nil {
|
|
return s.Connection.LocalAddr()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (s *session) RemoteAddr() string {
|
|
if s == nil {
|
|
return ""
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
if s.Connection != nil {
|
|
return s.Connection.RemoteAddr()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (s *session) incReadPkgNum() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
if s.Connection != nil {
|
|
s.Connection.incReadPkgNum()
|
|
}
|
|
}
|
|
|
|
func (s *session) incWritePkgNum() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
if s.Connection != nil {
|
|
s.Connection.incWritePkgNum()
|
|
}
|
|
}
|
|
|
|
func (s *session) send(pkg interface{}) (int, error) {
|
|
if s == nil {
|
|
return 0, nil
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
if s.Connection != nil {
|
|
return s.Connection.send(pkg)
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
func (s *session) readTimeout() time.Duration {
|
|
if s == nil {
|
|
return time.Duration(0)
|
|
}
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
if s.Connection != nil {
|
|
return s.Connection.readTimeout()
|
|
}
|
|
return time.Duration(0)
|
|
}
|
|
|
|
func (s *session) setSession(ss Session) {
|
|
if s == nil {
|
|
return
|
|
}
|
|
s.lock.RLock()
|
|
if s.Connection != nil {
|
|
s.Connection.setSession(ss)
|
|
}
|
|
s.lock.RUnlock()
|
|
}
|