feat: 兼容不规范的 rtp 包

This commit is contained in:
langhuihui
2023-10-26 17:24:44 +08:00
parent e4fe493cc6
commit 8ffe3d3d55
9 changed files with 70 additions and 58 deletions

View File

@@ -191,18 +191,17 @@ type Engine struct {
Publish
Subscribe
HTTP
EnableAVCC bool `default:"true"` //启用AVCC格式rtmp、http-flv协议使用
EnableRTP bool `default:"true"` //启用RTP格式rtsp、webrtc等协议使用
EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能
EnableAuth bool `default:"true"` //启用鉴权
Console
EnableAVCC bool `default:"true"` //启用AVCC格式rtmp、http-flv协议使用
EnableRTP bool `default:"true"` //启用RTP格式rtsp、webrtc等协议使用,已废弃,在 rtp 下面配置
EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能
EnableAuth bool `default:"true"` //启用鉴权
LogLang string `default:"zh"` //日志语言
LogLevel string `default:"info"` //日志级别
RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲长度
RTPFlushMode int `default:"0"` //RTP分帧写入模式0按 Marker 标志位1按时间戳
EventBusSize int `default:"10"` //事件总线大小
PulseInterval time.Duration `default:"5s"` //心跳事件间隔
DisableAll bool `default:"false"` //禁用所有插件
RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲区长度
PoolSize int //内存池大小
enableReport bool `default:"false"` //启用报告,用于统计和监控
reportStream quic.Stream // console server connection
@@ -310,10 +309,12 @@ func (cfg *Engine) OnEvent(event any) {
}
case context.Context:
util.RTPReorderBufferLen = uint16(cfg.RTPReorderBufferLen)
if strings.HasPrefix(cfg.Console.Server, "wss") {
go cfg.WsRemote()
} else {
go cfg.WtRemote(v)
if cfg.Secret != "" && cfg.Server != "" {
if strings.HasPrefix(cfg.Console.Server, "wss") {
go cfg.WsRemote()
} else {
go cfg.WtRemote(v)
}
}
}
}

View File

