From c9954149aa758ae9625f047271afa01facfefad2 Mon Sep 17 00:00:00 2001 From: pg Date: Mon, 21 Apr 2025 09:46:43 +0800 Subject: [PATCH] fix: Improve the method of utilizing UDP ports for RTSP --- plugin/rtsp/index.go | 76 ++++++++++------------------------ plugin/rtsp/pkg/transceiver.go | 15 ++----- plugin/rtsp/server.go | 13 ++++-- 3 files changed, 36 insertions(+), 68 deletions(-) diff --git a/plugin/rtsp/index.go b/plugin/rtsp/index.go index 35f4a73..614f313 100644 --- a/plugin/rtsp/index.go +++ b/plugin/rtsp/index.go @@ -2,6 +2,7 @@ package plugin_rtsp import ( "fmt" + "m7s.live/v5/pkg/util" "net" "strings" @@ -25,12 +26,8 @@ udp: type RTSPPlugin struct { m7s.Plugin - UDP struct { - PortRange string `desc:"UDP端口范围,用于UDP传输模式"` - PortPoolSize int `default:"100" desc:"预分配的UDP端口池大小"` - } - udpPortRange []uint16 // 解析后的UDP端口范围 - udpPortPool chan uint16 // UDP端口池,用于分配可用端口 + UdpPort util.Range[uint16] `default:"20001-30000" desc:"媒体端口范围"` //媒体端口范围 + udpPorts chan uint16 } func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask { @@ -58,60 +55,33 @@ func (p *RTSPPlugin) OnInit() (err error) { // 初始化UDP端口池 func (p *RTSPPlugin) initUDPPortPool() { - // 解析端口范围 - var startPort, endPort uint16 = 20000, 30000 // 默认值 - if p.UDP.PortRange != "" { - fmt.Sscanf(p.UDP.PortRange, "%d-%d", &startPort, &endPort) - } - - // 初始化端口范围 - p.udpPortRange = []uint16{startPort, endPort} - - // 确定池大小 - poolSize := p.UDP.PortPoolSize - if poolSize <= 0 { - poolSize = 100 - } - - // 创建端口池 - p.udpPortPool = make(chan uint16, poolSize) - - // 预填充端口池 - for i := 0; i < poolSize; i++ { - // 每个媒体需要2个端口(RTP+RTCP),所以步进2 - port := startPort + uint16(i*2) - if port+1 <= endPort { - p.udpPortPool <- port - } else { - break + if p.UdpPort.Valid() { + p.SetDescription("tcp", fmt.Sprintf("%d-%d", p.UdpPort[0], p.UdpPort[1])) + p.udpPorts = make(chan uint16, p.UdpPort.Size()) + for i := range p.UdpPort.Size() { + p.udpPorts <- p.UdpPort[0] + i } + } else { + p.Error("udp ports cannot init") + //p.SetDescription("tcp", fmt.Sprintf("%d", p.UdpPort[0])) + //tcpConfig := &p.GetCommonConf().TCP + //tcpConfig.ListenAddr = fmt.Sprintf(":%d", p.UdpPort[0]) } - - p.Info("UDP port pool initialized", "size", len(p.udpPortPool), "range", fmt.Sprintf("%d-%d", startPort, endPort)) } // 获取一个可用的UDP端口对(RTP端口和RTCP端口) -func (p *RTSPPlugin) GetUDPPort() (rtpPort uint16, rtcpPort uint16, err error) { - select { - case rtpPort = <-p.udpPortPool: - rtcpPort = rtpPort + 1 - return - default: - err = fmt.Errorf("no available UDP port in pool") - return - } -} - -// 释放一个UDP端口对回端口池 -func (p *RTSPPlugin) ReleaseUDPPort(rtpPort uint16) { - // 检查端口是否在有效范围内 - if rtpPort >= p.udpPortRange[0] && rtpPort+1 <= p.udpPortRange[1] { - // 尝试非阻塞方式放回池中,如果池已满则丢弃 +func (p *RTSPPlugin) GetUDPPort() (udpPort uint16, err error) { + if p.UdpPort.Valid() { select { - case p.udpPortPool <- rtpPort: - p.Debug("UDP port released back to pool", "port", rtpPort) + case udpPort = <-p.udpPorts: + defer func() { + p.udpPorts <- udpPort + }() default: - p.Debug("UDP port pool is full, port discarded", "port", rtpPort) + err = fmt.Errorf("no available tcp port") } + } else { + udpPort = p.UdpPort[0] } + return } diff --git a/plugin/rtsp/pkg/transceiver.go b/plugin/rtsp/pkg/transceiver.go index c063fa0..5a518df 100644 --- a/plugin/rtsp/pkg/transceiver.go +++ b/plugin/rtsp/pkg/transceiver.go @@ -17,10 +17,9 @@ import ( type Sender struct { *m7s.Subscriber Stream - UDPPorts map[int][]int // 保存媒体索引对应的UDP端口 [clientRTP, clientRTCP, serverRTP, serverRTCP] - UDPConns map[int][]*net.UDPConn // 保存媒体索引对应的UDP连接 [RTP发送连接, RTCP发送连接] - AllocatedUDPPorts []uint16 // 记录从端口池分配的UDP端口,用于连接结束时释放回池 - RTSPPlugin interface{ ReleaseUDPPort(port uint16) } // 指向RTSP插件的引用 + UDPPorts map[int][]int // 保存媒体索引对应的UDP端口 [clientRTP, clientRTCP, serverRTP, serverRTCP] + UDPConns map[int][]*net.UDPConn // 保存媒体索引对应的UDP连接 [RTP发送连接, RTCP发送连接] + AllocatedUDPPorts []uint16 // 记录从端口池分配的UDP端口,用于连接结束时释放回池 } type Receiver struct { @@ -609,14 +608,6 @@ func (s *Sender) Dispose() { s.UDPConns = nil } - // 释放分配的UDP端口 - if s.RTSPPlugin != nil && s.AllocatedUDPPorts != nil { - for _, port := range s.AllocatedUDPPorts { - s.RTSPPlugin.ReleaseUDPPort(port) - s.Stream.Debug("Released UDP port back to pool", "port", port) - } - } - // 调用基类Dispose s.Stream.Dispose() } diff --git a/plugin/rtsp/server.go b/plugin/rtsp/server.go index 62a7cc0..29f8b5e 100644 --- a/plugin/rtsp/server.go +++ b/plugin/rtsp/server.go @@ -161,6 +161,7 @@ func (task *RTSPServer) Go() (err error) { const udpTransport = "RTP/AVP" if strings.HasPrefix(tr, tcpTransport) { + task.Debug("into tcp play") // 原有的TCP传输处理逻辑 task.Session = util.RandomString(10) @@ -175,6 +176,8 @@ func (task *RTSPServer) Go() (err error) { res.Header.Set("Transport", tr[:len(tcpTransport)+3]) } } else if strings.HasPrefix(tr, udpTransport) && strings.Contains(tr, "unicast") && strings.Contains(tr, "client_port=") { + task.Debug("into udp play") + // UDP传输处理逻辑 task.Session = util.RandomString(10) @@ -189,13 +192,18 @@ func (task *RTSPServer) Go() (err error) { clientRTCPPort, _ := strconv.Atoi(matches[2]) // 从端口池获取服务器端口 - serverRTPPort, serverRTCPPort, err := task.conf.GetUDPPort() + serverRTPPort, err := task.conf.GetUDPPort() + if err != nil { + task.Error("Failed to get UDP port from pool", "error", err) + res.Status = "500 Internal Server Error: No available UDP ports" + break + } + serverRTCPPort, err := task.conf.GetUDPPort() if err != nil { task.Error("Failed to get UDP port from pool", "error", err) res.Status = "500 Internal Server Error: No available UDP ports" break } - // 在sender中记录这些端口信息 if sender.UDPPorts == nil { sender.UDPPorts = make(map[int][]int) @@ -210,7 +218,6 @@ func (task *RTSPServer) Go() (err error) { sender.AllocatedUDPPorts = append(sender.AllocatedUDPPorts, serverRTPPort) // 设置插件引用,用于在Dispose时释放端口 - sender.RTSPPlugin = task.conf // 设置传输响应 udpResponse := fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d;server_port=%d-%d",