From a0048a9c9decf17613f45bb9c901bddf53af92fe Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Mon, 14 Jun 2021 22:15:25 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=AA=92=E4=BD=93=E8=BD=A8?= =?UTF-8?q?=E9=81=93=E7=AD=89=E5=BE=85=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- audio_track.go | 146 +++++++++++----- base_track.go | 116 +++++++++++-- go.mod | 4 +- go.sum | 12 +- publisher.go | 53 ------ ring_audio.go | 6 +- ring_video.go | 8 +- rtp.go | 127 ++++++++++++++ stream.go | 189 +++++++++++---------- subscriber.go | 152 +++++------------ video_track.go | 445 +++++++++++++++++++++++++++++-------------------- 11 files changed, 764 insertions(+), 494 deletions(-) delete mode 100644 publisher.go create mode 100644 rtp.go diff --git a/audio_track.go b/audio_track.go index 94f7ddf..e9356fa 100644 --- a/audio_track.go +++ b/audio_track.go @@ -1,74 +1,138 @@ package engine import ( - "github.com/Monibuca/utils/v3" + "io" + "github.com/Monibuca/utils/v3/codec" - "github.com/pion/rtp" ) type AudioPack struct { Timestamp uint32 Payload []byte + Raw []byte SequenceNumber uint16 } type AudioTrack struct { Track_Audio - SoundFormat byte //4bit - SoundRate int //2bit - SoundSize byte //1bit - Channels byte //1bit - RtmpTag []byte //rtmp协议需要先发这个帧 + SoundRate int //2bit + SoundSize byte //1bit + Channels byte //1bit + ExtraData []byte `json:"-"` //rtmp协议需要先发这个帧 + PushByteStream func(pack AudioPack) + PushRaw func(pack AudioPack) + WriteByteStream func(writer io.Writer, pack AudioPack) //使用函数写入,避免申请内存 } -func (at *AudioPack) ToRTMPTag(aac byte) []byte { - audio := at.Payload - l := len(audio) + 1 - isAAC := 0 - if aac>>4 == 10 { - isAAC = 1 +func (at *AudioTrack) pushByteStream(pack AudioPack) { + at.CodecID = pack.Payload[0] >> 4 + at.WriteByteStream = func(writer io.Writer, pack AudioPack) { + writer.Write(pack.Payload) } - payload := utils.GetSlice(l + isAAC) - payload[0] = aac - if isAAC == 1 { - payload[1] = 1 - copy(payload[2:], audio) - } else { - copy(payload[1:], audio) + switch at.CodecID { + case 10: + at.Stream.AudioTracks.AddTrack("aac", at) + case 7: + at.Stream.AudioTracks.AddTrack("pcma", at) + case 8: + at.Stream.AudioTracks.AddTrack("pcmu", at) } - return payload + switch at.CodecID { + case 10: + if pack.Payload[1] != 0 { + return + } else { + config1, config2 := pack.Payload[2], pack.Payload[3] + //audioObjectType = (config1 & 0xF8) >> 3 + // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 + // 2 AAC LC ISO/IEC 14496-3 subpart 4 + // 3 AAC SSR ISO/IEC 14496-3 subpart 4 + // 4 AAC LTP ISO/IEC 14496-3 subpart 4 + at.SoundRate = codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)] + at.Channels = ((config2 >> 3) & 0x0F) //声道 + //frameLengthFlag = (config2 >> 2) & 0x01 + //dependsOnCoreCoder = (config2 >> 1) & 0x01 + //extensionFlag = config2 & 0x01 + at.ExtraData = pack.Payload + at.PushByteStream = func(pack AudioPack) { + pack.Raw = pack.Payload[2:] + at.push(pack) + } + } + default: + at.SoundRate = codec.SoundRate[(pack.Payload[0]&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz + at.SoundSize = (pack.Payload[0] & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples + at.Channels = pack.Payload[0]&0x01 + 1 + at.ExtraData = pack.Payload[:1] + at.PushByteStream = func(pack AudioPack) { + payloadLen := len(pack.Payload) + if payloadLen < 4 { + return + } + at.GetBPS(payloadLen) + pack.Raw = pack.Payload[1:] + at.push(pack) + } + at.PushByteStream(pack) + } + +} +func (at *AudioTrack) pushRaw(pack AudioPack) { + switch at.CodecID { + case 10: + at.WriteByteStream = func(writer io.Writer, pack AudioPack) { + writer.Write([]byte{at.ExtraData[0], 1}) + writer.Write(pack.Raw) + } + default: + at.WriteByteStream = func(writer io.Writer, pack AudioPack) { + writer.Write([]byte{at.ExtraData[0]}) + writer.Write(pack.Raw) + } + } + at.PushRaw = at.push + at.push(pack) } // Push 来自发布者推送的音频 -func (at *AudioTrack) Push(timestamp uint32, payload []byte) { +func (at *AudioTrack) push(pack AudioPack) { if at.Stream != nil { at.Stream.Update() } - payloadLen := len(payload) - if payloadLen < 4 { - return + abr := at.Buffer + audio := abr.Current + audio.AudioPack = pack + if at.Stream.prePayload > 0 && len(pack.Payload) == 0 { + buffer := abr.GetBuffer() + at.WriteByteStream(buffer, pack) + audio.AudioPack = pack + audio.AudioPack.Payload = buffer.Bytes() + } else { + audio.AudioPack = pack } - audio := at.Buffer - audio.Current.Timestamp = timestamp - audio.Current.Payload = payload - at.Track_Audio.GetBPS(payloadLen) - audio.NextW() + abr.NextW() } -func (at *AudioTrack) PushRTP(pack rtp.Packet) { - t := pack.Timestamp / 90 - for _, payload := range codec.ParseRTPAAC(pack.Payload) { - at.Push(t, payload) + +func (s *Stream) NewAudioTrack(codec byte) (at *AudioTrack) { + at = &AudioTrack{} + at.PushByteStream = at.pushByteStream + at.PushRaw = at.pushRaw + at.Stream = s + at.Buffer = NewRing_Audio() + switch codec { + case 10: + s.AudioTracks.AddTrack("aac", at) + case 7: + s.AudioTracks.AddTrack("pcma", at) + case 8: + s.AudioTracks.AddTrack("pcmu", at) } -} -func NewAudioTrack() *AudioTrack { - var result AudioTrack - result.Buffer = NewRing_Audio() - return &result + return } func (at *AudioTrack) SetASC(asc []byte) { - at.RtmpTag = append([]byte{0xAF, 0}, asc...) + at.ExtraData = append([]byte{0xAF, 0}, asc...) config1 := asc[0] config2 := asc[1] - at.SoundFormat = 10 + at.CodecID = 10 //audioObjectType = (config1 & 0xF8) >> 3 // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 // 2 AAC LC ISO/IEC 14496-3 subpart 4 diff --git a/base_track.go b/base_track.go index 1f534be..0ddc28f 100644 --- a/base_track.go +++ b/base_track.go @@ -1,22 +1,16 @@ package engine -import "github.com/pion/rtp" +import ( + "context" + "sync" + "time" +) type Track interface { - PushRTP(rtp.Packet) GetBPS(int) Dispose() } -// 一定要在写入Track的协程中调用该函数,这个函数的作用是防止订阅者无限等待 -func DisposeTracks(tracks ...Track) { - for _, track := range tracks { - if track != nil { - track.Dispose() - } - } -} - type Track_Audio struct { Buffer *Ring_Audio `json:"-"` Stream *Stream `json:"-"` @@ -56,3 +50,103 @@ func (t *Track_Video) GetBPS(payloadLen int) { func (t *Track_Video) Dispose() { t.Buffer.Dispose() } + +type TrackWaiter struct { + Track + *sync.Cond `json:"-"` +} + +func (tw *TrackWaiter) Ok(t Track) { + tw.Track = t + tw.Broadcast() +} +func (tw *TrackWaiter) Dispose() { + if tw.Cond != nil { + tw.Broadcast() + } + if tw.Track != nil { + tw.Track.Dispose() + } +} +func (tw *TrackWaiter) Wait(c chan<- Track) { + tw.L.Lock() + tw.Cond.Wait() + tw.L.Unlock() + c <- tw.Track +} + +type Tracks struct { + m map[string]*TrackWaiter + sync.RWMutex + context.Context +} + +func (ts *Tracks) Codecs() (result []string) { + ts.RLock() + defer ts.RUnlock() + for codec := range ts.m { + result = append(result, codec) + } + return +} + +func (ts *Tracks) Init() { + ts.m = make(map[string]*TrackWaiter) + ts.Context, _ = context.WithTimeout(context.Background(), time.Second*5) +} + +func (ts *Tracks) Dispose() { + ts.RLock() + defer ts.RUnlock() + for _, t := range ts.m { + t.Dispose() + } +} +func (ts *Tracks) AddTrack(codec string, t Track) { + ts.Lock() + if tw, ok := ts.m[codec]; ok { + ts.Unlock() + tw.Ok(t) + } else { + ts.m[codec] = &TrackWaiter{Track: t} + ts.Unlock() + } +} +func (ts *Tracks) GetTrack(codec string) (tw *TrackWaiter, ok bool) { + ts.Lock() + if tw, ok = ts.m[codec]; ok { + ts.Unlock() + ok = tw.Track != nil + } else { + tw = &TrackWaiter{Cond: sync.NewCond(new(sync.Mutex))} + ts.m[codec] = tw + ts.Unlock() + } + return +} +func (ts *Tracks) WaitTrack(codecs ...string) Track { + if len(codecs) == 0 { + codecs = ts.Codecs() + } + var tws []*TrackWaiter + for _, codec := range codecs { + if tw, ok := ts.GetTrack(codec); ok { + return tw.Track + } else { + tws = append(tws, tw) + } + } + if ts.Err() != nil { + return nil + } + c := make(chan Track, len(tws)) + for _, tw := range tws { + go tw.Wait(c) + } + select { + case <-ts.Done(): + return nil + case t := <-c: + return t + } +} diff --git a/go.mod b/go.mod index eeadfdb..1ff2673 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.13 require ( github.com/BurntSushi/toml v0.3.1 - github.com/Monibuca/utils/v3 v3.0.0-alpha5 + github.com/Monibuca/utils/v3 v3.0.0-beta github.com/logrusorgru/aurora v2.0.3+incompatible - github.com/pion/rtp v1.6.2 + github.com/pion/rtp v1.6.5 github.com/pkg/errors v0.9.1 ) diff --git a/go.sum b/go.sum index cd50c52..aac6578 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Monibuca/utils/v3 v3.0.0-alpha4 h1:pecYA89kWmtGOeY6R99d4T1epPJ1wc+jFrrJY13VD04= -github.com/Monibuca/utils/v3 v3.0.0-alpha4/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= -github.com/Monibuca/utils/v3 v3.0.0-alpha5 h1:IOyW/KJSRdRg+TPcgwkHLBynqfNQOV6p3iP7LgXEMFc= -github.com/Monibuca/utils/v3 v3.0.0-alpha5/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= +github.com/Monibuca/utils/v3 v3.0.0-beta h1:z4p/BSH5J9Ja/gwoDmj1RyN+b0q28Nmn/fqXiwq2hGY= +github.com/Monibuca/utils/v3 v3.0.0-beta/go.mod h1:mQYP/OMox1tkWP6Qut7pBfARr1TXSRkK662dexQl6kI= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= @@ -16,10 +14,12 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= -github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U= -github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtp v1.6.5 h1:o2cZf8OascA5HF/b0PAbTxRKvOWxTQxWYt7SlToxFGI= +github.com/pion/rtp v1.6.5/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= +github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/publisher.go b/publisher.go deleted file mode 100644 index 05bc979..0000000 --- a/publisher.go +++ /dev/null @@ -1,53 +0,0 @@ -package engine - -import ( - "context" - "time" -) - -// Publisher 发布者实体定义 -type Publisher struct { - context.Context - cancel context.CancelFunc - AutoUnPublish bool // 当无人订阅时自动停止发布 - *Stream - Type string //类型,用来区分不同的发布者 - timeout *time.Timer //更新时间用来做超时处理 -} - -// Close 关闭发布者 -func (p *Publisher) Close() { - if p.Running() { - p.Stream.Close() - } -} - -func (p *Publisher) Update() { - if p.timeout != nil { - p.timeout.Reset(config.PublishTimeout) - } else { - p.timeout = time.AfterFunc(config.PublishTimeout, p.Close) - } -} - -// Running 发布者是否正在发布 -func (p *Publisher) Running() bool { - return p.Stream != nil && p.Err() == nil -} - -// Publish 发布者进行发布操作 -func (p *Publisher) Publish(streamPath string) bool { - p.Stream = GetStream(streamPath) - //检查是否已存在发布者 - if p.Publisher != nil { - return false - } - p.Context, p.cancel = context.WithCancel(p.Stream) - p.Publisher = p - p.Stream.Type = p.Type - p.StartTime = time.Now() - p.Update() - //触发钩子 - TriggerHook(Hook{HOOK_PUBLISH, p.Stream}) - return true -} diff --git a/ring_audio.go b/ring_audio.go index 453db59..93eec35 100644 --- a/ring_audio.go +++ b/ring_audio.go @@ -97,14 +97,16 @@ func (r *Ring_Audio) Dispose() { } // NextR 读下一个 -func (r *Ring_Audio) NextR() { +func (r *Ring_Audio) NextR() bool { r.GoNext() r.Current.Wait() + return r.Flag != 2 // 2代表已经销毁 } func (r *Ring_Audio) GetBuffer() *bytes.Buffer { if r.Current.Buffer == nil { - r.Current.Buffer = bytes.NewBuffer([]byte{}) + r.Current.Payload = []byte{} + r.Current.Buffer = bytes.NewBuffer(r.Current.Payload) } else { r.Current.Reset() } diff --git a/ring_video.go b/ring_video.go index 12211d4..8fdb1fa 100644 --- a/ring_video.go +++ b/ring_video.go @@ -81,6 +81,7 @@ func (r *Ring_Video) NextW() { r.GoNext() r.Current.Add(1) item.Done() + //Flag不为1代表被Dispose了,但尚未处理Done if !atomic.CompareAndSwapInt32(&r.Flag, 1, 0) { r.Current.Done() } @@ -91,20 +92,23 @@ func (r *Ring_Video) Dispose() { if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) { r.Current.Done() } else if atomic.CompareAndSwapInt32(&r.Flag, 1, 2) { + //当前是1代表正在写入,此时变成2,但是Done的任务得交给NextW来处理 } else if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) { r.Current.Done() } } // NextR 读下一个 -func (r *Ring_Video) NextR() { +func (r *Ring_Video) NextR() bool{ r.GoNext() r.Current.Wait() + return r.Flag != 2 // 2代表已经销毁 } func (r *Ring_Video) GetBuffer() *bytes.Buffer { if r.Current.Buffer == nil { - r.Current.Buffer = bytes.NewBuffer([]byte{}) + r.Current.Payload = []byte{} + r.Current.Buffer = bytes.NewBuffer(r.Current.Payload) } else { r.Current.Reset() } diff --git a/rtp.go b/rtp.go new file mode 100644 index 0000000..852c59a --- /dev/null +++ b/rtp.go @@ -0,0 +1,127 @@ +package engine + +import ( + "sort" + + "github.com/Monibuca/utils/v3/codec" + "github.com/pion/rtp" +) + +type RTPPublisher struct { + rtp.Packet + Push func(payload []byte) +} +type RTPAudio struct { + RTPPublisher + *AudioTrack +} +type RTPVideo struct { + RTPPublisher + *VideoTrack +} + +func (s *Stream) NewRTPVideo(codec byte) (r *RTPVideo) { + r = &RTPVideo{ + VideoTrack: s.NewVideoTrack(codec), + } + r.Push = r.push + return +} + +func (s *Stream) NewRTPAudio(codec byte) (r *RTPAudio) { + r = &RTPAudio{ + AudioTrack: s.NewAudioTrack(codec), + } + r.Push = r.push + return +} + +func (v *RTPVideo) push(payload []byte) { + vt := v.VideoTrack + if err := v.Unmarshal(payload); err != nil { + return + } + t := v.Timestamp / 90 + if t < vt.Buffer.GetLast().Timestamp { + if vt.WaitIDR.Err() == nil { + return + } + //有B帧 + var tmpVT VideoTrack + tmpVT.Buffer = NewRing_Video() + tmpVT.revIDR = func() { + tmpVT.IDRIndex = tmpVT.Buffer.Index + } + // tmpVT.pushRTP = func(p rtp.Packet) { + // tmpVT.Push(VideoPack{Timestamp:p.Timestamp/90,Payload:p.Payload}) + // } + gopBuffer := tmpVT.Buffer //缓存一个GOP用来计算dts + var gopFirst byte + var tsSlice TSSlice + for i := vt.IDRIndex; vt.Buffer.Index != i; i++ { + t := vt.Buffer.GetAt(i) + c := gopBuffer.Current + c.VideoPack = t.VideoPack.Clone() + tsSlice = append(tsSlice, gopBuffer.Current.Timestamp) + gopBuffer.NextW() + } + v.Push = func(payload []byte) { + if err := v.Unmarshal(payload); err != nil { + return + } + t := v.Timestamp / 90 + c := gopBuffer.Current + vp := VideoPack{Timestamp: t, NALUs: [][]byte{v.Payload}} + tmpVT.PushNalu(vp) + if c != gopBuffer.Current { + if c.IDR { + sort.Sort(tsSlice) //排序后相当于DTS列表 + var offset uint32 + for i := 0; i < len(tsSlice); i++ { + j := gopFirst + byte(i) + f := gopBuffer.GetAt(j) + if f.Timestamp+offset < tsSlice[i] { + offset = tsSlice[i] - f.Timestamp + } + } + for i := 0; i < len(tsSlice); i++ { + f := gopBuffer.GetAt(gopFirst + byte(i)) + f.CompositionTime = f.Timestamp + offset - tsSlice[i] + f.Timestamp = tsSlice[i] + vt.PushNalu(f.VideoPack) + } + gopFirst = gopBuffer.Index - 1 + tsSlice = nil + } + tsSlice = append(tsSlice, t) + } + } + v.Push(payload) + return + } + vt.PushNalu(VideoPack{Timestamp: t, NALUs: [][]byte{v.Payload}}) +} +func (v *RTPAudio) push(payload []byte) { + at := v.AudioTrack + if err := v.Unmarshal(payload); err != nil { + return + } + switch at.CodecID { + case 10: + v.Push = func(payload []byte) { + if err := v.Unmarshal(payload); err != nil { + return + } + for _, payload = range codec.ParseRTPAAC(v.Payload) { + at.PushRaw(AudioPack{Timestamp: v.Timestamp / 90, Raw: payload}) + } + } + case 7, 8: + v.Push = func(payload []byte) { + if err := v.Unmarshal(payload); err != nil { + return + } + at.PushRaw(AudioPack{Timestamp: v.Timestamp / 8, Raw: v.Payload}) + } + } +} diff --git a/stream.go b/stream.go index 74ca809..153d410 100644 --- a/stream.go +++ b/stream.go @@ -9,117 +9,122 @@ import ( . "github.com/logrusorgru/aurora" ) -// Streams 所有的流集合 -var Streams sync.Map +type StreamCollection struct { + sync.RWMutex + m map[string]*Stream +} -//FindStream 根据流路径查找流 -func FindStream(streamPath string) *Stream { - if s, ok := Streams.Load(streamPath); ok { - return s.(*Stream) +func (sc *StreamCollection) GetStream(streamPath string) *Stream { + sc.RLock() + defer sc.RUnlock() + if s, ok := sc.m[streamPath]; ok { + return s } return nil } +func (sc *StreamCollection) Delete(streamPath string) { + sc.Lock() + delete(sc.m, streamPath) + sc.Unlock() +} -//GetStream 根据流路径获取流,如果不存在则创建一个新的 -func GetStream(streamPath string) (result *Stream) { - item, loaded := Streams.LoadOrStore(streamPath, &Stream{ - StreamPath: streamPath, - }) - result = item.(*Stream) - if !loaded { - result.Context, result.cancel = context.WithCancel(context.Background()) - utils.Print(Green("Stream create:"), BrightCyan(streamPath)) +func (sc *StreamCollection) ToList() (r []*Stream) { + sc.RLock() + defer sc.RUnlock() + for _, s := range sc.m { + r = append(r, s) } return } -type TrackWaiter struct { - Track - *sync.Cond +func init() { + Streams.m = make(map[string]*Stream) } -func (tw *TrackWaiter) Ok(t Track) { - tw.Track = t - tw.Broadcast() +// Streams 所有的流集合 +var Streams StreamCollection + +//FindStream 根据流路径查找流 +func FindStream(streamPath string) *Stream { + return Streams.GetStream(streamPath) } // Stream 流定义 type Stream struct { context.Context - cancel context.CancelFunc - StreamPath string - Type string //流类型,来自发布者 - StartTime time.Time //流的创建时间 - *Publisher `json:"-"` - Subscribers []*Subscriber // 订阅者 - VideoTracks sync.Map - AudioTracks sync.Map - OriginVideoTrack *VideoTrack //原始视频轨 - OriginAudioTrack *AudioTrack //原始音频轨 - subscribeMutex sync.Mutex + StreamPath string + Type string //流类型,来自发布者 + StartTime time.Time //流的创建时间 + Subscribers []*Subscriber // 订阅者 + VideoTracks Tracks + AudioTracks Tracks + AutoUnPublish bool // 当无人订阅时自动停止发布 + Transcoding map[string]string //转码配置,key:目标编码,value:发布者提供的编码 + subscribeMutex sync.Mutex + timeout *time.Timer //更新时间用来做超时处理 + Close func() `json:"-"` + prePayload uint32 //需要预拼装ByteStream格式的数据的订阅者数量 } -func (r *Stream) SetOriginVT(vt *VideoTrack) { - r.OriginVideoTrack = vt - switch vt.CodecID { - case 7: - r.AddVideoTrack("h264", vt) - case 12: - r.AddVideoTrack("h265", vt) - } -} -func (r *Stream) SetOriginAT(at *AudioTrack) { - r.OriginAudioTrack = at - switch at.SoundFormat { - case 10: - r.AddAudioTrack("aac", at) - case 7: - r.AddAudioTrack("pcma", at) - case 8: - r.AddAudioTrack("pcmu", at) - } -} -func (r *Stream) AddVideoTrack(codec string, vt *VideoTrack) *VideoTrack { - vt.Stream = r - if actual, loaded := r.VideoTracks.LoadOrStore(codec, &TrackWaiter{vt, sync.NewCond(new(sync.Mutex))}); loaded { - actual.(*TrackWaiter).Ok(vt) - } - return vt +func (r *Stream) Update() { + r.timeout.Reset(config.PublishTimeout) } -func (r *Stream) AddAudioTrack(codec string, at *AudioTrack) *AudioTrack { - at.Stream = r - if actual, loaded := r.AudioTracks.LoadOrStore(codec, &TrackWaiter{at, sync.NewCond(new(sync.Mutex))}); loaded { - actual.(*TrackWaiter).Ok(at) +// Publish 发布者进行发布操作 +func (r *Stream) Publish() bool { + Streams.Lock() + defer Streams.Unlock() + if _, ok := Streams.m[r.StreamPath]; ok { + return false } - return at + r.VideoTracks.Init() + r.AudioTracks.Init() + var cancel context.CancelFunc + customClose := r.Close + r.Context, cancel = context.WithCancel(context.Background()) + var closeOnce sync.Once + r.Close = func() { + closeOnce.Do(func() { + r.timeout.Stop() + if customClose != nil { + customClose() + } + cancel() + r.VideoTracks.Dispose() + r.AudioTracks.Dispose() + utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) + Streams.Delete(r.StreamPath) + TriggerHook(Hook{HOOK_STREAMCLOSE, r}) + }) + } + r.StartTime = time.Now() + Streams.m[r.StreamPath] = r + utils.Print(Green("Stream publish:"), BrightCyan(r.StreamPath)) + r.timeout = time.AfterFunc(config.PublishTimeout, r.Close) + //触发钩子 + TriggerHook(Hook{HOOK_PUBLISH, r}) + return true } -func (r *Stream) Close() { - r.cancel() - // if r.OriginVideoTrack != nil { - // r.OriginVideoTrack.Buffer.Current.Done() - // } - // if r.OriginAudioTrack != nil { - // r.OriginAudioTrack.Buffer.Current.Done() - // } - r.VideoTracks.Range(func(k, v interface{}) bool { - v.(*TrackWaiter).Broadcast() - if v.(*TrackWaiter).Track != nil { - v.(*TrackWaiter).Track.Dispose() - } - return true - }) - r.AudioTracks.Range(func(k, v interface{}) bool { - v.(*TrackWaiter).Broadcast() - if v.(*TrackWaiter).Track != nil { - v.(*TrackWaiter).Track.Dispose() - } - return true - }) - utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) - Streams.Delete(r.StreamPath) - TriggerHook(Hook{HOOK_STREAMCLOSE, r}) +func (r *Stream) WaitVideoTrack(codecs ...string) *VideoTrack { + if !config.EnableVideo { + return nil + } + if track := r.VideoTracks.WaitTrack(codecs...); track != nil { + return track.(*VideoTrack) + } + return nil +} + +// TODO: 触发转码逻辑 +func (r *Stream) WaitAudioTrack(codecs ...string) *AudioTrack { + if !config.EnableAudio { + return nil + } + if track := r.AudioTracks.WaitTrack(codecs...); track != nil { + return track.(*AudioTrack) + } + return nil } //Subscribe 订阅流 @@ -129,6 +134,9 @@ func (r *Stream) Subscribe(s *Subscriber) { utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) s.Context, s.cancel = context.WithCancel(r) r.subscribeMutex.Lock() + if s.ByteStreamFormat { + r.prePayload++ + } r.Subscribers = append(r.Subscribers, s) r.subscribeMutex.Unlock() utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) @@ -141,12 +149,15 @@ func (r *Stream) UnSubscribe(s *Subscriber) { if r.Err() == nil { var deleted bool r.subscribeMutex.Lock() + if s.ByteStreamFormat { + r.prePayload-- + } r.Subscribers, deleted = DeleteSliceItem_Subscriber(r.Subscribers, s) r.subscribeMutex.Unlock() if deleted { utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) TriggerHook(Hook{HOOK_UNSUBSCRIBE, s}) - if len(r.Subscribers) == 0 && (r.Publisher == nil || r.Publisher.AutoUnPublish) { + if len(r.Subscribers) == 0 && r.AutoUnPublish { r.Close() } } diff --git a/subscriber.go b/subscriber.go index 89b7705..053372c 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,128 +2,54 @@ package engine import ( "context" + "net/url" "sync" "time" - "github.com/Monibuca/utils/v3/codec" "github.com/pkg/errors" ) // Subscriber 订阅者实体定义 type Subscriber struct { - context.Context `json:"-"` - cancel context.CancelFunc - Ctx2 context.Context `json:"-"` - *Stream `json:"-"` - ID string - TotalDrop int //总丢帧 - TotalPacket int - Type string - BufferLength int - Delay uint32 - SubscribeTime time.Time - Sign string - OnAudio func(pack AudioPack) `json:"-"` - OnVideo func(pack VideoPack) `json:"-"` + context.Context `json:"-"` + cancel context.CancelFunc + Ctx2 context.Context `json:"-"` + *Stream `json:"-"` + ID string + TotalDrop int //总丢帧 + TotalPacket int + Type string + BufferLength int + Delay uint32 + SubscribeTime time.Time + SubscribeArgs url.Values + OnAudio func(pack AudioPack) `json:"-"` + OnVideo func(pack VideoPack) `json:"-"` + ByteStreamFormat bool + closeOnce sync.Once } -// IsClosed 检查订阅者是否已经关闭 -func (s *Subscriber) IsClosed() bool { - return s.Context != nil && s.Err() != nil +func (s *Subscriber) close() { + s.UnSubscribe(s) + s.cancel() } // Close 关闭订阅者 func (s *Subscriber) Close() { - if s.cancel != nil { - s.UnSubscribe(s) - s.cancel() - } -} -func (s *Subscriber) WaitVideoTrack(codec string) *VideoTrack { - if !config.EnableVideo { - return nil - } - waiter, ok := s.VideoTracks.LoadOrStore(codec, &TrackWaiter{nil, sync.NewCond(new(sync.Mutex))}) - tw := waiter.(*TrackWaiter) - if !ok { - tw.L.Lock() - tw.Wait() - tw.L.Unlock() - } - if tw.Track == nil { - return nil - } - return tw.Track.(*VideoTrack) -} -func (s *Subscriber) WaitAudioTrack(codecs ...string) *AudioTrack { - if !config.EnableAudio { - return nil - } - - for _, codec := range codecs { - if at, ok := s.AudioTracks.Load(codec); ok && at.(*TrackWaiter).Track != nil { - return at.(*TrackWaiter).Track.(*AudioTrack) - } - } - if HasTranscoder && s.OriginAudioTrack != nil { - at := s.AddAudioTrack(codecs[0], NewAudioTrack()) - at.SoundFormat = codec.Codec2SoundFormat[codecs[0]] - TriggerHook(Hook{HOOK_REQUEST_TRANSAUDIO, &TransCodeReq{s, codecs[0]}}) - return at - } - var once sync.Once - c := make(chan *TrackWaiter) - for _, codec := range codecs { - at, _ := s.AudioTracks.LoadOrStore(codec, &TrackWaiter{nil, sync.NewCond(new(sync.Mutex))}) - go func(tw *TrackWaiter) { - tw.L.Lock() - tw.Wait() - tw.L.Unlock() - once.Do(func() { - c <- tw - }) - }(at.(*TrackWaiter)) - } - tw := <-c - if tw.Track == nil { - return nil - } - return tw.Track.(*AudioTrack) -} -func (s *Subscriber) GetVideoTrack(codec string) *VideoTrack { - if !config.EnableVideo { - return nil - } - if waiter, ok := s.VideoTracks.Load(codec); ok && waiter.(*TrackWaiter).Track != nil { - return waiter.(*TrackWaiter).Track.(*VideoTrack) - } - return nil -} -func (s *Subscriber) GetAudioTrack(codecs ...string) (at *AudioTrack) { - if !config.EnableAudio { - return nil - } - for _, codec := range codecs { - if at, ok := s.AudioTracks.Load(codec); ok && at.(*TrackWaiter).Track != nil { - return at.(*TrackWaiter).Track.(*AudioTrack) - } - } - if HasTranscoder && s.OriginAudioTrack != nil { - at = s.AddAudioTrack(codecs[0], NewAudioTrack()) - at.SoundFormat = codec.Codec2SoundFormat[codecs[0]] - TriggerHook(Hook{HOOK_REQUEST_TRANSAUDIO, &TransCodeReq{s, codecs[0]}}) - } - return + s.closeOnce.Do(s.close) } //Subscribe 开始订阅 将Subscriber与Stream关联 func (s *Subscriber) Subscribe(streamPath string) error { - if FindStream(streamPath) == nil { + u, _ := url.Parse(streamPath) + s.SubscribeArgs, _ = url.ParseQuery(u.RawQuery) + streamPath = u.Path + if stream := FindStream(streamPath); stream == nil { return errors.Errorf("Stream not found:%s", streamPath) - } - GetStream(streamPath).Subscribe(s) - if s.Context == nil { - return errors.Errorf("stream not exist:%s", streamPath) + } else { + if stream.Subscribe(s); s.Context == nil { + return errors.Errorf("stream not exist:%s", streamPath) + } } return nil } @@ -153,7 +79,7 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { return } vr := vt.Buffer.SubRing(vt.IDRIndex) //从关键帧开始读取,首屏秒开 - vr.Current.Wait() //等到RingBuffer可读 + vr.Current.Wait() //等到RingBuffer可读 ar := at.Buffer.SubRing(at.Buffer.Index) ar.Current.Wait() dropping := false //是否处于丢帧中 @@ -164,10 +90,12 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { if vt.Buffer.Index-vr.Index > 128 { dropping = true } - } else if vr.Current.NalType == codec.NALU_IDR_Picture { + } else if vr.Current.IDR { dropping = false } - vr.NextR() + if !vr.NextR() { + return + } } else { if !dropping { s.OnAudio(ar.Current.AudioPack) @@ -175,7 +103,9 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { dropping = true } } - ar.NextR() + if !ar.NextR() { + return + } } } } @@ -204,7 +134,8 @@ func (s *Subscriber) PlayAudio(at *AudioTrack) { if ctx2 == nil { ctx2 = context.TODO() } - for action = send; ctx2.Err() == nil && s.Context.Err() == nil; ring.NextR() { + action = send + for running := true; ctx2.Err() == nil && s.Context.Err() == nil && running; running = ring.NextR() { action() } } @@ -226,7 +157,7 @@ func (s *Subscriber) PlayVideo(vt *VideoTrack) { droped := 0 var action, send func() drop := func() { - if ring.Current.NalType == codec.NALU_IDR_Picture { + if ring.Current.IDR { action = send } else { droped++ @@ -241,7 +172,8 @@ func (s *Subscriber) PlayVideo(vt *VideoTrack) { action = drop } } - for action = send; ctx2.Err() == nil && s.Context.Err() == nil; ring.NextR() { + action = send + for running := true; ctx2.Err() == nil && s.Context.Err() == nil && running; running = ring.NextR() { action() } } diff --git a/video_track.go b/video_track.go index c2f7ded..efce602 100644 --- a/video_track.go +++ b/video_track.go @@ -1,13 +1,13 @@ package engine import ( + "bytes" "context" "encoding/binary" - "sort" + "io" "github.com/Monibuca/utils/v3" "github.com/Monibuca/utils/v3/codec" - "github.com/pion/rtp" ) const ( @@ -15,10 +15,11 @@ const ( stapaHeaderSize = 1 stapaNALULengthSize = 2 - naluTypeBitmask = 0x1F - naluRefIdcBitmask = 0x60 - fuaStartBitmask = 0x80 //1000 0000 - fuaEndBitmask = 0x40 //0100 0000 + naluTypeBitmask = 0x1F + naluTypeBitmask_hevc = 0x7E + naluRefIdcBitmask = 0x60 + fuaStartBitmask = 0x80 //1000 0000 + fuaEndBitmask = 0x40 //0100 0000 ) type TSSlice []uint32 @@ -32,200 +33,288 @@ func (s TSSlice) Less(i, j int) bool { return s[i] < s[j] } type VideoPack struct { Timestamp uint32 CompositionTime uint32 - Payload []byte //NALU - NalType byte + Payload []byte + NALUs [][]byte + IDR bool // 是否关键帧 Sequence int } -func (vp *VideoPack) ToRTMPTag() []byte { - nalu := vp.Payload - cts := vp.CompositionTime - payload := utils.GetSlice(9 + len(nalu)) - if nalu[0]&31 == codec.NALU_IDR_Picture { - payload[0] = 0x17 - } else { - payload[0] = 0x27 - } - payload[1] = 0x01 - utils.BigEndian.PutUint24(payload[2:], cts) - utils.BigEndian.PutUint32(payload[5:], uint32(len(nalu))) - copy(payload[9:], nalu) - return payload +func (vp VideoPack) Clone() VideoPack { + return vp } type VideoTrack struct { IDRIndex byte //最近的关键帧位置,首屏渲染 Track_Video - SPS []byte `json:"-"` - PPS []byte `json:"-"` - SPSInfo codec.SPSInfo - GOP byte //关键帧间隔 - RtmpTag []byte `json:"-"` //rtmp需要先发送一个序列帧,包含SPS和PPS - WaitIDR context.Context `json:"-"` - revIDR func() - pushRTP func(rtp.Packet) + SPSInfo codec.SPSInfo + GOP byte //关键帧间隔 + ExtraData *VideoPack `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) + WaitIDR context.Context `json:"-"` + revIDR func() + PushByteStream func(pack VideoPack) + PushNalu func(pack VideoPack) + WriteByteStream func(writer io.Writer, pack VideoPack) //使用函数写入,避免申请内存 + } -func (vt *VideoTrack) PushRTP(pack rtp.Packet) { - vt.pushRTP(pack) -} - -func NewVideoTrack() *VideoTrack { - var result VideoTrack +func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) { var cancel context.CancelFunc - result.Buffer = NewRing_Video() - result.WaitIDR, cancel = context.WithCancel(context.Background()) - result.revIDR = func() { - if result.RtmpTag == nil { - return + vt = &VideoTrack{ + revIDR: func() { + vt.IDRIndex = vt.Buffer.Index + cancel() + vt.revIDR = func() { + vt.GOP = vt.Buffer.Index - vt.IDRIndex + vt.IDRIndex = vt.Buffer.Index + } + }, + } + vt.PushByteStream = vt.pushByteStream + vt.PushNalu = vt.pushNalu + vt.Stream = s + vt.CodecID = codec + vt.Buffer = NewRing_Video() + vt.WaitIDR, cancel = context.WithCancel(context.Background()) + switch codec { + case 7: + s.VideoTracks.AddTrack("h264", vt) + case 12: + s.VideoTracks.AddTrack("h265", vt) + } + return +} + +func (vt *VideoTrack) PushAnnexB(pack VideoPack) { + for _, payload := range codec.SplitH264(pack.Payload) { + pack.NALUs = append(pack.NALUs, payload) + } + vt.PushNalu(pack) +} + +func (vt *VideoTrack) pushNalu(pack VideoPack) { + // 缓冲中只包含Nalu数据所以写入rtmp格式时需要按照ByteStream格式写入 + vt.WriteByteStream = func(writer io.Writer, pack VideoPack) { + tmp := utils.GetSlice(4) + defer utils.RecycleSlice(tmp) + if pack.IDR { + tmp[0] = 0x10 | vt.CodecID + } else { + tmp[0] = 0x20 | vt.CodecID } - result.IDRIndex = result.Buffer.Index - cancel() - result.revIDR = func() { - result.GOP = result.Buffer.Index - result.IDRIndex - result.IDRIndex = result.Buffer.Index + tmp[1] = 1 + writer.Write(tmp[:2]) + cts := pack.CompositionTime + utils.BigEndian.PutUint24(tmp, cts) + writer.Write(tmp[:3]) + for _, nalu := range pack.NALUs { + utils.BigEndian.PutUint32(tmp, uint32(len(nalu))) + writer.Write(tmp) + writer.Write(nalu) + } + } + switch vt.CodecID { + case 7: + { + var info codec.AVCDecoderConfigurationRecord + vt.PushNalu = func(pack VideoPack) { + // 等待接收SPS和PPS数据 + for _, nalu := range pack.NALUs { + switch nalu[0] & naluTypeBitmask { + case codec.NALU_SPS: + info.SequenceParameterSetNALUnit = nalu + info.SequenceParameterSetLength = uint16(len(nalu)) + vt.SPSInfo, _ = codec.ParseSPS(nalu) + case codec.NALU_PPS: + info.PictureParameterSetNALUnit = nalu + info.PictureParameterSetLength = uint16(len(nalu)) + } + } + if info.SequenceParameterSetNALUnit != nil && info.PictureParameterSetNALUnit != nil { + vt.ExtraData = &VideoPack{ + Payload: codec.BuildH264SeqHeaderFromSpsPps(info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit), + NALUs: [][]byte{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit}, + } + fuaBuffer := bytes.NewBuffer([]byte{}) + //已完成SPS和PPS 组装,重置push函数,接收视频数据 + vt.PushNalu = func(pack VideoPack) { + var nonIDRs [][]byte + for _, nalu := range pack.NALUs { + naluType := nalu[0] & naluTypeBitmask + switch naluType { + case codec.NALU_STAPA: + var nalus [][]byte + for currOffset, naluSize := stapaHeaderSize, 0; currOffset < len(nalu); currOffset += naluSize { + naluSize = int(binary.BigEndian.Uint16(nalu[currOffset:])) + currOffset += stapaNALULengthSize + if currOffset+len(nalu) < currOffset+naluSize { + utils.Printf("STAP-A declared size(%d) is larger then buffer(%d)", naluSize, len(nalu)-currOffset) + return + } + nalus = append(nalus, nalu[currOffset:currOffset+naluSize]) + } + p := pack.Clone() + p.NALUs = nalus + vt.PushNalu(p) + case codec.NALU_FUA: + if len(nalu) < fuaHeaderSize { + utils.Printf("Payload is not large enough to be FU-A") + return + } + if nalu[1]&fuaStartBitmask != 0 { + naluRefIdc := nalu[0] & naluRefIdcBitmask + fragmentedNaluType := nalu[1] & naluTypeBitmask + nalu[fuaHeaderSize-1] = naluRefIdc | fragmentedNaluType + fuaBuffer.Write(nalu) + } else if nalu[1]&fuaEndBitmask != 0 { + p := pack.Clone() + p.NALUs = [][]byte{fuaBuffer.Bytes()[fuaHeaderSize-1:]} + fuaBuffer = bytes.NewBuffer([]byte{}) + vt.PushNalu(p) + } else { + fuaBuffer.Write(nalu[fuaHeaderSize:]) + } + case codec.NALU_Access_Unit_Delimiter: + case codec.NALU_IDR_Picture: + p := pack.Clone() + p.IDR = true + p.NALUs = [][]byte{nalu} + vt.push(p) + case codec.NALU_Non_IDR_Picture: + nonIDRs = append(nonIDRs, nalu) + case codec.NALU_SEI: + case codec.NALU_Filler_Data: + default: + utils.Printf("nalType not support yet:%d", naluType) + } + if len(nonIDRs) > 0 { + pack.NALUs = nonIDRs + vt.push(pack) + } + } + } + } + } + } + case 12: + var vps, sps, pps []byte + vt.PushNalu = func(pack VideoPack) { + // 等待接收SPS和PPS数据 + for _, nalu := range pack.NALUs { + switch nalu[0] & naluTypeBitmask_hevc >> 1 { + case codec.NAL_UNIT_VPS: + vps = nalu + case codec.NAL_UNIT_SPS: + sps = nalu + vt.SPSInfo, _ = codec.ParseSPS(nalu) + case codec.NAL_UNIT_PPS: + pps = nalu + } + } + if vps != nil && sps != nil && pps != nil { + extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vps, sps, pps) + if err != nil { + return + } + vt.ExtraData = &VideoPack{ + Payload: extraData, + NALUs: [][]byte{vps, sps, pps}, + } + vt.PushNalu = func(pack VideoPack) { + var nonIDRs [][]byte + for _, nalu := range pack.NALUs { + naluType := nalu[0] & naluTypeBitmask_hevc >> 1 + switch naluType { + case codec.NAL_UNIT_CODED_SLICE_BLA, + codec.NAL_UNIT_CODED_SLICE_BLANT, + codec.NAL_UNIT_CODED_SLICE_BLA_N_LP, + codec.NAL_UNIT_CODED_SLICE_IDR, + codec.NAL_UNIT_CODED_SLICE_IDR_N_LP, + codec.NAL_UNIT_CODED_SLICE_CRA: + p := pack.Clone() + p.IDR = true + p.NALUs = [][]byte{nalu} + vt.push(p) + case 0, 1, 2, 3, 4, 5, 6, 7, 9: + nonIDRs = append(nonIDRs, nalu) + } + } + if len(nonIDRs) > 0 { + pack.NALUs = nonIDRs + vt.push(pack) + } + } + } } } - result.pushRTP = result.pushRTP0 - return &result } -// Push 来自发布者推送的视频 -func (vt *VideoTrack) Push(pack VideoPack) { +func (vt *VideoTrack) pushByteStream(pack VideoPack) { + if pack.Payload[1] != 0 { + return + } else { + vt.CodecID = pack.Payload[0] & 0x0F + var nalulenSize int + switch vt.CodecID { + case 7: + var info codec.AVCDecoderConfigurationRecord + if _, err := info.Unmarshal(pack.Payload[5:]); err == nil { + vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit) + pack.NALUs = append(pack.NALUs, info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit) + nalulenSize = int(info.LengthSizeMinusOne&3 + 1) + vt.ExtraData = &pack + vt.Stream.VideoTracks.AddTrack("h264", vt) + } + case 12: + if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(pack.Payload); err == nil { + pack.NALUs = append(pack.NALUs, vps, sps, pps) + vt.SPSInfo, _ = codec.ParseSPS(sps) + nalulenSize = int(pack.Payload[26]) & 0x03 + vt.ExtraData = &pack + vt.Stream.VideoTracks.AddTrack("h265", vt) + } + } + vt.WriteByteStream = func(writer io.Writer, pack VideoPack) { + writer.Write(pack.Payload) + } + // 已完成序列帧组装,重置Push函数,从Payload中提取Nalu供非bytestream格式使用 + vt.PushByteStream = func(pack VideoPack) { + if len(pack.Payload) < 4 { + return + } + vt.GetBPS(len(pack.Payload)) + pack.IDR = pack.Payload[0]>>4 == 1 + pack.CompositionTime = utils.BigEndian.Uint24(pack.Payload[2:]) + nalus := pack.Payload[5:] + for len(nalus) > nalulenSize { + nalulen := 0 + for i := 0; i < nalulenSize; i++ { + nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1)) + } + pack.NALUs = append(pack.NALUs, nalus[nalulenSize:nalulen+nalulenSize]) + nalus = nalus[nalulen+nalulenSize:] + } + vt.push(pack) + } + } +} + +func (vt *VideoTrack) push(pack VideoPack) { if vt.Stream != nil { vt.Stream.Update() } - payload := pack.Payload - payloadLen := len(payload) - if payloadLen == 0 { - return - } vbr := vt.Buffer video := vbr.Current - video.NalType = payload[0] & naluTypeBitmask - video.Timestamp = pack.Timestamp - video.CompositionTime = pack.CompositionTime + if vt.Stream.prePayload > 0 && len(pack.Payload) == 0 { + buffer := vbr.GetBuffer() + vt.WriteByteStream(buffer, pack) + video.VideoPack = pack + video.VideoPack.Payload = buffer.Bytes() + } else { + video.VideoPack = pack + } video.Sequence = vt.PacketCount - switch video.NalType { - case codec.NALU_STAPA: - for currOffset, naluSize := stapaHeaderSize, 0; currOffset < len(payload); currOffset += naluSize { - naluSize = int(binary.BigEndian.Uint16(payload[currOffset:])) - currOffset += stapaNALULengthSize - if currOffset+len(payload) < currOffset+naluSize { - utils.Printf("STAP-A declared size(%d) is larger then buffer(%d)", naluSize, len(payload)-currOffset) - return - } - pack.Payload = payload[currOffset : currOffset+naluSize] - vt.Push(pack) - } - case codec.NALU_FUA: - if len(payload) < fuaHeaderSize { - utils.Printf("Payload is not large enough to be FU-A") - return - } - if payload[1]&fuaStartBitmask != 0 { - naluRefIdc := payload[0] & naluRefIdcBitmask - fragmentedNaluType := payload[1] & naluTypeBitmask - buffer := vbr.GetBuffer() - payload[fuaHeaderSize-1] = naluRefIdc | fragmentedNaluType - buffer.Write(payload) - } else if payload[1]&fuaEndBitmask != 0 { - pack.Payload = video.Bytes()[fuaHeaderSize-1:] - vt.Push(pack) - } else { - video.Write(payload[fuaHeaderSize:]) - } - - case codec.NALU_SPS: - vt.SPS = payload - vt.SPSInfo, _ = codec.ParseSPS(payload) - case codec.NALU_PPS: - vt.PPS = payload - if vt.RtmpTag == nil { - vt.SetRtmpTag() - } - case codec.NALU_Access_Unit_Delimiter: - - case codec.NALU_IDR_Picture: + if pack.IDR { vt.revIDR() - fallthrough - case codec.NALU_Non_IDR_Picture: - video.Payload = payload - vt.Track_Video.GetBPS(payloadLen) - vbr.NextW() - case codec.NALU_SEI: - case codec.NALU_Filler_Data: - default: - utils.Printf("nalType not support yet:%d", video.NalType) } -} - -func (vt *VideoTrack) SetRtmpTag() { - lenSPS, lenPPS := len(vt.SPS), len(vt.PPS) - vt.RtmpTag = append([]byte{}, codec.RTMP_AVC_HEAD...) - copy(vt.RtmpTag[6:], vt.SPS[1:4]) - vt.RtmpTag = append(vt.RtmpTag, 0xE1, byte(lenSPS>>8), byte(lenSPS)) - vt.RtmpTag = append(vt.RtmpTag, vt.SPS...) - vt.RtmpTag = append(append(vt.RtmpTag, 0x01, byte(lenPPS>>8), byte(lenPPS)), vt.PPS...) -} -func (vt *VideoTrack) pushRTP0(pack rtp.Packet) { - t := pack.Timestamp / 90 - if t < vt.Buffer.GetLast().Timestamp { - if vt.WaitIDR.Err() == nil { - return - } - //有B帧 - var tmpVT VideoTrack - tmpVT.Buffer = NewRing_Video() - tmpVT.revIDR = func() { - tmpVT.IDRIndex = tmpVT.Buffer.Index - } - // tmpVT.pushRTP = func(p rtp.Packet) { - // tmpVT.Push(VideoPack{Timestamp:p.Timestamp/90,Payload:p.Payload}) - // } - gopBuffer := tmpVT.Buffer //缓存一个GOP用来计算dts - var gopFirst byte - var tsSlice TSSlice - for i := vt.IDRIndex; vt.Buffer.Index != i; i++ { - t := vt.Buffer.GetAt(i) - c := gopBuffer.Current - c.Payload = append(c.Payload, t.Payload...) - c.Timestamp = t.Timestamp - c.NalType = t.NalType - tsSlice = append(tsSlice, gopBuffer.Current.Timestamp) - gopBuffer.NextW() - } - vt.pushRTP = func(pack rtp.Packet) { - t := pack.Timestamp / 90 - c := gopBuffer.Current - vp := VideoPack{Timestamp: t} - vp.Payload = append(vp.Payload, pack.Payload...) - tmpVT.Push(vp) - if c != gopBuffer.Current { - if c.NalType == codec.NALU_IDR_Picture { - sort.Sort(tsSlice) //排序后相当于DTS列表 - var offset uint32 - for i := 0; i < len(tsSlice); i++ { - j := gopFirst + byte(i) - f := gopBuffer.GetAt(j) - if f.Timestamp+offset < tsSlice[i] { - offset = tsSlice[i] - f.Timestamp - } - } - for i := 0; i < len(tsSlice); i++ { - f := gopBuffer.GetAt(gopFirst + byte(i)) - f.CompositionTime = f.Timestamp + offset - tsSlice[i] - f.Timestamp = tsSlice[i] - vt.Push(f.VideoPack) - } - gopFirst = gopBuffer.Index - 1 - tsSlice = nil - } - tsSlice = append(tsSlice, t) - } - } - vt.pushRTP(pack) - return - } - vt.Push(VideoPack{Timestamp: t, Payload: pack.Payload}) + vbr.NextW() }