diff --git a/config/types.go b/config/types.go index bb04648..96e147a 100755 --- a/config/types.go +++ b/config/types.go @@ -191,18 +191,17 @@ type Engine struct { Publish Subscribe HTTP - EnableAVCC bool `default:"true"` //启用AVCC格式,rtmp、http-flv协议使用 - EnableRTP bool `default:"true"` //启用RTP格式,rtsp、webrtc等协议使用 - EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能 - EnableAuth bool `default:"true"` //启用鉴权 Console + EnableAVCC bool `default:"true"` //启用AVCC格式,rtmp、http-flv协议使用 + EnableRTP bool `default:"true"` //启用RTP格式,rtsp、webrtc等协议使用,已废弃,在 rtp 下面配置 + EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能 + EnableAuth bool `default:"true"` //启用鉴权 LogLang string `default:"zh"` //日志语言 LogLevel string `default:"info"` //日志级别 - RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲长度 - RTPFlushMode int `default:"0"` //RTP分帧写入模式,0:按 Marker 标志位,1:按时间戳 EventBusSize int `default:"10"` //事件总线大小 PulseInterval time.Duration `default:"5s"` //心跳事件间隔 DisableAll bool `default:"false"` //禁用所有插件 + RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲区长度 PoolSize int //内存池大小 enableReport bool `default:"false"` //启用报告,用于统计和监控 reportStream quic.Stream // console server connection @@ -310,10 +309,12 @@ func (cfg *Engine) OnEvent(event any) { } case context.Context: util.RTPReorderBufferLen = uint16(cfg.RTPReorderBufferLen) - if strings.HasPrefix(cfg.Console.Server, "wss") { - go cfg.WsRemote() - } else { - go cfg.WtRemote(v) + if cfg.Secret != "" && cfg.Server != "" { + if strings.HasPrefix(cfg.Console.Server, "wss") { + go cfg.WsRemote() + } else { + go cfg.WtRemote(v) + } } } } diff --git a/main.go b/main.go index 308e471..9260917 100755 --- a/main.go +++ b/main.go @@ -209,9 +209,7 @@ func Run(ctx context.Context, conf any) (err error) { }{UUID, id, EngineConfig.GetInstanceId(), version, runtime.GOOS, runtime.GOARCH} json.NewEncoder(contentBuf).Encode(&rp) req.Body = io.NopCloser(contentBuf) - if EngineConfig.Secret != "" { - EngineConfig.OnEvent(ctx) - } + EngineConfig.OnEvent(ctx) go func() { var c http.Client reportTimer := time.NewTimer(time.Minute) diff --git a/track/aac.go b/track/aac.go index da73892..882a0fe 100644 --- a/track/aac.go +++ b/track/aac.go @@ -68,7 +68,9 @@ func (aac *AAC) WriteADTS(ts uint32, b util.IBytes) { } // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 -func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { +func (aac *AAC) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) { + aac.Value.RTP.Push(rtpItem) + frame := &rtpItem.Value if len(frame.Payload) < 2 { // aac.fragments = aac.fragments[:0] return diff --git a/track/base.go b/track/base.go index d13d3dc..b21761e 100644 --- a/track/base.go +++ b/track/base.go @@ -61,7 +61,7 @@ type SpesificTrack interface { CompleteRTP(*AVFrame) CompleteAVCC(*AVFrame) WriteSliceBytes([]byte) - WriteRTPFrame(*RTPFrame) + WriteRTPFrame(*util.ListItem[RTPFrame]) generateTimestamp(uint32) WriteSequenceHead([]byte) GetNALU_SEI() *util.ListItem[util.Buffer] diff --git a/track/g711.go b/track/g711.go index c3e33df..acb0c18 100644 --- a/track/g711.go +++ b/track/g711.go @@ -58,7 +58,8 @@ func (g711 *G711) WriteAVCC(ts uint32, frame *util.BLL) error { return nil } -func (g711 *G711) WriteRTPFrame(frame *RTPFrame) { +func (g711 *G711) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) { + frame := &rtpItem.Value if g711.SampleRate != 90000 { g711.generateTimestamp(uint32(uint64(frame.Timestamp) * 90000 / uint64(g711.SampleRate))) } diff --git a/track/h264.go b/track/h264.go index da5ef2c..b1dce3d 100644 --- a/track/h264.go +++ b/track/h264.go @@ -1,13 +1,13 @@ package track import ( + "bytes" "io" "time" "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" - "m7s.live/engine/v4/config" "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) @@ -16,6 +16,7 @@ var _ SpesificTrack = (*H264)(nil) type H264 struct { Video + buf util.Buffer // rtp 包临时缓存,对于不规范的 rtp 包(sps 放到了 fua 中导致)需要缓存 } func NewH264(stream IStream, stuff ...any) (vt *H264) { @@ -32,6 +33,9 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) { } func (vt *H264) WriteSliceBytes(slice []byte) { + if len(slice) > 4 && bytes.Equal(slice[:4], codec.NALU_Delimiter2) { + slice = slice[4:] // 有些设备厂商不规范,所以需要移除前导的 00 00 00 01 + } naluType := codec.ParseH264NALUType(slice[0]) if log.Trace { vt.Trace("naluType", zap.Uint8("naluType", naluType.Byte())) @@ -113,33 +117,37 @@ func (vt *H264) WriteAVCC(ts uint32, frame *util.BLL) (err error) { } } -func (vt *H264) WriteRTPFrame(frame *RTPFrame) { +func (vt *H264) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) { if vt.lastSeq != vt.lastSeq2+1 && vt.lastSeq2 != 0 { vt.lostFlag = true vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq), zap.Uint16("lastSeq2", vt.lastSeq2)) } - if config.Global.RTPFlushMode == 1 { - if vt.Value.AUList.ByteLength == 0 { - vt.generateTimestamp(frame.Timestamp) - } else if vt.Value.PTS != time.Duration(frame.Timestamp) { - if !vt.dcChanged && vt.Value.IFrame { + frame := &rtpItem.Value + pts := frame.Timestamp + rv := vt.Value + // 有些流的 rtp 包中没有设置 marker 导致无法判断是否是最后一个包,此时通过时间戳变化判断,先 flush 之前的帧 + if rv.PTS != time.Duration(pts) { + if rv.AUList.ByteLength > 0 { + if !vt.dcChanged && rv.IFrame { vt.insertDCRtp() } vt.Flush() - vt.generateTimestamp(frame.Timestamp) + rv = vt.Value } + vt.generateTimestamp(pts) } - rv := vt.Value + rv.RTP.Push(rtpItem) if naluType := frame.H264Type(); naluType < 24 { vt.WriteSliceBytes(frame.Payload) } else { + offset := naluType.Offset() switch naluType { case codec.NALU_STAPA, codec.NALU_STAPB: - if len(frame.Payload) <= naluType.Offset() { + if len(frame.Payload) <= offset { vt.Error("invalid nalu size", zap.Int("naluType", int(naluType))) return } - for buffer := util.Buffer(frame.Payload[naluType.Offset():]); buffer.CanRead(); { + for buffer := util.Buffer(frame.Payload[offset:]); buffer.CanRead(); { nextSize := int(buffer.ReadUint16()) if buffer.Len() >= nextSize { vt.WriteSliceBytes(buffer.ReadN(nextSize)) @@ -149,25 +157,37 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) { } } case codec.NALU_FUA, codec.NALU_FUB: - if util.Bit1(frame.Payload[1], 0) { - vt.WriteSliceByte(naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60)) + b1 := frame.Payload[1] + if util.Bit1(b1, 0) { + naluType = naluType.Parse(b1) + firstByte := naluType.Or(frame.Payload[0] & 0x60) + switch naluType { + case codec.NALU_SPS, codec.NALU_PPS: + vt.buf.WriteByte(firstByte) + default: + vt.WriteSliceByte(firstByte) + } } - if rv.AUList.Pre != nil && rv.AUList.Pre.Value != nil { - rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():])) + if vt.buf.Len() > 0 { + vt.buf.Write(frame.Payload[offset:]) } else { - vt.Error("fu have no start") - return + if rv.AUList.Pre != nil && rv.AUList.Pre.Value != nil { + rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[offset:])) + } else { + vt.Error("fu have no start") + return + } } - if !util.Bit1(frame.Payload[1], 1) { + if !util.Bit1(b1, 1) { + // fua 还没结束 return + } else if vt.buf.Len() > 0 { + vt.WriteAnnexB(uint32(rv.PTS), uint32(rv.DTS), vt.buf) + vt.buf = nil } } } - if rv.AUList.ByteLength == 0 { - return - } - if config.Global.RTPFlushMode == 0 && frame.Marker { - vt.generateTimestamp(frame.Timestamp) + if frame.Marker && rv.AUList.ByteLength > 0 { if !vt.dcChanged && rv.IFrame { vt.insertDCRtp() } diff --git a/track/h265.go b/track/h265.go index e5723ad..5e7092f 100644 --- a/track/h265.go +++ b/track/h265.go @@ -2,12 +2,10 @@ package track import ( "io" - "time" "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" - "m7s.live/engine/v4/config" "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) @@ -138,18 +136,8 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) { return } -func (vt *H265) WriteRTPFrame(frame *RTPFrame) { - if config.Global.RTPFlushMode == 1 { - if vt.Value.AUList.ByteLength == 0 { - vt.generateTimestamp(frame.Timestamp) - } else if vt.Value.PTS != time.Duration(frame.Timestamp) { - if !vt.dcChanged && vt.Value.IFrame { - vt.insertDCRtp() - } - vt.Flush() - vt.generateTimestamp(frame.Timestamp) - } - } +func (vt *H265) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) { + frame := &rtpItem.Value rv := vt.Value // TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream. var usingDonlField bool @@ -189,7 +177,7 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) { default: vt.WriteSliceBytes(frame.Payload) } - if config.Global.RTPFlushMode == 0 && frame.Marker { + if frame.Marker { vt.generateTimestamp(frame.Timestamp) if !vt.dcChanged && rv.IFrame { vt.insertDCRtp() diff --git a/track/rtp.go b/track/rtp.go index 16350b6..b739b07 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -19,12 +19,11 @@ func (av *Media) WriteRTPPack(p *rtp.Packet) { p.PaddingSize = 0 frame.Packet = p av.Value.BytesIn += len(frame.Payload) + 12 - av.Value.RTP.PushValue(frame) av.lastSeq2 = av.lastSeq av.lastSeq = frame.SequenceNumber av.DropCount += int(av.lastSeq - av.lastSeq2 - 1) if len(p.Payload) > 0 { - av.WriteRTPFrame(&frame) + av.WriteRTPFrame(util.NewListItem(frame)) } } @@ -35,8 +34,7 @@ func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) { av.Value.BytesIn += len(frame.Value.Payload) + 12 av.DropCount += int(av.lastSeq - av.lastSeq2 - 1) if len(frame.Value.Payload) > 0 { - av.Value.RTP.Push(frame) - av.WriteRTPFrame(&frame.Value) + av.WriteRTPFrame(frame) // av.Info("rtp", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Int("len", len(frame.Value.Payload)), zap.Bool("marker", frame.Value.Marker), zap.Uint16("seq", frame.Value.SequenceNumber)) } else { av.Debug("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber)) diff --git a/util/list.go b/util/list.go index 0ddb7e6..6bd7424 100644 --- a/util/list.go +++ b/util/list.go @@ -16,6 +16,10 @@ type ListItem[T any] struct { reset bool // 是否需要重置 } +func NewListItem[T any](value T) *ListItem[T] { + return &ListItem[T]{Value: value, reset: true} +} + func (item *ListItem[T]) InsertBefore(insert *ListItem[T]) { if insert.list != nil { panic("item already in list")