开始升级

This commit is contained in:
李宇翔
2020-04-21 01:38:49 +08:00
parent 556d69efa2
commit 6dfe62fd4c
3 changed files with 93 additions and 132 deletions

29
circle.go Normal file
View File

@@ -0,0 +1,29 @@
package engine
import (
"sync"
"github.com/Monibuca/engine/avformat"
)
const CIRCLE_SIZE = 512
type CircleItem struct {
*avformat.AVPacket
next *CircleItem
pre *CircleItem
index int
*sync.RWMutex
}
func CreateCircle() (p *CircleItem) {
p = &CircleItem{AVPacket: new(avformat.AVPacket), RWMutex: new(sync.RWMutex)}
first := p
for i := 0; i < CIRCLE_SIZE; i++ {
p.next = &CircleItem{pre: p, index: i, AVPacket: new(avformat.AVPacket), RWMutex: new(sync.RWMutex)}
p = p.next
}
first.pre = p
p.next = first
return
}

77
room.go
View File

@@ -25,8 +25,7 @@ func (c *Collection) Get(name string) (result *Room) {
item, loaded := AllRoom.LoadOrStore(name, &Room{ item, loaded := AllRoom.LoadOrStore(name, &Room{
Subscribers: make(map[string]*OutputStream), Subscribers: make(map[string]*OutputStream),
Control: make(chan interface{}), Control: make(chan interface{}),
VideoChan: make(chan *avformat.AVPacket, 1), AVCircle: CreateCircle(),
AudioChan: make(chan *avformat.AVPacket, 1),
}) })
result = item.(*Room) result = item.(*Room)
if !loaded { if !loaded {
@@ -47,9 +46,8 @@ type Room struct {
Subscribers map[string]*OutputStream // 订阅者 Subscribers map[string]*OutputStream // 订阅者
VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据 VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据 AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
FirstScreen []*avformat.AVPacket FirstScreen *CircleItem
AudioChan chan *avformat.AVPacket AVCircle *CircleItem
VideoChan chan *avformat.AVPacket
UseTimestamp bool //是否采用数据包中的时间戳 UseTimestamp bool //是否采用数据包中的时间戳
} }
@@ -90,7 +88,7 @@ type ChangeRoomCmd struct {
} }
func (r *Room) onClosed() { func (r *Room) onClosed() {
Print(Yellow("room destoryed :"), BgBrightCyan(r.StreamPath)) Print(Yellow("room destoryed :"), BrightCyan(r.StreamPath))
AllRoom.Delete(r.StreamPath) AllRoom.Delete(r.StreamPath)
OnRoomClosedHooks.Trigger(r) OnRoomClosedHooks.Trigger(r)
if r.Publisher != nil { if r.Publisher != nil {
@@ -103,8 +101,8 @@ func (r *Room) Subscribe(s *OutputStream) {
s.Room = r s.Room = r
if r.Err() == nil { if r.Err() == nil {
s.SubscribeTime = time.Now() s.SubscribeTime = time.Now()
log.Printf("subscribe :%s %s,to room %s", s.Type, s.ID, r.StreamPath) Print(Sprintf(Yellow("subscribe :%s %s,to room %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath)))
s.packetQueue = make(chan *avformat.SendPacket, 1024) //s.packetQueue = make(chan *avformat.SendPacket, 1024)
s.Context, s.Cancel = context.WithCancel(r) s.Context, s.Cancel = context.WithCancel(r)
s.Control <- &SubscribeCmd{s} s.Control <- &SubscribeCmd{s}
} }
@@ -119,7 +117,7 @@ func (r *Room) UnSubscribe(s *OutputStream) {
// Run 房间运行,转发逻辑 // Run 房间运行,转发逻辑
func (r *Room) Run() { func (r *Room) Run() {
Print(Green("room create:"), BgBrightCyan(r.StreamPath)) Print(Green("room create:"), BrightCyan(r.StreamPath))
defer r.onClosed() defer r.onClosed()
update := time.NewTicker(time.Second) update := time.NewTicker(time.Second)
defer update.Stop() defer update.Stop()
@@ -141,14 +139,14 @@ func (r *Room) Run() {
case *UnSubscribeCmd: case *UnSubscribeCmd:
delete(r.Subscribers, v.ID) delete(r.Subscribers, v.ID)
OnUnSubscribeHooks.Trigger(v.OutputStream) OnUnSubscribeHooks.Trigger(v.OutputStream)
log.Printf("%s subscriber %s removed remains:%d", r.StreamPath, v.ID, len(r.Subscribers)) Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.Subscribers))))
if len(r.Subscribers) == 0 && r.Publisher == nil { if len(r.Subscribers) == 0 && r.Publisher == nil {
r.Cancel() r.Cancel()
} }
case *SubscribeCmd: case *SubscribeCmd:
if _, ok := r.Subscribers[v.ID]; !ok { if _, ok := r.Subscribers[v.ID]; !ok {
r.Subscribers[v.ID] = v.OutputStream r.Subscribers[v.ID] = v.OutputStream
log.Printf("%s subscriber %s added remains:%d", r.StreamPath, v.ID, len(r.Subscribers)) Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.Subscribers))))
OnSubscribeHooks.Trigger(v.OutputStream) OnSubscribeHooks.Trigger(v.OutputStream)
} }
case *ChangeRoomCmd: case *ChangeRoomCmd:
@@ -160,20 +158,24 @@ func (r *Room) Run() {
} }
} }
} }
case audio := <-r.AudioChan: // case audio := <-r.AudioChan:
for _, v := range r.Subscribers { // for _, v := range r.Subscribers {
v.sendAudio(audio) // v.sendAudio(audio)
} // }
case video := <-r.VideoChan: // case video := <-r.VideoChan:
for _, v := range r.Subscribers { // for _, v := range r.Subscribers {
v.sendVideo(video) // v.sendVideo(video)
} // }
} }
} }
} }
// PushAudio 来自发布者推送的音频 // PushAudio 来自发布者推送的音频
func (r *Room) PushAudio(audio *avformat.AVPacket) { func (r *Room) PushAudio(timestamp uint32, payload []byte) {
audio := r.AVCircle
audio.Timestamp = timestamp
audio.Payload = payload
audio.IsAACSequence = false
if len(audio.Payload) < 4 { if len(audio.Payload) < 4 {
return return
} }
@@ -188,7 +190,7 @@ func (r *Room) PushAudio(audio *avformat.AVPacket) {
if len(audio.Payload) < 5 { if len(audio.Payload) < 5 {
return return
} }
r.AudioTag = audio r.AudioTag = audio.AVPacket
tmp := audio.Payload[0] // 第一个字节保存着音频的相关信息 tmp := audio.Payload[0] // 第一个字节保存着音频的相关信息
if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话后面有一个字节的详细信息 if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话后面有一个字节的详细信息
//0 = AAC sequence header1 = AAC raw。 //0 = AAC sequence header1 = AAC raw。
@@ -211,17 +213,21 @@ func (r *Room) PushAudio(audio *avformat.AVPacket) {
r.AudioInfo.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples r.AudioInfo.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples
r.AudioInfo.SoundType = tmp & 0x01 // 0 单声道1立体声 r.AudioInfo.SoundType = tmp & 0x01 // 0 单声道1立体声
} }
audio.AVPacket = avformat.NewAVPacket(audio.Type)
return return
} }
audio.RefCount = len(r.Subscribers) //audio.RefCount = len(r.Subscribers)
if !r.UseTimestamp { if !r.UseTimestamp {
audio.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond) audio.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond)
} }
r.AudioInfo.PacketCount++ r.AudioInfo.PacketCount++
r.AudioChan <- audio r.AVCircle = r.AVCircle.next
r.AVCircle.Lock()
audio.Unlock()
//r.AudioChan <- audio
} }
func (r *Room) setH264Info(video *avformat.AVPacket) { func (r *Room) setH264Info(video *CircleItem) {
r.VideoTag = video r.VideoTag = video.AVPacket
if r.VideoInfo.CodecID != 7 { if r.VideoInfo.CodecID != 7 {
return return
} }
@@ -230,10 +236,14 @@ func (r *Room) setH264Info(video *avformat.AVPacket) {
if _, err := info.Unmarshal(video.Payload[5:]); err == nil { if _, err := info.Unmarshal(video.Payload[5:]); err == nil {
r.VideoInfo.SPSInfo, err = avformat.ParseSPS(info.SequenceParameterSetNALUnit) r.VideoInfo.SPSInfo, err = avformat.ParseSPS(info.SequenceParameterSetNALUnit)
} }
video.AVPacket = avformat.NewAVPacket(video.Type)
} }
// PushVideo 来自发布者推送的视频 // PushVideo 来自发布者推送的视频
func (r *Room) PushVideo(video *avformat.AVPacket) { func (r *Room) PushVideo(timestamp uint32, payload []byte) {
video := r.AVCircle
video.Timestamp = timestamp
video.Payload = payload
if len(video.Payload) < 3 { if len(video.Payload) < 3 {
return return
} }
@@ -251,22 +261,15 @@ func (r *Room) PushVideo(video *avformat.AVPacket) {
if video.IsAVCSequence { if video.IsAVCSequence {
r.setH264Info(video) r.setH264Info(video)
} }
if r.FirstScreen != nil {
if video.IsKeyFrame() { if video.IsKeyFrame() {
for _, cache := range r.FirstScreen { //清空队列 r.FirstScreen = video
cache.Recycle()
}
r.FirstScreen = r.FirstScreen[:0]
}
r.FirstScreen = append(r.FirstScreen, video)
video.RefCount = len(r.Subscribers) + 1
} else {
video.RefCount = len(r.Subscribers)
} }
if !r.UseTimestamp { if !r.UseTimestamp {
video.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond) video.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond)
} }
r.VideoInfo.PacketCount++ r.VideoInfo.PacketCount++
r.VideoChan <- video r.AVCircle = r.AVCircle.next
r.AVCircle.Lock()
video.Unlock()
} }
} }

