fix: 1.data.Parse before p.fixTimestamp

2.Solution to RTSP Memory Overflow Issues
This commit is contained in:
pg
2025-04-20 20:50:01 +08:00
committed by pggiroro
parent 8c6cb12d48
commit 324193d30e
6 changed files with 100 additions and 22 deletions

View File

@@ -12,6 +12,7 @@ import (
"sync/atomic"
"time"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/task"
"m7s.live/v5"
@@ -357,7 +358,9 @@ func (c *NetConnection) Receive(sendMode bool, onReceive func(byte, []byte) erro
c.MemoryAllocator.Free(buf)
return
} else if onReceive != nil {
onReceive(channelID, buf)
if err := onReceive(channelID, buf); err != nil {
c.MemoryAllocator.Free(buf)
}
} else {
c.MemoryAllocator.Free(buf)
}
@@ -380,20 +383,36 @@ func (c *NetConnection) Receive(sendMode bool, onReceive func(byte, []byte) erro
c.MemoryAllocator.Free(buf)
return
}
if channelID&1 == 0 {
var needToFree = true // 默认需要释放内存
if channelID&1 == 0 { // 偶数通道RTP数据
if onReceive != nil {
if onReceive(channelID, buf) != nil {
c.MemoryAllocator.Free(buf)
err := onReceive(channelID, buf)
if err == nil {
// 如果回调返回nil表示内存被接管
needToFree = false
} else {
// 如果回调返回错误,检查是否是丢弃错误
needToFree = (err != pkg.ErrDiscard)
}
continue
}
} else if onRTCP != nil {
if onRTCP(channelID, buf) != nil {
c.MemoryAllocator.Free(buf)
} else if onRTCP != nil { // 奇数通道RTCP数据
err := onRTCP(channelID, buf)
if err == nil {
// 如果回调返回nil表示内存被接管
needToFree = false
} else {
// 如果回调返回错误,检查是否是丢弃错误
needToFree = (err != pkg.ErrDiscard)
}
continue
}
c.MemoryAllocator.Free(buf)
// 如果需要释放内存,则释放
if needToFree {
c.MemoryAllocator.Free(buf)
}
}
if ts.After(c.keepaliveTS) {

View File

@@ -47,8 +47,13 @@ func (d *RTSPPullProxy) Start() (err error) {
}
func (d *RTSPPullProxy) Dispose() {
d.conn.NetConnection.Dispose()
if d.conn.NetConnection != nil {
_ = d.conn.Teardown()
d.conn.NetConnection.Dispose()
d.conn.NetConnection = nil
}
d.TCPPullProxy.Dispose()
d.Info("RTSP pull proxy disposed and all resources cleaned up")
}
func (d *RTSPPullProxy) GetTickInterval() time.Duration {

View File

@@ -64,3 +64,19 @@ func (d *RTSPPushProxy) Tick(any) {
}
}
}
func (d *RTSPPushProxy) Dispose() {
// 先停止任何正在进行的操作
if d.conn.NetConnection != nil {
// 尝试发送TEARDOWN信令
_ = d.conn.Teardown()
// 确保所有资源正确释放
d.conn.NetConnection.Dispose()
d.conn.NetConnection = nil
}
// 调用父类的Dispose方法
d.TCPPushProxy.Dispose()
d.Info("RTSP push proxy disposed and all resources cleaned up")
}

View File

@@ -489,7 +489,7 @@ func (r *Receiver) Receive() (err error) {
audioFrame.Packets = []*rtp.Packet{packet}
audioFrame.RTPCodecParameters = r.AudioCodecParameters
audioFrame.SetAllocator(r.MemoryAllocator)
return nil
return pkg.ErrDiscard
} else if packet.Timestamp != r.lastAudioPacketTS {
// 时间戳有变化,重置检查
r.lastAudioPacketTS = packet.Timestamp
@@ -506,7 +506,7 @@ func (r *Receiver) Receive() (err error) {
if len(audioFrame.Packets) == 0 || packet.Timestamp == audioFrame.Packets[0].Timestamp {
audioFrame.AddRecycleBytes(buf)
audioFrame.Packets = append(audioFrame.Packets, packet)
return nil
return pkg.ErrDiscard
} else {
if err = r.WriteAudio(audioFrame); err != nil {
return err
@@ -516,7 +516,7 @@ func (r *Receiver) Receive() (err error) {
audioFrame.Packets = []*rtp.Packet{packet}
audioFrame.RTPCodecParameters = r.AudioCodecParameters
audioFrame.SetAllocator(r.MemoryAllocator)
return nil
return pkg.ErrDiscard
}
case r.VideoChannelID:
if !r.PubVideo {
@@ -533,7 +533,7 @@ func (r *Receiver) Receive() (err error) {
if len(videoFrame.Packets) == 0 || packet.Timestamp == videoFrame.Packets[0].Timestamp {
videoFrame.AddRecycleBytes(buf)
videoFrame.Packets = append(videoFrame.Packets, packet)
return nil
return pkg.ErrDiscard
} else {
// t := time.Now()
if err = r.WriteVideo(videoFrame); err != nil {
@@ -545,7 +545,7 @@ func (r *Receiver) Receive() (err error) {
videoFrame.Packets = []*rtp.Packet{packet}
videoFrame.RTPCodecParameters = r.VideoCodecParameters
videoFrame.SetAllocator(r.MemoryAllocator)
return nil
return pkg.ErrDiscard
}
default:
@@ -572,6 +572,18 @@ func (r *Receiver) Receive() (err error) {
})
}
// 添加Dispose方法清理资源
func (r *Receiver) Dispose() {
// 清理可能持有的帧资源
if r.Publisher != nil {
// 如果必要这里可以添加额外的Publisher清理代码
}
// 调用基类Dispose
r.Stream.Dispose()
r.Stream.Info("Receiver disposed and resources cleaned up")
}
// 添加Dispose方法清理UDP资源
func (s *Sender) Dispose() {
// 释放UDP连接资源

View File

@@ -22,6 +22,22 @@ func (task *RTSPServer) Go() (err error) {
var sender *Sender
var req *util.Request
var sendMode bool
// 添加延迟函数在方法结束时清理资源
defer func() {
if receiver != nil {
receiver.Dispose()
}
if sender != nil {
sender.Dispose()
}
// 确保任何残留资源被清理
if task.NetConnection != nil {
task.NetConnection.Dispose()
}
task.Info("RTSP connection closed and resources cleaned up")
}()
for {
req, err = task.ReadRequest()
if err != nil {
@@ -226,20 +242,31 @@ func (task *RTSPServer) Go() (err error) {
case MethodRecord:
res := &util.Response{Request: req}
if err = task.WriteResponse(res); err != nil {
task.Error("Failed to write response", "error", err)
return
}
task.Info("Starting RTSP record session")
err = receiver.Receive()
if err != nil {
task.Error("RTSP receive error", "error", err)
}
return
case MethodPlay:
res := &util.Response{Request: req}
if err = task.WriteResponse(res); err != nil {
task.Error("Failed to write response", "error", err)
return
}
task.Info("Starting RTSP play session")
err = sender.Send()
if err != nil {
task.Error("RTSP send error", "error", err)
}
return
case MethodTeardown:
res := &util.Response{Request: req}
_ = task.WriteResponse(res)
task.Info("RTSP teardown received")
return
default:

View File

@@ -335,19 +335,18 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
p.VideoTrack.Set(t)
p.Call(p.trackAdded)
}
p.fixTimestamp(t, data)
defer t.SpeedControl(p.Speed)
oldCodecCtx := t.ICodecCtx
err = data.Parse(t)
if err != nil {
return nil
}
p.fixTimestamp(t, data)
defer t.SpeedControl(p.Speed)
oldCodecCtx := t.ICodecCtx
codecCtxChanged := oldCodecCtx != t.ICodecCtx
if err != nil {
p.Error("parse", "err", err)
return err
}
if t.ICodecCtx == nil {
return ErrUnsupportCodec
}
@@ -451,6 +450,10 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
p.AudioTrack.Set(t)
p.Call(p.trackAdded)
}
err = data.Parse(t)
if err != nil {
return
}
p.fixTimestamp(t, data)
defer t.SpeedControl(p.Speed)
// 根据丢帧率进行音频帧丢弃
@@ -463,10 +466,6 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
}
}
oldCodecCtx := t.ICodecCtx
err = data.Parse(t)
if err != nil {
return
}
codecCtxChanged := oldCodecCtx != t.ICodecCtx
if t.ICodecCtx == nil {
return ErrUnsupportCodec