diff --git a/plugin/rtsp/pkg/connection.go b/plugin/rtsp/pkg/connection.go index 5ec94cb..10a7921 100644 --- a/plugin/rtsp/pkg/connection.go +++ b/plugin/rtsp/pkg/connection.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + "m7s.live/v5/pkg" "m7s.live/v5/pkg/task" "m7s.live/v5" @@ -357,7 +358,9 @@ func (c *NetConnection) Receive(sendMode bool, onReceive func(byte, []byte) erro c.MemoryAllocator.Free(buf) return } else if onReceive != nil { - onReceive(channelID, buf) + if err := onReceive(channelID, buf); err != nil { + c.MemoryAllocator.Free(buf) + } } else { c.MemoryAllocator.Free(buf) } @@ -380,20 +383,36 @@ func (c *NetConnection) Receive(sendMode bool, onReceive func(byte, []byte) erro c.MemoryAllocator.Free(buf) return } - if channelID&1 == 0 { + + var needToFree = true // 默认需要释放内存 + if channelID&1 == 0 { // 偶数通道,RTP数据 if onReceive != nil { - if onReceive(channelID, buf) != nil { - c.MemoryAllocator.Free(buf) + err := onReceive(channelID, buf) + if err == nil { + // 如果回调返回nil,表示内存被接管 + needToFree = false + } else { + // 如果回调返回错误,检查是否是丢弃错误 + needToFree = (err != pkg.ErrDiscard) } continue } - } else if onRTCP != nil { - if onRTCP(channelID, buf) != nil { - c.MemoryAllocator.Free(buf) + } else if onRTCP != nil { // 奇数通道,RTCP数据 + err := onRTCP(channelID, buf) + if err == nil { + // 如果回调返回nil,表示内存被接管 + needToFree = false + } else { + // 如果回调返回错误,检查是否是丢弃错误 + needToFree = (err != pkg.ErrDiscard) } continue } - c.MemoryAllocator.Free(buf) + + // 如果需要释放内存,则释放 + if needToFree { + c.MemoryAllocator.Free(buf) + } } if ts.After(c.keepaliveTS) { diff --git a/plugin/rtsp/pkg/pull-proxy.go b/plugin/rtsp/pkg/pull-proxy.go index 340cc51..e2d0164 100644 --- a/plugin/rtsp/pkg/pull-proxy.go +++ b/plugin/rtsp/pkg/pull-proxy.go @@ -47,8 +47,13 @@ func (d *RTSPPullProxy) Start() (err error) { } func (d *RTSPPullProxy) Dispose() { - d.conn.NetConnection.Dispose() + if d.conn.NetConnection != nil { + _ = d.conn.Teardown() + d.conn.NetConnection.Dispose() + d.conn.NetConnection = nil + } d.TCPPullProxy.Dispose() + d.Info("RTSP pull proxy disposed and all resources cleaned up") } func (d *RTSPPullProxy) GetTickInterval() time.Duration { diff --git a/plugin/rtsp/pkg/push-proxy.go b/plugin/rtsp/pkg/push-proxy.go index e8475eb..7a25607 100644 --- a/plugin/rtsp/pkg/push-proxy.go +++ b/plugin/rtsp/pkg/push-proxy.go @@ -64,3 +64,19 @@ func (d *RTSPPushProxy) Tick(any) { } } } + +func (d *RTSPPushProxy) Dispose() { + // 先停止任何正在进行的操作 + if d.conn.NetConnection != nil { + // 尝试发送TEARDOWN信令 + _ = d.conn.Teardown() + + // 确保所有资源正确释放 + d.conn.NetConnection.Dispose() + d.conn.NetConnection = nil + } + + // 调用父类的Dispose方法 + d.TCPPushProxy.Dispose() + d.Info("RTSP push proxy disposed and all resources cleaned up") +} diff --git a/plugin/rtsp/pkg/transceiver.go b/plugin/rtsp/pkg/transceiver.go index bdb4646..c063fa0 100644 --- a/plugin/rtsp/pkg/transceiver.go +++ b/plugin/rtsp/pkg/transceiver.go @@ -489,7 +489,7 @@ func (r *Receiver) Receive() (err error) { audioFrame.Packets = []*rtp.Packet{packet} audioFrame.RTPCodecParameters = r.AudioCodecParameters audioFrame.SetAllocator(r.MemoryAllocator) - return nil + return pkg.ErrDiscard } else if packet.Timestamp != r.lastAudioPacketTS { // 时间戳有变化,重置检查 r.lastAudioPacketTS = packet.Timestamp @@ -506,7 +506,7 @@ func (r *Receiver) Receive() (err error) { if len(audioFrame.Packets) == 0 || packet.Timestamp == audioFrame.Packets[0].Timestamp { audioFrame.AddRecycleBytes(buf) audioFrame.Packets = append(audioFrame.Packets, packet) - return nil + return pkg.ErrDiscard } else { if err = r.WriteAudio(audioFrame); err != nil { return err @@ -516,7 +516,7 @@ func (r *Receiver) Receive() (err error) { audioFrame.Packets = []*rtp.Packet{packet} audioFrame.RTPCodecParameters = r.AudioCodecParameters audioFrame.SetAllocator(r.MemoryAllocator) - return nil + return pkg.ErrDiscard } case r.VideoChannelID: if !r.PubVideo { @@ -533,7 +533,7 @@ func (r *Receiver) Receive() (err error) { if len(videoFrame.Packets) == 0 || packet.Timestamp == videoFrame.Packets[0].Timestamp { videoFrame.AddRecycleBytes(buf) videoFrame.Packets = append(videoFrame.Packets, packet) - return nil + return pkg.ErrDiscard } else { // t := time.Now() if err = r.WriteVideo(videoFrame); err != nil { @@ -545,7 +545,7 @@ func (r *Receiver) Receive() (err error) { videoFrame.Packets = []*rtp.Packet{packet} videoFrame.RTPCodecParameters = r.VideoCodecParameters videoFrame.SetAllocator(r.MemoryAllocator) - return nil + return pkg.ErrDiscard } default: @@ -572,6 +572,18 @@ func (r *Receiver) Receive() (err error) { }) } +// 添加Dispose方法,清理资源 +func (r *Receiver) Dispose() { + // 清理可能持有的帧资源 + if r.Publisher != nil { + // 如果必要,这里可以添加额外的Publisher清理代码 + } + + // 调用基类Dispose + r.Stream.Dispose() + r.Stream.Info("Receiver disposed and resources cleaned up") +} + // 添加Dispose方法,清理UDP资源 func (s *Sender) Dispose() { // 释放UDP连接资源 diff --git a/plugin/rtsp/server.go b/plugin/rtsp/server.go index 2d0d3d8..62a7cc0 100644 --- a/plugin/rtsp/server.go +++ b/plugin/rtsp/server.go @@ -22,6 +22,22 @@ func (task *RTSPServer) Go() (err error) { var sender *Sender var req *util.Request var sendMode bool + + // 添加延迟函数在方法结束时清理资源 + defer func() { + if receiver != nil { + receiver.Dispose() + } + if sender != nil { + sender.Dispose() + } + // 确保任何残留资源被清理 + if task.NetConnection != nil { + task.NetConnection.Dispose() + } + task.Info("RTSP connection closed and resources cleaned up") + }() + for { req, err = task.ReadRequest() if err != nil { @@ -226,20 +242,31 @@ func (task *RTSPServer) Go() (err error) { case MethodRecord: res := &util.Response{Request: req} if err = task.WriteResponse(res); err != nil { + task.Error("Failed to write response", "error", err) return } + task.Info("Starting RTSP record session") err = receiver.Receive() + if err != nil { + task.Error("RTSP receive error", "error", err) + } return case MethodPlay: res := &util.Response{Request: req} if err = task.WriteResponse(res); err != nil { + task.Error("Failed to write response", "error", err) return } + task.Info("Starting RTSP play session") err = sender.Send() + if err != nil { + task.Error("RTSP send error", "error", err) + } return case MethodTeardown: res := &util.Response{Request: req} _ = task.WriteResponse(res) + task.Info("RTSP teardown received") return default: diff --git a/publisher.go b/publisher.go index 8f867d8..dbf2e5f 100644 --- a/publisher.go +++ b/publisher.go @@ -335,19 +335,18 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { p.VideoTrack.Set(t) p.Call(p.trackAdded) } - p.fixTimestamp(t, data) - defer t.SpeedControl(p.Speed) - oldCodecCtx := t.ICodecCtx err = data.Parse(t) if err != nil { return nil } + p.fixTimestamp(t, data) + defer t.SpeedControl(p.Speed) + oldCodecCtx := t.ICodecCtx codecCtxChanged := oldCodecCtx != t.ICodecCtx if err != nil { p.Error("parse", "err", err) return err } - if t.ICodecCtx == nil { return ErrUnsupportCodec } @@ -451,6 +450,10 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { p.AudioTrack.Set(t) p.Call(p.trackAdded) } + err = data.Parse(t) + if err != nil { + return + } p.fixTimestamp(t, data) defer t.SpeedControl(p.Speed) // 根据丢帧率进行音频帧丢弃 @@ -463,10 +466,6 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { } } oldCodecCtx := t.ICodecCtx - err = data.Parse(t) - if err != nil { - return - } codecCtxChanged := oldCodecCtx != t.ICodecCtx if t.ICodecCtx == nil { return ErrUnsupportCodec