View File

@@ -36,10 +36,9 @@ type OutputStream struct {
ATSent bool ATSent bool
VSentTime uint32 VSentTime uint32
ASentTime uint32 ASentTime uint32
packetQueue chan *avformat.SendPacket //packetQueue chan *avformat.SendPacket
dropCount int dropCount int
OffsetTime uint32 OffsetTime uint32
firstScreenIndex int
} }
// IsClosed 检查订阅者是否已经关闭 // IsClosed 检查订阅者是否已经关闭
@@ -63,93 +62,23 @@ func (s *OutputStream) Play(streamPath string) (err error) {
} }
AllRoom.Get(streamPath).Subscribe(s) AllRoom.Get(streamPath).Subscribe(s)
defer s.UnSubscribe(s) defer s.UnSubscribe(s)
p := avformat.NewSendPacket(s.VideoTag, 0)
s.SendHandler(p)
p = avformat.NewSendPacket(s.AudioTag, 0)
s.SendHandler(p)
packet := s.FirstScreen
s.VSentTime = packet.Timestamp
s.ASentTime = packet.Timestamp
for { for {
select { select {
case <-s.Done(): case <-s.Done():
return s.Err() return s.Err()
case p := <-s.packetQueue: default:
if err = s.SendHandler(p); err != nil { packet.RLock()
s.Cancel() //此处为了使得IsClosed 返回true p = avformat.NewSendPacket(packet.AVPacket, packet.Timestamp-s.VSentTime)
return s.SendHandler(p)
} packet.RUnlock()
p.Recycle() packet = packet.next
} }
} }
} }
func (s *OutputStream) sendPacket(packet *avformat.AVPacket, timestamp uint32) {
if !packet.IsAVCSequence && timestamp == 0 {
timestamp = 1 //防止为0
}
s.TotalPacket++
s.BufferLength = len(s.packetQueue)
if s.dropCount > 0 {
if packet.IsKeyFrame() {
fmt.Printf("%s drop packet:%d\n", s.ID, s.dropCount)
s.dropCount = 0 //退出丢包
} else {
s.dropCount++
s.TotalDrop++
return
}
}
if s.BufferLength == cap(s.packetQueue) {
s.dropCount++
s.TotalDrop++
packet.Recycle()
} else if !s.IsClosed() {
s.packetQueue <- avformat.NewSendPacket(packet, timestamp)
}
}
func (s *OutputStream) sendVideo(video *avformat.AVPacket) error {
isKF := video.IsKeyFrame()
if s.VTSent {
if s.FirstScreen == nil || s.firstScreenIndex == -1 {
s.sendPacket(video, video.Timestamp-s.VSentTime+s.OffsetTime)
} else if !isKF && s.firstScreenIndex < len(s.FirstScreen) {
firstScreen := s.FirstScreen[s.firstScreenIndex]
firstScreen.RefCount++
s.VSentTime = firstScreen.Timestamp - s.FirstScreen[0].Timestamp
s.sendPacket(firstScreen, s.VSentTime)
video.Recycle() //回收当前数据
s.firstScreenIndex++
} else {
s.firstScreenIndex = -1 //收到关键帧或者首屏缓冲已播完后退出首屏渲染模式
s.OffsetTime += s.VSentTime
s.VSentTime = video.Timestamp
s.sendPacket(video, s.OffsetTime)
}
return nil
}
//非首屏渲染模式跳过开头的非关键帧
if !isKF {
if s.FirstScreen == nil {
return nil
}
} else if s.FirstScreen != nil {
s.firstScreenIndex = -1 //跳过首屏渲染
}
s.VTSent = true
s.sendPacket(s.VideoTag, 0)
s.VSentTime = video.Timestamp
return s.sendVideo(video)
}
func (s *OutputStream) sendAudio(audio *avformat.AVPacket) error {
if !s.VTSent {
audio.Recycle()
return nil
}
if s.ATSent {
if s.FirstScreen != nil && s.firstScreenIndex == -1 {
audio.Recycle()
return nil
}
s.sendPacket(audio, audio.Timestamp-s.ASentTime)
return nil
}
s.ATSent = true
s.sendPacket(s.AudioTag, 0)
s.ASentTime = audio.Timestamp
return s.sendAudio(audio)
}