@@ -209,9 +209,7 @@ func Run(ctx context.Context, conf any) (err error) {
}{UUID, id, EngineConfig.GetInstanceId(), version, runtime.GOOS, runtime.GOARCH}
json.NewEncoder(contentBuf).Encode(&rp)
req.Body = io.NopCloser(contentBuf)
if EngineConfig.Secret != "" {
EngineConfig.OnEvent(ctx)
}
EngineConfig.OnEvent(ctx)
go func() {
var c http.Client
reportTimer := time.NewTimer(time.Minute)

View File

@@ -68,7 +68,9 @@ func (aac *AAC) WriteADTS(ts uint32, b util.IBytes) {
}
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
func (aac *AAC) WriteRTPFrame(frame *RTPFrame) {
func (aac *AAC) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) {
aac.Value.RTP.Push(rtpItem)
frame := &rtpItem.Value
if len(frame.Payload) < 2 {
// aac.fragments = aac.fragments[:0]
return

View File

@@ -61,7 +61,7 @@ type SpesificTrack interface {
CompleteRTP(*AVFrame)
CompleteAVCC(*AVFrame)
WriteSliceBytes([]byte)
WriteRTPFrame(*RTPFrame)
WriteRTPFrame(*util.ListItem[RTPFrame])
generateTimestamp(uint32)
WriteSequenceHead([]byte)
GetNALU_SEI() *util.ListItem[util.Buffer]

View File

@@ -58,7 +58,8 @@ func (g711 *G711) WriteAVCC(ts uint32, frame *util.BLL) error {
return nil
}
func (g711 *G711) WriteRTPFrame(frame *RTPFrame) {
func (g711 *G711) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) {
frame := &rtpItem.Value
if g711.SampleRate != 90000 {
g711.generateTimestamp(uint32(uint64(frame.Timestamp) * 90000 / uint64(g711.SampleRate)))
}

View File

@@ -1,13 +1,13 @@
package track
import (
"bytes"
"io"
"time"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
)
@@ -16,6 +16,7 @@ var _ SpesificTrack = (*H264)(nil)
type H264 struct {
Video
buf util.Buffer // rtp 包临时缓存,对于不规范的 rtp 包sps 放到了 fua 中导致)需要缓存
}
func NewH264(stream IStream, stuff ...any) (vt *H264) {
@@ -32,6 +33,9 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) {
}
func (vt *H264) WriteSliceBytes(slice []byte) {
if len(slice) > 4 && bytes.Equal(slice[:4], codec.NALU_Delimiter2) {
slice = slice[4:] // 有些设备厂商不规范,所以需要移除前导的 00 00 00 01
}
naluType := codec.ParseH264NALUType(slice[0])
if log.Trace {
vt.Trace("naluType", zap.Uint8("naluType", naluType.Byte()))
@@ -113,33 +117,37 @@ func (vt *H264) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
}
}
func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
func (vt *H264) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) {
if vt.lastSeq != vt.lastSeq2+1 && vt.lastSeq2 != 0 {
vt.lostFlag = true
vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq), zap.Uint16("lastSeq2", vt.lastSeq2))
}
if config.Global.RTPFlushMode == 1 {
if vt.Value.AUList.ByteLength == 0 {
vt.generateTimestamp(frame.Timestamp)
} else if vt.Value.PTS != time.Duration(frame.Timestamp) {
if !vt.dcChanged && vt.Value.IFrame {
frame := &rtpItem.Value
pts := frame.Timestamp
rv := vt.Value
// 有些流的 rtp 包中没有设置 marker 导致无法判断是否是最后一个包,此时通过时间戳变化判断,先 flush 之前的帧
if rv.PTS != time.Duration(pts) {
if rv.AUList.ByteLength > 0 {
if !vt.dcChanged && rv.IFrame {
vt.insertDCRtp()
}
vt.Flush()
vt.generateTimestamp(frame.Timestamp)
rv = vt.Value
}
vt.generateTimestamp(pts)
}
rv := vt.Value
rv.RTP.Push(rtpItem)
if naluType := frame.H264Type(); naluType < 24 {
vt.WriteSliceBytes(frame.Payload)
} else {
offset := naluType.Offset()
switch naluType {
case codec.NALU_STAPA, codec.NALU_STAPB:
if len(frame.Payload) <= naluType.Offset() {
if len(frame.Payload) <= offset {
vt.Error("invalid nalu size", zap.Int("naluType", int(naluType)))
return
}
for buffer := util.Buffer(frame.Payload[naluType.Offset():]); buffer.CanRead(); {
for buffer := util.Buffer(frame.Payload[offset:]); buffer.CanRead(); {
nextSize := int(buffer.ReadUint16())
if buffer.Len() >= nextSize {
vt.WriteSliceBytes(buffer.ReadN(nextSize))
@@ -149,25 +157,37 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
}
}
case codec.NALU_FUA, codec.NALU_FUB:
if util.Bit1(frame.Payload[1], 0) {
vt.WriteSliceByte(naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60))
b1 := frame.Payload[1]
if util.Bit1(b1, 0) {
naluType = naluType.Parse(b1)
firstByte := naluType.Or(frame.Payload[0] & 0x60)
switch naluType {
case codec.NALU_SPS, codec.NALU_PPS:
vt.buf.WriteByte(firstByte)
default:
vt.WriteSliceByte(firstByte)
}
}
if rv.AUList.Pre != nil && rv.AUList.Pre.Value != nil {
rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():]))
if vt.buf.Len() > 0 {
vt.buf.Write(frame.Payload[offset:])
} else {
vt.Error("fu have no start")
return
if rv.AUList.Pre != nil && rv.AUList.Pre.Value != nil {
rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[offset:]))
} else {
vt.Error("fu have no start")
return
}
}
if !util.Bit1(frame.Payload[1], 1) {
if !util.Bit1(b1, 1) {
// fua 还没结束
return
} else if vt.buf.Len() > 0 {
vt.WriteAnnexB(uint32(rv.PTS), uint32(rv.DTS), vt.buf)
vt.buf = nil
}
}
}
if rv.AUList.ByteLength == 0 {
return
}
if config.Global.RTPFlushMode == 0 && frame.Marker {
vt.generateTimestamp(frame.Timestamp)
if frame.Marker && rv.AUList.ByteLength > 0 {
if !vt.dcChanged && rv.IFrame {
vt.insertDCRtp()
}

View File

@@ -2,12 +2,10 @@ package track
import (
"io"
"time"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
)
@@ -138,18 +136,8 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
return
}
func (vt *H265) WriteRTPFrame(frame *RTPFrame) {
if config.Global.RTPFlushMode == 1 {
if vt.Value.AUList.ByteLength == 0 {
vt.generateTimestamp(frame.Timestamp)
} else if vt.Value.PTS != time.Duration(frame.Timestamp) {
if !vt.dcChanged && vt.Value.IFrame {
vt.insertDCRtp()
}
vt.Flush()
vt.generateTimestamp(frame.Timestamp)
}
}
func (vt *H265) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) {
frame := &rtpItem.Value
rv := vt.Value
// TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream.
var usingDonlField bool
@@ -189,7 +177,7 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) {
default:
vt.WriteSliceBytes(frame.Payload)
}
if config.Global.RTPFlushMode == 0 && frame.Marker {
if frame.Marker {
vt.generateTimestamp(frame.Timestamp)
if !vt.dcChanged && rv.IFrame {
vt.insertDCRtp()

View File

@@ -19,12 +19,11 @@ func (av *Media) WriteRTPPack(p *rtp.Packet) {
p.PaddingSize = 0
frame.Packet = p
av.Value.BytesIn += len(frame.Payload) + 12
av.Value.RTP.PushValue(frame)
av.lastSeq2 = av.lastSeq
av.lastSeq = frame.SequenceNumber
av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
if len(p.Payload) > 0 {
av.WriteRTPFrame(&frame)
av.WriteRTPFrame(util.NewListItem(frame))
}
}
@@ -35,8 +34,7 @@ func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
av.Value.BytesIn += len(frame.Value.Payload) + 12
av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
if len(frame.Value.Payload) > 0 {
av.Value.RTP.Push(frame)
av.WriteRTPFrame(&frame.Value)
av.WriteRTPFrame(frame)
// av.Info("rtp", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Int("len", len(frame.Value.Payload)), zap.Bool("marker", frame.Value.Marker), zap.Uint16("seq", frame.Value.SequenceNumber))
} else {
av.Debug("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber))

View File

@@ -16,6 +16,10 @@ type ListItem[T any] struct {
reset bool // 是否需要重置
}
func NewListItem[T any](value T) *ListItem[T] {
return &ListItem[T]{Value: value, reset: true}
}
func (item *ListItem[T]) InsertBefore(insert *ListItem[T]) {
if insert.list != nil {
panic("item already in list")