diff --git a/circle.go b/circle.go new file mode 100644 index 0000000..ec41f00 --- /dev/null +++ b/circle.go @@ -0,0 +1,29 @@ +package engine + +import ( + "sync" + + "github.com/Monibuca/engine/avformat" +) + +const CIRCLE_SIZE = 512 + +type CircleItem struct { + *avformat.AVPacket + next *CircleItem + pre *CircleItem + index int + *sync.RWMutex +} + +func CreateCircle() (p *CircleItem) { + p = &CircleItem{AVPacket: new(avformat.AVPacket), RWMutex: new(sync.RWMutex)} + first := p + for i := 0; i < CIRCLE_SIZE; i++ { + p.next = &CircleItem{pre: p, index: i, AVPacket: new(avformat.AVPacket), RWMutex: new(sync.RWMutex)} + p = p.next + } + first.pre = p + p.next = first + return +} diff --git a/room.go b/room.go index 4746077..a529e40 100644 --- a/room.go +++ b/room.go @@ -25,8 +25,7 @@ func (c *Collection) Get(name string) (result *Room) { item, loaded := AllRoom.LoadOrStore(name, &Room{ Subscribers: make(map[string]*OutputStream), Control: make(chan interface{}), - VideoChan: make(chan *avformat.AVPacket, 1), - AudioChan: make(chan *avformat.AVPacket, 1), + AVCircle: CreateCircle(), }) result = item.(*Room) if !loaded { @@ -47,9 +46,8 @@ type Room struct { Subscribers map[string]*OutputStream // 订阅者 VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据 AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据 - FirstScreen []*avformat.AVPacket - AudioChan chan *avformat.AVPacket - VideoChan chan *avformat.AVPacket + FirstScreen *CircleItem + AVCircle *CircleItem UseTimestamp bool //是否采用数据包中的时间戳 } @@ -90,7 +88,7 @@ type ChangeRoomCmd struct { } func (r *Room) onClosed() { - Print(Yellow("room destoryed :"), BgBrightCyan(r.StreamPath)) + Print(Yellow("room destoryed :"), BrightCyan(r.StreamPath)) AllRoom.Delete(r.StreamPath) OnRoomClosedHooks.Trigger(r) if r.Publisher != nil { @@ -103,8 +101,8 @@ func (r *Room) Subscribe(s *OutputStream) { s.Room = r if r.Err() == nil { s.SubscribeTime = time.Now() - log.Printf("subscribe :%s %s,to room %s", s.Type, s.ID, r.StreamPath) - s.packetQueue = make(chan *avformat.SendPacket, 1024) + Print(Sprintf(Yellow("subscribe :%s %s,to room %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) + //s.packetQueue = make(chan *avformat.SendPacket, 1024) s.Context, s.Cancel = context.WithCancel(r) s.Control <- &SubscribeCmd{s} } @@ -119,7 +117,7 @@ func (r *Room) UnSubscribe(s *OutputStream) { // Run 房间运行,转发逻辑 func (r *Room) Run() { - Print(Green("room create:"), BgBrightCyan(r.StreamPath)) + Print(Green("room create:"), BrightCyan(r.StreamPath)) defer r.onClosed() update := time.NewTicker(time.Second) defer update.Stop() @@ -141,14 +139,14 @@ func (r *Room) Run() { case *UnSubscribeCmd: delete(r.Subscribers, v.ID) OnUnSubscribeHooks.Trigger(v.OutputStream) - log.Printf("%s subscriber %s removed remains:%d", r.StreamPath, v.ID, len(r.Subscribers)) + Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.Subscribers)))) if len(r.Subscribers) == 0 && r.Publisher == nil { r.Cancel() } case *SubscribeCmd: if _, ok := r.Subscribers[v.ID]; !ok { r.Subscribers[v.ID] = v.OutputStream - log.Printf("%s subscriber %s added remains:%d", r.StreamPath, v.ID, len(r.Subscribers)) + Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.Subscribers)))) OnSubscribeHooks.Trigger(v.OutputStream) } case *ChangeRoomCmd: @@ -160,20 +158,24 @@ func (r *Room) Run() { } } } - case audio := <-r.AudioChan: - for _, v := range r.Subscribers { - v.sendAudio(audio) - } - case video := <-r.VideoChan: - for _, v := range r.Subscribers { - v.sendVideo(video) - } + // case audio := <-r.AudioChan: + // for _, v := range r.Subscribers { + // v.sendAudio(audio) + // } + // case video := <-r.VideoChan: + // for _, v := range r.Subscribers { + // v.sendVideo(video) + // } } } } // PushAudio 来自发布者推送的音频 -func (r *Room) PushAudio(audio *avformat.AVPacket) { +func (r *Room) PushAudio(timestamp uint32, payload []byte) { + audio := r.AVCircle + audio.Timestamp = timestamp + audio.Payload = payload + audio.IsAACSequence = false if len(audio.Payload) < 4 { return } @@ -188,7 +190,7 @@ func (r *Room) PushAudio(audio *avformat.AVPacket) { if len(audio.Payload) < 5 { return } - r.AudioTag = audio + r.AudioTag = audio.AVPacket tmp := audio.Payload[0] // 第一个字节保存着音频的相关信息 if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话,后面有一个字节的详细信息 //0 = AAC sequence header,1 = AAC raw。 @@ -211,17 +213,21 @@ func (r *Room) PushAudio(audio *avformat.AVPacket) { r.AudioInfo.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples r.AudioInfo.SoundType = tmp & 0x01 // 0 单声道,1立体声 } + audio.AVPacket = avformat.NewAVPacket(audio.Type) return } - audio.RefCount = len(r.Subscribers) + //audio.RefCount = len(r.Subscribers) if !r.UseTimestamp { audio.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond) } r.AudioInfo.PacketCount++ - r.AudioChan <- audio + r.AVCircle = r.AVCircle.next + r.AVCircle.Lock() + audio.Unlock() + //r.AudioChan <- audio } -func (r *Room) setH264Info(video *avformat.AVPacket) { - r.VideoTag = video +func (r *Room) setH264Info(video *CircleItem) { + r.VideoTag = video.AVPacket if r.VideoInfo.CodecID != 7 { return } @@ -230,10 +236,14 @@ func (r *Room) setH264Info(video *avformat.AVPacket) { if _, err := info.Unmarshal(video.Payload[5:]); err == nil { r.VideoInfo.SPSInfo, err = avformat.ParseSPS(info.SequenceParameterSetNALUnit) } + video.AVPacket = avformat.NewAVPacket(video.Type) } // PushVideo 来自发布者推送的视频 -func (r *Room) PushVideo(video *avformat.AVPacket) { +func (r *Room) PushVideo(timestamp uint32, payload []byte) { + video := r.AVCircle + video.Timestamp = timestamp + video.Payload = payload if len(video.Payload) < 3 { return } @@ -251,22 +261,15 @@ func (r *Room) PushVideo(video *avformat.AVPacket) { if video.IsAVCSequence { r.setH264Info(video) } - if r.FirstScreen != nil { - if video.IsKeyFrame() { - for _, cache := range r.FirstScreen { //清空队列 - cache.Recycle() - } - r.FirstScreen = r.FirstScreen[:0] - } - r.FirstScreen = append(r.FirstScreen, video) - video.RefCount = len(r.Subscribers) + 1 - } else { - video.RefCount = len(r.Subscribers) + if video.IsKeyFrame() { + r.FirstScreen = video } if !r.UseTimestamp { video.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond) } r.VideoInfo.PacketCount++ - r.VideoChan <- video + r.AVCircle = r.AVCircle.next + r.AVCircle.Lock() + video.Unlock() } } diff --git a/subscriber.go b/subscriber.go index 081b996..471b5b3 100644 --- a/subscriber.go +++ b/subscriber.go @@ -29,17 +29,16 @@ type OutputStream struct { context.Context *Room SubscriberInfo - SendHandler func(*avformat.SendPacket) error - Cancel context.CancelFunc - Sign string - VTSent bool - ATSent bool - VSentTime uint32 - ASentTime uint32 - packetQueue chan *avformat.SendPacket - dropCount int - OffsetTime uint32 - firstScreenIndex int + SendHandler func(*avformat.SendPacket) error + Cancel context.CancelFunc + Sign string + VTSent bool + ATSent bool + VSentTime uint32 + ASentTime uint32 + //packetQueue chan *avformat.SendPacket + dropCount int + OffsetTime uint32 } // IsClosed 检查订阅者是否已经关闭 @@ -63,93 +62,23 @@ func (s *OutputStream) Play(streamPath string) (err error) { } AllRoom.Get(streamPath).Subscribe(s) defer s.UnSubscribe(s) + p := avformat.NewSendPacket(s.VideoTag, 0) + s.SendHandler(p) + p = avformat.NewSendPacket(s.AudioTag, 0) + s.SendHandler(p) + packet := s.FirstScreen + s.VSentTime = packet.Timestamp + s.ASentTime = packet.Timestamp for { select { case <-s.Done(): return s.Err() - case p := <-s.packetQueue: - if err = s.SendHandler(p); err != nil { - s.Cancel() //此处为了使得IsClosed 返回true - return - } - p.Recycle() + default: + packet.RLock() + p = avformat.NewSendPacket(packet.AVPacket, packet.Timestamp-s.VSentTime) + s.SendHandler(p) + packet.RUnlock() + packet = packet.next } } } - -func (s *OutputStream) sendPacket(packet *avformat.AVPacket, timestamp uint32) { - if !packet.IsAVCSequence && timestamp == 0 { - timestamp = 1 //防止为0 - } - s.TotalPacket++ - s.BufferLength = len(s.packetQueue) - if s.dropCount > 0 { - if packet.IsKeyFrame() { - fmt.Printf("%s drop packet:%d\n", s.ID, s.dropCount) - s.dropCount = 0 //退出丢包 - } else { - s.dropCount++ - s.TotalDrop++ - return - } - } - if s.BufferLength == cap(s.packetQueue) { - s.dropCount++ - s.TotalDrop++ - packet.Recycle() - } else if !s.IsClosed() { - s.packetQueue <- avformat.NewSendPacket(packet, timestamp) - } -} - -func (s *OutputStream) sendVideo(video *avformat.AVPacket) error { - isKF := video.IsKeyFrame() - if s.VTSent { - if s.FirstScreen == nil || s.firstScreenIndex == -1 { - s.sendPacket(video, video.Timestamp-s.VSentTime+s.OffsetTime) - } else if !isKF && s.firstScreenIndex < len(s.FirstScreen) { - firstScreen := s.FirstScreen[s.firstScreenIndex] - firstScreen.RefCount++ - s.VSentTime = firstScreen.Timestamp - s.FirstScreen[0].Timestamp - s.sendPacket(firstScreen, s.VSentTime) - video.Recycle() //回收当前数据 - s.firstScreenIndex++ - } else { - s.firstScreenIndex = -1 //收到关键帧或者首屏缓冲已播完后退出首屏渲染模式 - s.OffsetTime += s.VSentTime - s.VSentTime = video.Timestamp - s.sendPacket(video, s.OffsetTime) - } - return nil - } - //非首屏渲染模式跳过开头的非关键帧 - if !isKF { - if s.FirstScreen == nil { - return nil - } - } else if s.FirstScreen != nil { - s.firstScreenIndex = -1 //跳过首屏渲染 - } - s.VTSent = true - s.sendPacket(s.VideoTag, 0) - s.VSentTime = video.Timestamp - return s.sendVideo(video) -} -func (s *OutputStream) sendAudio(audio *avformat.AVPacket) error { - if !s.VTSent { - audio.Recycle() - return nil - } - if s.ATSent { - if s.FirstScreen != nil && s.firstScreenIndex == -1 { - audio.Recycle() - return nil - } - s.sendPacket(audio, audio.Timestamp-s.ASentTime) - return nil - } - s.ATSent = true - s.sendPacket(s.AudioTag, 0) - s.ASentTime = audio.Timestamp - return s.sendAudio(audio) -}