重构媒体轨道等待逻辑

This commit is contained in:
langhuihui
2021-06-14 22:15:25 +08:00
parent 96c29bbfa6
commit a0048a9c9d
11 changed files with 764 additions and 494 deletions

View File

@@ -1,74 +1,138 @@
package engine package engine
import ( import (
"github.com/Monibuca/utils/v3" "io"
"github.com/Monibuca/utils/v3/codec" "github.com/Monibuca/utils/v3/codec"
"github.com/pion/rtp"
) )
type AudioPack struct { type AudioPack struct {
Timestamp uint32 Timestamp uint32
Payload []byte Payload []byte
Raw []byte
SequenceNumber uint16 SequenceNumber uint16
} }
type AudioTrack struct { type AudioTrack struct {
Track_Audio Track_Audio
SoundFormat byte //4bit SoundRate int //2bit
SoundRate int //2bit SoundSize byte //1bit
SoundSize byte //1bit Channels byte //1bit
Channels byte //1bit ExtraData []byte `json:"-"` //rtmp协议需要先发这个帧
RtmpTag []byte //rtmp协议需要先发这个帧 PushByteStream func(pack AudioPack)
PushRaw func(pack AudioPack)
WriteByteStream func(writer io.Writer, pack AudioPack) //使用函数写入,避免申请内存
} }
func (at *AudioPack) ToRTMPTag(aac byte) []byte { func (at *AudioTrack) pushByteStream(pack AudioPack) {
audio := at.Payload at.CodecID = pack.Payload[0] >> 4
l := len(audio) + 1 at.WriteByteStream = func(writer io.Writer, pack AudioPack) {
isAAC := 0 writer.Write(pack.Payload)
if aac>>4 == 10 {
isAAC = 1
} }
payload := utils.GetSlice(l + isAAC) switch at.CodecID {
payload[0] = aac case 10:
if isAAC == 1 { at.Stream.AudioTracks.AddTrack("aac", at)
payload[1] = 1 case 7:
copy(payload[2:], audio) at.Stream.AudioTracks.AddTrack("pcma", at)
} else { case 8:
copy(payload[1:], audio) at.Stream.AudioTracks.AddTrack("pcmu", at)
} }
return payload switch at.CodecID {
case 10:
if pack.Payload[1] != 0 {
return
} else {
config1, config2 := pack.Payload[2], pack.Payload[3]
//audioObjectType = (config1 & 0xF8) >> 3
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4
// 2 AAC LC ISO/IEC 14496-3 subpart 4
// 3 AAC SSR ISO/IEC 14496-3 subpart 4
// 4 AAC LTP ISO/IEC 14496-3 subpart 4
at.SoundRate = codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]
at.Channels = ((config2 >> 3) & 0x0F) //声道
//frameLengthFlag = (config2 >> 2) & 0x01
//dependsOnCoreCoder = (config2 >> 1) & 0x01
//extensionFlag = config2 & 0x01
at.ExtraData = pack.Payload
at.PushByteStream = func(pack AudioPack) {
pack.Raw = pack.Payload[2:]
at.push(pack)
}
}
default:
at.SoundRate = codec.SoundRate[(pack.Payload[0]&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz
at.SoundSize = (pack.Payload[0] & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples
at.Channels = pack.Payload[0]&0x01 + 1
at.ExtraData = pack.Payload[:1]
at.PushByteStream = func(pack AudioPack) {
payloadLen := len(pack.Payload)
if payloadLen < 4 {
return
}
at.GetBPS(payloadLen)
pack.Raw = pack.Payload[1:]
at.push(pack)
}
at.PushByteStream(pack)
}
}
func (at *AudioTrack) pushRaw(pack AudioPack) {
switch at.CodecID {
case 10:
at.WriteByteStream = func(writer io.Writer, pack AudioPack) {
writer.Write([]byte{at.ExtraData[0], 1})
writer.Write(pack.Raw)
}
default:
at.WriteByteStream = func(writer io.Writer, pack AudioPack) {
writer.Write([]byte{at.ExtraData[0]})
writer.Write(pack.Raw)
}
}
at.PushRaw = at.push
at.push(pack)
} }
// Push 来自发布者推送的音频 // Push 来自发布者推送的音频
func (at *AudioTrack) Push(timestamp uint32, payload []byte) { func (at *AudioTrack) push(pack AudioPack) {
if at.Stream != nil { if at.Stream != nil {
at.Stream.Update() at.Stream.Update()
} }
payloadLen := len(payload) abr := at.Buffer
if payloadLen < 4 { audio := abr.Current
return audio.AudioPack = pack
if at.Stream.prePayload > 0 && len(pack.Payload) == 0 {
buffer := abr.GetBuffer()
at.WriteByteStream(buffer, pack)
audio.AudioPack = pack
audio.AudioPack.Payload = buffer.Bytes()
} else {
audio.AudioPack = pack
} }
audio := at.Buffer abr.NextW()
audio.Current.Timestamp = timestamp
audio.Current.Payload = payload
at.Track_Audio.GetBPS(payloadLen)
audio.NextW()
} }
func (at *AudioTrack) PushRTP(pack rtp.Packet) {
t := pack.Timestamp / 90 func (s *Stream) NewAudioTrack(codec byte) (at *AudioTrack) {
for _, payload := range codec.ParseRTPAAC(pack.Payload) { at = &AudioTrack{}
at.Push(t, payload) at.PushByteStream = at.pushByteStream
at.PushRaw = at.pushRaw
at.Stream = s
at.Buffer = NewRing_Audio()
switch codec {
case 10:
s.AudioTracks.AddTrack("aac", at)
case 7:
s.AudioTracks.AddTrack("pcma", at)
case 8:
s.AudioTracks.AddTrack("pcmu", at)
} }
} return
func NewAudioTrack() *AudioTrack {
var result AudioTrack
result.Buffer = NewRing_Audio()
return &result
} }
func (at *AudioTrack) SetASC(asc []byte) { func (at *AudioTrack) SetASC(asc []byte) {
at.RtmpTag = append([]byte{0xAF, 0}, asc...) at.ExtraData = append([]byte{0xAF, 0}, asc...)
config1 := asc[0] config1 := asc[0]
config2 := asc[1] config2 := asc[1]
at.SoundFormat = 10 at.CodecID = 10
//audioObjectType = (config1 & 0xF8) >> 3 //audioObjectType = (config1 & 0xF8) >> 3
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4 // 1 AAC MAIN ISO/IEC 14496-3 subpart 4
// 2 AAC LC ISO/IEC 14496-3 subpart 4 // 2 AAC LC ISO/IEC 14496-3 subpart 4

View File

@@ -1,22 +1,16 @@
package engine package engine
import "github.com/pion/rtp" import (
"context"
"sync"
"time"
)
type Track interface { type Track interface {
PushRTP(rtp.Packet)
GetBPS(int) GetBPS(int)
Dispose() Dispose()
} }
// 一定要在写入Track的协程中调用该函数这个函数的作用是防止订阅者无限等待
func DisposeTracks(tracks ...Track) {
for _, track := range tracks {
if track != nil {
track.Dispose()
}
}
}
type Track_Audio struct { type Track_Audio struct {
Buffer *Ring_Audio `json:"-"` Buffer *Ring_Audio `json:"-"`
Stream *Stream `json:"-"` Stream *Stream `json:"-"`
@@ -56,3 +50,103 @@ func (t *Track_Video) GetBPS(payloadLen int) {
func (t *Track_Video) Dispose() { func (t *Track_Video) Dispose() {
t.Buffer.Dispose() t.Buffer.Dispose()
} }
type TrackWaiter struct {
Track
*sync.Cond `json:"-"`
}
func (tw *TrackWaiter) Ok(t Track) {
tw.Track = t
tw.Broadcast()
}
func (tw *TrackWaiter) Dispose() {
if tw.Cond != nil {
tw.Broadcast()
}
if tw.Track != nil {
tw.Track.Dispose()
}
}
func (tw *TrackWaiter) Wait(c chan<- Track) {
tw.L.Lock()
tw.Cond.Wait()
tw.L.Unlock()
c <- tw.Track
}
type Tracks struct {
m map[string]*TrackWaiter
sync.RWMutex
context.Context
}
func (ts *Tracks) Codecs() (result []string) {
ts.RLock()
defer ts.RUnlock()
for codec := range ts.m {
result = append(result, codec)
}
return
}
func (ts *Tracks) Init() {
ts.m = make(map[string]*TrackWaiter)
ts.Context, _ = context.WithTimeout(context.Background(), time.Second*5)
}
func (ts *Tracks) Dispose() {
ts.RLock()
defer ts.RUnlock()
for _, t := range ts.m {
t.Dispose()
}
}
func (ts *Tracks) AddTrack(codec string, t Track) {
ts.Lock()
if tw, ok := ts.m[codec]; ok {
ts.Unlock()
tw.Ok(t)
} else {
ts.m[codec] = &TrackWaiter{Track: t}
ts.Unlock()
}
}
func (ts *Tracks) GetTrack(codec string) (tw *TrackWaiter, ok bool) {
ts.Lock()
if tw, ok = ts.m[codec]; ok {
ts.Unlock()
ok = tw.Track != nil
} else {
tw = &TrackWaiter{Cond: sync.NewCond(new(sync.Mutex))}
ts.m[codec] = tw
ts.Unlock()
}
return
}
func (ts *Tracks) WaitTrack(codecs ...string) Track {
if len(codecs) == 0 {
codecs = ts.Codecs()
}
var tws []*TrackWaiter
for _, codec := range codecs {
if tw, ok := ts.GetTrack(codec); ok {
return tw.Track
} else {
tws = append(tws, tw)
}
}
if ts.Err() != nil {
return nil
}
c := make(chan Track, len(tws))
for _, tw := range tws {
go tw.Wait(c)
}
select {
case <-ts.Done():
return nil
case t := <-c:
return t
}
}

4
go.mod
View File

@@ -4,8 +4,8 @@ go 1.13
require ( require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/Monibuca/utils/v3 v3.0.0-alpha5 github.com/Monibuca/utils/v3 v3.0.0-beta
github.com/logrusorgru/aurora v2.0.3+incompatible github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/pion/rtp v1.6.2 github.com/pion/rtp v1.6.5
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
) )

12
go.sum
View File

@@ -1,9 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Monibuca/utils/v3 v3.0.0-alpha4 h1:pecYA89kWmtGOeY6R99d4T1epPJ1wc+jFrrJY13VD04= github.com/Monibuca/utils/v3 v3.0.0-beta h1:z4p/BSH5J9Ja/gwoDmj1RyN+b0q28Nmn/fqXiwq2hGY=
github.com/Monibuca/utils/v3 v3.0.0-alpha4/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= github.com/Monibuca/utils/v3 v3.0.0-beta/go.mod h1:mQYP/OMox1tkWP6Qut7pBfARr1TXSRkK662dexQl6kI=
github.com/Monibuca/utils/v3 v3.0.0-alpha5 h1:IOyW/KJSRdRg+TPcgwkHLBynqfNQOV6p3iP7LgXEMFc=
github.com/Monibuca/utils/v3 v3.0.0-alpha5/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo=
github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg=
@@ -16,10 +14,12 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U= github.com/pion/rtp v1.6.5 h1:o2cZf8OascA5HF/b0PAbTxRKvOWxTQxWYt7SlToxFGI=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.5/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY=
github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@@ -1,53 +0,0 @@
package engine
import (
"context"
"time"
)
// Publisher 发布者实体定义
type Publisher struct {
context.Context
cancel context.CancelFunc
AutoUnPublish bool // 当无人订阅时自动停止发布
*Stream
Type string //类型,用来区分不同的发布者
timeout *time.Timer //更新时间用来做超时处理
}
// Close 关闭发布者
func (p *Publisher) Close() {
if p.Running() {
p.Stream.Close()
}
}
func (p *Publisher) Update() {
if p.timeout != nil {
p.timeout.Reset(config.PublishTimeout)
} else {
p.timeout = time.AfterFunc(config.PublishTimeout, p.Close)
}
}
// Running 发布者是否正在发布
func (p *Publisher) Running() bool {
return p.Stream != nil && p.Err() == nil
}
// Publish 发布者进行发布操作
func (p *Publisher) Publish(streamPath string) bool {
p.Stream = GetStream(streamPath)
//检查是否已存在发布者
if p.Publisher != nil {
return false
}
p.Context, p.cancel = context.WithCancel(p.Stream)
p.Publisher = p
p.Stream.Type = p.Type
p.StartTime = time.Now()
p.Update()
//触发钩子
TriggerHook(Hook{HOOK_PUBLISH, p.Stream})
return true
}

View File

@@ -97,14 +97,16 @@ func (r *Ring_Audio) Dispose() {
} }
// NextR 读下一个 // NextR 读下一个
func (r *Ring_Audio) NextR() { func (r *Ring_Audio) NextR() bool {
r.GoNext() r.GoNext()
r.Current.Wait() r.Current.Wait()
return r.Flag != 2 // 2代表已经销毁
} }
func (r *Ring_Audio) GetBuffer() *bytes.Buffer { func (r *Ring_Audio) GetBuffer() *bytes.Buffer {
if r.Current.Buffer == nil { if r.Current.Buffer == nil {
r.Current.Buffer = bytes.NewBuffer([]byte{}) r.Current.Payload = []byte{}
r.Current.Buffer = bytes.NewBuffer(r.Current.Payload)
} else { } else {
r.Current.Reset() r.Current.Reset()
} }

View File

@@ -81,6 +81,7 @@ func (r *Ring_Video) NextW() {
r.GoNext() r.GoNext()
r.Current.Add(1) r.Current.Add(1)
item.Done() item.Done()
//Flag不为1代表被Dispose了但尚未处理Done
if !atomic.CompareAndSwapInt32(&r.Flag, 1, 0) { if !atomic.CompareAndSwapInt32(&r.Flag, 1, 0) {
r.Current.Done() r.Current.Done()
} }
@@ -91,20 +92,23 @@ func (r *Ring_Video) Dispose() {
if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) { if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) {
r.Current.Done() r.Current.Done()
} else if atomic.CompareAndSwapInt32(&r.Flag, 1, 2) { } else if atomic.CompareAndSwapInt32(&r.Flag, 1, 2) {
//当前是1代表正在写入此时变成2但是Done的任务得交给NextW来处理
} else if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) { } else if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) {
r.Current.Done() r.Current.Done()
} }
} }
// NextR 读下一个 // NextR 读下一个
func (r *Ring_Video) NextR() { func (r *Ring_Video) NextR() bool{
r.GoNext() r.GoNext()
r.Current.Wait() r.Current.Wait()
return r.Flag != 2 // 2代表已经销毁
} }
func (r *Ring_Video) GetBuffer() *bytes.Buffer { func (r *Ring_Video) GetBuffer() *bytes.Buffer {
if r.Current.Buffer == nil { if r.Current.Buffer == nil {
r.Current.Buffer = bytes.NewBuffer([]byte{}) r.Current.Payload = []byte{}
r.Current.Buffer = bytes.NewBuffer(r.Current.Payload)
} else { } else {
r.Current.Reset() r.Current.Reset()
} }

127
rtp.go Normal file
View File

@@ -0,0 +1,127 @@
package engine
import (
"sort"
"github.com/Monibuca/utils/v3/codec"
"github.com/pion/rtp"
)
type RTPPublisher struct {
rtp.Packet
Push func(payload []byte)
}
type RTPAudio struct {
RTPPublisher
*AudioTrack
}
type RTPVideo struct {
RTPPublisher
*VideoTrack
}
func (s *Stream) NewRTPVideo(codec byte) (r *RTPVideo) {
r = &RTPVideo{
VideoTrack: s.NewVideoTrack(codec),
}
r.Push = r.push
return
}
func (s *Stream) NewRTPAudio(codec byte) (r *RTPAudio) {
r = &RTPAudio{
AudioTrack: s.NewAudioTrack(codec),
}
r.Push = r.push
return
}
func (v *RTPVideo) push(payload []byte) {
vt := v.VideoTrack
if err := v.Unmarshal(payload); err != nil {
return
}
t := v.Timestamp / 90
if t < vt.Buffer.GetLast().Timestamp {
if vt.WaitIDR.Err() == nil {
return
}
//有B帧
var tmpVT VideoTrack
tmpVT.Buffer = NewRing_Video()
tmpVT.revIDR = func() {
tmpVT.IDRIndex = tmpVT.Buffer.Index
}
// tmpVT.pushRTP = func(p rtp.Packet) {
// tmpVT.Push(VideoPack{Timestamp:p.Timestamp/90,Payload:p.Payload})
// }
gopBuffer := tmpVT.Buffer //缓存一个GOP用来计算dts
var gopFirst byte
var tsSlice TSSlice
for i := vt.IDRIndex; vt.Buffer.Index != i; i++ {
t := vt.Buffer.GetAt(i)
c := gopBuffer.Current
c.VideoPack = t.VideoPack.Clone()
tsSlice = append(tsSlice, gopBuffer.Current.Timestamp)
gopBuffer.NextW()
}
v.Push = func(payload []byte) {
if err := v.Unmarshal(payload); err != nil {
return
}
t := v.Timestamp / 90
c := gopBuffer.Current
vp := VideoPack{Timestamp: t, NALUs: [][]byte{v.Payload}}
tmpVT.PushNalu(vp)
if c != gopBuffer.Current {
if c.IDR {
sort.Sort(tsSlice) //排序后相当于DTS列表
var offset uint32
for i := 0; i < len(tsSlice); i++ {
j := gopFirst + byte(i)
f := gopBuffer.GetAt(j)
if f.Timestamp+offset < tsSlice[i] {
offset = tsSlice[i] - f.Timestamp
}
}
for i := 0; i < len(tsSlice); i++ {
f := gopBuffer.GetAt(gopFirst + byte(i))
f.CompositionTime = f.Timestamp + offset - tsSlice[i]
f.Timestamp = tsSlice[i]
vt.PushNalu(f.VideoPack)
}
gopFirst = gopBuffer.Index - 1
tsSlice = nil
}
tsSlice = append(tsSlice, t)
}
}
v.Push(payload)
return
}
vt.PushNalu(VideoPack{Timestamp: t, NALUs: [][]byte{v.Payload}})
}
func (v *RTPAudio) push(payload []byte) {
at := v.AudioTrack
if err := v.Unmarshal(payload); err != nil {
return
}
switch at.CodecID {
case 10:
v.Push = func(payload []byte) {
if err := v.Unmarshal(payload); err != nil {
return
}
for _, payload = range codec.ParseRTPAAC(v.Payload) {
at.PushRaw(AudioPack{Timestamp: v.Timestamp / 90, Raw: payload})
}
}
case 7, 8:
v.Push = func(payload []byte) {
if err := v.Unmarshal(payload); err != nil {
return
}
at.PushRaw(AudioPack{Timestamp: v.Timestamp / 8, Raw: v.Payload})
}
}
}

189
stream.go
View File

@@ -9,117 +9,122 @@ import (
. "github.com/logrusorgru/aurora" . "github.com/logrusorgru/aurora"
) )
// Streams 所有的流集合 type StreamCollection struct {
var Streams sync.Map sync.RWMutex
m map[string]*Stream
}
//FindStream 根据流路径查找流 func (sc *StreamCollection) GetStream(streamPath string) *Stream {
func FindStream(streamPath string) *Stream { sc.RLock()
if s, ok := Streams.Load(streamPath); ok { defer sc.RUnlock()
return s.(*Stream) if s, ok := sc.m[streamPath]; ok {
return s
} }
return nil return nil
} }
func (sc *StreamCollection) Delete(streamPath string) {
sc.Lock()
delete(sc.m, streamPath)
sc.Unlock()
}
//GetStream 根据流路径获取流,如果不存在则创建一个新的 func (sc *StreamCollection) ToList() (r []*Stream) {
func GetStream(streamPath string) (result *Stream) { sc.RLock()
item, loaded := Streams.LoadOrStore(streamPath, &Stream{ defer sc.RUnlock()
StreamPath: streamPath, for _, s := range sc.m {
}) r = append(r, s)
result = item.(*Stream)
if !loaded {
result.Context, result.cancel = context.WithCancel(context.Background())
utils.Print(Green("Stream create:"), BrightCyan(streamPath))
} }
return return
} }
type TrackWaiter struct { func init() {
Track Streams.m = make(map[string]*Stream)
*sync.Cond
} }
func (tw *TrackWaiter) Ok(t Track) { // Streams 所有的流集合
tw.Track = t var Streams StreamCollection
tw.Broadcast()
//FindStream 根据流路径查找流
func FindStream(streamPath string) *Stream {
return Streams.GetStream(streamPath)
} }
// Stream 流定义 // Stream 流定义
type Stream struct { type Stream struct {
context.Context context.Context
cancel context.CancelFunc StreamPath string
StreamPath string Type string //流类型,来自发布者
Type string //流类型,来自发布者 StartTime time.Time //流的创建时间
StartTime time.Time //流的创建时间 Subscribers []*Subscriber // 订阅者
*Publisher `json:"-"` VideoTracks Tracks
Subscribers []*Subscriber // 订阅者 AudioTracks Tracks
VideoTracks sync.Map AutoUnPublish bool // 当无人订阅时自动停止发布
AudioTracks sync.Map Transcoding map[string]string //转码配置key目标编码value发布者提供的编码
OriginVideoTrack *VideoTrack //原始视频轨 subscribeMutex sync.Mutex
OriginAudioTrack *AudioTrack //原始音频轨 timeout *time.Timer //更新时间用来做超时处理
subscribeMutex sync.Mutex Close func() `json:"-"`
prePayload uint32 //需要预拼装ByteStream格式的数据的订阅者数量
} }
func (r *Stream) SetOriginVT(vt *VideoTrack) { func (r *Stream) Update() {
r.OriginVideoTrack = vt r.timeout.Reset(config.PublishTimeout)
switch vt.CodecID {
case 7:
r.AddVideoTrack("h264", vt)
case 12:
r.AddVideoTrack("h265", vt)
}
}
func (r *Stream) SetOriginAT(at *AudioTrack) {
r.OriginAudioTrack = at
switch at.SoundFormat {
case 10:
r.AddAudioTrack("aac", at)
case 7:
r.AddAudioTrack("pcma", at)
case 8:
r.AddAudioTrack("pcmu", at)
}
}
func (r *Stream) AddVideoTrack(codec string, vt *VideoTrack) *VideoTrack {
vt.Stream = r
if actual, loaded := r.VideoTracks.LoadOrStore(codec, &TrackWaiter{vt, sync.NewCond(new(sync.Mutex))}); loaded {
actual.(*TrackWaiter).Ok(vt)
}
return vt
} }
func (r *Stream) AddAudioTrack(codec string, at *AudioTrack) *AudioTrack { // Publish 发布者进行发布操作
at.Stream = r func (r *Stream) Publish() bool {
if actual, loaded := r.AudioTracks.LoadOrStore(codec, &TrackWaiter{at, sync.NewCond(new(sync.Mutex))}); loaded { Streams.Lock()
actual.(*TrackWaiter).Ok(at) defer Streams.Unlock()
if _, ok := Streams.m[r.StreamPath]; ok {
return false
} }
return at r.VideoTracks.Init()
r.AudioTracks.Init()
var cancel context.CancelFunc
customClose := r.Close
r.Context, cancel = context.WithCancel(context.Background())
var closeOnce sync.Once
r.Close = func() {
closeOnce.Do(func() {
r.timeout.Stop()
if customClose != nil {
customClose()
}
cancel()
r.VideoTracks.Dispose()
r.AudioTracks.Dispose()
utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath))
Streams.Delete(r.StreamPath)
TriggerHook(Hook{HOOK_STREAMCLOSE, r})
})
}
r.StartTime = time.Now()
Streams.m[r.StreamPath] = r
utils.Print(Green("Stream publish:"), BrightCyan(r.StreamPath))
r.timeout = time.AfterFunc(config.PublishTimeout, r.Close)
//触发钩子
TriggerHook(Hook{HOOK_PUBLISH, r})
return true
} }
func (r *Stream) Close() { func (r *Stream) WaitVideoTrack(codecs ...string) *VideoTrack {
r.cancel() if !config.EnableVideo {
// if r.OriginVideoTrack != nil { return nil
// r.OriginVideoTrack.Buffer.Current.Done() }
// } if track := r.VideoTracks.WaitTrack(codecs...); track != nil {
// if r.OriginAudioTrack != nil { return track.(*VideoTrack)
// r.OriginAudioTrack.Buffer.Current.Done() }
// } return nil
r.VideoTracks.Range(func(k, v interface{}) bool { }
v.(*TrackWaiter).Broadcast()
if v.(*TrackWaiter).Track != nil { // TODO: 触发转码逻辑
v.(*TrackWaiter).Track.Dispose() func (r *Stream) WaitAudioTrack(codecs ...string) *AudioTrack {
} if !config.EnableAudio {
return true return nil
}) }
r.AudioTracks.Range(func(k, v interface{}) bool { if track := r.AudioTracks.WaitTrack(codecs...); track != nil {
v.(*TrackWaiter).Broadcast() return track.(*AudioTrack)
if v.(*TrackWaiter).Track != nil { }
v.(*TrackWaiter).Track.Dispose() return nil
}
return true
})
utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath))
Streams.Delete(r.StreamPath)
TriggerHook(Hook{HOOK_STREAMCLOSE, r})
} }
//Subscribe 订阅流 //Subscribe 订阅流
@@ -129,6 +134,9 @@ func (r *Stream) Subscribe(s *Subscriber) {
utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath)))
s.Context, s.cancel = context.WithCancel(r) s.Context, s.cancel = context.WithCancel(r)
r.subscribeMutex.Lock() r.subscribeMutex.Lock()
if s.ByteStreamFormat {
r.prePayload++
}
r.Subscribers = append(r.Subscribers, s) r.Subscribers = append(r.Subscribers, s)
r.subscribeMutex.Unlock() r.subscribeMutex.Unlock()
utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
@@ -141,12 +149,15 @@ func (r *Stream) UnSubscribe(s *Subscriber) {
if r.Err() == nil { if r.Err() == nil {
var deleted bool var deleted bool
r.subscribeMutex.Lock() r.subscribeMutex.Lock()
if s.ByteStreamFormat {
r.prePayload--
}
r.Subscribers, deleted = DeleteSliceItem_Subscriber(r.Subscribers, s) r.Subscribers, deleted = DeleteSliceItem_Subscriber(r.Subscribers, s)
r.subscribeMutex.Unlock() r.subscribeMutex.Unlock()
if deleted { if deleted {
utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
TriggerHook(Hook{HOOK_UNSUBSCRIBE, s}) TriggerHook(Hook{HOOK_UNSUBSCRIBE, s})
if len(r.Subscribers) == 0 && (r.Publisher == nil || r.Publisher.AutoUnPublish) { if len(r.Subscribers) == 0 && r.AutoUnPublish {
r.Close() r.Close()
} }
} }

View File

@@ -2,128 +2,54 @@ package engine
import ( import (
"context" "context"
"net/url"
"sync" "sync"
"time" "time"
"github.com/Monibuca/utils/v3/codec"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// Subscriber 订阅者实体定义 // Subscriber 订阅者实体定义
type Subscriber struct { type Subscriber struct {
context.Context `json:"-"` context.Context `json:"-"`
cancel context.CancelFunc cancel context.CancelFunc
Ctx2 context.Context `json:"-"` Ctx2 context.Context `json:"-"`
*Stream `json:"-"` *Stream `json:"-"`
ID string ID string
TotalDrop int //总丢帧 TotalDrop int //总丢帧
TotalPacket int TotalPacket int
Type string Type string
BufferLength int BufferLength int
Delay uint32 Delay uint32
SubscribeTime time.Time SubscribeTime time.Time
Sign string SubscribeArgs url.Values
OnAudio func(pack AudioPack) `json:"-"` OnAudio func(pack AudioPack) `json:"-"`
OnVideo func(pack VideoPack) `json:"-"` OnVideo func(pack VideoPack) `json:"-"`
ByteStreamFormat bool
closeOnce sync.Once
} }
// IsClosed 检查订阅者是否已经关闭 func (s *Subscriber) close() {
func (s *Subscriber) IsClosed() bool { s.UnSubscribe(s)
return s.Context != nil && s.Err() != nil s.cancel()
} }
// Close 关闭订阅者 // Close 关闭订阅者
func (s *Subscriber) Close() { func (s *Subscriber) Close() {
if s.cancel != nil { s.closeOnce.Do(s.close)
s.UnSubscribe(s)
s.cancel()
}
}
func (s *Subscriber) WaitVideoTrack(codec string) *VideoTrack {
if !config.EnableVideo {
return nil
}
waiter, ok := s.VideoTracks.LoadOrStore(codec, &TrackWaiter{nil, sync.NewCond(new(sync.Mutex))})
tw := waiter.(*TrackWaiter)
if !ok {
tw.L.Lock()
tw.Wait()
tw.L.Unlock()
}
if tw.Track == nil {
return nil
}
return tw.Track.(*VideoTrack)
}
func (s *Subscriber) WaitAudioTrack(codecs ...string) *AudioTrack {
if !config.EnableAudio {
return nil
}
for _, codec := range codecs {
if at, ok := s.AudioTracks.Load(codec); ok && at.(*TrackWaiter).Track != nil {
return at.(*TrackWaiter).Track.(*AudioTrack)
}
}
if HasTranscoder && s.OriginAudioTrack != nil {
at := s.AddAudioTrack(codecs[0], NewAudioTrack())
at.SoundFormat = codec.Codec2SoundFormat[codecs[0]]
TriggerHook(Hook{HOOK_REQUEST_TRANSAUDIO, &TransCodeReq{s, codecs[0]}})
return at
}
var once sync.Once
c := make(chan *TrackWaiter)
for _, codec := range codecs {
at, _ := s.AudioTracks.LoadOrStore(codec, &TrackWaiter{nil, sync.NewCond(new(sync.Mutex))})
go func(tw *TrackWaiter) {
tw.L.Lock()
tw.Wait()
tw.L.Unlock()
once.Do(func() {
c <- tw
})
}(at.(*TrackWaiter))
}
tw := <-c
if tw.Track == nil {
return nil
}
return tw.Track.(*AudioTrack)
}
func (s *Subscriber) GetVideoTrack(codec string) *VideoTrack {
if !config.EnableVideo {
return nil
}
if waiter, ok := s.VideoTracks.Load(codec); ok && waiter.(*TrackWaiter).Track != nil {
return waiter.(*TrackWaiter).Track.(*VideoTrack)
}
return nil
}
func (s *Subscriber) GetAudioTrack(codecs ...string) (at *AudioTrack) {
if !config.EnableAudio {
return nil
}
for _, codec := range codecs {
if at, ok := s.AudioTracks.Load(codec); ok && at.(*TrackWaiter).Track != nil {
return at.(*TrackWaiter).Track.(*AudioTrack)
}
}
if HasTranscoder && s.OriginAudioTrack != nil {
at = s.AddAudioTrack(codecs[0], NewAudioTrack())
at.SoundFormat = codec.Codec2SoundFormat[codecs[0]]
TriggerHook(Hook{HOOK_REQUEST_TRANSAUDIO, &TransCodeReq{s, codecs[0]}})
}
return
} }
//Subscribe 开始订阅 将Subscriber与Stream关联 //Subscribe 开始订阅 将Subscriber与Stream关联
func (s *Subscriber) Subscribe(streamPath string) error { func (s *Subscriber) Subscribe(streamPath string) error {
if FindStream(streamPath) == nil { u, _ := url.Parse(streamPath)
s.SubscribeArgs, _ = url.ParseQuery(u.RawQuery)
streamPath = u.Path
if stream := FindStream(streamPath); stream == nil {
return errors.Errorf("Stream not found:%s", streamPath) return errors.Errorf("Stream not found:%s", streamPath)
} } else {
GetStream(streamPath).Subscribe(s) if stream.Subscribe(s); s.Context == nil {
if s.Context == nil { return errors.Errorf("stream not exist:%s", streamPath)
return errors.Errorf("stream not exist:%s", streamPath) }
} }
return nil return nil
} }
@@ -153,7 +79,7 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) {
return return
} }
vr := vt.Buffer.SubRing(vt.IDRIndex) //从关键帧开始读取,首屏秒开 vr := vt.Buffer.SubRing(vt.IDRIndex) //从关键帧开始读取,首屏秒开
vr.Current.Wait() //等到RingBuffer可读 vr.Current.Wait() //等到RingBuffer可读
ar := at.Buffer.SubRing(at.Buffer.Index) ar := at.Buffer.SubRing(at.Buffer.Index)
ar.Current.Wait() ar.Current.Wait()
dropping := false //是否处于丢帧中 dropping := false //是否处于丢帧中
@@ -164,10 +90,12 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) {
if vt.Buffer.Index-vr.Index > 128 { if vt.Buffer.Index-vr.Index > 128 {
dropping = true dropping = true
} }
} else if vr.Current.NalType == codec.NALU_IDR_Picture { } else if vr.Current.IDR {
dropping = false dropping = false
} }
vr.NextR() if !vr.NextR() {
return
}
} else { } else {
if !dropping { if !dropping {
s.OnAudio(ar.Current.AudioPack) s.OnAudio(ar.Current.AudioPack)
@@ -175,7 +103,9 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) {
dropping = true dropping = true
} }
} }
ar.NextR() if !ar.NextR() {
return
}
} }
} }
} }
@@ -204,7 +134,8 @@ func (s *Subscriber) PlayAudio(at *AudioTrack) {
if ctx2 == nil { if ctx2 == nil {
ctx2 = context.TODO() ctx2 = context.TODO()
} }
for action = send; ctx2.Err() == nil && s.Context.Err() == nil; ring.NextR() { action = send
for running := true; ctx2.Err() == nil && s.Context.Err() == nil && running; running = ring.NextR() {
action() action()
} }
} }
@@ -226,7 +157,7 @@ func (s *Subscriber) PlayVideo(vt *VideoTrack) {
droped := 0 droped := 0
var action, send func() var action, send func()
drop := func() { drop := func() {
if ring.Current.NalType == codec.NALU_IDR_Picture { if ring.Current.IDR {
action = send action = send
} else { } else {
droped++ droped++
@@ -241,7 +172,8 @@ func (s *Subscriber) PlayVideo(vt *VideoTrack) {
action = drop action = drop
} }
} }
for action = send; ctx2.Err() == nil && s.Context.Err() == nil; ring.NextR() { action = send
for running := true; ctx2.Err() == nil && s.Context.Err() == nil && running; running = ring.NextR() {
action() action()
} }
} }

