Files
requests/conn.go
gospider a19160026b sync
2025-07-30 08:18:58 +08:00

170 lines
4.1 KiB
Go

package requests
import (
"context"
"errors"
"net"
"net/http"
"time"
"github.com/gospider007/http1"
"github.com/gospider007/tools"
)
func taskMain(conn http1.Conn, task *reqTask) (err error) {
defer func() {
if err != nil && task.reqCtx.option.ErrCallBack != nil {
task.reqCtx.err = err
if err2 := task.reqCtx.option.ErrCallBack(task.reqCtx); err2 != nil {
task.disRetry = true
err = err2
}
}
if err == nil {
task.cnl(tools.ErrNoErr)
} else {
task.cnl(err)
}
if err == nil && task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
if bodyCtx := task.reqCtx.response.Body.(*http1.Body).Context(); bodyCtx != nil {
select {
case <-task.reqCtx.Context().Done():
if context.Cause(task.reqCtx.Context()) != tools.ErrNoErr {
err = context.Cause(task.reqCtx.Context())
}
case <-bodyCtx.Done():
if context.Cause(bodyCtx) != tools.ErrNoErr {
err = context.Cause(bodyCtx)
}
}
}
}
if err != nil {
conn.CloseWithError(tools.WrapError(err, "taskMain close with error"))
}
}()
select {
case <-conn.Context().Done(): //force conn close
err = context.Cause(conn.Context())
return
default:
}
done := make(chan struct{})
var derr error
var response *http.Response
go func() {
defer close(done)
response, derr = conn.DoRequest(task.reqCtx.request, &http1.Option{OrderHeaders: task.reqCtx.option.orderHeaders.Data()})
}()
if task.reqCtx.option.ResponseHeaderTimeout > 0 {
select {
case <-conn.Context().Done(): //force conn close
err = tools.WrapError(context.Cause(conn.Context()), "taskMain delete ctx error")
case <-time.After(task.reqCtx.option.ResponseHeaderTimeout):
err = errors.New("ResponseHeaderTimeout error: ")
case <-task.ctx.Done():
err = context.Cause(task.ctx)
case <-done:
}
} else {
select {
case <-conn.Context().Done(): //force conn close
err = tools.WrapError(context.Cause(conn.Context()), "taskMain delete ctx error")
case <-task.ctx.Done():
err = context.Cause(task.ctx)
case <-done:
}
}
if err != nil {
err = tools.WrapError(err, "roundTrip error")
} else if derr != nil {
err = tools.WrapError(derr, "roundTrip error")
} else {
task.reqCtx.response = response
task.reqCtx.response.Request = task.reqCtx.request
}
if task.reqCtx.option.Logger != nil {
task.reqCtx.option.Logger(Log{
Id: task.reqCtx.requestId,
Time: time.Now(),
Type: LogType_ResponseHeader,
Msg: "response header",
})
}
return
}
func taskM(conn http1.Conn, task *reqTask) error {
err := taskMain(conn, task)
if err != nil {
return err
}
if task.reqCtx.response != nil && task.reqCtx.response.StatusCode == 101 {
return tools.ErrNoErr
}
return err
}
func rwMain(conn http1.Conn, task *reqTask, tasks chan *reqTask) (err error) {
defer func() {
if err != nil && err != tools.ErrNoErr {
conn.CloseWithError(tools.WrapError(err, "rwMain close with error"))
}
}()
if err = taskM(conn, task); err != nil {
return
}
for {
select {
case <-conn.Context().Done(): //force close conn
return errors.New("connecotr force close")
case task := <-tasks: //recv task
if task == nil {
return errors.New("task is nil")
}
err = taskM(conn, task)
if err != nil {
return
}
}
}
}
func newSSHConn(sshCon net.Conn, rawCon net.Conn) *sshConn {
return &sshConn{sshCon: sshCon, rawCon: rawCon}
}
type sshConn struct {
sshCon net.Conn
rawCon net.Conn
}
func (obj *sshConn) Read(b []byte) (n int, err error) {
return obj.sshCon.Read(b)
}
func (obj *sshConn) Write(b []byte) (n int, err error) {
return obj.sshCon.Write(b)
}
func (obj *sshConn) Close() error {
return obj.sshCon.Close()
}
func (obj *sshConn) LocalAddr() net.Addr {
return obj.sshCon.LocalAddr()
}
func (obj *sshConn) RemoteAddr() net.Addr {
return obj.sshCon.RemoteAddr()
}
func (obj *sshConn) SetDeadline(deadline time.Time) error {
return obj.rawCon.SetDeadline(deadline)
}
func (obj *sshConn) SetReadDeadline(deadline time.Time) error {
return obj.rawCon.SetReadDeadline(deadline)
}
func (obj *sshConn) SetWriteDeadline(deadline time.Time) error {
return obj.rawCon.SetWriteDeadline(deadline)
}