优化连接池管理

This commit is contained in:
bxd
2023-08-15 22:59:07 +08:00
parent 99a612d5d1
commit c15488e88a

View File

@@ -4,7 +4,6 @@ import (
"bufio"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
@@ -25,28 +24,6 @@ type roundTripper interface {
RoundTrip(*http.Request) (*http.Response, error)
}
type Connecotr struct {
ctx context.Context
ctx2 context.Context
cnl context.CancelFunc
rawConn net.Conn
h2 bool
r *bufio.Reader
h2RawConn *http2.ClientConn
h2Ja3RawConn *h2ja3.ClientConn
}
func (obj *Connecotr) Close() error {
obj.cnl()
if obj.h2RawConn != nil {
obj.h2RawConn.Close()
}
if obj.h2Ja3RawConn != nil {
obj.h2Ja3RawConn.Close()
}
return obj.rawConn.Close()
}
type reqTask struct {
ctx context.Context //控制请求的生命周期
cnl context.CancelFunc
@@ -126,6 +103,9 @@ func (obj *RoundTripper) getConnPool(key string) *connPool {
func (obj *RoundTripper) putConnPool(key string, conn *Connecotr) {
obj.connsLock.Lock()
defer obj.connsLock.Unlock()
if !conn.h2 {
go conn.read()
}
pool, ok := obj.connPools[key]
if ok {
go pool.rwMain(conn)
@@ -157,6 +137,10 @@ func (obj *RoundTripper) dial(ctxData *reqCtxData, key string, req *http.Request
return nil, err
}
conne := new(Connecotr)
conne.rn = make(chan int)
conne.rc = make(chan []byte)
conne.ctx, conne.cnl = context.WithCancel(obj.ctx)
var h2 bool
if req.URL.Scheme == "https" {
@@ -190,55 +174,101 @@ func (obj *RoundTripper) dial(ctxData *reqCtxData, key string, req *http.Request
return conne, err
}
} else {
conne.r = bufio.NewReader(netConn)
conne.r = bufio.NewReader(conne)
}
return conne, err
}
type ClientConnState struct {
Closed bool
// Closing is whether the connection is in the process of
// closing. It may be closing due to shutdown, being a
// single-use connection, being marked as DoNotReuse, or
// having received a GOAWAY frame.
Closing bool
// StreamsActive is how many streams are active.
StreamsActive int
// StreamsReserved is how many streams have been reserved via
// ClientConn.ReserveNewRequest.
StreamsReserved int
// StreamsPending is how many requests have been sent in excess
// of the peer's advertised MaxConcurrentStreams setting and
// are waiting for other streams to complete.
StreamsPending int
// MaxConcurrentStreams is how many concurrent streams the
// peer advertised as acceptable. Zero means no SETTINGS
// frame has been received yet.
MaxConcurrentStreams uint32
// LastIdle, if non-zero, is when the connection last
// transitioned to idle state.
LastIdle time.Time
type Connecotr struct {
ctx context.Context
ctx2 context.Context
cnl context.CancelFunc
rawConn net.Conn
h2 bool
r *bufio.Reader
h2RawConn *http2.ClientConn
h2Ja3RawConn *h2ja3.ClientConn
rc chan []byte
rn chan int
isRead bool
}
func (obj *Connecotr) ping() error {
func (obj *Connecotr) Close() error {
obj.cnl()
if obj.h2RawConn != nil {
state := obj.h2RawConn.State()
if state.Closed || state.Closing {
return errors.New("h2 is close")
obj.h2RawConn.Close()
}
if obj.h2Ja3RawConn != nil {
obj.h2Ja3RawConn.Close()
}
return obj.rawConn.Close()
}
func (obj *Connecotr) read() {
defer obj.Close()
obj.isRead = true
con := make([]byte, 1024)
for {
i, err := obj.rawConn.Read(con)
if err != nil {
return
}
} else if obj.h2Ja3RawConn != nil {
state := obj.h2Ja3RawConn.State()
if state.Closed || state.Closing {
return errors.New("h2 is close")
b := con[:i]
for once := true; once || len(b) > 0; once = false {
select {
case obj.rc <- b:
nw := <-obj.rn
b = b[nw:]
case <-obj.ctx.Done():
return
}
}
}
_, err := obj.rawConn.Write(make([]byte, 0))
return err
}
func (obj *Connecotr) Read(b []byte) (int, error) {
if !obj.isRead {
return obj.rawConn.Read(b)
}
select {
case con := <-obj.rc:
i := copy(b, con)
select {
case obj.rn <- i:
return i, nil
case <-obj.ctx.Done():
return i, obj.ctx.Err()
}
case <-obj.ctx.Done():
return 0, obj.ctx.Err()
}
}
func (obj *Connecotr) Write(b []byte) (int, error) {
return obj.rawConn.Write(b)
}
func (obj *Connecotr) LocalAddr() net.Addr {
return obj.rawConn.LocalAddr()
}
func (obj *Connecotr) RemoteAddr() net.Addr {
return obj.rawConn.RemoteAddr()
}
func (obj *Connecotr) SetDeadline(t time.Time) error {
return obj.rawConn.SetDeadline(t)
}
func (obj *Connecotr) SetReadDeadline(t time.Time) error {
return obj.rawConn.SetReadDeadline(t)
}
func (obj *Connecotr) SetWriteDeadline(t time.Time) error {
return obj.rawConn.SetWriteDeadline(t)
}
func (obj *Connecotr) h2Closed() bool {
if obj.h2RawConn != nil {
state := obj.h2RawConn.State()
return state.Closed || state.Closing
} else if obj.h2Ja3RawConn != nil {
state := obj.h2Ja3RawConn.State()
return state.Closed || state.Closing
}
return false
}
type ReadWriteCloser struct {
@@ -249,8 +279,8 @@ type ReadWriteCloser struct {
conn *Connecotr
}
func (obj *ReadWriteCloser) Conn() net.Conn {
return obj.conn.rawConn
func (obj *ReadWriteCloser) Conn() *Connecotr {
return obj.conn
}
func (obj *ReadWriteCloser) Read(p []byte) (n int, err error) {
return obj.body.Read(p)
@@ -281,7 +311,7 @@ func http1Req(conn *Connecotr, task *reqTask) {
conn.Close()
}
}()
err := task.req.Write(conn.rawConn)
err := task.req.Write(conn)
if err != nil {
task.err = err
} else {
@@ -318,37 +348,23 @@ func (obj *connPool) rwMain(conn *Connecotr) {
}()
defer obj.total.Add(-1)
defer conn.Close()
wait := time.NewTimer(0)
defer wait.Stop()
defer conn.cnl()
go func() {
defer conn.cnl()
for {
wait.Reset(time.Second * 30)
select {
case <-conn.ctx.Done(): //连接池通知关闭,不用再监听了
return
case <-wait.C:
if conn.ping() != nil {
return
}
}
}
}()
for {
select {
case <-conn.ctx.Done(): //连接池通知关闭,等待连接被释放掉
<-conn.ctx2.Done()
return
case task := <-obj.tasks: //接收到任务
if conn.ping() != nil { //判断连接是否异常
select {
case obj.tasks <- task: //任务给池子里其它连接
case task.emptyPool <- struct{}{}: //告诉提交任务方,池子没有可用连接
if conn.h2 {
if conn.h2Closed() { //判断连接是否异常
select {
case obj.tasks <- task: //任务给池子里其它连接
case task.emptyPool <- struct{}{}: //告诉提交任务方,池子没有可用连接
}
return //由于连接异常直接结束
}
return //由于连接异常直接结束
}
if !conn.h2 {
go http2Req(conn, task)
} else {
select {
case <-conn.ctx2.Done(): //http1.1 连接被占用
default:
@@ -358,11 +374,6 @@ func (obj *connPool) rwMain(conn *Connecotr) {
}
continue //由于连接被占用,开始下一个循环
}
}
wait.Reset(time.Hour * 24 * 365) //停止健康检查
if conn.h2 {
go http2Req(conn, task)
} else {
go http1Req(conn, task)
}
//等待任务完成
@@ -371,7 +382,6 @@ func (obj *connPool) rwMain(conn *Connecotr) {
if task.req == nil || task.err != nil {
return
}
wait.Reset(time.Second * 30)
}
}
}