From 46ababe7a92f9ed9d152d2a745bf54acdf82fc16 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Wed, 10 Sep 2025 09:44:23 +0800 Subject: [PATCH] fix: rtsp client read timeout --- pkg/util/{buf-reader.go => buf_reader.go} | 29 +++++++++++++++++++ ...{buf-reader_test.go => buf_reader_test.go} | 0 pkg/util/rm_disable.go | 4 +-- plugin/rtsp/pkg/connection.go | 8 +---- 4 files changed, 32 insertions(+), 9 deletions(-) rename pkg/util/{buf-reader.go => buf_reader.go} (86%) rename pkg/util/{buf-reader_test.go => buf_reader_test.go} (100%) diff --git a/pkg/util/buf-reader.go b/pkg/util/buf_reader.go similarity index 86% rename from pkg/util/buf-reader.go rename to pkg/util/buf_reader.go index 6c3cbaa..bbf9f97 100644 --- a/pkg/util/buf-reader.go +++ b/pkg/util/buf_reader.go @@ -5,6 +5,7 @@ import ( "net" "net/textproto" "strings" + "time" ) const defaultBufSize = 1 << 14 @@ -40,6 +41,34 @@ func NewBufReaderWithBufLen(reader io.Reader, bufLen int) (r *BufReader) { return } +// NewBufReaderWithTimeout 创建一个具有指定读取超时时间的 BufReader +func NewBufReaderWithTimeout(conn net.Conn, timeout time.Duration) (r *BufReader) { + r = &BufReader{ + Allocator: NewScalableMemoryAllocator(defaultBufSize), + BufLen: defaultBufSize, + feedData: func() error { + // 设置读取超时 + if conn != nil && timeout > 0 { + if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { + return err + } + } + buf, err := r.Allocator.Read(conn, r.BufLen) + if err != nil { + return err + } + n := len(buf) + r.totalRead += n + r.buf.Buffers = append(r.buf.Buffers, buf) + r.buf.Size += n + r.buf.Length += n + return nil + }, + } + r.buf.Memory = &Memory{} + return +} + func NewBufReaderBuffersChan(feedChan chan net.Buffers) (r *BufReader) { r = &BufReader{ feedData: func() error { diff --git a/pkg/util/buf-reader_test.go b/pkg/util/buf_reader_test.go similarity index 100% rename from pkg/util/buf-reader_test.go rename to pkg/util/buf_reader_test.go diff --git a/pkg/util/rm_disable.go b/pkg/util/rm_disable.go index f81cdf3..1f0a500 100644 --- a/pkg/util/rm_disable.go +++ b/pkg/util/rm_disable.go @@ -39,12 +39,12 @@ func (r *RecyclableMemory) Recycle() { func (r *RecyclableMemory) NextN(size int) (memory []byte) { memory = make([]byte, size) - r.AppendOne(memory) + r.PushOne(memory) return memory } func (r *RecyclableMemory) AddRecycleBytes(b []byte) { - r.AppendOne(b) + r.PushOne(b) } type MemoryAllocator struct { diff --git a/plugin/rtsp/pkg/connection.go b/plugin/rtsp/pkg/connection.go index f9f6995..2e60f3a 100644 --- a/plugin/rtsp/pkg/connection.go +++ b/plugin/rtsp/pkg/connection.go @@ -23,7 +23,7 @@ const Timeout = time.Second * 10 func NewNetConnection(conn net.Conn) *NetConnection { return &NetConnection{ Conn: conn, - BufReader: util.NewBufReader(conn), + BufReader: util.NewBufReaderWithTimeout(conn, Timeout), MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12), UserAgent: "monibuca" + m7s.Version, } @@ -187,9 +187,6 @@ func (c *NetConnection) WriteRequest(req *util.Request) (err error) { } func (c *NetConnection) ReadRequest() (req *util.Request, err error) { - if err = c.Conn.SetReadDeadline(time.Now().Add(Timeout)); err != nil { - return - } req, err = util.ReadRequest(c.BufReader) if err != nil { return @@ -243,9 +240,6 @@ func (c *NetConnection) WriteResponse(res *util.Response) (err error) { } func (c *NetConnection) ReadResponse() (res *util.Response, err error) { - if err := c.Conn.SetReadDeadline(time.Now().Add(Timeout)); err != nil { - return nil, err - } res, err = util.ReadResponse(c.BufReader) if err == nil { c.Debug("<-", "res", res.String())