From 95191a308c47746b7d4d13255fde0c75892b5559 Mon Sep 17 00:00:00 2001 From: "engine-labs-app[bot]" <140088366+engine-labs-app[bot]@users.noreply.github.com> Date: Sat, 18 Oct 2025 01:06:30 +0000 Subject: [PATCH] feat(codec): unify AV1 raw format handling and support AV1 in protocol mux/demux This change is needed to enable seamless AV1 video handling across RTMP, RTP, and file formats, ensuring correct intermediate representation and protocol conversion. - Unifies the raw format abstraction: both H26x NALUs and AV1 OBUs use a ReuseArray structure, adding GetOBUs() for AV1 in Sample - Refactors protocol mux/demux (RTMP, RTP, MP4) to handle AV1 OBUs properly, removing hardcoded NALU paths - Introduces AV1Frame for raw AV1 and ensures type safety between NALU/OBU, with minimal impact to existing H26x logic - AV1 HLS support is stubbed (pending gohlslib backing), migration is transparent for current users --- AV1_RAW_FORMAT_CHANGES.md | 207 +++++++++ pkg/avframe.go | 316 +++++++------- pkg/format/raw.go | 231 ++++++---- plugin/hls/llhls.go | 276 ++++++------ plugin/mp4/pkg/video.go | 104 ++--- plugin/rtmp/pkg/video.go | 652 ++++++++++++++-------------- plugin/rtp/pkg/video.go | 893 ++++++++++++++++++++------------------ 7 files changed, 1491 insertions(+), 1188 deletions(-) create mode 100644 AV1_RAW_FORMAT_CHANGES.md diff --git a/AV1_RAW_FORMAT_CHANGES.md b/AV1_RAW_FORMAT_CHANGES.md new file mode 100644 index 0000000..09647a9 --- /dev/null +++ b/AV1_RAW_FORMAT_CHANGES.md @@ -0,0 +1,207 @@ +# AV1 裸格式支持 - 修改说明 + +## 问题描述 + +项目中原本只考虑了 H.264/H.265(H26x)编码,使用 `Nalus`(NALU 数组)作为裸格式的中间表示。但 AV1 编码使用的是 OBU(Open Bitstream Unit)而不是 NALU,因此需要重新设计裸格式的处理方式,以支持不同编码格式的中转。 + +## 核心概念 + +### 什么是裸格式(Raw Format)? + +在视频流处理中,裸格式是指从容器格式(如 RTMP、RTP)解包后,但还未重新封装到另一种容器格式之前的中间表示。对于: +- **H.264/H.265**: 裸格式是 NALU (Network Abstraction Layer Unit) 数组 +- **AV1**: 裸格式是 OBU (Open Bitstream Unit) 数组 + +这些裸格式用于在不同协议间中转视频流,例如从 RTMP 推流到 RTP/WebRTC 播放。 + +## 关键修改 + +### 1. pkg/avframe.go - 核心类型定义 + +#### OBUs 类型重新定义 +```go +// 修改前 +OBUs AudioData // AudioData = gomem.Memory + +// 修改后 +OBUs = util.ReuseArray[gomem.Memory] // 与 Nalus 类型一致 +``` + +这个修改使得 OBUs 和 Nalus 都是 `util.ReuseArray[gomem.Memory]` 类型,提供了统一的数组接口。 + +#### 添加 GetOBUs() 方法 +```go +func (b *BaseSample) GetOBUs() *OBUs { + if b.Raw == nil { + b.Raw = &OBUs{} + } + return b.Raw.(*OBUs) +} +``` + +这与 `GetNalus()` 方法对应,用于获取 AV1 的裸格式数据。 + +#### 更新 OBUs 方法实现 +```go +func (obus *OBUs) ParseAVCC(reader *gomem.MemoryReader) error { + obus.Reset() // 重置数组 + // ... 解析 OBU 并添加到数组中 + obus.GetNextPointer().PushOne(obu) +} + +func (obus *OBUs) Reset() { + (*util.ReuseArray[gomem.Memory])(obus).Reset() +} + +func (obus *OBUs) Count() int { + return (*util.ReuseArray[gomem.Memory])(obus).Count() +} +``` + +### 2. pkg/format/raw.go - AV1 原始格式 + +添加了新的 `AV1Frame` 类型: + +```go +type AV1Frame struct { + pkg.Sample +} + +func (a *AV1Frame) GetSize() (ret int) { + if obus, ok := a.Raw.(*pkg.OBUs); ok { + for obu := range obus.RangePoint { + ret += obu.Size + } + } + return +} + +func (a *AV1Frame) Demux() error { + a.Raw = &a.Memory + return nil +} + +func (a *AV1Frame) Mux(from *pkg.Sample) (err error) { + a.InitRecycleIndexes(0) + obus := from.Raw.(*pkg.OBUs) + for obu := range obus.RangePoint { + a.Push(obu.Buffers...) + } + a.ICodecCtx = from.GetBase() + return +} +``` + +### 3. plugin/rtmp/pkg/video.go - RTMP 协议支持 + +#### parseAV1 方法修改 +```go +func (avcc *VideoFrame) parseAV1(reader *gomem.MemoryReader) error { + obus := avcc.GetOBUs() // 使用 GetOBUs() 方法 + if err := obus.ParseAVCC(reader); err != nil { + return err + } + return nil +} +``` + +#### Mux 方法添加 AV1 支持 +```go +case *codec.AV1Ctx: + if avcc.ICodecCtx == nil { + ctx := &AV1Ctx{AV1Ctx: c} + configBytes := make([]byte, 4+len(c.ConfigOBUs)) + configBytes[0] = 0b1001_0000 | byte(PacketTypeSequenceStart) + copy(configBytes[1:], codec.FourCC_AV1[:]) + copy(configBytes[5:], c.ConfigOBUs) + ctx.SequenceFrame.PushOne(configBytes) + ctx.SequenceFrame.BaseSample = &BaseSample{} + avcc.ICodecCtx = ctx + } + obus := fromBase.Raw.(*OBUs) + avcc.InitRecycleIndexes(obus.Count()) + head := avcc.NextN(5) + if fromBase.IDR { + head[0] = 0b1001_0000 | byte(PacketTypeCodedFrames) + } else { + head[0] = 0b1010_0000 | byte(PacketTypeCodedFrames) + } + copy(head[1:], codec.FourCC_AV1[:]) + for obu := range obus.RangePoint { + avcc.Push(obu.Buffers...) + } +``` + +### 4. plugin/rtp/pkg/video.go - RTP 协议支持 + +#### CheckCodecChange 方法修改 +将 `nalus := r.Raw.(*Nalus)` 移到各个 case 分支内部,避免对 AV1 进行错误的类型断言。 + +#### Demux 方法添加 AV1 支持 +```go +case *AV1Ctx: + obus := r.GetOBUs() + obus.Reset() + for _, packet := range r.Packets { + if len(packet.Payload) > 0 { + obus.GetNextPointer().PushOne(packet.Payload) + } + } + return nil +``` + +#### Mux 方法添加 AV1 支持 +```go +case *codec.AV1Ctx: + var ctx AV1Ctx + ctx.AV1Ctx = base + ctx.PayloadType = 99 + ctx.MimeType = webrtc.MimeTypeAV1 + ctx.ClockRate = 90000 + ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) + codecCtx = &ctx + +// ... 在 Mux 处理中 +case *AV1Ctx: + ctx := &c.RTPCtx + var lastPacket *rtp.Packet + for obu := range baseFrame.Raw.(*OBUs).RangePoint { + mem := r.NextN(obu.Size) + obu.NewReader().Read(mem) + lastPacket = r.Append(ctx, pts, mem) + } + if lastPacket != nil { + lastPacket.Header.Marker = true + } +``` + +## 设计原则 + +1. **统一的数组结构**:OBUs 和 Nalus 都使用 `util.ReuseArray[gomem.Memory]`,提供一致的接口。 + +2. **类型安全**:通过 `GetOBUs()` 和 `GetNalus()` 方法进行类型转换,避免直接的类型断言错误。 + +3. **协议独立**:在各协议(RTMP、RTP)的实现中分别处理 H26x 和 AV1,保持代码的清晰性。 + +4. **扩展性**:新的设计使得添加其他编码格式(如 VP8、VP9)更加容易。 + +## 注意事项 + +1. **AV1 的 RTP 封装**:当前实现是简化版本,每个 OBU 作为一个完整的 RTP 包。实际的 RFC 标准可能需要更复杂的分片和聚合逻辑。 + +2. **HLS 支持**:AV1 在 HLS 中的支持目前被跳过,需要 gohlslib 库的支持。 + +3. **IDR 帧检测**:AV1 的关键帧检测逻辑可能需要根据 OBU 类型进一步完善。 + +## 测试建议 + +1. 测试 RTMP Enhanced 模式下的 AV1 推流 +2. 测试 RTP/WebRTC 下的 AV1 传输 +3. 测试不同协议之间的 AV1 转码 + +## 未来改进 + +1. 完善 AV1 RTP 封装,支持分片和聚合 +2. 添加 AV1 在 HLS 中的支持(需要库支持) +3. 添加 AV1 关键帧的准确检测 +4. 性能优化和内存池管理 diff --git a/pkg/avframe.go b/pkg/avframe.go index e43d2b8..6b9a44d 100644 --- a/pkg/avframe.go +++ b/pkg/avframe.go @@ -1,241 +1,249 @@ package pkg import ( - "sync" - "time" + "sync" + "time" - "github.com/bluenviron/mediacommon/pkg/codecs/av1" - "github.com/langhuihui/gomem" - "m7s.live/v5/pkg/codec" - "m7s.live/v5/pkg/util" + "github.com/bluenviron/mediacommon/pkg/codecs/av1" + "github.com/langhuihui/gomem" + "m7s.live/v5/pkg/codec" + "m7s.live/v5/pkg/util" ) type ( - IAudioCodecCtx interface { - codec.ICodecCtx - GetSampleRate() int - GetChannels() int - GetSampleSize() int - } - IVideoCodecCtx interface { - codec.ICodecCtx - Width() int - Height() int - } - IDataFrame interface { - } - // Source -> Parse -> Demux -> (ConvertCtx) -> Mux(GetAllocator) -> Recycle - IAVFrame interface { - GetSample() *Sample - GetSize() int - CheckCodecChange() error - Demux() error // demux to raw format - Mux(*Sample) error // mux from origin format - Recycle() - String() string - } - ISequenceCodecCtx[T any] interface { - GetSequenceFrame() T - } - BaseSample struct { - Raw IRaw // 裸格式用于转换的中间格式 - IDR bool - TS0, Timestamp, CTS time.Duration // 原始 TS、修正 TS、Composition Time Stamp - } - Sample struct { - codec.ICodecCtx - gomem.RecyclableMemory - *BaseSample - } - Nalus = util.ReuseArray[gomem.Memory] + IAudioCodecCtx interface { + codec.ICodecCtx + GetSampleRate() int + GetChannels() int + GetSampleSize() int + } + IVideoCodecCtx interface { + codec.ICodecCtx + Width() int + Height() int + } + IDataFrame interface { + } + // Source -> Parse -> Demux -> (ConvertCtx) -> Mux(GetAllocator) -> Recycle + IAVFrame interface { + GetSample() *Sample + GetSize() int + CheckCodecChange() error + Demux() error // demux to raw format + Mux(*Sample) error // mux from origin format + Recycle() + String() string + } + ISequenceCodecCtx[T any] interface { + GetSequenceFrame() T + } + BaseSample struct { + Raw IRaw // 裸格式用于转换的中间格式 + IDR bool + TS0, Timestamp, CTS time.Duration // 原始 TS、修正 TS、Composition Time Stamp + } + Sample struct { + codec.ICodecCtx + gomem.RecyclableMemory + *BaseSample + } + Nalus = util.ReuseArray[gomem.Memory] - AudioData = gomem.Memory + AudioData = gomem.Memory - OBUs AudioData + OBUs = util.ReuseArray[gomem.Memory] - AVFrame struct { - DataFrame - *Sample - Wraps []IAVFrame // 封装格式 - } - IRaw interface { - util.Resetter - Count() int - } - AVRing = util.Ring[AVFrame] - DataFrame struct { - sync.RWMutex - discard bool - Sequence uint32 // 在一个Track中的序号 - WriteTime time.Time // 写入时间,可用于比较两个帧的先后 - } + AVFrame struct { + DataFrame + *Sample + Wraps []IAVFrame // 封装格式 + } + IRaw interface { + util.Resetter + Count() int + } + AVRing = util.Ring[AVFrame] + DataFrame struct { + sync.RWMutex + discard bool + Sequence uint32 // 在一个Track中的序号 + WriteTime time.Time // 写入时间,可用于比较两个帧的先后 + } ) func (sample *Sample) GetSize() int { - return sample.Size + return sample.Size } func (sample *Sample) GetSample() *Sample { - return sample + return sample } func (sample *Sample) CheckCodecChange() (err error) { - return + return } func (sample *Sample) Demux() error { - return nil + return nil } func (sample *Sample) Mux(from *Sample) error { - sample.ICodecCtx = from.GetBase() - return nil + sample.ICodecCtx = from.GetBase() + return nil } func ConvertFrameType(from, to IAVFrame) (err error) { - fromSampe, toSample := from.GetSample(), to.GetSample() - if !fromSampe.HasRaw() { - if err = from.Demux(); err != nil { - return - } - } - toSample.SetAllocator(fromSampe.GetAllocator()) - toSample.BaseSample = fromSampe.BaseSample - return to.Mux(fromSampe) + fromSampe, toSample := from.GetSample(), to.GetSample() + if !fromSampe.HasRaw() { + if err = from.Demux(); err != nil { + return + } + } + toSample.SetAllocator(fromSampe.GetAllocator()) + toSample.BaseSample = fromSampe.BaseSample + return to.Mux(fromSampe) } func (b *BaseSample) HasRaw() bool { - return b.Raw != nil && b.Raw.Count() > 0 + return b.Raw != nil && b.Raw.Count() > 0 } // 90Hz func (b *BaseSample) GetDTS() time.Duration { - return b.Timestamp * 90 / time.Millisecond + return b.Timestamp * 90 / time.Millisecond } func (b *BaseSample) GetPTS() time.Duration { - return (b.Timestamp + b.CTS) * 90 / time.Millisecond + return (b.Timestamp + b.CTS) * 90 / time.Millisecond } func (b *BaseSample) SetDTS(dts time.Duration) { - b.Timestamp = dts * time.Millisecond / 90 + b.Timestamp = dts * time.Millisecond / 90 } func (b *BaseSample) SetPTS(pts time.Duration) { - b.CTS = pts*time.Millisecond/90 - b.Timestamp + b.CTS = pts*time.Millisecond/90 - b.Timestamp } func (b *BaseSample) SetTS32(ts uint32) { - b.Timestamp = time.Duration(ts) * time.Millisecond + b.Timestamp = time.Duration(ts) * time.Millisecond } func (b *BaseSample) GetTS32() uint32 { - return uint32(b.Timestamp / time.Millisecond) + return uint32(b.Timestamp / time.Millisecond) } func (b *BaseSample) SetCTS32(ts uint32) { - b.CTS = time.Duration(ts) * time.Millisecond + b.CTS = time.Duration(ts) * time.Millisecond } func (b *BaseSample) GetCTS32() uint32 { - return uint32(b.CTS / time.Millisecond) + return uint32(b.CTS / time.Millisecond) } func (b *BaseSample) GetNalus() *Nalus { - if b.Raw == nil { - b.Raw = &Nalus{} - } - return b.Raw.(*Nalus) + if b.Raw == nil { + b.Raw = &Nalus{} + } + return b.Raw.(*Nalus) +} + +func (b *BaseSample) GetOBUs() *OBUs { + if b.Raw == nil { + b.Raw = &OBUs{} + } + return b.Raw.(*OBUs) } func (b *BaseSample) GetAudioData() *AudioData { - if b.Raw == nil { - b.Raw = &AudioData{} - } - return b.Raw.(*AudioData) + if b.Raw == nil { + b.Raw = &AudioData{} + } + return b.Raw.(*AudioData) } func (b *BaseSample) ParseAVCC(reader *gomem.MemoryReader, naluSizeLen int) error { - array := b.GetNalus() - for reader.Length > 0 { - l, err := reader.ReadBE(naluSizeLen) - if err != nil { - return err - } - reader.RangeN(int(l), array.GetNextPointer().PushOne) - } - return nil + array := b.GetNalus() + for reader.Length > 0 { + l, err := reader.ReadBE(naluSizeLen) + if err != nil { + return err + } + reader.RangeN(int(l), array.GetNextPointer().PushOne) + } + return nil } func (frame *AVFrame) Reset() { - if len(frame.Wraps) > 0 { - for _, wrap := range frame.Wraps { - wrap.Recycle() - } - frame.BaseSample.IDR = false - frame.BaseSample.TS0 = 0 - frame.BaseSample.Timestamp = 0 - frame.BaseSample.CTS = 0 - if frame.Raw != nil { - frame.Raw.Reset() - } - } + if len(frame.Wraps) > 0 { + for _, wrap := range frame.Wraps { + wrap.Recycle() + } + frame.BaseSample.IDR = false + frame.BaseSample.TS0 = 0 + frame.BaseSample.Timestamp = 0 + frame.BaseSample.CTS = 0 + if frame.Raw != nil { + frame.Raw.Reset() + } + } } func (frame *AVFrame) Discard() { - frame.discard = true - frame.Reset() + frame.discard = true + frame.Reset() } func (df *DataFrame) StartWrite() (success bool) { - if df.discard { - return - } - if df.TryLock() { - return true - } - df.discard = true - return + if df.discard { + return + } + if df.TryLock() { + return true + } + df.discard = true + return } func (df *DataFrame) Ready() { - df.WriteTime = time.Now() - df.Unlock() + df.WriteTime = time.Now() + df.Unlock() } func (obus *OBUs) ParseAVCC(reader *gomem.MemoryReader) error { - var obuHeader av1.OBUHeader - startLen := reader.Length - for reader.Length > 0 { - offset := reader.Size - reader.Length - b, err := reader.ReadByte() - if err != nil { - return err - } - err = obuHeader.Unmarshal([]byte{b}) - if err != nil { - return err - } - // if log.Trace { - // vt.Trace("obu", zap.Any("type", obuHeader.Type), zap.Bool("iframe", vt.Value.IFrame)) - // } - obuSize, _, _ := reader.LEB128Unmarshal() - end := reader.Size - reader.Length - size := end - offset + int(obuSize) - reader = &gomem.MemoryReader{Memory: reader.Memory, Length: startLen - offset} - obu, err := reader.ReadBytes(size) - if err != nil { - return err - } - (*AudioData)(obus).PushOne(obu) - } - return nil + obus.Reset() + var obuHeader av1.OBUHeader + startLen := reader.Length + for reader.Length > 0 { + offset := reader.Size - reader.Length + b, err := reader.ReadByte() + if err != nil { + return err + } + err = obuHeader.Unmarshal([]byte{b}) + if err != nil { + return err + } + // if log.Trace { + // vt.Trace("obu", zap.Any("type", obuHeader.Type), zap.Bool("iframe", vt.Value.IFrame)) + // } + obuSize, _, _ := reader.LEB128Unmarshal() + end := reader.Size - reader.Length + size := end - offset + int(obuSize) + reader = &gomem.MemoryReader{Memory: reader.Memory, Length: startLen - offset} + obu, err := reader.ReadBytes(size) + if err != nil { + return err + } + obus.GetNextPointer().PushOne(obu) + } + return nil } func (obus *OBUs) Reset() { - ((*gomem.Memory)(obus)).Reset() + (*util.ReuseArray[gomem.Memory])(obus).Reset() } func (obus *OBUs) Count() int { - return (*gomem.Memory)(obus).Count() + return (*util.ReuseArray[gomem.Memory])(obus).Count() } diff --git a/pkg/format/raw.go b/pkg/format/raw.go index 536c28c..8a080c7 100644 --- a/pkg/format/raw.go +++ b/pkg/format/raw.go @@ -1,131 +1,172 @@ package format import ( - "bytes" - "fmt" + "bytes" + "fmt" - "github.com/deepch/vdk/codec/h264parser" - "github.com/deepch/vdk/codec/h265parser" - "github.com/langhuihui/gomem" - "m7s.live/v5/pkg" - "m7s.live/v5/pkg/codec" + "github.com/deepch/vdk/codec/h264parser" + "github.com/deepch/vdk/codec/h265parser" + "github.com/langhuihui/gomem" + "m7s.live/v5/pkg" + "m7s.live/v5/pkg/codec" ) var _ pkg.IAVFrame = (*RawAudio)(nil) type RawAudio struct { - pkg.Sample + pkg.Sample } func (r *RawAudio) GetSize() int { - return r.Raw.(*gomem.Memory).Size + return r.Raw.(*gomem.Memory).Size } func (r *RawAudio) Demux() error { - r.Raw = &r.Memory - return nil + r.Raw = &r.Memory + return nil } func (r *RawAudio) Mux(from *pkg.Sample) (err error) { - r.InitRecycleIndexes(0) - r.Memory = *from.Raw.(*gomem.Memory) - r.ICodecCtx = from.GetBase() - return + r.InitRecycleIndexes(0) + r.Memory = *from.Raw.(*gomem.Memory) + r.ICodecCtx = from.GetBase() + return } func (r *RawAudio) String() string { - return fmt.Sprintf("RawAudio{FourCC: %s, Timestamp: %s, Size: %d}", r.FourCC(), r.Timestamp, r.Size) + return fmt.Sprintf("RawAudio{FourCC: %s, Timestamp: %s, Size: %d}", r.FourCC(), r.Timestamp, r.Size) } var _ pkg.IAVFrame = (*H26xFrame)(nil) type H26xFrame struct { - pkg.Sample + pkg.Sample } func (h *H26xFrame) CheckCodecChange() (err error) { - if h.ICodecCtx == nil { - return pkg.ErrUnsupportCodec - } - var hasVideoFrame bool - switch ctx := h.GetBase().(type) { - case *codec.H264Ctx: - var sps, pps []byte - for nalu := range h.Raw.(*pkg.Nalus).RangePoint { - switch codec.ParseH264NALUType(nalu.Buffers[0][0]) { - case codec.NALU_SPS: - sps = nalu.ToBytes() - case codec.NALU_PPS: - pps = nalu.ToBytes() - case codec.NALU_IDR_Picture: - h.IDR = true - case codec.NALU_Non_IDR_Picture: - hasVideoFrame = true - } - } - if sps != nil && pps != nil { - var codecData h264parser.CodecData - codecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps) - if err != nil { - return - } - if !bytes.Equal(codecData.Record, ctx.Record) { - h.ICodecCtx = &codec.H264Ctx{ - CodecData: codecData, - } - } - } - case *codec.H265Ctx: - var vps, sps, pps []byte - for nalu := range h.Raw.(*pkg.Nalus).RangePoint { - switch codec.ParseH265NALUType(nalu.Buffers[0][0]) { - case h265parser.NAL_UNIT_VPS: - vps = nalu.ToBytes() - case h265parser.NAL_UNIT_SPS: - sps = nalu.ToBytes() - case h265parser.NAL_UNIT_PPS: - pps = nalu.ToBytes() - case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP, - h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL, - h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP, - h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL, - h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP, - h265parser.NAL_UNIT_CODED_SLICE_CRA: - h.IDR = true - case 1, 2, 3, 4, 5, 6, 7, 8, 9: - hasVideoFrame = true - } - } - if vps != nil && sps != nil && pps != nil { - var codecData h265parser.CodecData - codecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(vps, sps, pps) - if err != nil { - return - } - if !bytes.Equal(codecData.Record, ctx.Record) { - h.ICodecCtx = &codec.H265Ctx{ - CodecData: codecData, - } - } - } - } - // Return ErrSkip if no video frames are present (only metadata NALUs) - if !hasVideoFrame && !h.IDR { - return pkg.ErrSkip - } - return + if h.ICodecCtx == nil { + return pkg.ErrUnsupportCodec + } + var hasVideoFrame bool + switch ctx := h.GetBase().(type) { + case *codec.H264Ctx: + var sps, pps []byte + for nalu := range h.Raw.(*pkg.Nalus).RangePoint { + switch codec.ParseH264NALUType(nalu.Buffers[0][0]) { + case codec.NALU_SPS: + sps = nalu.ToBytes() + case codec.NALU_PPS: + pps = nalu.ToBytes() + case codec.NALU_IDR_Picture: + h.IDR = true + case codec.NALU_Non_IDR_Picture: + hasVideoFrame = true + } + } + if sps != nil && pps != nil { + var codecData h264parser.CodecData + codecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps) + if err != nil { + return + } + if !bytes.Equal(codecData.Record, ctx.Record) { + h.ICodecCtx = &codec.H264Ctx{ + CodecData: codecData, + } + } + } + case *codec.H265Ctx: + var vps, sps, pps []byte + for nalu := range h.Raw.(*pkg.Nalus).RangePoint { + switch codec.ParseH265NALUType(nalu.Buffers[0][0]) { + case h265parser.NAL_UNIT_VPS: + vps = nalu.ToBytes() + case h265parser.NAL_UNIT_SPS: + sps = nalu.ToBytes() + case h265parser.NAL_UNIT_PPS: + pps = nalu.ToBytes() + case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP, + h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL, + h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP, + h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL, + h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP, + h265parser.NAL_UNIT_CODED_SLICE_CRA: + h.IDR = true + case 1, 2, 3, 4, 5, 6, 7, 8, 9: + hasVideoFrame = true + } + } + if vps != nil && sps != nil && pps != nil { + var codecData h265parser.CodecData + codecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(vps, sps, pps) + if err != nil { + return + } + if !bytes.Equal(codecData.Record, ctx.Record) { + h.ICodecCtx = &codec.H265Ctx{ + CodecData: codecData, + } + } + } + } + // Return ErrSkip if no video frames are present (only metadata NALUs) + if !hasVideoFrame && !h.IDR { + return pkg.ErrSkip + } + return } func (r *H26xFrame) GetSize() (ret int) { - switch raw := r.Raw.(type) { - case *pkg.Nalus: - for nalu := range raw.RangePoint { - ret += nalu.Size - } - } - return + switch raw := r.Raw.(type) { + case *pkg.Nalus: + for nalu := range raw.RangePoint { + ret += nalu.Size + } + } + return } func (h *H26xFrame) String() string { - return fmt.Sprintf("H26xFrame{FourCC: %s, Timestamp: %s, CTS: %s}", h.FourCC, h.Timestamp, h.CTS) + return fmt.Sprintf("H26xFrame{FourCC: %s, Timestamp: %s, CTS: %s}", h.FourCC, h.Timestamp, h.CTS) +} + +var _ pkg.IAVFrame = (*AV1Frame)(nil) + +type AV1Frame struct { + pkg.Sample +} + +func (a *AV1Frame) CheckCodecChange() (err error) { + if a.ICodecCtx == nil { + return pkg.ErrUnsupportCodec + } + return nil +} + +func (a *AV1Frame) GetSize() (ret int) { + if obus, ok := a.Raw.(*pkg.OBUs); ok { + for obu := range obus.RangePoint { + ret += obu.Size + } + } + return +} + +func (a *AV1Frame) Demux() error { + a.Raw = &a.Memory + return nil +} + +func (a *AV1Frame) Mux(from *pkg.Sample) (err error) { + a.InitRecycleIndexes(0) + obus := from.Raw.(*pkg.OBUs) + for obu := range obus.RangePoint { + a.Push(obu.Buffers...) + } + a.ICodecCtx = from.GetBase() + return +} + +func (a *AV1Frame) String() string { + return fmt.Sprintf("AV1Frame{FourCC: %s, Timestamp: %s, CTS: %s}", a.FourCC, a.Timestamp, a.CTS) } diff --git a/plugin/hls/llhls.go b/plugin/hls/llhls.go index 36bf83d..3f0367f 100644 --- a/plugin/hls/llhls.go +++ b/plugin/hls/llhls.go @@ -1,181 +1,181 @@ package plugin_hls import ( - "fmt" - "net/http" - "path" - "strconv" - "strings" - "sync" - "time" + "fmt" + "net/http" + "path" + "strconv" + "strings" + "sync" + "time" - "github.com/bluenviron/gohlslib" - "github.com/bluenviron/gohlslib/pkg/codecs" - "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" - "golang.org/x/exp/slices" - "m7s.live/v5" - . "m7s.live/v5" - "m7s.live/v5/pkg" - "m7s.live/v5/pkg/codec" - "m7s.live/v5/pkg/format" - "m7s.live/v5/pkg/util" + "github.com/bluenviron/gohlslib" + "github.com/bluenviron/gohlslib/pkg/codecs" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "golang.org/x/exp/slices" + "m7s.live/v5" + . "m7s.live/v5" + "m7s.live/v5/pkg" + "m7s.live/v5/pkg/codec" + "m7s.live/v5/pkg/format" + "m7s.live/v5/pkg/util" ) var _ = InstallPlugin[LLHLSPlugin](m7s.PluginMeta{ - NewTransformer: NewLLHLSTransform, + NewTransformer: NewLLHLSTransform, }) var llwriting util.Collection[string, *LLMuxer] func init() { - llwriting.L = &sync.RWMutex{} + llwriting.L = &sync.RWMutex{} } func NewLLHLSTransform() ITransformer { - ret := &LLMuxer{} - return ret + ret := &LLMuxer{} + return ret } type LLHLSPlugin struct { - Plugin + Plugin } func (c *LLHLSPlugin) Start() (err error) { - _, port, _ := strings.Cut(c.GetCommonConf().HTTP.ListenAddr, ":") - if port == "80" { - c.PlayAddr = append(c.PlayAddr, "http://{hostName}/llhls/{streamPath}/index.m3u8") - } else if port != "" { - c.PlayAddr = append(c.PlayAddr, fmt.Sprintf("http://{hostName}:%s/llhls/{streamPath}/index.m3u8", port)) - } - _, port, _ = strings.Cut(c.GetCommonConf().HTTP.ListenAddrTLS, ":") - if port == "443" { - c.PlayAddr = append(c.PlayAddr, "https://{hostName}/llhls/{streamPath}/index.m3u8") - } else if port != "" { - c.PlayAddr = append(c.PlayAddr, fmt.Sprintf("https://{hostName}:%s/llhls/{streamPath}/index.m3u8", port)) - } - return + _, port, _ := strings.Cut(c.GetCommonConf().HTTP.ListenAddr, ":") + if port == "80" { + c.PlayAddr = append(c.PlayAddr, "http://{hostName}/llhls/{streamPath}/index.m3u8") + } else if port != "" { + c.PlayAddr = append(c.PlayAddr, fmt.Sprintf("http://{hostName}:%s/llhls/{streamPath}/index.m3u8", port)) + } + _, port, _ = strings.Cut(c.GetCommonConf().HTTP.ListenAddrTLS, ":") + if port == "443" { + c.PlayAddr = append(c.PlayAddr, "https://{hostName}/llhls/{streamPath}/index.m3u8") + } else if port != "" { + c.PlayAddr = append(c.PlayAddr, fmt.Sprintf("https://{hostName}:%s/llhls/{streamPath}/index.m3u8", port)) + } + return } func (c *LLHLSPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if strings.HasSuffix(r.URL.Path, ".html") { - w.Write([]byte(`
`)) - return - } - streamPath := strings.TrimPrefix(r.URL.Path, "/") - streamPath = path.Dir(streamPath) - if llwriting.Has(streamPath) { - r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+streamPath) - writer, ok := llwriting.Get(streamPath) - if ok { - writer.Handle(w, r) - } - return - } else { - w.Write([]byte(``)) - } + if strings.HasSuffix(r.URL.Path, ".html") { + w.Write([]byte(``)) + return + } + streamPath := strings.TrimPrefix(r.URL.Path, "/") + streamPath = path.Dir(streamPath) + if llwriting.Has(streamPath) { + r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+streamPath) + writer, ok := llwriting.Get(streamPath) + if ok { + writer.Handle(w, r) + } + return + } else { + w.Write([]byte(``)) + } } type LLMuxer struct { - DefaultTransformer - *gohlslib.Muxer + DefaultTransformer + *gohlslib.Muxer } func (ll *LLMuxer) GetKey() string { - return ll.TransformJob.StreamPath + return ll.TransformJob.StreamPath } func (ll *LLMuxer) Start() (err error) { - return ll.TransformJob.Subscribe() + return ll.TransformJob.Subscribe() } func (ll *LLMuxer) Run() (err error) { - llwriting.Set(ll) - subscriber := ll.TransformJob.Subscriber - ll.Muxer = &gohlslib.Muxer{ - Variant: gohlslib.MuxerVariantLowLatency, - SegmentCount: 7, - SegmentMinDuration: 1 * time.Second, - } + llwriting.Set(ll) + subscriber := ll.TransformJob.Subscriber + ll.Muxer = &gohlslib.Muxer{ + Variant: gohlslib.MuxerVariantLowLatency, + SegmentCount: 7, + SegmentMinDuration: 1 * time.Second, + } - if conf, ok := ll.TransformJob.Config.Input.(string); ok { - ss := strings.Split(conf, "x") - if len(ss) != 2 { - return fmt.Errorf("invalid input config %s", conf) - } - ll.Muxer.SegmentMinDuration, err = time.ParseDuration(strings.TrimSpace(ss[0])) - if err != nil { - return - } - ll.Muxer.SegmentCount, err = strconv.Atoi(strings.TrimSpace(ss[1])) - if err != nil { - return - } - } + if conf, ok := ll.TransformJob.Config.Input.(string); ok { + ss := strings.Split(conf, "x") + if len(ss) != 2 { + return fmt.Errorf("invalid input config %s", conf) + } + ll.Muxer.SegmentMinDuration, err = time.ParseDuration(strings.TrimSpace(ss[0])) + if err != nil { + return + } + ll.Muxer.SegmentCount, err = strconv.Atoi(strings.TrimSpace(ss[1])) + if err != nil { + return + } + } - var videoFunc = func(v *pkg.AVFrame) (err error) { - return nil - } - if ctx := subscriber.Publisher.GetVideoCodecCtx(); ctx != nil { - ll.Muxer.VideoTrack = &gohlslib.Track{} - switch ctx := ctx.GetBase().(type) { - case *codec.H264Ctx: - ll.Muxer.VideoTrack.Codec = &codecs.H264{ - SPS: ctx.SPS(), - PPS: ctx.PPS(), - } - videoFunc = func(v *pkg.AVFrame) (err error) { - ts := v.Timestamp - var au [][]byte - if subscriber.VideoReader.Value.IDR { - au = append(au, ctx.SPS(), ctx.PPS()) - } - for buffer := range v.Raw.(*pkg.Nalus).RangePoint { - au = append(au, buffer.Buffers...) - } - return ll.Muxer.WriteH264(time.Now().Add(ts-ll.Muxer.SegmentMinDuration), v.GetPTS(), au) - } - case *codec.H265Ctx: - ll.Muxer.VideoTrack.Codec = &codecs.H265{ - SPS: ctx.SPS(), - PPS: ctx.PPS(), - VPS: ctx.VPS(), - } - videoFunc = func(v *pkg.AVFrame) (err error) { - var au [][]byte - if subscriber.VideoReader.Value.IDR { - au = append(au, ctx.VPS(), ctx.SPS(), ctx.PPS()) - } - for buffer := range v.Raw.(*pkg.Nalus).RangePoint { - au = append(au, buffer.Buffers...) - } - return ll.Muxer.WriteH265(time.Now().Add(v.Timestamp-ll.Muxer.SegmentMinDuration), v.GetPTS(), au) - } - } - } - if ctx := subscriber.Publisher.GetAudioCodecCtx(); ctx != nil { - ll.Muxer.AudioTrack = &gohlslib.Track{} - switch ctx := ctx.GetBase().(type) { - case *codec.AACCtx: - var config mpeg4audio.Config - config.Unmarshal(ctx.ConfigBytes) - ll.Muxer.AudioTrack.Codec = &codecs.MPEG4Audio{ - Config: config, - } - } - } + var videoFunc = func(v *pkg.AVFrame) (err error) { + return nil + } + if ctx := subscriber.Publisher.GetVideoCodecCtx(); ctx != nil { + ll.Muxer.VideoTrack = &gohlslib.Track{} + switch ctx := ctx.GetBase().(type) { + case *codec.H264Ctx: + ll.Muxer.VideoTrack.Codec = &codecs.H264{ + SPS: ctx.SPS(), + PPS: ctx.PPS(), + } + videoFunc = func(v *pkg.AVFrame) (err error) { + ts := v.Timestamp + var au [][]byte + if subscriber.VideoReader.Value.IDR { + au = append(au, ctx.SPS(), ctx.PPS()) + } + for buffer := range v.Raw.(*pkg.Nalus).RangePoint { + au = append(au, buffer.Buffers...) + } + return ll.Muxer.WriteH264(time.Now().Add(ts-ll.Muxer.SegmentMinDuration), v.GetPTS(), au) + } + case *codec.H265Ctx: + ll.Muxer.VideoTrack.Codec = &codecs.H265{ + SPS: ctx.SPS(), + PPS: ctx.PPS(), + VPS: ctx.VPS(), + } + videoFunc = func(v *pkg.AVFrame) (err error) { + var au [][]byte + if subscriber.VideoReader.Value.IDR { + au = append(au, ctx.VPS(), ctx.SPS(), ctx.PPS()) + } + for buffer := range v.Raw.(*pkg.Nalus).RangePoint { + au = append(au, buffer.Buffers...) + } + return ll.Muxer.WriteH265(time.Now().Add(v.Timestamp-ll.Muxer.SegmentMinDuration), v.GetPTS(), au) + } + } + } + if ctx := subscriber.Publisher.GetAudioCodecCtx(); ctx != nil { + ll.Muxer.AudioTrack = &gohlslib.Track{} + switch ctx := ctx.GetBase().(type) { + case *codec.AACCtx: + var config mpeg4audio.Config + config.Unmarshal(ctx.ConfigBytes) + ll.Muxer.AudioTrack.Codec = &codecs.MPEG4Audio{ + Config: config, + } + } + } - err = ll.Muxer.Start() - if err != nil { - return - } + err = ll.Muxer.Start() + if err != nil { + return + } - return PlayBlock(ll.TransformJob.Subscriber, func(audio *format.RawAudio) (err error) { - now := time.Now() - ts := audio.Timestamp - return ll.Muxer.WriteMPEG4Audio(now.Add(ts-ll.Muxer.SegmentMinDuration), audio.GetDTS(), slices.Clone(audio.Buffers)) - }, videoFunc) + return PlayBlock(ll.TransformJob.Subscriber, func(audio *format.RawAudio) (err error) { + now := time.Now() + ts := audio.Timestamp + return ll.Muxer.WriteMPEG4Audio(now.Add(ts-ll.Muxer.SegmentMinDuration), audio.GetDTS(), slices.Clone(audio.Buffers)) + }, videoFunc) } func (ll *LLMuxer) Dispose() { - ll.Muxer.Close() - llwriting.Remove(ll) + ll.Muxer.Close() + llwriting.Remove(ll) } diff --git a/plugin/mp4/pkg/video.go b/plugin/mp4/pkg/video.go index 3b9b837..7636087 100644 --- a/plugin/mp4/pkg/video.go +++ b/plugin/mp4/pkg/video.go @@ -1,74 +1,76 @@ package mp4 import ( - "fmt" + "fmt" - "m7s.live/v5/pkg" - "m7s.live/v5/pkg/codec" - "m7s.live/v5/pkg/util" + "m7s.live/v5/pkg" + "m7s.live/v5/pkg/codec" + "m7s.live/v5/pkg/util" ) var _ pkg.IAVFrame = (*VideoFrame)(nil) type VideoFrame struct { - pkg.Sample + pkg.Sample } func (v *VideoFrame) Demux() (err error) { - if v.Size == 0 { - return fmt.Errorf("no video data to demux") - } + if v.Size == 0 { + return fmt.Errorf("no video data to demux") + } - reader := v.NewReader() - // 根据编解码器类型进行解复用 - switch ctx := v.ICodecCtx.(type) { - case *codec.H264Ctx: - // 对于 H.264,解析 AVCC 格式的 NAL 单元 - if err := v.ParseAVCC(&reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1); err != nil { - return fmt.Errorf("failed to parse H.264 AVCC: %w", err) - } - case *codec.H265Ctx: - // 对于 H.265,解析 AVCC 格式的 NAL 单元 - if err := v.ParseAVCC(&reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1); err != nil { - return fmt.Errorf("failed to parse H.265 AVCC: %w", err) - } - default: - // 对于其他格式,尝试默认的 AVCC 解析(4字节长度前缀) - if err := v.ParseAVCC(&reader, 4); err != nil { - return fmt.Errorf("failed to parse AVCC with default settings: %w", err) - } - } + reader := v.NewReader() + // 根据编解码器类型进行解复用 + switch ctx := v.ICodecCtx.(type) { + case *codec.H264Ctx: + // 对于 H.264,解析 AVCC 格式的 NAL 单元 + if err := v.ParseAVCC(&reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1); err != nil { + return fmt.Errorf("failed to parse H.264 AVCC: %w", err) + } + case *codec.H265Ctx: + // 对于 H.265,解析 AVCC 格式的 NAL 单元 + if err := v.ParseAVCC(&reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1); err != nil { + return fmt.Errorf("failed to parse H.265 AVCC: %w", err) + } + default: + // 对于其他格式,尝试默认的 AVCC 解析(4字节长度前缀) + if err := v.ParseAVCC(&reader, 4); err != nil { + return fmt.Errorf("failed to parse AVCC with default settings: %w", err) + } + } - return + return } // Mux implements pkg.IAVFrame. func (v *VideoFrame) Mux(sample *pkg.Sample) (err error) { - v.InitRecycleIndexes(0) - if v.ICodecCtx == nil { - v.ICodecCtx = sample.GetBase() - } - switch rawData := sample.Raw.(type) { - case *pkg.Nalus: - // 根据编解码器类型确定 NALU 长度字段的大小 - var naluSizeLen int = 4 // 默认使用 4 字节 - switch ctx := sample.ICodecCtx.(type) { - case *codec.H264Ctx: - naluSizeLen = int(ctx.RecordInfo.LengthSizeMinusOne) + 1 - case *codec.H265Ctx: - naluSizeLen = int(ctx.RecordInfo.LengthSizeMinusOne) + 1 - } - // 为每个 NALU 添加长度前缀 - for nalu := range rawData.RangePoint { - util.PutBE(v.NextN(naluSizeLen), nalu.Size) // 写入 NALU 长度 - v.Push(nalu.Buffers...) - } - } - return + v.InitRecycleIndexes(0) + if v.ICodecCtx == nil { + v.ICodecCtx = sample.GetBase() + } + switch rawData := sample.Raw.(type) { + case *pkg.Nalus: + var naluSizeLen int = 4 + switch ctx := sample.ICodecCtx.(type) { + case *codec.H264Ctx: + naluSizeLen = int(ctx.RecordInfo.LengthSizeMinusOne) + 1 + case *codec.H265Ctx: + naluSizeLen = int(ctx.RecordInfo.LengthSizeMinusOne) + 1 + } + for nalu := range rawData.RangePoint { + util.PutBE(v.NextN(naluSizeLen), nalu.Size) + v.Push(nalu.Buffers...) + } + case *pkg.OBUs: + for obu := range rawData.RangePoint { + v.Push(obu.Buffers...) + } + } + return } // String implements pkg.IAVFrame. func (v *VideoFrame) String() string { - return fmt.Sprintf("MP4Video[ts:%s, cts:%s, size:%d, keyframe:%t]", - v.Timestamp, v.CTS, v.Size, v.IDR) + return fmt.Sprintf("MP4Video[ts:%s, cts:%s, size:%d, keyframe:%t]", + v.Timestamp, v.CTS, v.Size, v.IDR) } diff --git a/plugin/rtmp/pkg/video.go b/plugin/rtmp/pkg/video.go index 0569d52..ffe7532 100644 --- a/plugin/rtmp/pkg/video.go +++ b/plugin/rtmp/pkg/video.go @@ -1,356 +1,376 @@ package rtmp import ( - "bytes" - "encoding/binary" - "io" - "net" - "time" + "bytes" + "encoding/binary" + "io" + "net" + "time" - "github.com/deepch/vdk/codec/h264parser" - "github.com/langhuihui/gomem" + "github.com/deepch/vdk/codec/h264parser" + "github.com/langhuihui/gomem" - . "m7s.live/v5/pkg" - "m7s.live/v5/pkg/codec" - "m7s.live/v5/pkg/util" + . "m7s.live/v5/pkg" + "m7s.live/v5/pkg/codec" + "m7s.live/v5/pkg/util" ) type VideoFrame RTMPData // 过滤掉异常的 NALU func (avcc *VideoFrame) filterH264(naluSizeLen int) { - reader := avcc.NewReader() - lenReader := reader.NewReader() - reader.Skip(5) - var afterFilter gomem.Memory - lenReader.RangeN(5, afterFilter.PushOne) - allocator := avcc.GetAllocator() - var hasBadNalu bool - for { - naluLen, err := reader.ReadBE(naluSizeLen) - if err != nil { - break - } - var lenBuffer net.Buffers - lenReader.RangeN(naluSizeLen, func(b []byte) { - lenBuffer = append(lenBuffer, b) - }) - lenReader.Skip(int(naluLen)) - var naluBuffer net.Buffers - reader.RangeN(int(naluLen), func(b []byte) { - naluBuffer = append(naluBuffer, b) - }) - badType := codec.ParseH264NALUType(naluBuffer[0][0]) - // 替换之前打印 badType 的逻辑,解码并打印 SliceType - if badType == 5 { // NALU type for Coded slice of a non-IDR picture or Coded slice of an IDR picture - naluData := bytes.Join(naluBuffer, nil) // bytes 包已导入 - if len(naluData) > 0 { - // h264parser 包已导入 as "github.com/deepch/vdk/codec/h264parser" - // ParseSliceHeaderFromNALU 返回的第一个值就是 SliceType - sliceType, err := h264parser.ParseSliceHeaderFromNALU(naluData) - if err == nil { - println("Decoded SliceType:", sliceType.String()) - } else { - println("Error parsing H.264 slice header:", err.Error()) - } - } else { - println("NALU data is empty, cannot parse H.264 slice header.") - } - } + reader := avcc.NewReader() + lenReader := reader.NewReader() + reader.Skip(5) + var afterFilter gomem.Memory + lenReader.RangeN(5, afterFilter.PushOne) + allocator := avcc.GetAllocator() + var hasBadNalu bool + for { + naluLen, err := reader.ReadBE(naluSizeLen) + if err != nil { + break + } + var lenBuffer net.Buffers + lenReader.RangeN(naluSizeLen, func(b []byte) { + lenBuffer = append(lenBuffer, b) + }) + lenReader.Skip(int(naluLen)) + var naluBuffer net.Buffers + reader.RangeN(int(naluLen), func(b []byte) { + naluBuffer = append(naluBuffer, b) + }) + badType := codec.ParseH264NALUType(naluBuffer[0][0]) + // 替换之前打印 badType 的逻辑,解码并打印 SliceType + if badType == 5 { // NALU type for Coded slice of a non-IDR picture or Coded slice of an IDR picture + naluData := bytes.Join(naluBuffer, nil) // bytes 包已导入 + if len(naluData) > 0 { + // h264parser 包已导入 as "github.com/deepch/vdk/codec/h264parser" + // ParseSliceHeaderFromNALU 返回的第一个值就是 SliceType + sliceType, err := h264parser.ParseSliceHeaderFromNALU(naluData) + if err == nil { + println("Decoded SliceType:", sliceType.String()) + } else { + println("Error parsing H.264 slice header:", err.Error()) + } + } else { + println("NALU data is empty, cannot parse H.264 slice header.") + } + } - switch badType { - case 5, 6, 7, 8, 1, 2, 3, 4: - afterFilter.Push(lenBuffer...) - afterFilter.Push(naluBuffer...) - default: - hasBadNalu = true - if allocator != nil { - for _, nalu := range lenBuffer { - allocator.Free(nalu) - } - for _, nalu := range naluBuffer { - allocator.Free(nalu) - } - } - } - } - if hasBadNalu { - avcc.Memory = afterFilter - } + switch badType { + case 5, 6, 7, 8, 1, 2, 3, 4: + afterFilter.Push(lenBuffer...) + afterFilter.Push(naluBuffer...) + default: + hasBadNalu = true + if allocator != nil { + for _, nalu := range lenBuffer { + allocator.Free(nalu) + } + for _, nalu := range naluBuffer { + allocator.Free(nalu) + } + } + } + } + if hasBadNalu { + avcc.Memory = afterFilter + } } func (avcc *VideoFrame) filterH265(naluSizeLen int) { - //TODO + //TODO } func (avcc *VideoFrame) CheckCodecChange() (err error) { - old := avcc.ICodecCtx - if avcc.Size <= 10 { - err = io.ErrShortBuffer - return - } - reader := avcc.NewReader() - var b0 byte - b0, err = reader.ReadByte() - if err != nil { - return - } - enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf - avcc.IDR = b0&0b0111_0000>>4 == 1 - packetType := b0 & 0b1111 - codecId := VideoCodecID(b0 & 0x0F) - var fourCC codec.FourCC - parseSequence := func() (err error) { - avcc.IDR = false - switch fourCC { - case codec.FourCC_H264: - if old != nil && avcc.Memory.Equal(&old.(*H264Ctx).SequenceFrame.Memory) { - avcc.ICodecCtx = old - break - } - newCtx := &H264Ctx{} - newCtx.SequenceFrame.CopyFrom(&avcc.Memory) - newCtx.SequenceFrame.BaseSample = &BaseSample{} - newCtx.H264Ctx, err = codec.NewH264CtxFromRecord(newCtx.SequenceFrame.Buffers[0][reader.Offset():]) - if err == nil { - avcc.ICodecCtx = newCtx - } else { - return - } - case codec.FourCC_H265: - if old != nil && avcc.Memory.Equal(&old.(*H265Ctx).SequenceFrame.Memory) { - avcc.ICodecCtx = old - break - } - newCtx := H265Ctx{ - Enhanced: enhanced, - } - newCtx.SequenceFrame.CopyFrom(&avcc.Memory) - newCtx.SequenceFrame.BaseSample = &BaseSample{} - newCtx.H265Ctx, err = codec.NewH265CtxFromRecord(newCtx.SequenceFrame.Buffers[0][reader.Offset():]) - if err == nil { - avcc.ICodecCtx = newCtx - } else { - return - } - case codec.FourCC_AV1: - var newCtx AV1Ctx - if err = newCtx.Unmarshal(&reader); err == nil { - avcc.ICodecCtx = &newCtx - } else { - return - } - } - return ErrSkip - } - if enhanced { - reader.Read(fourCC[:]) - switch packetType { - case PacketTypeSequenceStart: - err = parseSequence() - return - case PacketTypeCodedFrames: - switch old.(type) { - case *H265Ctx: - var cts uint32 - if cts, err = reader.ReadBE(3); err != nil { - return err - } - avcc.CTS = time.Duration(cts) * time.Millisecond - // avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) - case *AV1Ctx: - // return avcc.parseAV1(reader) - } - case PacketTypeCodedFramesX: - // avcc.filterH265(int(old.(*H265Ctx).RecordInfo.LengthSizeMinusOne) + 1) - } - } else { - b0, err = reader.ReadByte() //sequence frame flag - if err != nil { - return - } - if codecId == CodecID_H265 { - fourCC = codec.FourCC_H265 - } else { - fourCC = codec.FourCC_H264 - } - var cts uint32 - cts, err = reader.ReadBE(3) - if err != nil { - return - } - avcc.CTS = time.Duration(cts) * time.Millisecond - if b0 == 0 { - if err = parseSequence(); err != nil { - return - } - } else { - // switch ctx := old.(type) { - // case *codec.H264Ctx: - // avcc.filterH264(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) - // case *H265Ctx: - // avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) - // } - // if avcc.Size <= 5 { - // return old, ErrSkip - // } - } - } - return + old := avcc.ICodecCtx + if avcc.Size <= 10 { + err = io.ErrShortBuffer + return + } + reader := avcc.NewReader() + var b0 byte + b0, err = reader.ReadByte() + if err != nil { + return + } + enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf + avcc.IDR = b0&0b0111_0000>>4 == 1 + packetType := b0 & 0b1111 + codecId := VideoCodecID(b0 & 0x0F) + var fourCC codec.FourCC + parseSequence := func() (err error) { + avcc.IDR = false + switch fourCC { + case codec.FourCC_H264: + if old != nil && avcc.Memory.Equal(&old.(*H264Ctx).SequenceFrame.Memory) { + avcc.ICodecCtx = old + break + } + newCtx := &H264Ctx{} + newCtx.SequenceFrame.CopyFrom(&avcc.Memory) + newCtx.SequenceFrame.BaseSample = &BaseSample{} + newCtx.H264Ctx, err = codec.NewH264CtxFromRecord(newCtx.SequenceFrame.Buffers[0][reader.Offset():]) + if err == nil { + avcc.ICodecCtx = newCtx + } else { + return + } + case codec.FourCC_H265: + if old != nil && avcc.Memory.Equal(&old.(*H265Ctx).SequenceFrame.Memory) { + avcc.ICodecCtx = old + break + } + newCtx := H265Ctx{ + Enhanced: enhanced, + } + newCtx.SequenceFrame.CopyFrom(&avcc.Memory) + newCtx.SequenceFrame.BaseSample = &BaseSample{} + newCtx.H265Ctx, err = codec.NewH265CtxFromRecord(newCtx.SequenceFrame.Buffers[0][reader.Offset():]) + if err == nil { + avcc.ICodecCtx = newCtx + } else { + return + } + case codec.FourCC_AV1: + var newCtx AV1Ctx + if err = newCtx.Unmarshal(&reader); err == nil { + avcc.ICodecCtx = &newCtx + } else { + return + } + } + return ErrSkip + } + if enhanced { + reader.Read(fourCC[:]) + switch packetType { + case PacketTypeSequenceStart: + err = parseSequence() + return + case PacketTypeCodedFrames: + switch old.(type) { + case *H265Ctx: + var cts uint32 + if cts, err = reader.ReadBE(3); err != nil { + return err + } + avcc.CTS = time.Duration(cts) * time.Millisecond + // avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) + case *AV1Ctx: + // return avcc.parseAV1(reader) + } + case PacketTypeCodedFramesX: + // avcc.filterH265(int(old.(*H265Ctx).RecordInfo.LengthSizeMinusOne) + 1) + } + } else { + b0, err = reader.ReadByte() //sequence frame flag + if err != nil { + return + } + if codecId == CodecID_H265 { + fourCC = codec.FourCC_H265 + } else { + fourCC = codec.FourCC_H264 + } + var cts uint32 + cts, err = reader.ReadBE(3) + if err != nil { + return + } + avcc.CTS = time.Duration(cts) * time.Millisecond + if b0 == 0 { + if err = parseSequence(); err != nil { + return + } + } else { + // switch ctx := old.(type) { + // case *codec.H264Ctx: + // avcc.filterH264(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) + // case *H265Ctx: + // avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) + // } + // if avcc.Size <= 5 { + // return old, ErrSkip + // } + } + } + return } func (avcc *VideoFrame) parseH264(ctx *H264Ctx, reader *gomem.MemoryReader) (err error) { - return avcc.ParseAVCC(reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1) + return avcc.ParseAVCC(reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1) } func (avcc *VideoFrame) parseH265(ctx *H265Ctx, reader *gomem.MemoryReader) (err error) { - return avcc.ParseAVCC(reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1) + return avcc.ParseAVCC(reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1) } func (avcc *VideoFrame) parseAV1(reader *gomem.MemoryReader) error { - var obus OBUs - if err := obus.ParseAVCC(reader); err != nil { - return err - } - avcc.Raw = &obus - return nil + obus := avcc.GetOBUs() + if err := obus.ParseAVCC(reader); err != nil { + return err + } + return nil } func (avcc *VideoFrame) Demux() error { - reader := avcc.NewReader() - b0, err := reader.ReadByte() - if err != nil { - return err - } + reader := avcc.NewReader() + b0, err := reader.ReadByte() + if err != nil { + return err + } - enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf - // frameType := b0 & 0b0111_0000 >> 4 - packetType := b0 & 0b1111 + enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf + // frameType := b0 & 0b0111_0000 >> 4 + packetType := b0 & 0b1111 - if enhanced { - err = reader.Skip(4) // fourcc - if err != nil { - return err - } - switch packetType { - case PacketTypeSequenceStart: - // see Parse() - return nil - case PacketTypeCodedFrames: - switch ctx := avcc.ICodecCtx.(type) { - case *H265Ctx: - var cts uint32 - if cts, err = reader.ReadBE(3); err != nil { - return err - } - avcc.CTS = time.Duration(cts) * time.Millisecond - err = avcc.parseH265(ctx, &reader) - case *AV1Ctx: - err = avcc.parseAV1(&reader) - } - case PacketTypeCodedFramesX: // no cts - err = avcc.parseH265(avcc.ICodecCtx.(*H265Ctx), &reader) - } - return err - } else { - b0, err = reader.ReadByte() //sequence frame flag - if err != nil { - return err - } - var cts uint32 - if cts, err = reader.ReadBE(3); err != nil { - return err - } - avcc.SetCTS32(cts) - switch ctx := avcc.ICodecCtx.(type) { - case *H265Ctx: - if b0 == 0 { - // nalus.Append(ctx.VPS()) - // nalus.Append(ctx.SPS()) - // nalus.Append(ctx.PPS()) - } else { - err = avcc.parseH265(ctx, &reader) - return err - } + if enhanced { + err = reader.Skip(4) // fourcc + if err != nil { + return err + } + switch packetType { + case PacketTypeSequenceStart: + // see Parse() + return nil + case PacketTypeCodedFrames: + switch ctx := avcc.ICodecCtx.(type) { + case *H265Ctx: + var cts uint32 + if cts, err = reader.ReadBE(3); err != nil { + return err + } + avcc.CTS = time.Duration(cts) * time.Millisecond + err = avcc.parseH265(ctx, &reader) + case *AV1Ctx: + err = avcc.parseAV1(&reader) + } + case PacketTypeCodedFramesX: // no cts + err = avcc.parseH265(avcc.ICodecCtx.(*H265Ctx), &reader) + } + return err + } else { + b0, err = reader.ReadByte() //sequence frame flag + if err != nil { + return err + } + var cts uint32 + if cts, err = reader.ReadBE(3); err != nil { + return err + } + avcc.SetCTS32(cts) + switch ctx := avcc.ICodecCtx.(type) { + case *H265Ctx: + if b0 == 0 { + // nalus.Append(ctx.VPS()) + // nalus.Append(ctx.SPS()) + // nalus.Append(ctx.PPS()) + } else { + err = avcc.parseH265(ctx, &reader) + return err + } - case *H264Ctx: - if b0 == 0 { - // nalus.Append(ctx.SPS()) - // nalus.Append(ctx.PPS()) - } else { - err = avcc.parseH264(ctx, &reader) - return err - } - } - return err - } + case *H264Ctx: + if b0 == 0 { + // nalus.Append(ctx.SPS()) + // nalus.Append(ctx.PPS()) + } else { + err = avcc.parseH264(ctx, &reader) + return err + } + } + return err + } } func (avcc *VideoFrame) muxOld26x(codecID VideoCodecID, fromBase *Sample) { - nalus := fromBase.GetNalus() - avcc.InitRecycleIndexes(nalus.Count()) // Recycle partial data - head := avcc.NextN(5) - head[0] = util.Conditional[byte](fromBase.IDR, 0x10, 0x20) | byte(codecID) - head[1] = 1 - util.PutBE(head[2:5], fromBase.CTS/time.Millisecond) // cts - for nalu := range nalus.RangePoint { - naluLenM := avcc.NextN(4) - naluLen := uint32(nalu.Size) - binary.BigEndian.PutUint32(naluLenM, naluLen) - // if nalu.Size != len(util.ConcatBuffers(nalu.Buffers)) { - // panic("nalu size mismatch") - // } - avcc.Push(nalu.Buffers...) - } + nalus := fromBase.GetNalus() + avcc.InitRecycleIndexes(nalus.Count()) // Recycle partial data + head := avcc.NextN(5) + head[0] = util.Conditional[byte](fromBase.IDR, 0x10, 0x20) | byte(codecID) + head[1] = 1 + util.PutBE(head[2:5], fromBase.CTS/time.Millisecond) // cts + for nalu := range nalus.RangePoint { + naluLenM := avcc.NextN(4) + naluLen := uint32(nalu.Size) + binary.BigEndian.PutUint32(naluLenM, naluLen) + // if nalu.Size != len(util.ConcatBuffers(nalu.Buffers)) { + // panic("nalu size mismatch") + // } + avcc.Push(nalu.Buffers...) + } } func (avcc *VideoFrame) Mux(fromBase *Sample) (err error) { - switch c := fromBase.GetBase().(type) { - case *AV1Ctx: - panic(c) - case *codec.H264Ctx: - if avcc.ICodecCtx == nil { - ctx := &H264Ctx{H264Ctx: c} - ctx.SequenceFrame.PushOne(append([]byte{0x17, 0, 0, 0, 0}, c.Record...)) - ctx.SequenceFrame.BaseSample = &BaseSample{} - avcc.ICodecCtx = ctx - } - avcc.muxOld26x(CodecID_H264, fromBase) - case *codec.H265Ctx: - if true { - if avcc.ICodecCtx == nil { - ctx := &H265Ctx{H265Ctx: c, Enhanced: true} - b := make(util.Buffer, len(ctx.Record)+5) - if ctx.Enhanced { - b[0] = 0b1001_0000 | byte(PacketTypeSequenceStart) - copy(b[1:], codec.FourCC_H265[:]) - } else { - b[0], b[1], b[2], b[3], b[4] = 0x1C, 0, 0, 0, 0 - } - copy(b[5:], ctx.Record) - ctx.SequenceFrame.PushOne(b) - ctx.SequenceFrame.BaseSample = &BaseSample{} - avcc.ICodecCtx = ctx - } - nalus := fromBase.Raw.(*Nalus) - avcc.InitRecycleIndexes(nalus.Count()) // Recycle partial data - head := avcc.NextN(8) - if fromBase.IDR { - head[0] = 0b1001_0000 | byte(PacketTypeCodedFrames) - } else { - head[0] = 0b1010_0000 | byte(PacketTypeCodedFrames) - } - copy(head[1:], codec.FourCC_H265[:]) - util.PutBE(head[5:8], fromBase.CTS/time.Millisecond) // cts - for nalu := range nalus.RangePoint { - naluLenM := avcc.NextN(4) - naluLen := uint32(nalu.Size) - binary.BigEndian.PutUint32(naluLenM, naluLen) - avcc.Push(nalu.Buffers...) - } - } else { - avcc.muxOld26x(CodecID_H265, fromBase) - } - } - return + switch c := fromBase.GetBase().(type) { + case *codec.AV1Ctx: + if avcc.ICodecCtx == nil { + ctx := &AV1Ctx{AV1Ctx: c} + configBytes := make([]byte, 5+len(c.ConfigOBUs)) + configBytes[0] = 0b1001_0000 | byte(PacketTypeSequenceStart) + copy(configBytes[1:], codec.FourCC_AV1[:]) + copy(configBytes[5:], c.ConfigOBUs) + ctx.SequenceFrame.PushOne(configBytes) + ctx.SequenceFrame.BaseSample = &BaseSample{} + avcc.ICodecCtx = ctx + } + obus := fromBase.Raw.(*OBUs) + avcc.InitRecycleIndexes(obus.Count()) + head := avcc.NextN(5) + if fromBase.IDR { + head[0] = 0b1001_0000 | byte(PacketTypeCodedFrames) + } else { + head[0] = 0b1010_0000 | byte(PacketTypeCodedFrames) + } + copy(head[1:], codec.FourCC_AV1[:]) + for obu := range obus.RangePoint { + avcc.Push(obu.Buffers...) + } + case *codec.H264Ctx: + if avcc.ICodecCtx == nil { + ctx := &H264Ctx{H264Ctx: c} + ctx.SequenceFrame.PushOne(append([]byte{0x17, 0, 0, 0, 0}, c.Record...)) + ctx.SequenceFrame.BaseSample = &BaseSample{} + avcc.ICodecCtx = ctx + } + avcc.muxOld26x(CodecID_H264, fromBase) + case *codec.H265Ctx: + if true { + if avcc.ICodecCtx == nil { + ctx := &H265Ctx{H265Ctx: c, Enhanced: true} + b := make(util.Buffer, len(ctx.Record)+5) + if ctx.Enhanced { + b[0] = 0b1001_0000 | byte(PacketTypeSequenceStart) + copy(b[1:], codec.FourCC_H265[:]) + } else { + b[0], b[1], b[2], b[3], b[4] = 0x1C, 0, 0, 0, 0 + } + copy(b[5:], ctx.Record) + ctx.SequenceFrame.PushOne(b) + ctx.SequenceFrame.BaseSample = &BaseSample{} + avcc.ICodecCtx = ctx + } + nalus := fromBase.Raw.(*Nalus) + avcc.InitRecycleIndexes(nalus.Count()) // Recycle partial data + head := avcc.NextN(8) + if fromBase.IDR { + head[0] = 0b1001_0000 | byte(PacketTypeCodedFrames) + } else { + head[0] = 0b1010_0000 | byte(PacketTypeCodedFrames) + } + copy(head[1:], codec.FourCC_H265[:]) + util.PutBE(head[5:8], fromBase.CTS/time.Millisecond) // cts + for nalu := range nalus.RangePoint { + naluLenM := avcc.NextN(4) + naluLen := uint32(nalu.Size) + binary.BigEndian.PutUint32(naluLenM, naluLen) + avcc.Push(nalu.Buffers...) + } + } else { + avcc.muxOld26x(CodecID_H265, fromBase) + } + } + return } diff --git a/plugin/rtp/pkg/video.go b/plugin/rtp/pkg/video.go index 46d9ce5..03f8bca 100644 --- a/plugin/rtp/pkg/video.go +++ b/plugin/rtp/pkg/video.go @@ -1,476 +1,501 @@ package rtp import ( - "bytes" - "encoding/base64" - "fmt" - "io" - "slices" - "time" - "unsafe" + "bytes" + "encoding/base64" + "fmt" + "io" + "slices" + "time" + "unsafe" - "github.com/deepch/vdk/codec/h264parser" - "github.com/deepch/vdk/codec/h265parser" - "github.com/langhuihui/gomem" + "github.com/deepch/vdk/codec/h264parser" + "github.com/deepch/vdk/codec/h265parser" + "github.com/langhuihui/gomem" - "github.com/pion/rtp" - "github.com/pion/webrtc/v4" - . "m7s.live/v5/pkg" - "m7s.live/v5/pkg/codec" - "m7s.live/v5/pkg/util" + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" + . "m7s.live/v5/pkg" + "m7s.live/v5/pkg/codec" + "m7s.live/v5/pkg/util" ) type ( - H26xCtx struct { - RTPCtx - seq uint16 - dtsEst util.DTSEstimator - } - H264Ctx struct { - H26xCtx - *codec.H264Ctx - } - H265Ctx struct { - H26xCtx - *codec.H265Ctx - DONL bool - } - AV1Ctx struct { - RTPCtx - *codec.AV1Ctx - } - VP9Ctx struct { - RTPCtx - } - VideoFrame struct { - RTPData - } + H26xCtx struct { + RTPCtx + seq uint16 + dtsEst util.DTSEstimator + } + H264Ctx struct { + H26xCtx + *codec.H264Ctx + } + H265Ctx struct { + H26xCtx + *codec.H265Ctx + DONL bool + } + AV1Ctx struct { + RTPCtx + *codec.AV1Ctx + } + VP9Ctx struct { + RTPCtx + } + VideoFrame struct { + RTPData + } ) var ( - _ IAVFrame = (*VideoFrame)(nil) - _ IVideoCodecCtx = (*H264Ctx)(nil) - _ IVideoCodecCtx = (*H265Ctx)(nil) - _ IVideoCodecCtx = (*AV1Ctx)(nil) + _ IAVFrame = (*VideoFrame)(nil) + _ IVideoCodecCtx = (*H264Ctx)(nil) + _ IVideoCodecCtx = (*H265Ctx)(nil) + _ IVideoCodecCtx = (*AV1Ctx)(nil) ) const ( - H265_NALU_AP = h265parser.NAL_UNIT_UNSPECIFIED_48 - H265_NALU_FU = h265parser.NAL_UNIT_UNSPECIFIED_49 - startBit = 1 << 7 - endBit = 1 << 6 - MTUSize = 1460 - ReceiveMTU = 1500 + H265_NALU_AP = h265parser.NAL_UNIT_UNSPECIFIED_48 + H265_NALU_FU = h265parser.NAL_UNIT_UNSPECIFIED_49 + startBit = 1 << 7 + endBit = 1 << 6 + MTUSize = 1460 + ReceiveMTU = 1500 ) func (r *VideoFrame) Recycle() { - r.RecyclableMemory.Recycle() - r.Packets.Reset() + r.RecyclableMemory.Recycle() + r.Packets.Reset() } func (r *VideoFrame) CheckCodecChange() (err error) { - if len(r.Packets) == 0 { - return ErrSkip - } - old := r.ICodecCtx - // 解复用数据 - if err = r.Demux(); err != nil { - return - } - // 处理时间戳和序列号 - pts := r.Packets[0].Timestamp - nalus := r.Raw.(*Nalus) - switch ctx := old.(type) { - case *H264Ctx: - dts := ctx.dtsEst.Feed(pts) - r.SetDTS(time.Duration(dts)) - r.SetPTS(time.Duration(pts)) + if len(r.Packets) == 0 { + return ErrSkip + } + old := r.ICodecCtx + // 解复用数据 + if err = r.Demux(); err != nil { + return + } + // 处理时间戳和序列号 + pts := r.Packets[0].Timestamp + switch ctx := old.(type) { + case *H264Ctx: + nalus := r.Raw.(*Nalus) + dts := ctx.dtsEst.Feed(pts) + r.SetDTS(time.Duration(dts)) + r.SetPTS(time.Duration(pts)) - // 检查 SPS、PPS 和 IDR 帧 - var sps, pps []byte - var hasSPSPPS bool - for nalu := range nalus.RangePoint { - nalType := codec.ParseH264NALUType(nalu.Buffers[0][0]) - switch nalType { - case h264parser.NALU_SPS: - sps = nalu.ToBytes() - defer nalus.Remove(nalu) - case h264parser.NALU_PPS: - pps = nalu.ToBytes() - defer nalus.Remove(nalu) - case codec.NALU_IDR_Picture: - r.IDR = true - } - } + // 检查 SPS、PPS 和 IDR 帧 + var sps, pps []byte + var hasSPSPPS bool + for nalu := range nalus.RangePoint { + nalType := codec.ParseH264NALUType(nalu.Buffers[0][0]) + switch nalType { + case h264parser.NALU_SPS: + sps = nalu.ToBytes() + defer nalus.Remove(nalu) + case h264parser.NALU_PPS: + pps = nalu.ToBytes() + defer nalus.Remove(nalu) + case codec.NALU_IDR_Picture: + r.IDR = true + } + } - // 如果发现新的 SPS/PPS,更新编解码器上下文 - if hasSPSPPS = sps != nil && pps != nil; hasSPSPPS && (len(ctx.Record) == 0 || !bytes.Equal(sps, ctx.SPS()) || !bytes.Equal(pps, ctx.PPS())) { - var newCodecData h264parser.CodecData - if newCodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil { - return - } - newCtx := &H264Ctx{ - H26xCtx: ctx.H26xCtx, - H264Ctx: &codec.H264Ctx{ - CodecData: newCodecData, - }, - } - // 保持原有的 RTP 参数 - if oldCtx, ok := old.(*H264Ctx); ok { - newCtx.RTPCtx = oldCtx.RTPCtx - } - r.ICodecCtx = newCtx - } else { - // 如果是 IDR 帧但没有 SPS/PPS,需要插入 - if r.IDR && len(ctx.SPS()) > 0 && len(ctx.PPS()) > 0 { - spsRTP := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SequenceNumber: ctx.SequenceNumber, - Timestamp: pts, - SSRC: ctx.SSRC, - PayloadType: uint8(ctx.PayloadType), - }, - Payload: ctx.SPS(), - } - ppsRTP := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SequenceNumber: ctx.SequenceNumber, - Timestamp: pts, - SSRC: ctx.SSRC, - PayloadType: uint8(ctx.PayloadType), - }, - Payload: ctx.PPS(), - } - r.Packets = slices.Insert(r.Packets, 0, spsRTP, ppsRTP) - } - } + // 如果发现新的 SPS/PPS,更新编解码器上下文 + if hasSPSPPS = sps != nil && pps != nil; hasSPSPPS && (len(ctx.Record) == 0 || !bytes.Equal(sps, ctx.SPS()) || !bytes.Equal(pps, ctx.PPS())) { + var newCodecData h264parser.CodecData + if newCodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil { + return + } + newCtx := &H264Ctx{ + H26xCtx: ctx.H26xCtx, + H264Ctx: &codec.H264Ctx{ + CodecData: newCodecData, + }, + } + // 保持原有的 RTP 参数 + if oldCtx, ok := old.(*H264Ctx); ok { + newCtx.RTPCtx = oldCtx.RTPCtx + } + r.ICodecCtx = newCtx + } else { + // 如果是 IDR 帧但没有 SPS/PPS,需要插入 + if r.IDR && len(ctx.SPS()) > 0 && len(ctx.PPS()) > 0 { + spsRTP := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + SequenceNumber: ctx.SequenceNumber, + Timestamp: pts, + SSRC: ctx.SSRC, + PayloadType: uint8(ctx.PayloadType), + }, + Payload: ctx.SPS(), + } + ppsRTP := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + SequenceNumber: ctx.SequenceNumber, + Timestamp: pts, + SSRC: ctx.SSRC, + PayloadType: uint8(ctx.PayloadType), + }, + Payload: ctx.PPS(), + } + r.Packets = slices.Insert(r.Packets, 0, spsRTP, ppsRTP) + } + } - // 更新序列号 - for p := range r.Packets.RangePoint { - p.SequenceNumber = ctx.seq - ctx.seq++ - } - case *H265Ctx: - dts := ctx.dtsEst.Feed(pts) - r.SetDTS(time.Duration(dts)) - r.SetPTS(time.Duration(pts)) - // 检查 VPS、SPS、PPS 和 IDR 帧 - var vps, sps, pps []byte - var hasVPSSPSPPS bool - for nalu := range nalus.RangePoint { - switch codec.ParseH265NALUType(nalu.Buffers[0][0]) { - case h265parser.NAL_UNIT_VPS: - vps = nalu.ToBytes() - defer nalus.Remove(nalu) - case h265parser.NAL_UNIT_SPS: - sps = nalu.ToBytes() - defer nalus.Remove(nalu) - case h265parser.NAL_UNIT_PPS: - pps = nalu.ToBytes() - defer nalus.Remove(nalu) - case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP, - h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL, - h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP, - h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL, - h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP, - h265parser.NAL_UNIT_CODED_SLICE_CRA: - r.IDR = true - } - } - - // 如果发现新的 VPS/SPS/PPS,更新编解码器上下文 - if hasVPSSPSPPS = vps != nil && sps != nil && pps != nil; hasVPSSPSPPS && (len(ctx.Record) == 0 || !bytes.Equal(vps, ctx.VPS()) || !bytes.Equal(sps, ctx.SPS()) || !bytes.Equal(pps, ctx.PPS())) { - var newCodecData h265parser.CodecData - if newCodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(vps, sps, pps); err != nil { - return - } - newCtx := &H265Ctx{ - H26xCtx: ctx.H26xCtx, - H265Ctx: &codec.H265Ctx{ - CodecData: newCodecData, - }, - } - // 保持原有的 RTP 参数 - if oldCtx, ok := old.(*H265Ctx); ok { - newCtx.RTPCtx = oldCtx.RTPCtx - } - r.ICodecCtx = newCtx - } else { - // 如果是 IDR 帧但没有 VPS/SPS/PPS,需要插入 - if r.IDR && len(ctx.VPS()) > 0 && len(ctx.SPS()) > 0 && len(ctx.PPS()) > 0 { - vpsRTP := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SequenceNumber: ctx.SequenceNumber, - Timestamp: pts, - SSRC: ctx.SSRC, - PayloadType: uint8(ctx.PayloadType), - }, - Payload: ctx.VPS(), - } - spsRTP := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SequenceNumber: ctx.SequenceNumber, - Timestamp: pts, - SSRC: ctx.SSRC, - PayloadType: uint8(ctx.PayloadType), - }, - Payload: ctx.SPS(), - } - ppsRTP := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SequenceNumber: ctx.SequenceNumber, - Timestamp: pts, - SSRC: ctx.SSRC, - PayloadType: uint8(ctx.PayloadType), - }, - Payload: ctx.PPS(), - } - r.Packets = slices.Insert(r.Packets, 0, vpsRTP, spsRTP, ppsRTP) - } - } - - // 更新序列号 - for p := range r.Packets.RangePoint { - p.SequenceNumber = ctx.seq - ctx.seq++ - } - } - return + // 更新序列号 + for p := range r.Packets.RangePoint { + p.SequenceNumber = ctx.seq + ctx.seq++ + } + case *H265Ctx: + nalus := r.Raw.(*Nalus) + dts := ctx.dtsEst.Feed(pts) + r.SetDTS(time.Duration(dts)) + r.SetPTS(time.Duration(pts)) + var vps, sps, pps []byte + var hasVPSSPSPPS bool + for nalu := range nalus.RangePoint { + switch codec.ParseH265NALUType(nalu.Buffers[0][0]) { + case h265parser.NAL_UNIT_VPS: + vps = nalu.ToBytes() + defer nalus.Remove(nalu) + case h265parser.NAL_UNIT_SPS: + sps = nalu.ToBytes() + defer nalus.Remove(nalu) + case h265parser.NAL_UNIT_PPS: + pps = nalu.ToBytes() + defer nalus.Remove(nalu) + case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP, + h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL, + h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP, + h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL, + h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP, + h265parser.NAL_UNIT_CODED_SLICE_CRA: + r.IDR = true + } + } + if hasVPSSPSPPS = vps != nil && sps != nil && pps != nil; hasVPSSPSPPS && (len(ctx.Record) == 0 || !bytes.Equal(vps, ctx.VPS()) || !bytes.Equal(sps, ctx.SPS()) || !bytes.Equal(pps, ctx.PPS())) { + var newCodecData h265parser.CodecData + if newCodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(vps, sps, pps); err != nil { + return + } + newCtx := &H265Ctx{ + H26xCtx: ctx.H26xCtx, + H265Ctx: &codec.H265Ctx{ + CodecData: newCodecData, + }, + } + if oldCtx, ok := old.(*H265Ctx); ok { + newCtx.RTPCtx = oldCtx.RTPCtx + } + r.ICodecCtx = newCtx + } else { + if r.IDR && len(ctx.VPS()) > 0 && len(ctx.SPS()) > 0 && len(ctx.PPS()) > 0 { + vpsRTP := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + SequenceNumber: ctx.SequenceNumber, + Timestamp: pts, + SSRC: ctx.SSRC, + PayloadType: uint8(ctx.PayloadType), + }, + Payload: ctx.VPS(), + } + spsRTP := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + SequenceNumber: ctx.SequenceNumber, + Timestamp: pts, + SSRC: ctx.SSRC, + PayloadType: uint8(ctx.PayloadType), + }, + Payload: ctx.SPS(), + } + ppsRTP := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + SequenceNumber: ctx.SequenceNumber, + Timestamp: pts, + SSRC: ctx.SSRC, + PayloadType: uint8(ctx.PayloadType), + }, + Payload: ctx.PPS(), + } + r.Packets = slices.Insert(r.Packets, 0, vpsRTP, spsRTP, ppsRTP) + } + } + for p := range r.Packets.RangePoint { + p.SequenceNumber = ctx.seq + ctx.seq++ + } + case *AV1Ctx: + r.SetPTS(time.Duration(pts)) + r.SetDTS(time.Duration(pts)) + } + return } func (h264 *H264Ctx) GetInfo() string { - return h264.SDPFmtpLine + return h264.SDPFmtpLine } func (h265 *H265Ctx) GetInfo() string { - return h265.SDPFmtpLine + return h265.SDPFmtpLine } func (av1 *AV1Ctx) GetInfo() string { - return av1.SDPFmtpLine + return av1.SDPFmtpLine } func (r *VideoFrame) Mux(baseFrame *Sample) error { - // 获取编解码器上下文 - codecCtx := r.ICodecCtx - if codecCtx == nil { - switch base := baseFrame.GetBase().(type) { - case *codec.H264Ctx: - var ctx H264Ctx - ctx.H264Ctx = base - ctx.PayloadType = 96 - ctx.MimeType = webrtc.MimeTypeH264 - ctx.ClockRate = 90000 - spsInfo := ctx.SPSInfo - ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc) - ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) - codecCtx = &ctx - case *codec.H265Ctx: - var ctx H265Ctx - ctx.H265Ctx = base - ctx.PayloadType = 98 - ctx.MimeType = webrtc.MimeTypeH265 - ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), base64.StdEncoding.EncodeToString(ctx.VPS())) - ctx.ClockRate = 90000 - ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) - codecCtx = &ctx - } - r.ICodecCtx = codecCtx - } - // 获取时间戳信息 - pts := uint32(baseFrame.GetPTS()) + // 获取编解码器上下文 + codecCtx := r.ICodecCtx + if codecCtx == nil { + switch base := baseFrame.GetBase().(type) { + case *codec.H264Ctx: + var ctx H264Ctx + ctx.H264Ctx = base + ctx.PayloadType = 96 + ctx.MimeType = webrtc.MimeTypeH264 + ctx.ClockRate = 90000 + spsInfo := ctx.SPSInfo + ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc) + ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) + codecCtx = &ctx + case *codec.H265Ctx: + var ctx H265Ctx + ctx.H265Ctx = base + ctx.PayloadType = 98 + ctx.MimeType = webrtc.MimeTypeH265 + ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), base64.StdEncoding.EncodeToString(ctx.VPS())) + ctx.ClockRate = 90000 + ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) + codecCtx = &ctx + case *codec.AV1Ctx: + var ctx AV1Ctx + ctx.AV1Ctx = base + ctx.PayloadType = 99 + ctx.MimeType = webrtc.MimeTypeAV1 + ctx.ClockRate = 90000 + ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) + codecCtx = &ctx + } + r.ICodecCtx = codecCtx + } + // 获取时间戳信息 + pts := uint32(baseFrame.GetPTS()) - switch c := codecCtx.(type) { - case *H264Ctx: - ctx := &c.RTPCtx - var lastPacket *rtp.Packet - if baseFrame.IDR && len(c.RecordInfo.SPS) > 0 && len(c.RecordInfo.PPS) > 0 { - r.Append(ctx, pts, c.SPS()) - r.Append(ctx, pts, c.PPS()) - } - for nalu := range baseFrame.Raw.(*Nalus).RangePoint { - if reader := nalu.NewReader(); reader.Length > MTUSize { - payloadLen := MTUSize - if reader.Length+1 < payloadLen { - payloadLen = reader.Length + 1 - } - //fu-a - mem := r.NextN(payloadLen) - reader.Read(mem[1:]) - fuaHead, naluType := codec.NALU_FUA.Or(mem[1]&0x60), mem[1]&0x1f - mem[0], mem[1] = fuaHead, naluType|startBit - lastPacket = r.Append(ctx, pts, mem) - for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) { - if reader.Length+2 < payloadLen { - payloadLen = reader.Length + 2 - } - mem = r.NextN(payloadLen) - reader.Read(mem[2:]) - mem[0], mem[1] = fuaHead, naluType - } - lastPacket.Payload[1] |= endBit - } else { - mem := r.NextN(reader.Length) - reader.Read(mem) - lastPacket = r.Append(ctx, pts, mem) - } - } - lastPacket.Header.Marker = true - case *H265Ctx: - ctx := &c.RTPCtx - var lastPacket *rtp.Packet - if baseFrame.IDR && len(c.RecordInfo.SPS) > 0 && len(c.RecordInfo.PPS) > 0 && len(c.RecordInfo.VPS) > 0 { - r.Append(ctx, pts, c.VPS()) - r.Append(ctx, pts, c.SPS()) - r.Append(ctx, pts, c.PPS()) - } - for nalu := range baseFrame.Raw.(*Nalus).RangePoint { - if reader := nalu.NewReader(); reader.Length > MTUSize { - var b0, b1 byte - _ = reader.ReadByteTo(&b0, &b1) - //fu - naluType := byte(codec.ParseH265NALUType(b0)) - b0 = (byte(H265_NALU_FU) << 1) | (b0 & 0b10000001) + switch c := codecCtx.(type) { + case *H264Ctx: + ctx := &c.RTPCtx + var lastPacket *rtp.Packet + if baseFrame.IDR && len(c.RecordInfo.SPS) > 0 && len(c.RecordInfo.PPS) > 0 { + r.Append(ctx, pts, c.SPS()) + r.Append(ctx, pts, c.PPS()) + } + for nalu := range baseFrame.Raw.(*Nalus).RangePoint { + if reader := nalu.NewReader(); reader.Length > MTUSize { + payloadLen := MTUSize + if reader.Length+1 < payloadLen { + payloadLen = reader.Length + 1 + } + //fu-a + mem := r.NextN(payloadLen) + reader.Read(mem[1:]) + fuaHead, naluType := codec.NALU_FUA.Or(mem[1]&0x60), mem[1]&0x1f + mem[0], mem[1] = fuaHead, naluType|startBit + lastPacket = r.Append(ctx, pts, mem) + for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) { + if reader.Length+2 < payloadLen { + payloadLen = reader.Length + 2 + } + mem = r.NextN(payloadLen) + reader.Read(mem[2:]) + mem[0], mem[1] = fuaHead, naluType + } + lastPacket.Payload[1] |= endBit + } else { + mem := r.NextN(reader.Length) + reader.Read(mem) + lastPacket = r.Append(ctx, pts, mem) + } + } + lastPacket.Header.Marker = true + case *H265Ctx: + ctx := &c.RTPCtx + var lastPacket *rtp.Packet + if baseFrame.IDR && len(c.RecordInfo.SPS) > 0 && len(c.RecordInfo.PPS) > 0 && len(c.RecordInfo.VPS) > 0 { + r.Append(ctx, pts, c.VPS()) + r.Append(ctx, pts, c.SPS()) + r.Append(ctx, pts, c.PPS()) + } + for nalu := range baseFrame.Raw.(*Nalus).RangePoint { + if reader := nalu.NewReader(); reader.Length > MTUSize { + var b0, b1 byte + _ = reader.ReadByteTo(&b0, &b1) + //fu + naluType := byte(codec.ParseH265NALUType(b0)) + b0 = (byte(H265_NALU_FU) << 1) | (b0 & 0b10000001) - payloadLen := MTUSize - if reader.Length+3 < payloadLen { - payloadLen = reader.Length + 3 - } - mem := r.NextN(payloadLen) - reader.Read(mem[3:]) - mem[0], mem[1], mem[2] = b0, b1, naluType|startBit - lastPacket = r.Append(ctx, pts, mem) + payloadLen := MTUSize + if reader.Length+3 < payloadLen { + payloadLen = reader.Length + 3 + } + mem := r.NextN(payloadLen) + reader.Read(mem[3:]) + mem[0], mem[1], mem[2] = b0, b1, naluType|startBit + lastPacket = r.Append(ctx, pts, mem) - for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) { - if reader.Length+3 < payloadLen { - payloadLen = reader.Length + 3 - } - mem = r.NextN(payloadLen) - reader.Read(mem[3:]) - mem[0], mem[1], mem[2] = b0, b1, naluType - } - lastPacket.Payload[2] |= endBit - } else { - mem := r.NextN(reader.Length) - reader.Read(mem) - lastPacket = r.Append(ctx, pts, mem) - } - } - lastPacket.Header.Marker = true - } - return nil + for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) { + if reader.Length+3 < payloadLen { + payloadLen = reader.Length + 3 + } + mem = r.NextN(payloadLen) + reader.Read(mem[3:]) + mem[0], mem[1], mem[2] = b0, b1, naluType + } + lastPacket.Payload[2] |= endBit + } else { + mem := r.NextN(reader.Length) + reader.Read(mem) + lastPacket = r.Append(ctx, pts, mem) + } + } + lastPacket.Header.Marker = true + case *AV1Ctx: + ctx := &c.RTPCtx + var lastPacket *rtp.Packet + for obu := range baseFrame.Raw.(*OBUs).RangePoint { + mem := r.NextN(obu.Size) + obu.NewReader().Read(mem) + lastPacket = r.Append(ctx, pts, mem) + } + if lastPacket != nil { + lastPacket.Header.Marker = true + } + } + return nil } func (r *VideoFrame) Demux() (err error) { - if len(r.Packets) == 0 { - return ErrSkip - } - switch c := r.ICodecCtx.(type) { - case *H264Ctx: - nalus := r.GetNalus() - var nalu *gomem.Memory - var naluType codec.H264NALUType - for packet := range r.Packets.RangePoint { - if len(packet.Payload) < 2 { - continue - } - if packet.Padding { - packet.Padding = false - } - b0 := packet.Payload[0] - if t := codec.ParseH264NALUType(b0); t < 24 { - nalus.GetNextPointer().PushOne(packet.Payload) - } else { - offset := t.Offset() - switch t { - case codec.NALU_STAPA, codec.NALU_STAPB: - if len(packet.Payload) <= offset { - return fmt.Errorf("invalid nalu size %d", len(packet.Payload)) - } - for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); { - if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize { - nalus.GetNextPointer().PushOne(buffer.ReadN(nextSize)) - } else { - return fmt.Errorf("invalid nalu size %d", nextSize) - } - } - case codec.NALU_FUA, codec.NALU_FUB: - b1 := packet.Payload[1] - if util.Bit1(b1, 0) { - nalu = nalus.GetNextPointer() - naluType.Parse(b1) - nalu.PushOne([]byte{naluType.Or(b0 & 0x60)}) - } - if nalu != nil && nalu.Size > 0 { - nalu.PushOne(packet.Payload[offset:]) - if util.Bit1(b1, 1) { - nalu = nil - } - } else { - continue - } - default: - return fmt.Errorf("unsupported nalu type %d", t) - } - } - } - return nil - case *H265Ctx: - nalus := r.GetNalus() - var nalu *gomem.Memory - for _, packet := range r.Packets { - if len(packet.Payload) == 0 { - continue - } - b0 := packet.Payload[0] - if t := codec.ParseH265NALUType(b0); t < H265_NALU_AP { - nalus.GetNextPointer().PushOne(packet.Payload) - } else { - var buffer = util.Buffer(packet.Payload) - switch t { - case H265_NALU_AP: - buffer.ReadUint16() - if c.DONL { - buffer.ReadUint16() - } - for buffer.CanRead() { - nalus.GetNextPointer().PushOne(buffer.ReadN(int(buffer.ReadUint16()))) - } - if c.DONL { - buffer.ReadByte() - } - case H265_NALU_FU: - if buffer.Len() < 3 { - return io.ErrShortBuffer - } - first3 := buffer.ReadN(3) - fuHeader := first3[2] - if c.DONL { - buffer.ReadUint16() - } - if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) { - nalu = nalus.GetNextPointer() - nalu.PushOne([]byte{first3[0]&0b10000001 | (naluType << 1), first3[1]}) - } - if nalu != nil && nalu.Size > 0 { - nalu.PushOne(buffer) - if util.Bit1(fuHeader, 1) { - nalu = nil - } - } else { - continue - } - default: - return fmt.Errorf("unsupported nalu type %d", t) - } - } - } - return nil - } - return ErrUnsupportCodec + if len(r.Packets) == 0 { + return ErrSkip + } + switch c := r.ICodecCtx.(type) { + case *H264Ctx: + nalus := r.GetNalus() + var nalu *gomem.Memory + var naluType codec.H264NALUType + for packet := range r.Packets.RangePoint { + if len(packet.Payload) < 2 { + continue + } + if packet.Padding { + packet.Padding = false + } + b0 := packet.Payload[0] + if t := codec.ParseH264NALUType(b0); t < 24 { + nalus.GetNextPointer().PushOne(packet.Payload) + } else { + offset := t.Offset() + switch t { + case codec.NALU_STAPA, codec.NALU_STAPB: + if len(packet.Payload) <= offset { + return fmt.Errorf("invalid nalu size %d", len(packet.Payload)) + } + for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); { + if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize { + nalus.GetNextPointer().PushOne(buffer.ReadN(nextSize)) + } else { + return fmt.Errorf("invalid nalu size %d", nextSize) + } + } + case codec.NALU_FUA, codec.NALU_FUB: + b1 := packet.Payload[1] + if util.Bit1(b1, 0) { + nalu = nalus.GetNextPointer() + naluType.Parse(b1) + nalu.PushOne([]byte{naluType.Or(b0 & 0x60)}) + } + if nalu != nil && nalu.Size > 0 { + nalu.PushOne(packet.Payload[offset:]) + if util.Bit1(b1, 1) { + nalu = nil + } + } else { + continue + } + default: + return fmt.Errorf("unsupported nalu type %d", t) + } + } + } + return nil + case *H265Ctx: + nalus := r.GetNalus() + var nalu *gomem.Memory + for _, packet := range r.Packets { + if len(packet.Payload) == 0 { + continue + } + b0 := packet.Payload[0] + if t := codec.ParseH265NALUType(b0); t < H265_NALU_AP { + nalus.GetNextPointer().PushOne(packet.Payload) + } else { + var buffer = util.Buffer(packet.Payload) + switch t { + case H265_NALU_AP: + buffer.ReadUint16() + if c.DONL { + buffer.ReadUint16() + } + for buffer.CanRead() { + nalus.GetNextPointer().PushOne(buffer.ReadN(int(buffer.ReadUint16()))) + } + if c.DONL { + buffer.ReadByte() + } + case H265_NALU_FU: + if buffer.Len() < 3 { + return io.ErrShortBuffer + } + first3 := buffer.ReadN(3) + fuHeader := first3[2] + if c.DONL { + buffer.ReadUint16() + } + if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) { + nalu = nalus.GetNextPointer() + nalu.PushOne([]byte{first3[0]&0b10000001 | (naluType << 1), first3[1]}) + } + if nalu != nil && nalu.Size > 0 { + nalu.PushOne(buffer) + if util.Bit1(fuHeader, 1) { + nalu = nil + } + } else { + continue + } + default: + return fmt.Errorf("unsupported nalu type %d", t) + } + } + } + return nil + case *AV1Ctx: + obus := r.GetOBUs() + obus.Reset() + for _, packet := range r.Packets { + if len(packet.Payload) > 0 { + obus.GetNextPointer().PushOne(packet.Payload) + } + } + return nil + } + return ErrUnsupportCodec }