VideoFrame和AudioFrame增加track的指针方便访问track,rtp写入采用内存复用机制

This commit is contained in:
dexter
2023-02-26 10:36:37 +08:00
parent 4d008cfc5e
commit fd2c7b8cb6
11 changed files with 78 additions and 110 deletions

View File

@@ -52,7 +52,6 @@ global:
subvideo: true # 是否订阅视频流
iframeonly: false # 只订阅关键帧
waittimeout: 10 # 等待发布者的秒数,用于订阅尚未发布的流
rtpreorder : true # 启用RTP包乱序重排
enableavcc : true # 启用AVCC格式缓存用于rtmp协议
enablertp : true # 启用rtp格式缓存用于rtsp、websocket、gb28181协议
enableauth: true # 启用鉴权,详细查看鉴权机制

View File

@@ -21,26 +21,27 @@ func SplitAnnexB[T ~[]byte](frame T, process func(T), delimiter []byte) {
}
type RTPFrame struct {
rtp.Packet
*rtp.Packet
Raw []byte
}
func (rtp *RTPFrame) Clone() *RTPFrame {
return &RTPFrame{*rtp.Packet.Clone()}
func (r *RTPFrame) H264Type() (naluType codec.H264NALUType) {
return naluType.Parse(r.Payload[0])
}
func (r *RTPFrame) H265Type() (naluType codec.H265NALUType) {
return naluType.Parse(r.Payload[0])
}
func (rtp *RTPFrame) H264Type() (naluType codec.H264NALUType) {
return naluType.Parse(rtp.Payload[0])
func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
if r.Packet == nil {
r.Packet = &rtp.Packet{}
}
func (rtp *RTPFrame) H265Type() (naluType codec.H265NALUType) {
return naluType.Parse(rtp.Payload[0])
}
func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
if err := rtp.Packet.Unmarshal(raw); err != nil {
r.Raw = raw
if err := r.Packet.Unmarshal(raw); err != nil {
log.Error(err)
return nil
}
return rtp
return r
}
type BaseFrame struct {

View File

@@ -98,10 +98,11 @@ type AVTrack interface {
Attach()
Detach()
WriteAVCC(ts uint32, frame *util.BLL) error //写入AVCC格式的数据
WriteRTP([]byte)
WriteRTP(*util.ListItem[RTPFrame])
WriteRTPPack(*rtp.Packet)
Flush()
SetSpeedLimit(time.Duration)
GetRTPFromPool() *util.ListItem[RTPFrame]
}
type VideoTrack interface {
AVTrack

View File

@@ -112,7 +112,6 @@ type Engine struct {
Publish
Subscribe
HTTP
RTPReorder bool `default:"true"`
EnableAVCC bool `default:"true"` //启用AVCC格式rtmp协议使用
EnableRTP bool `default:"true"` //启用RTP格式rtsp、gb18181等协议使用
EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能

View File

@@ -8,7 +8,6 @@ import (
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
@@ -141,11 +140,11 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M
return nil
}
func (ts *MemoryTs) WriteAudioFrame(frame *AudioFrame, aac_asc *codec.AudioSpecificConfig, pes *mpegts.MpegtsPESFrame) (err error) {
func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error) {
adtsItem := ts.Get(7)
defer adtsItem.Recycle()
adts := adtsItem.Value
aac_asc.ToADTS(frame.AUList.ByteLength, adts)
frame.AudioSpecificConfig.ToADTS(frame.AUList.ByteLength, adts)
// packetLength = 原始音频流长度 + adts(7) + MpegTsOptionalPESHeader长度(8 bytes, 因为只含有pts)
pktLength := len(adts) + frame.AUList.ByteLength + 8
var packet mpegts.MpegTsPESPacket
@@ -161,17 +160,14 @@ func (ts *MemoryTs) WriteAudioFrame(frame *AudioFrame, aac_asc *codec.AudioSpeci
return ts.WritePESPacket(pes, packet)
}
func (ts *MemoryTs) WriteVideoFrame(frame *VideoFrame, paramaterSets common.ParamaterSets, pes *mpegts.MpegtsPESFrame) (err error) {
func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error) {
var buffer net.Buffers
//需要对原始数据(ES),进行一些预处理,视频需要分割nalu(H264编码),并且打上sps,pps,nalu_aud信息.
if len(paramaterSets) == 2 {
if len(frame.ParamaterSets) == 2 {
buffer = append(buffer, codec.NALU_AUD_BYTE)
} else {
buffer = append(buffer, codec.AudNalu)
}
if frame.IFrame {
buffer = append(buffer, paramaterSets.GetAnnexB()...)
}
buffer = append(buffer, frame.GetAnnexB()...)
pktLength := util.SizeOfBuffers(buffer) + 10 + 3
if pktLength > 0xffff {

View File

@@ -7,7 +7,9 @@ import (
"github.com/pion/webrtc/v3/pkg/media/rtpdump"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/common"
"m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
)
type RTPDumpPublisher struct {
@@ -57,7 +59,9 @@ func (t *RTPDumpPublisher) OnEvent(event any) {
return
}
if !packet.IsRTCP {
t.VideoTrack.WriteRTP(packet.Payload)
var frame common.RTPFrame
frame.Unmarshal(packet.Payload)
t.VideoTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame})
}
// t.AudioTrack.WriteRTP(packet)
}

View File

@@ -33,12 +33,14 @@ type VideoDeConf []byte
type AudioDeConf []byte
type AudioFrame struct {
*AVFrame
*track.Audio
AbsTime uint32
PTS uint32
DTS uint32
}
type VideoFrame struct {
*AVFrame
*track.Video
AbsTime uint32
PTS uint32
DTS uint32
@@ -69,6 +71,9 @@ func (f FLVFrame) WriteTo(w io.Writer) (int64, error) {
}
func (v VideoFrame) GetAnnexB() (r net.Buffers) {
if v.IFrame {
r = v.ParamaterSets.GetAnnexB()
}
v.AUList.Range(func(au *util.BLL) bool {
r = append(append(r, codec.NALU_Delimiter2), au.ToBuffers()...)
return true
@@ -196,11 +201,11 @@ func (s *Subscriber) PlayBlock(subType byte) {
case SUBTYPE_RAW:
sendVideoFrame = func(frame *AVFrame) {
// println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
spesic.OnEvent(VideoFrame{frame, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipRTPTs, frame.DTS - s.VideoReader.SkipRTPTs})
spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipRTPTs, frame.DTS - s.VideoReader.SkipRTPTs})
}
sendAudioFrame = func(frame *AVFrame) {
// println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, frame.PTS - s.AudioReader.SkipRTPTs, frame.PTS - s.AudioReader.SkipRTPTs})
spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, frame.PTS - s.AudioReader.SkipRTPTs, frame.PTS - s.AudioReader.SkipRTPTs})
}
case SUBTYPE_RTP:
var videoSeq, audioSeq uint16

View File

@@ -4,6 +4,7 @@ import (
"time"
"unsafe"
"github.com/pion/rtp"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
@@ -79,7 +80,7 @@ type Media struct {
SSRC uint32
SampleRate uint32
BytesPool util.BytesPool `json:"-"`
rtpPool util.Pool[RTPFrame]
RtpPool util.Pool[RTPFrame] `json:"-"`
SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
SequenceHeadSeq int
RTPMuxer
@@ -89,6 +90,19 @@ type Media struct {
流速控制
}
func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
result = av.RtpPool.Get()
if result.Value.Packet == nil {
result.Value.Packet = &rtp.Packet{}
result.Value.PayloadType = av.PayloadType
result.Value.SSRC = av.SSRC
result.Value.Version = 2
result.Value.Raw = make([]byte, 1460)
result.Value.Payload = result.Value.Raw[:0]
}
return
}
// 毫秒转换为Mpeg时间戳
func (av *Media) Ms2MpegTs(ms uint32) uint32 {
return uint32(uint64(ms) * 90)

View File

@@ -2,55 +2,31 @@ package track
import (
"github.com/pion/rtp"
"go.uber.org/zap"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
)
const RTPMTU = 1400
func (av *Media) UnmarshalRTPPacket(p *rtp.Packet) (frame *RTPFrame) {
if av.PayloadType != p.PayloadType {
av.Warn("RTP PayloadType error", zap.Uint8("want", av.PayloadType), zap.Uint8("got", p.PayloadType))
return
}
frame = &RTPFrame{
Packet: *p,
}
av.Value.BytesIn += len(p.Payload) + 12
return av.recorderRTP(frame)
}
func (av *Media) UnmarshalRTP(raw []byte) (frame *RTPFrame) {
var p rtp.Packet
err := p.Unmarshal(raw)
if err != nil {
av.Warn("RTP Unmarshal error", zap.Error(err))
return
}
return av.UnmarshalRTPPacket(&p)
}
func (av *Media) writeRTPFrame(frame *RTPFrame) {
if len(frame.Payload) == 0 {
return
}
av.Value.RTP.PushValue(*frame)
av.WriteRTPFrame(frame)
}
// WriteRTPPack 写入已反序列化的RTP包
// WriteRTPPack 写入已反序列化的RTP包已经排序过了的
func (av *Media) WriteRTPPack(p *rtp.Packet) {
for frame := av.UnmarshalRTPPacket(p); frame != nil; frame = av.nextRTPFrame() {
av.writeRTPFrame(frame)
var frame RTPFrame
frame.Packet = p
av.Value.BytesIn += len(frame.Payload) + 12
av.Value.RTP.PushValue(frame)
if len(p.Payload) > 0 {
av.WriteRTPFrame(&frame)
}
}
// WriteRTP 写入未反序列化的RTP包
func (av *Media) WriteRTP(raw []byte) {
for frame := av.UnmarshalRTP(raw); frame != nil; frame = av.nextRTPFrame() {
av.writeRTPFrame(frame)
// WriteRTPFrame 写入未反序列化的RTP包, 未排序的
func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
for frame := av.recorderRTP(raw); frame != nil; frame = av.nextRTPFrame() {
av.Value.BytesIn += len(frame.Value.Payload) + 12
av.Value.RTP.Push(frame)
if len(frame.Value.Payload) > 0 {
av.WriteRTPFrame(&frame.Value)
}
}
}
@@ -60,14 +36,8 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) {
packetCount := len(payloads)
for i, pp := range payloads {
av.rtpSequence++
rtpItem := av.rtpPool.Get()
rtpItem := av.GetRTPFromPool()
packet := &rtpItem.Value
if packet.Payload == nil {
packet.Payload = make([]byte, 0, RTPMTU)
packet.Version = 2
packet.PayloadType = av.PayloadType
packet.SSRC = av.SSRC
}
packet.Payload = packet.Payload[:0]
packet.SequenceNumber = av.rtpSequence
if av.SampleRate != 90000 {
@@ -86,36 +56,17 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) {
type RTPDemuxer struct {
lastSeq uint16 //上一个收到的序号,用于乱序重排
lastSeq2 uint16 //记录上上一个收到的序列号
乱序重排 util.RTPReorder[*RTPFrame]
乱序重排 util.RTPReorder[*util.ListItem[RTPFrame]]
}
// 获取缓存中下一个rtpFrame
func (av *RTPDemuxer) nextRTPFrame() (frame *RTPFrame) {
if config.Global.RTPReorder {
func (av *RTPDemuxer) nextRTPFrame() (frame *util.ListItem[RTPFrame]) {
return av.乱序重排.Pop()
}
return
}
// 对RTP包乱序重排
func (av *RTPDemuxer) recorderRTP(frame *RTPFrame) *RTPFrame {
if config.Global.RTPReorder {
return av.乱序重排.Push(frame.SequenceNumber, frame)
} else {
if av.lastSeq == 0 {
av.lastSeq = frame.SequenceNumber
} else if frame.SequenceNumber == av.lastSeq2+1 { // 本次序号是上上次的序号+1 说明中间隔了一个错误序号某些rtsp流中的rtcp包写成了rtp包导致的
av.lastSeq = frame.SequenceNumber
} else {
av.lastSeq2 = av.lastSeq
av.lastSeq = frame.SequenceNumber
if av.lastSeq != av.lastSeq2+1 { //序号不连续
// av.Stream.Warn("RTP SequenceNumber error", av.lastSeq2, av.lastSeq)
return frame
}
}
return frame
}
func (av *RTPDemuxer) recorderRTP(item *util.ListItem[RTPFrame]) *util.ListItem[RTPFrame] {
return av.乱序重排.Push(item.Value.SequenceNumber, item)
}
type RTPMuxer struct {

View File

@@ -5,6 +5,9 @@ import (
"net"
)
type Recyclable interface {
Recycle()
}
type BLLReader struct {
*ListItem[Buffer]
pos int

View File

@@ -1,14 +1,9 @@
package util
type CloneType[T any] interface {
Clone() T
comparable
}
var RTPReorderBufferLen uint16 = 50
// RTPReorder RTP包乱序重排
type RTPReorder[T CloneType[T]] struct {
type RTPReorder[T comparable] struct {
lastSeq uint16 //最新收到的rtp包序号
queue []T // 缓存队列,0号元素位置代表lastReq+1永远保持为空
Total uint32 // 总共收到的包数量
@@ -48,7 +43,7 @@ func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) {
head := p.pop()
// 可以放得进去了
if delta == RTPReorderBufferLen {
p.queue[RTPReorderBufferLen-1] = v.Clone()
p.queue[RTPReorderBufferLen-1] = v
p.queue[0] = result
return head
} else if head != result {
@@ -57,7 +52,7 @@ func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) {
}
} else {
// 出现后面的包先到达,缓存起来
p.queue[delta-1] = v.Clone()
p.queue[delta-1] = v
return
}
}