diff --git a/codec/h264.go b/codec/h264.go index 954809e..b66fae6 100644 --- a/codec/h264.go +++ b/codec/h264.go @@ -17,34 +17,65 @@ import ( // Macroblock layer1 -> mb_type + PCM Data // Macroblock layer2 -> mb_type + Sub_mb_pred or mb_pred + Residual Data // Residual Data -> +type H264NALUType byte + +func (b H264NALUType) Or(b2 byte) byte { + return byte(b) | b2 +} + +func (b H264NALUType) Offset() int { + switch b { + case NALU_STAPA: + return 1 + case NALU_STAPB: + return 3 + case NALU_FUA: + return 2 + case NALU_FUB: + return 4 + } + return 0 +} + +func (b H264NALUType) Byte() byte { + return byte(b) +} + +func (H264NALUType) Parse(b byte) H264NALUType { + return H264NALUType(b & 0x1F) +} + +func (H264NALUType) ParseBytes(bs []byte) H264NALUType { + return H264NALUType(bs[0] & 0x1F) +} const ( // NALU Type - NALU_Unspecified byte = iota - NALU_Non_IDR_Picture // 1 - NALU_Data_Partition_A // 2 - NALU_Data_Partition_B // 3 - NALU_Data_Partition_C // 4 - NALU_IDR_Picture // 5 - NALU_SEI // 6 - NALU_SPS // 7 - NALU_PPS // 8 - NALU_Access_Unit_Delimiter // 9 - NALU_Sequence_End // 10 - NALU_Stream_End // 11 - NALU_Filler_Data // 12 - NALU_SPS_Extension // 13 - NALU_Prefix // 14 - NALU_SPS_Subset // 15 - NALU_DPS // 16 - NALU_Reserved1 // 17 - NALU_Reserved2 // 18 - NALU_Not_Auxiliary_Coded // 19 - NALU_Coded_Slice_Extension // 20 - NALU_Reserved3 // 21 - NALU_Reserved4 // 22 - NALU_Reserved5 // 23 - NALU_STAPA // 24 + NALU_Unspecified H264NALUType = iota + NALU_Non_IDR_Picture // 1 + NALU_Data_Partition_A // 2 + NALU_Data_Partition_B // 3 + NALU_Data_Partition_C // 4 + NALU_IDR_Picture // 5 + NALU_SEI // 6 + NALU_SPS // 7 + NALU_PPS // 8 + NALU_Access_Unit_Delimiter // 9 + NALU_Sequence_End // 10 + NALU_Stream_End // 11 + NALU_Filler_Data // 12 + NALU_SPS_Extension // 13 + NALU_Prefix // 14 + NALU_SPS_Subset // 15 + NALU_DPS // 16 + NALU_Reserved1 // 17 + NALU_Reserved2 // 18 + NALU_Not_Auxiliary_Coded // 19 + NALU_Coded_Slice_Extension // 20 + NALU_Reserved3 // 21 + NALU_Reserved4 // 22 + NALU_Reserved5 // 23 + NALU_STAPA // 24 NALU_STAPB NALU_MTAP16 NALU_MTAP24 diff --git a/codec/h265.go b/codec/h265.go index d53ea30..caebc71 100644 --- a/codec/h265.go +++ b/codec/h265.go @@ -8,6 +8,12 @@ import ( "github.com/q191201771/naza/pkg/nazabits" ) +type H265NALUType byte + +func (H265NALUType) Parse(b byte) H265NALUType { + return H265NALUType(b & 0x7E >> 1) +} + const ( // HEVC_VPS = 0x40 // HEVC_SPS = 0x42 @@ -16,16 +22,16 @@ const ( // HEVC_IDR = 0x26 // HEVC_PSLICE = 0x02 - NAL_UNIT_CODED_SLICE_TRAIL_N byte = iota // 0 - NAL_UNIT_CODED_SLICE_TRAIL_R // 1 - NAL_UNIT_CODED_SLICE_TSA_N // 2 - NAL_UNIT_CODED_SLICE_TLA // 3 // Current name in the spec: TSA_R - NAL_UNIT_CODED_SLICE_STSA_N // 4 - NAL_UNIT_CODED_SLICE_STSA_R // 5 - NAL_UNIT_CODED_SLICE_RADL_N // 6 - NAL_UNIT_CODED_SLICE_DLP // 7 // Current name in the spec: RADL_R - NAL_UNIT_CODED_SLICE_RASL_N // 8 - NAL_UNIT_CODED_SLICE_TFD // 9 // Current name in the spec: RASL_R + NAL_UNIT_CODED_SLICE_TRAIL_N H265NALUType = iota // 0 + NAL_UNIT_CODED_SLICE_TRAIL_R // 1 + NAL_UNIT_CODED_SLICE_TSA_N // 2 + NAL_UNIT_CODED_SLICE_TLA // 3 // Current name in the spec: TSA_R + NAL_UNIT_CODED_SLICE_STSA_N // 4 + NAL_UNIT_CODED_SLICE_STSA_R // 5 + NAL_UNIT_CODED_SLICE_RADL_N // 6 + NAL_UNIT_CODED_SLICE_DLP // 7 // Current name in the spec: RADL_R + NAL_UNIT_CODED_SLICE_RASL_N // 8 + NAL_UNIT_CODED_SLICE_TFD // 9 // Current name in the spec: RASL_R NAL_UNIT_RESERVED_10 NAL_UNIT_RESERVED_11 NAL_UNIT_RESERVED_12 @@ -64,8 +70,8 @@ const ( NAL_UNIT_RESERVED_45 NAL_UNIT_RESERVED_46 NAL_UNIT_RESERVED_47 - NAL_UNIT_UNSPECIFIED_48 - NAL_UNIT_UNSPECIFIED_49 + NAL_UNIT_RTP_AP + NAL_UNIT_RTP_FU NAL_UNIT_UNSPECIFIED_50 NAL_UNIT_UNSPECIFIED_51 NAL_UNIT_UNSPECIFIED_52 diff --git a/codec/mpegts/mpegts_pes.go b/codec/mpegts/mpegts_pes.go index 12b8bf3..510babc 100644 --- a/codec/mpegts/mpegts_pes.go +++ b/codec/mpegts/mpegts_pes.go @@ -4,10 +4,11 @@ import ( "bytes" "errors" "fmt" - "github.com/Monibuca/engine/v4/util" - "github.com/Monibuca/engine/v4/codec" "io" "io/ioutil" + + "github.com/Monibuca/engine/v4/codec" + "github.com/Monibuca/engine/v4/util" ) // ios13818-1-CN.pdf 45/166 @@ -582,7 +583,7 @@ func CheckPESPacketIsKeyFrame(packet MpegTsPESPacket) bool { nalus := bytes.SplitN(packet.Payload, codec.NALU_Delimiter1, -1) for _, v := range nalus { - if v[0]&0x1f == codec.NALU_IDR_Picture { + if codec.H264NALUType.ParseBytes(codec.NALU_IDR_Picture, v) == codec.NALU_IDR_Picture { return true } } diff --git a/common/frame.go b/common/frame.go index 825613e..f341a41 100644 --- a/common/frame.go +++ b/common/frame.go @@ -6,6 +6,7 @@ import ( "github.com/Monibuca/engine/v4/codec" "github.com/pion/rtp" + "github.com/sirupsen/logrus" ) type NALUSlice net.Buffers @@ -29,11 +30,31 @@ type RawSlice interface { // func (nalu *H264NALU) Append(slice ...NALUSlice) { // *nalu = append(*nalu, slice...) // } -func (nalu NALUSlice) H264Type() byte { - return nalu[0][0] & 0x1F +func (nalu NALUSlice) H264Type() (naluType codec.H264NALUType) { + return naluType.Parse(nalu[0][0]) } -func (nalu NALUSlice) H265Type() byte { - return nalu[0][0] & 0x7E >> 1 +func (nalu NALUSlice) RefIdc() byte { + return nalu[0][0] & 0x60 +} +func (nalu NALUSlice) H265Type() (naluType codec.H265NALUType) { + return naluType.Parse(nalu[0][0]) +} +func (nalu NALUSlice) Bytes() (b []byte) { + for _, slice := range nalu { + b = append(b, slice...) + } + return +} + +func (nalu *NALUSlice) Reset() *NALUSlice{ + if len(*nalu) > 0 { + *nalu = (*nalu)[:0] + } + return nalu +} + +func (nalu *NALUSlice) Append(b ...[]byte) { + *nalu = append(*nalu, b...) } // func (nalu *H265NALU) Append(slice ...NALUSlice) { @@ -54,6 +75,31 @@ func (nalu NALUSlice) H265Type() byte { type AVCCFrame []byte // 一帧AVCC格式的数据 type AnnexBFrame []byte // 一帧AnnexB格式数据 +type RTPFrame struct { + rtp.Packet + Raw []byte // 序列化后的数据,避免反复序列化 +} + +func (rtp *RTPFrame) H264Type() (naluType codec.H264NALUType) { + return naluType.Parse(rtp.Payload[0]) +} +func (rtp *RTPFrame) H265Type() (naluType codec.H265NALUType) { + return naluType.Parse(rtp.Payload[0]) +} +func (rtp *RTPFrame) Marshal() *RTPFrame { + rtp.Raw, _ = rtp.Packet.Marshal() + return rtp +} + +func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame { + rtp.Raw = raw + if err := rtp.Packet.Unmarshal(raw); err != nil { + logrus.Errorln(err) + return nil + } + return rtp +} + type BaseFrame struct { DeltaTime uint32 // 相对上一帧时间戳,毫秒 SeqInStream uint32 //在一个流中的总序号 @@ -68,15 +114,14 @@ type DataFrame[T any] struct { } type AVFrame[T RawSlice] struct { BaseFrame - IFrame bool - PTS uint32 - DTS uint32 - FLV net.Buffers // 打包好的FLV Tag - AVCC net.Buffers // 打包好的AVCC格式 - RTP net.Buffers // 打包好的RTP格式 - RTPPackets []rtp.Packet - Raw []T //裸数据 - canRead bool + IFrame bool + PTS uint32 + DTS uint32 + FLV net.Buffers // 打包好的FLV Tag + AVCC net.Buffers // 打包好的AVCC格式 + RTP []RTPFrame + Raw []T //裸数据 + canRead bool } func (av *AVFrame[T]) AppendRaw(raw ...T) { @@ -89,18 +134,14 @@ func (av *AVFrame[T]) FillFLV(t byte, ts uint32) { func (av *AVFrame[T]) AppendAVCC(avcc ...[]byte) { av.AVCC = append(av.AVCC, avcc...) } -func (av *AVFrame[T]) AppendRTP(rtp []byte) { - av.RTP = append(av.RTP, rtp) -} -func (av *AVFrame[T]) AppendRTPPackets(rtp rtp.Packet) { - av.RTPPackets = append(av.RTPPackets, rtp) +func (av *AVFrame[T]) AppendRTP(rtp ...RTPFrame) { + av.RTP = append(av.RTP, rtp...) } func (av *AVFrame[T]) Reset() { av.FLV = nil av.AVCC = nil av.RTP = nil - av.RTPPackets = nil av.Raw = nil } @@ -147,7 +188,8 @@ func (avcc AVCCFrame) AudioCodecID() byte { // return // } type DecoderConfiguration[T RawSlice] struct { - AVCC T - Raw T - FLV net.Buffers + PayloadType byte + AVCC T + Raw T + FLV net.Buffers } diff --git a/common/stream.go b/common/stream.go index d297c25..0030b5a 100644 --- a/common/stream.go +++ b/common/stream.go @@ -12,4 +12,5 @@ type IStream interface { AddTrack(Track) IsClosed() bool log.Ext1FieldLogger + SSRC() uint32 } diff --git a/config/types.go b/config/types.go index 5f2067d..41c6738 100644 --- a/config/types.go +++ b/config/types.go @@ -3,7 +3,7 @@ package config type Publish struct { PubAudio bool PubVideo bool - KickExsit bool // 是否踢掉已经存在的发布者 + KickExsit bool // 是否踢掉已经存在的发布者 PublishTimeout Second // 发布无数据超时 WaitCloseTimeout Second // 延迟自动关闭(无订阅时) } @@ -16,14 +16,21 @@ type Subscribe struct { } type Pull struct { - AutoReconnect bool // 自动重连 + Reconnect int // 自动重连,0 表示不自动重连,-1 表示无限重连,高于0 的数代表最大重连次数 PullOnStart bool // 启动时拉流 PullOnSubscribe bool // 订阅时自动拉流 - AutoPullList map[string]string // 自动拉流列表 + PullList map[string]string // 自动拉流列表,以streamPath为key,url为value +} + +func (p *Pull) AddPull(streamPath string, url string) { + if p.PullList == nil { + p.PullList = make(map[string]string) + } + p.PullList[streamPath] = url } type Push struct { - AutoPushList map[string]string // 自动推流列表 + PushList map[string]string // 自动推流列表 } type Engine struct { diff --git a/plugin.go b/plugin.go index fd71c46..f78b68e 100644 --- a/plugin.go +++ b/plugin.go @@ -27,7 +27,7 @@ func InstallPlugin(config config.Plugin) *Plugin { _, pluginFilePath, _, _ := runtime.Caller(1) configDir := filepath.Dir(pluginFilePath) if parts := strings.Split(configDir, "@"); len(parts) > 1 { - plugin.Version = parts[len(parts)-1] + plugin.Version = util.LastElement(parts) } if _, ok := Plugins[name]; ok { return nil @@ -83,6 +83,7 @@ func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter, } // 读取独立配置合并入总配置中 +// TODO: 覆盖逻辑有待商榷 func (opt *Plugin) assign() { f, err := os.Open(opt.settingPath()) if err == nil { @@ -128,7 +129,7 @@ func (opt *Plugin) autoPull() { if t.Field(i).Name == "Pull" { var pullConfig config.Pull reflect.ValueOf(&pullConfig).Elem().Set(v.Field(i)) - for streamPath, url := range pullConfig.AutoPullList { + for streamPath, url := range pullConfig.PullList { puller := Puller{RemoteURL: url, Config: &pullConfig} if pullConfig.PullOnStart { opt.Config.(PullPlugin).PullStream(streamPath, puller) diff --git a/publisher.go b/publisher.go index 0d2d161..ab0a166 100644 --- a/publisher.go +++ b/publisher.go @@ -74,6 +74,11 @@ type Puller struct { pullCount int } +// 是否需要重连 +func (pub *Puller) reconnect() bool { + return pub.Config.Reconnect == -1 || pub.pullCount <= pub.Config.Reconnect +} + func (pub *Puller) pull() { pub.specific.(IPuller).Pull(pub.pullCount) pub.pullCount++ @@ -84,7 +89,7 @@ func (pub *Puller) OnStateChanged(oldState StreamState, newState StreamState) { case STATE_WAITTRACK: go pub.pull() case STATE_WAITPUBLISH: - if pub.Config.AutoReconnect && pub.Publish(pub.Path, pub.specific, *pub.Publisher.Config) { + if pub.reconnect() && pub.Publish(pub.Path, pub.specific, *pub.Publisher.Config) { go pub.pull() } } diff --git a/stream.go b/stream.go index e796399..c68affd 100644 --- a/stream.go +++ b/stream.go @@ -6,6 +6,7 @@ import ( "strings" "sync/atomic" "time" + "unsafe" "github.com/Monibuca/engine/v4/track" "github.com/Monibuca/engine/v4/util" @@ -105,6 +106,10 @@ type Stream struct { *log.Entry `json:"-"` } +func (s *Stream) SSRC() uint32 { + return uint32(uintptr(unsafe.Pointer(s))) +} + func (s *Stream) UnPublish() { if !s.IsClosed() { s.actionChan <- UnPublishAction{} @@ -130,7 +135,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream s = &Stream{ URL: u, AppName: p[0], - StreamName: p[len(p)-1], + StreamName: util.LastElement(p), Entry: log.WithField("stream", u.Path), } s.Infoln("created:", streamPath) diff --git a/track/aac.go b/track/aac.go index 13bdf63..b498925 100644 --- a/track/aac.go +++ b/track/aac.go @@ -6,6 +6,8 @@ import ( "github.com/Monibuca/engine/v4/codec" . "github.com/Monibuca/engine/v4/common" + "github.com/Monibuca/engine/v4/config" + "github.com/Monibuca/engine/v4/util" ) func NewAAC(stream IStream) (aac *AAC) { @@ -15,11 +17,26 @@ func NewAAC(stream IStream) (aac *AAC) { aac.CodecID = codec.CodecID_AAC aac.Init(stream, 32) aac.Poll = time.Millisecond * 20 + aac.DecoderConfiguration.PayloadType = 97 return } type AAC Audio +func (aac *AAC) WriteRTP(raw []byte) { + var packet RTPFrame + if frame := packet.Unmarshal(raw); frame == nil { + return + } + for _, payload := range codec.ParseRTPAAC(packet.Payload) { + aac.WriteSlice(payload) + } + aac.Value.AppendRTP(packet) + if packet.Marker { + aac.Flush() + } +} + func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { if frame.IsSequence() { aac.DecoderConfiguration.AVCC = AudioSlice(frame) @@ -35,5 +52,26 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { aac.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2} } else { (*Audio)(aac).WriteAVCC(ts, frame) + aac.Flush() } } + +func (aac *AAC) Flush() { + // RTP格式补完 + // TODO: MTU 分割 + if aac.Value.RTP == nil && config.Global.EnableRTP { + l := util.SizeOfBuffers(aac.Value.Raw) + o := make([]byte, 4, l+4) + //AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度;又因为单个auheader字节长度2字节,所以再除以2就是auheader的个数。 + o[0] = 0x00 //高位 + o[1] = 0x10 //低位 + //AU_HEADER + o[2] = (byte)((l & 0x1fe0) >> 5) //高位 + o[3] = (byte)((l & 0x1f) << 3) //低位 + for _, raw := range aac.Value.Raw { + o = append(o, raw...) + } + aac.PacketizeRTP(o) + } + (*Audio)(aac).Flush() +} diff --git a/track/audio.go b/track/audio.go index 22e03b8..9fa6e7d 100644 --- a/track/audio.go +++ b/track/audio.go @@ -6,6 +6,7 @@ import ( "github.com/Monibuca/engine/v4/codec" . "github.com/Monibuca/engine/v4/common" + "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/util" ) @@ -39,6 +40,7 @@ func (at *Audio) Play(onAudio func(*AVFrame[AudioSlice]) error) { ar.MoveNext() } } + func (at *Audio) WriteADTS(adts []byte) { profile := ((adts[2] & 0xc0) >> 6) + 1 sampleRate := (adts[2] & 0x3c) >> 2 @@ -47,27 +49,34 @@ func (at *Audio) WriteADTS(adts []byte) { config2 := ((sampleRate & 0x1) << 7) | (channel << 3) at.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) at.Channels = channel - at.DecoderConfiguration.AVCC = []byte{0xAF, 0x00, config1, config2} - at.DecoderConfiguration.Raw = at.DecoderConfiguration.AVCC[:2] - at.DecoderConfiguration.FLV = net.Buffers{adcflv1, at.DecoderConfiguration.AVCC, adcflv2} -} - -func (at *Audio) WriteAVCC(ts uint32, frame AVCCFrame) { - at.Media.WriteAVCC(ts, frame) - at.Flush() + avcc := []byte{0xAF, 0x00, config1, config2} + at.DecoderConfiguration = DecoderConfiguration[AudioSlice]{ + 97, + avcc, + avcc[:2], + net.Buffers{adcflv1, at.DecoderConfiguration.AVCC, adcflv2}, + } } func (at *Audio) Flush() { - if at.Value.AVCC == nil { + // AVCC 格式补完 + if at.Value.AVCC == nil && (config.Global.EnableAVCC || config.Global.EnableFLV) { at.Value.AppendAVCC(at.avccHead) for _, raw := range at.Value.Raw { at.Value.AppendAVCC(raw) } } // FLV tag 补完 - if at.Value.FLV == nil { + if at.Value.FLV == nil && config.Global.EnableFLV { at.Value.FillFLV(codec.FLV_TAG_TYPE_AUDIO, at.Value.DTS/90) } + if at.Value.RTP == nil && config.Global.EnableRTP { + var o []byte + for _, raw := range at.Value.Raw { + o = append(o, raw...) + } + at.PacketizeRTP(o) + } at.Media.Flush() } diff --git a/track/base.go b/track/base.go index 503871e..db24028 100644 --- a/track/base.go +++ b/track/base.go @@ -2,7 +2,6 @@ package track import ( . "github.com/Monibuca/engine/v4/common" - "github.com/Monibuca/engine/v4/util" "github.com/pion/rtp" ) @@ -30,20 +29,9 @@ type Media[T RawSlice] struct { SampleRate uint32 SampleSize byte DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) - util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用 + // util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用 lastAvccTS uint32 //上一个avcc帧的时间戳 -} - -func (av *Media[T]) WriteRTP(raw []byte) { - av.Value.AppendRTP(raw) - var packet rtp.Packet - if err := packet.Unmarshal(raw); err != nil { - return - } - av.Value.AppendRTPPackets(packet) - if packet.Marker { - av.Flush() - } + rtpSequence uint16 } func (av *Media[T]) WriteSlice(slice T) { @@ -68,3 +56,25 @@ func (av *Media[T]) Flush() { av.Base.Flush(&av.Value.BaseFrame) av.Step() } + +// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets +func (av *Media[T]) PacketizeRTP(payloads ...[]byte) { + for i, pp := range payloads { + av.rtpSequence++ + var frame = RTPFrame{Packet: rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Padding: false, + Extension: false, + Marker: i == len(payloads)-1, + PayloadType: av.DecoderConfiguration.PayloadType, + SequenceNumber: av.rtpSequence, + Timestamp: av.Value.DTS, // Figure out how to do timestamps + SSRC: av.Stream.SSRC(), + }, + Payload: pp, + }} + frame.Marshal() + av.Value.AppendRTP(frame) + } +} diff --git a/track/g711.go b/track/g711.go index bfdc0d5..65e5e8d 100644 --- a/track/g711.go +++ b/track/g711.go @@ -17,12 +17,26 @@ func NewG711(stream IStream, alaw bool) (g711 *G711) { } g711.Init(stream, 32) g711.Poll = time.Millisecond * 20 + g711.DecoderConfiguration.PayloadType = 97 return } type G711 Audio func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) { - g711.Value.AppendRaw(AudioSlice(frame[1:])) + g711.WriteSlice(AudioSlice(frame[1:])) (*Audio)(g711).WriteAVCC(ts, frame) + g711.Flush() +} + +func (g711 *G711) WriteRTP(raw []byte) { + var packet RTPFrame + if frame := packet.Unmarshal(raw); frame == nil { + return + } + g711.WriteSlice(packet.Payload) + g711.Value.AppendRTP(packet) + if packet.Marker { + g711.Flush() + } } diff --git a/track/h264.go b/track/h264.go index 7e6593d..bc1fac3 100644 --- a/track/h264.go +++ b/track/h264.go @@ -6,6 +6,7 @@ import ( "github.com/Monibuca/engine/v4/codec" . "github.com/Monibuca/engine/v4/common" + "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/util" ) @@ -19,6 +20,7 @@ func NewH264(stream IStream) (vt *H264) { vt.Stream = stream vt.Init(stream, 256) vt.Poll = time.Millisecond * 20 + vt.DecoderConfiguration.PayloadType = 96 return } func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { @@ -28,29 +30,22 @@ func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { func (vt *H264) WriteSlice(slice NALUSlice) { switch slice.H264Type() { case codec.NALU_SPS: - if len(vt.DecoderConfiguration.Raw) > 0 { - vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0] - } - vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) + vt.DecoderConfiguration.Raw.Reset().Append(slice[0]) case codec.NALU_PPS: - vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) + vt.DecoderConfiguration.Raw.Append(slice[0]) vt.SPSInfo, _ = codec.ParseSPS(slice[0]) - if len(vt.DecoderConfiguration.Raw) > 0 { - vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0] - } lenSPS := len(vt.DecoderConfiguration.Raw[0]) lenPPS := len(vt.DecoderConfiguration.Raw[1]) - if len(vt.DecoderConfiguration.AVCC) > 0 { - vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0] - } + vt.DecoderConfiguration.AVCC.Reset() if lenSPS > 3 { - vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, codec.RTMP_AVC_HEAD[:6], vt.DecoderConfiguration.Raw[0][1:4]) + vt.DecoderConfiguration.AVCC.Append(codec.RTMP_AVC_HEAD[:6], vt.DecoderConfiguration.Raw[0][1:4]) } else { - vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, codec.RTMP_AVC_HEAD) + vt.DecoderConfiguration.AVCC.Append(codec.RTMP_AVC_HEAD) } tmp := []byte{0xE1, 0, 0, 0x01, 0, 0} - vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, tmp[:1], util.PutBE(tmp[1:3], lenSPS), vt.DecoderConfiguration.Raw[0], tmp[3:4], util.PutBE(tmp[3:6], lenPPS), vt.DecoderConfiguration.Raw[1]) + vt.DecoderConfiguration.AVCC.Append(tmp[:1], util.PutBE(tmp[1:3], lenSPS), vt.DecoderConfiguration.Raw[0], tmp[3:4], util.PutBE(tmp[3:6], lenPPS), vt.DecoderConfiguration.Raw[1]) vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) + case codec.NALU_IDR_Picture: vt.Value.IFrame = true fallthrough @@ -62,15 +57,12 @@ func (vt *H264) WriteSlice(slice NALUSlice) { func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) { if frame.IsSequence() { - if len(vt.DecoderConfiguration.AVCC) > 0 { - vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0] - } - vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, frame) + vt.DecoderConfiguration.AVCC.Reset().Append(frame) var info codec.AVCDecoderConfigurationRecord if _, err := info.Unmarshal(frame[5:]); err == nil { vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit) vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1) - vt.DecoderConfiguration.Raw = NALUSlice{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit} + vt.DecoderConfiguration.Raw.Append(info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit) } vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) } else { @@ -80,6 +72,37 @@ func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) { } } +func (vt *H264) WriteRTP(raw []byte) { + var frame RTPFrame + if packet := frame.Unmarshal(raw); packet == nil { + return + } + if naluType := frame.H264Type(); naluType < 24 { + vt.WriteSlice(NALUSlice{frame.Payload}) + } else { + switch naluType { + case codec.NALU_STAPA, codec.NALU_STAPB: + for buffer := util.Buffer(frame.Payload[naluType.Offset():]); buffer.CanRead(); { + vt.WriteSlice(NALUSlice{buffer.ReadN(int(buffer.ReadUint16()))}) + } + case codec.NALU_FUA, codec.NALU_FUB: + if util.Bit1(frame.Payload[1], 0) { + vt.Value.AppendRaw(NALUSlice{[]byte{naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60)}}) + } + lastIndex := len(vt.Value.Raw) - 1 + vt.Value.Raw[lastIndex].Append(frame.Payload[naluType.Offset():]) + if util.Bit1(frame.Payload[1], 1) { + vt.Value.Raw = vt.Value.Raw[:lastIndex] + vt.WriteSlice(vt.Value.Raw[lastIndex]) + } + } + } + vt.Value.AppendRTP(frame) + if frame.Marker { + vt.Flush() + } +} + func (vt *H264) Flush() { if vt.Value.IFrame { if vt.IDRing == nil { @@ -88,8 +111,36 @@ func (vt *H264) Flush() { (*Video)(vt).ComputeGOP() } // RTP格式补完 - if vt.Value.RTP == nil { - + if vt.Value.RTP == nil && config.Global.EnableRTP { + var out [][]byte + for _, nalu := range vt.Value.Raw { + buffers := util.SplitBuffers(nalu, 1200) + firstBuffer := NALUSlice(buffers[0]) + if l := len(buffers); l == 1 { + out = append(out, firstBuffer.Bytes()) + } else { + naluType := firstBuffer.H264Type() + firstByte := codec.NALU_FUA.Or(firstBuffer.RefIdc()) + buf := []byte{firstByte, naluType.Or(1 << 7)} + for i, sp := range firstBuffer { + if i == 0 { + sp = sp[1:] + } + buf = append(buf, sp...) + } + out = append(out, buf) + for _, bufs := range buffers[1:] { + buf := []byte{firstByte, naluType.Byte()} + for _, sp := range bufs { + buf = append(buf, sp...) + } + out = append(out, buf) + } + lastBuf := out[len(out)-1] + lastBuf[1] |= 1 << 6 // set end bit + } + } + vt.PacketizeRTP(out...) } (*Video)(vt).Flush() } diff --git a/track/h265.go b/track/h265.go index 889d4f1..3b9c816 100644 --- a/track/h265.go +++ b/track/h265.go @@ -6,6 +6,8 @@ import ( "github.com/Monibuca/engine/v4/codec" . "github.com/Monibuca/engine/v4/common" + "github.com/Monibuca/engine/v4/config" + "github.com/Monibuca/engine/v4/util" ) type H265 Video @@ -18,6 +20,7 @@ func NewH265(stream IStream) (vt *H265) { vt.Stream = stream vt.Init(stream, 256) vt.Poll = time.Millisecond * 20 + vt.DecoderConfiguration.PayloadType = 96 return } func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { @@ -27,21 +30,15 @@ func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { func (vt *H265) WriteSlice(slice NALUSlice) { switch slice.H265Type() { case codec.NAL_UNIT_VPS: - if len(vt.DecoderConfiguration.Raw) > 0 { - vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0] - } - vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) + vt.DecoderConfiguration.Raw.Reset().Append(slice[0]) case codec.NAL_UNIT_SPS: - vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) + vt.DecoderConfiguration.Raw.Append(slice[0]) vt.SPSInfo, _ = codec.ParseHevcSPS(slice[0]) case codec.NAL_UNIT_PPS: - vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) + vt.DecoderConfiguration.Raw.Append(slice[0]) extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.DecoderConfiguration.Raw[0], vt.DecoderConfiguration.Raw[1], vt.DecoderConfiguration.Raw[2]) if err == nil { - if len(vt.DecoderConfiguration.AVCC) > 0 { - vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0] - } - vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, extraData) + vt.DecoderConfiguration.AVCC.Reset().Append(extraData) } vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) case @@ -59,14 +56,11 @@ func (vt *H265) WriteSlice(slice NALUSlice) { } func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) { if frame.IsSequence() { - if len(vt.DecoderConfiguration.AVCC) > 0 { - vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0] - } - vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, frame) + vt.DecoderConfiguration.AVCC.Reset().Append(frame) if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(frame); err == nil { vt.SPSInfo, _ = codec.ParseHevcSPS(frame) vt.nalulenSize = int(frame[26]) & 0x03 - vt.DecoderConfiguration.Raw = NALUSlice{vps, sps, pps} + vt.DecoderConfiguration.Raw.Append(vps, sps, pps) } vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) } else { @@ -75,7 +69,43 @@ func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) { vt.Flush() } } - +func (vt *H265) WriteRTP(raw []byte) { + var frame RTPFrame + if packet := frame.Unmarshal(raw); packet == nil { + return + } + // TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream. + var usingDonlField bool + var buffer = util.Buffer(frame.Payload) + switch frame.H265Type() { + case codec.NAL_UNIT_RTP_AP: + buffer.ReadUint16() + if usingDonlField { + buffer.ReadUint16() + } + for buffer.CanRead() { + vt.WriteSlice(NALUSlice{buffer.ReadN(int(buffer.ReadUint16()))}) + if usingDonlField { + buffer.ReadByte() + } + } + case codec.NAL_UNIT_RTP_FU: + first3 := buffer.ReadN(3) + fuHeader := first3[2] + if usingDonlField { + buffer.ReadUint16() + } + if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) { + vt.Value.AppendRaw(NALUSlice{[]byte{first3[0]&0b10000001 | (naluType << 1), first3[1]}}) + } + lastIndex := len(vt.Value.Raw) - 1 + vt.Value.Raw[lastIndex].Append(buffer) + if util.Bit1(fuHeader, 1) { + vt.Value.Raw = vt.Value.Raw[:lastIndex] + vt.WriteSlice(vt.Value.Raw[lastIndex]) + } + } +} func (vt *H265) Flush() { if vt.Value.IFrame { if vt.IDRing == nil { @@ -84,8 +114,36 @@ func (vt *H265) Flush() { (*Video)(vt).ComputeGOP() } // RTP格式补完 - if vt.Value.RTP == nil { - + if vt.Value.RTP == nil && config.Global.EnableRTP { + var out [][]byte + for _, nalu := range vt.Value.Raw { + buffers := util.SplitBuffers(nalu, 1200) + firstBuffer := NALUSlice(buffers[0]) + if l := len(buffers); l == 1 { + out = append(out, firstBuffer.Bytes()) + } else { + naluType := firstBuffer.H265Type() + firstByte := (byte(codec.NAL_UNIT_RTP_FU) << 1) | (firstBuffer[0][0] & 0b10000001) + buf := []byte{firstByte, firstBuffer[0][1], (1 << 7) | (byte(naluType) >> 1)} + for i, sp := range firstBuffer { + if i == 0 { + sp = sp[2:] + } + buf = append(buf, sp...) + } + out = append(out, buf) + for _, bufs := range buffers[1:] { + buf := []byte{firstByte, firstBuffer[0][1], byte(naluType) >> 1} + for _, sp := range bufs { + buf = append(buf, sp...) + } + out = append(out, buf) + } + lastBuf := out[len(out)-1] + lastBuf[2] |= 1 << 6 // set end bit + } + } + vt.PacketizeRTP(out...) } (*Video)(vt).Flush() } diff --git a/track/video.go b/track/video.go index 1f7f087..7a1fe5d 100644 --- a/track/video.go +++ b/track/video.go @@ -93,6 +93,11 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) { } func (vt *Video) Flush() { + // 没有实际媒体数据 + if vt.Value.Raw == nil { + vt.Value.Reset() + return + } // AVCC格式补完 if vt.Value.AVCC == nil && (config.Global.EnableAVCC || config.Global.EnableFLV) { b := []byte{vt.CodecID, 1, 0, 0, 0} diff --git a/util/buffer.go b/util/buffer.go index 6e66399..26dbf15 100644 --- a/util/buffer.go +++ b/util/buffer.go @@ -3,7 +3,6 @@ package util import ( "encoding/binary" "math" - "net" ) type Buffer []byte @@ -57,6 +56,9 @@ func (b *Buffer) Write(a []byte) (n int, err error) { func (b Buffer) Len() int { return len(b) } +func (b Buffer) CanRead() bool { + return b.Len() > 0 +} func (b Buffer) Cap() int { return cap(b) } @@ -86,23 +88,20 @@ func (b *Buffer) Glow(n int) { } // SizeOfBuffers 计算Buffers的内容长度 -func SizeOfBuffers(buf net.Buffers) (size int) { +func SizeOfBuffers[T ~[]byte](buf []T) (size int) { for _, b := range buf { size += len(b) } return } -func CutBuffers(buf net.Buffers, size int) { - -} // SplitBuffers 按照一定大小分割 Buffers -func SplitBuffers(buf net.Buffers, size int) (result []net.Buffers) { +func SplitBuffers[T ~[]byte](buf []T, size int) (result [][]T) { for total := SizeOfBuffers(buf); total > 0; { if total <= size { return append(result, buf) } else { - var before net.Buffers + var before []T sizeOfBefore := 0 for _, b := range buf { need := size - sizeOfBefore @@ -123,5 +122,3 @@ func SplitBuffers(buf net.Buffers, size int) (result []net.Buffers) { } return } - - diff --git a/util/bytes_pool.go b/util/bytes_pool.go deleted file mode 100644 index 2bb5c4a..0000000 --- a/util/bytes_pool.go +++ /dev/null @@ -1,17 +0,0 @@ -package util - -type BytesPool [][]byte - -func (pool *BytesPool) Get(size int) (result []byte) { - if l := len(*pool); l > 0 { - result = (*pool)[l-1] - *pool = (*pool)[:l-1] - } else { - result = make([]byte, size, 10) - } - return -} - -func (pool *BytesPool) Put(b []byte) { - *pool = append(*pool, b) -} diff --git a/util/index.go b/util/index.go index 1828bf8..d5c9570 100644 --- a/util/index.go +++ b/util/index.go @@ -29,3 +29,8 @@ func Exist(filename string) bool { func ConvertNum[F constraints.Integer, T constraints.Integer](from F, to T) T { return T(from) } + +// Bit1 检查字节中的某一位是否为1 |0|1|2|3|4|5|6|7| +func Bit1(b byte, index int) bool { + return b&(1<<(7-index)) != 0 +} diff --git a/util/slice.go b/util/slice.go index 10c4255..c1e9666 100644 --- a/util/slice.go +++ b/util/slice.go @@ -30,3 +30,7 @@ func (s *Slice[T]) ResetAppend(first T) { s.Reset() s.Add(first) } + +func LastElement[T any](s []T) T { + return s[len(s)-1] +}