View File

@@ -1,13 +1,13 @@
package engine package engine
import ( import (
"bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"sort" "io"
"github.com/Monibuca/utils/v3" "github.com/Monibuca/utils/v3"
"github.com/Monibuca/utils/v3/codec" "github.com/Monibuca/utils/v3/codec"
"github.com/pion/rtp"
) )
const ( const (
@@ -15,10 +15,11 @@ const (
stapaHeaderSize = 1 stapaHeaderSize = 1
stapaNALULengthSize = 2 stapaNALULengthSize = 2
naluTypeBitmask = 0x1F naluTypeBitmask = 0x1F
naluRefIdcBitmask = 0x60 naluTypeBitmask_hevc = 0x7E
fuaStartBitmask = 0x80 //1000 0000 naluRefIdcBitmask = 0x60
fuaEndBitmask = 0x40 //0100 0000 fuaStartBitmask = 0x80 //1000 0000
fuaEndBitmask = 0x40 //0100 0000
) )
type TSSlice []uint32 type TSSlice []uint32
@@ -32,200 +33,288 @@ func (s TSSlice) Less(i, j int) bool { return s[i] < s[j] }
type VideoPack struct { type VideoPack struct {
Timestamp uint32 Timestamp uint32
CompositionTime uint32 CompositionTime uint32
Payload []byte //NALU Payload []byte
NalType byte NALUs [][]byte
IDR bool // 是否关键帧
Sequence int Sequence int
} }
func (vp *VideoPack) ToRTMPTag() []byte { func (vp VideoPack) Clone() VideoPack {
nalu := vp.Payload return vp
cts := vp.CompositionTime
payload := utils.GetSlice(9 + len(nalu))
if nalu[0]&31 == codec.NALU_IDR_Picture {
payload[0] = 0x17
} else {
payload[0] = 0x27
}
payload[1] = 0x01
utils.BigEndian.PutUint24(payload[2:], cts)
utils.BigEndian.PutUint32(payload[5:], uint32(len(nalu)))
copy(payload[9:], nalu)
return payload
} }
type VideoTrack struct { type VideoTrack struct {
IDRIndex byte //最近的关键帧位置,首屏渲染 IDRIndex byte //最近的关键帧位置,首屏渲染
Track_Video Track_Video
SPS []byte `json:"-"` SPSInfo codec.SPSInfo
PPS []byte `json:"-"` GOP byte //关键帧间隔
SPSInfo codec.SPSInfo ExtraData *VideoPack `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS)
GOP byte //关键帧间隔 WaitIDR context.Context `json:"-"`
RtmpTag []byte `json:"-"` //rtmp需要先发送一个序列帧包含SPS和PPS revIDR func()
WaitIDR context.Context `json:"-"` PushByteStream func(pack VideoPack)
revIDR func() PushNalu func(pack VideoPack)
pushRTP func(rtp.Packet) WriteByteStream func(writer io.Writer, pack VideoPack) //使用函数写入,避免申请内存
} }
func (vt *VideoTrack) PushRTP(pack rtp.Packet) { func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) {
vt.pushRTP(pack)
}
func NewVideoTrack() *VideoTrack {
var result VideoTrack
var cancel context.CancelFunc var cancel context.CancelFunc
result.Buffer = NewRing_Video() vt = &VideoTrack{
result.WaitIDR, cancel = context.WithCancel(context.Background()) revIDR: func() {
result.revIDR = func() { vt.IDRIndex = vt.Buffer.Index
if result.RtmpTag == nil { cancel()
return vt.revIDR = func() {
vt.GOP = vt.Buffer.Index - vt.IDRIndex
vt.IDRIndex = vt.Buffer.Index
}
},
}
vt.PushByteStream = vt.pushByteStream
vt.PushNalu = vt.pushNalu
vt.Stream = s
vt.CodecID = codec
vt.Buffer = NewRing_Video()
vt.WaitIDR, cancel = context.WithCancel(context.Background())
switch codec {
case 7:
s.VideoTracks.AddTrack("h264", vt)
case 12:
s.VideoTracks.AddTrack("h265", vt)
}
return
}
func (vt *VideoTrack) PushAnnexB(pack VideoPack) {
for _, payload := range codec.SplitH264(pack.Payload) {
pack.NALUs = append(pack.NALUs, payload)
}
vt.PushNalu(pack)
}
func (vt *VideoTrack) pushNalu(pack VideoPack) {
// 缓冲中只包含Nalu数据所以写入rtmp格式时需要按照ByteStream格式写入
vt.WriteByteStream = func(writer io.Writer, pack VideoPack) {
tmp := utils.GetSlice(4)
defer utils.RecycleSlice(tmp)
if pack.IDR {
tmp[0] = 0x10 | vt.CodecID
} else {
tmp[0] = 0x20 | vt.CodecID
} }
result.IDRIndex = result.Buffer.Index tmp[1] = 1
cancel() writer.Write(tmp[:2])
result.revIDR = func() { cts := pack.CompositionTime
result.GOP = result.Buffer.Index - result.IDRIndex utils.BigEndian.PutUint24(tmp, cts)
result.IDRIndex = result.Buffer.Index writer.Write(tmp[:3])
for _, nalu := range pack.NALUs {
utils.BigEndian.PutUint32(tmp, uint32(len(nalu)))
writer.Write(tmp)
writer.Write(nalu)
}
}
switch vt.CodecID {
case 7:
{
var info codec.AVCDecoderConfigurationRecord
vt.PushNalu = func(pack VideoPack) {
// 等待接收SPS和PPS数据
for _, nalu := range pack.NALUs {
switch nalu[0] & naluTypeBitmask {
case codec.NALU_SPS:
info.SequenceParameterSetNALUnit = nalu
info.SequenceParameterSetLength = uint16(len(nalu))
vt.SPSInfo, _ = codec.ParseSPS(nalu)
case codec.NALU_PPS:
info.PictureParameterSetNALUnit = nalu
info.PictureParameterSetLength = uint16(len(nalu))
}
}
if info.SequenceParameterSetNALUnit != nil && info.PictureParameterSetNALUnit != nil {
vt.ExtraData = &VideoPack{
Payload: codec.BuildH264SeqHeaderFromSpsPps(info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit),
NALUs: [][]byte{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit},
}
fuaBuffer := bytes.NewBuffer([]byte{})
//已完成SPS和PPS 组装重置push函数接收视频数据
vt.PushNalu = func(pack VideoPack) {
var nonIDRs [][]byte
for _, nalu := range pack.NALUs {
naluType := nalu[0] & naluTypeBitmask
switch naluType {
case codec.NALU_STAPA:
var nalus [][]byte
for currOffset, naluSize := stapaHeaderSize, 0; currOffset < len(nalu); currOffset += naluSize {
naluSize = int(binary.BigEndian.Uint16(nalu[currOffset:]))
currOffset += stapaNALULengthSize
if currOffset+len(nalu) < currOffset+naluSize {
utils.Printf("STAP-A declared size(%d) is larger then buffer(%d)", naluSize, len(nalu)-currOffset)
return
}
nalus = append(nalus, nalu[currOffset:currOffset+naluSize])
}
p := pack.Clone()
p.NALUs = nalus
vt.PushNalu(p)
case codec.NALU_FUA:
if len(nalu) < fuaHeaderSize {
utils.Printf("Payload is not large enough to be FU-A")
return
}
if nalu[1]&fuaStartBitmask != 0 {
naluRefIdc := nalu[0] & naluRefIdcBitmask
fragmentedNaluType := nalu[1] & naluTypeBitmask
nalu[fuaHeaderSize-1] = naluRefIdc | fragmentedNaluType
fuaBuffer.Write(nalu)
} else if nalu[1]&fuaEndBitmask != 0 {
p := pack.Clone()
p.NALUs = [][]byte{fuaBuffer.Bytes()[fuaHeaderSize-1:]}
fuaBuffer = bytes.NewBuffer([]byte{})
vt.PushNalu(p)
} else {
fuaBuffer.Write(nalu[fuaHeaderSize:])
}
case codec.NALU_Access_Unit_Delimiter:
case codec.NALU_IDR_Picture:
p := pack.Clone()
p.IDR = true
p.NALUs = [][]byte{nalu}
vt.push(p)
case codec.NALU_Non_IDR_Picture:
nonIDRs = append(nonIDRs, nalu)
case codec.NALU_SEI:
case codec.NALU_Filler_Data:
default:
utils.Printf("nalType not support yet:%d", naluType)
}
if len(nonIDRs) > 0 {
pack.NALUs = nonIDRs
vt.push(pack)
}
}
}
}
}
}
case 12:
var vps, sps, pps []byte
vt.PushNalu = func(pack VideoPack) {
// 等待接收SPS和PPS数据
for _, nalu := range pack.NALUs {
switch nalu[0] & naluTypeBitmask_hevc >> 1 {
case codec.NAL_UNIT_VPS:
vps = nalu
case codec.NAL_UNIT_SPS:
sps = nalu
vt.SPSInfo, _ = codec.ParseSPS(nalu)
case codec.NAL_UNIT_PPS:
pps = nalu
}
}
if vps != nil && sps != nil && pps != nil {
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vps, sps, pps)
if err != nil {
return
}
vt.ExtraData = &VideoPack{
Payload: extraData,
NALUs: [][]byte{vps, sps, pps},
}
vt.PushNalu = func(pack VideoPack) {
var nonIDRs [][]byte
for _, nalu := range pack.NALUs {
naluType := nalu[0] & naluTypeBitmask_hevc >> 1
switch naluType {
case codec.NAL_UNIT_CODED_SLICE_BLA,
codec.NAL_UNIT_CODED_SLICE_BLANT,
codec.NAL_UNIT_CODED_SLICE_BLA_N_LP,
codec.NAL_UNIT_CODED_SLICE_IDR,
codec.NAL_UNIT_CODED_SLICE_IDR_N_LP,
codec.NAL_UNIT_CODED_SLICE_CRA:
p := pack.Clone()
p.IDR = true
p.NALUs = [][]byte{nalu}
vt.push(p)
case 0, 1, 2, 3, 4, 5, 6, 7, 9:
nonIDRs = append(nonIDRs, nalu)
}
}
if len(nonIDRs) > 0 {
pack.NALUs = nonIDRs
vt.push(pack)
}
}
}
} }
} }
result.pushRTP = result.pushRTP0
return &result
} }
// Push 来自发布者推送的视频 func (vt *VideoTrack) pushByteStream(pack VideoPack) {
func (vt *VideoTrack) Push(pack VideoPack) { if pack.Payload[1] != 0 {
return
} else {
vt.CodecID = pack.Payload[0] & 0x0F
var nalulenSize int
switch vt.CodecID {
case 7:
var info codec.AVCDecoderConfigurationRecord
if _, err := info.Unmarshal(pack.Payload[5:]); err == nil {
vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit)
pack.NALUs = append(pack.NALUs, info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit)
nalulenSize = int(info.LengthSizeMinusOne&3 + 1)
vt.ExtraData = &pack
vt.Stream.VideoTracks.AddTrack("h264", vt)
}
case 12:
if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(pack.Payload); err == nil {
pack.NALUs = append(pack.NALUs, vps, sps, pps)
vt.SPSInfo, _ = codec.ParseSPS(sps)
nalulenSize = int(pack.Payload[26]) & 0x03
vt.ExtraData = &pack
vt.Stream.VideoTracks.AddTrack("h265", vt)
}
}
vt.WriteByteStream = func(writer io.Writer, pack VideoPack) {
writer.Write(pack.Payload)
}
// 已完成序列帧组装重置Push函数从Payload中提取Nalu供非bytestream格式使用
vt.PushByteStream = func(pack VideoPack) {
if len(pack.Payload) < 4 {
return
}
vt.GetBPS(len(pack.Payload))
pack.IDR = pack.Payload[0]>>4 == 1
pack.CompositionTime = utils.BigEndian.Uint24(pack.Payload[2:])
nalus := pack.Payload[5:]
for len(nalus) > nalulenSize {
nalulen := 0
for i := 0; i < nalulenSize; i++ {
nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1))
}
pack.NALUs = append(pack.NALUs, nalus[nalulenSize:nalulen+nalulenSize])
nalus = nalus[nalulen+nalulenSize:]
}
vt.push(pack)
}
}
}
func (vt *VideoTrack) push(pack VideoPack) {
if vt.Stream != nil { if vt.Stream != nil {
vt.Stream.Update() vt.Stream.Update()
} }
payload := pack.Payload
payloadLen := len(payload)
if payloadLen == 0 {
return
}
vbr := vt.Buffer vbr := vt.Buffer
video := vbr.Current video := vbr.Current
video.NalType = payload[0] & naluTypeBitmask if vt.Stream.prePayload > 0 && len(pack.Payload) == 0 {
video.Timestamp = pack.Timestamp buffer := vbr.GetBuffer()
video.CompositionTime = pack.CompositionTime vt.WriteByteStream(buffer, pack)
video.VideoPack = pack
video.VideoPack.Payload = buffer.Bytes()
} else {
video.VideoPack = pack
}
video.Sequence = vt.PacketCount video.Sequence = vt.PacketCount
switch video.NalType { if pack.IDR {
case codec.NALU_STAPA:
for currOffset, naluSize := stapaHeaderSize, 0; currOffset < len(payload); currOffset += naluSize {
naluSize = int(binary.BigEndian.Uint16(payload[currOffset:]))
currOffset += stapaNALULengthSize
if currOffset+len(payload) < currOffset+naluSize {
utils.Printf("STAP-A declared size(%d) is larger then buffer(%d)", naluSize, len(payload)-currOffset)
return
}
pack.Payload = payload[currOffset : currOffset+naluSize]
vt.Push(pack)
}
case codec.NALU_FUA:
if len(payload) < fuaHeaderSize {
utils.Printf("Payload is not large enough to be FU-A")
return
}
if payload[1]&fuaStartBitmask != 0 {
naluRefIdc := payload[0] & naluRefIdcBitmask
fragmentedNaluType := payload[1] & naluTypeBitmask
buffer := vbr.GetBuffer()
payload[fuaHeaderSize-1] = naluRefIdc | fragmentedNaluType
buffer.Write(payload)
} else if payload[1]&fuaEndBitmask != 0 {
pack.Payload = video.Bytes()[fuaHeaderSize-1:]
vt.Push(pack)
} else {
video.Write(payload[fuaHeaderSize:])
}
case codec.NALU_SPS:
vt.SPS = payload
vt.SPSInfo, _ = codec.ParseSPS(payload)
case codec.NALU_PPS:
vt.PPS = payload
if vt.RtmpTag == nil {
vt.SetRtmpTag()
}
case codec.NALU_Access_Unit_Delimiter:
case codec.NALU_IDR_Picture:
vt.revIDR() vt.revIDR()
fallthrough
case codec.NALU_Non_IDR_Picture:
video.Payload = payload
vt.Track_Video.GetBPS(payloadLen)
vbr.NextW()
case codec.NALU_SEI:
case codec.NALU_Filler_Data:
default:
utils.Printf("nalType not support yet:%d", video.NalType)
} }
} vbr.NextW()
func (vt *VideoTrack) SetRtmpTag() {
lenSPS, lenPPS := len(vt.SPS), len(vt.PPS)
vt.RtmpTag = append([]byte{}, codec.RTMP_AVC_HEAD...)
copy(vt.RtmpTag[6:], vt.SPS[1:4])
vt.RtmpTag = append(vt.RtmpTag, 0xE1, byte(lenSPS>>8), byte(lenSPS))
vt.RtmpTag = append(vt.RtmpTag, vt.SPS...)
vt.RtmpTag = append(append(vt.RtmpTag, 0x01, byte(lenPPS>>8), byte(lenPPS)), vt.PPS...)
}
func (vt *VideoTrack) pushRTP0(pack rtp.Packet) {
t := pack.Timestamp / 90
if t < vt.Buffer.GetLast().Timestamp {
if vt.WaitIDR.Err() == nil {
return
}
//有B帧
var tmpVT VideoTrack
tmpVT.Buffer = NewRing_Video()
tmpVT.revIDR = func() {
tmpVT.IDRIndex = tmpVT.Buffer.Index
}
// tmpVT.pushRTP = func(p rtp.Packet) {
// tmpVT.Push(VideoPack{Timestamp:p.Timestamp/90,Payload:p.Payload})
// }
gopBuffer := tmpVT.Buffer //缓存一个GOP用来计算dts
var gopFirst byte
var tsSlice TSSlice
for i := vt.IDRIndex; vt.Buffer.Index != i; i++ {
t := vt.Buffer.GetAt(i)
c := gopBuffer.Current
c.Payload = append(c.Payload, t.Payload...)
c.Timestamp = t.Timestamp
c.NalType = t.NalType
tsSlice = append(tsSlice, gopBuffer.Current.Timestamp)
gopBuffer.NextW()
}
vt.pushRTP = func(pack rtp.Packet) {
t := pack.Timestamp / 90
c := gopBuffer.Current
vp := VideoPack{Timestamp: t}
vp.Payload = append(vp.Payload, pack.Payload...)
tmpVT.Push(vp)
if c != gopBuffer.Current {
if c.NalType == codec.NALU_IDR_Picture {
sort.Sort(tsSlice) //排序后相当于DTS列表
var offset uint32
for i := 0; i < len(tsSlice); i++ {
j := gopFirst + byte(i)
f := gopBuffer.GetAt(j)
if f.Timestamp+offset < tsSlice[i] {
offset = tsSlice[i] - f.Timestamp
}
}
for i := 0; i < len(tsSlice); i++ {
f := gopBuffer.GetAt(gopFirst + byte(i))
f.CompositionTime = f.Timestamp + offset - tsSlice[i]
f.Timestamp = tsSlice[i]
vt.Push(f.VideoPack)
}
gopFirst = gopBuffer.Index - 1
tsSlice = nil
}
tsSlice = append(tsSlice, t)
}
}
vt.pushRTP(pack)
return
}
vt.Push(VideoPack{Timestamp: t, Payload: pack.Payload})
} }