diff --git a/README.md b/README.md index 0809340..3461f2e 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ rec_video = func(msg *Chunk) { nalus = nalus[nalulen+nalulenSize:] } } - close(stream.WaitPub) + close(vt.WaitFirst) } ``` 在填充数据之前,需要获取到SPS和PPS,然后设置好,因为订阅者需要先发送这个数据 diff --git a/audio_track.go b/audio_track.go index 5fad539..8ecf01b 100644 --- a/audio_track.go +++ b/audio_track.go @@ -16,7 +16,6 @@ type AudioTrack struct { SoundSize byte //1bit SoundType byte //1bit RtmpTag []byte //rtmp协议需要先发这个帧 - ASC []byte //audio special configure } // Push 来自发布者推送的音频 diff --git a/base_track.go b/base_track.go index 37088af..cf89dc2 100644 --- a/base_track.go +++ b/base_track.go @@ -47,6 +47,11 @@ type TrackCP struct { } func (tcp *TrackCP) Play(ctx context.Context, cba func(AudioPack), cbv func(VideoPack)) { + select { + case <-tcp.Video.WaitFirst: + case <-ctx.Done(): + return + } vr := tcp.Video.Buffer.SubRing(tcp.Video.FirstScreen) ar := tcp.Audio.Buffer.SubRing(tcp.Audio.Buffer.Index) vr.Current.Wait() diff --git a/hook.go b/hook.go index 83e0649..1ed1cd0 100644 --- a/hook.go +++ b/hook.go @@ -6,26 +6,35 @@ import ( "sync" "time" ) + type Hook struct { - Name string + Name string Payload interface{} } + +const ( + HOOK_SUBSCRIBE = "Subscribe" + HOOK_UNSUBSCRIBE = "UnSubscibe" + HOOK_STREAMCLOSE = "StreamClose" + HOOK_PUBLISH = "Publish" +) + var Hooks = NewRing_Hook() -func AddHook(name string,channel chan interface{}) { - for hooks:= Hooks.SubRing(Hooks.Index);;hooks.GoNext(){ +func AddHook(name string, channel chan interface{}) { + for hooks := Hooks.SubRing(Hooks.Index); ; hooks.GoNext() { hooks.Current.Wait() if name == hooks.Current.Name { - channel<-hooks.Current.Payload + channel <- hooks.Current.Payload } } } -func AddHookWithContext(name string,channel chan interface{},ctx context.Context) { - for hooks:= Hooks.SubRing(Hooks.Index);ctx.Err()==nil;hooks.GoNext(){ +func AddHookWithContext(name string, channel chan interface{}, ctx context.Context) { + for hooks := Hooks.SubRing(Hooks.Index); ctx.Err() == nil; hooks.GoNext() { hooks.Current.Wait() - if name == hooks.Current.Name && ctx.Err()==nil{ - channel<-hooks.Current.Payload + if name == hooks.Current.Name && ctx.Err() == nil { + channel <- hooks.Current.Payload } } } @@ -35,7 +44,6 @@ func TriggerHook(hook Hook) { Hooks.NextW() } - type RingItem_Hook struct { Hook sync.WaitGroup @@ -46,20 +54,22 @@ type RingItem_Hook struct { // Ring 环形缓冲,使用数组实现 type Ring_Hook struct { Current *RingItem_Hook - buffer []RingItem_Hook - Index byte + buffer []RingItem_Hook + Index byte } -func (r *Ring_Hook) SubRing(index byte) *Ring_Hook{ - result:= &Ring_Hook{ - buffer:r.buffer, + +func (r *Ring_Hook) SubRing(index byte) *Ring_Hook { + result := &Ring_Hook{ + buffer: r.buffer, } result.GoTo(index) return result } + // NewRing 创建Ring func NewRing_Hook() (r *Ring_Hook) { r = &Ring_Hook{ - buffer : make([]RingItem_Hook, 256), + buffer: make([]RingItem_Hook, 256), } r.GoTo(0) r.Current.Add(1) @@ -92,13 +102,13 @@ func (r *Ring_Hook) GetLast() *RingItem_Hook { // GoNext 移动到下一个位置 func (r *Ring_Hook) GoNext() { - r.Index = r.Index+1 + r.Index = r.Index + 1 r.Current = &r.buffer[r.Index] } // GoBack 移动到上一个位置 func (r *Ring_Hook) GoBack() { - r.Index = r.Index-1 + r.Index = r.Index - 1 r.Current = &r.buffer[r.Index] } @@ -112,7 +122,7 @@ func (r *Ring_Hook) NextW() { } // NextR 读下一个 -func (r *Ring_Hook) NextR(){ +func (r *Ring_Hook) NextR() { r.Current.Wait() r.GoNext() } @@ -129,8 +139,8 @@ func (r *Ring_Hook) GetBuffer() *bytes.Buffer { // Timeout 发布者是否超时了 func (r *Ring_Hook) Timeout(t time.Duration) bool { // 如果设置为0则表示永不超时 - if t==0 { + if t == 0 { return false } - return time.Since(r.Current.UpdateTime) >t + return time.Since(r.Current.UpdateTime) > t } diff --git a/publisher.go b/publisher.go index dd916bf..40559f4 100644 --- a/publisher.go +++ b/publisher.go @@ -47,6 +47,6 @@ func (p *Publisher) Publish(streamPath string) bool { p.Publisher = p p.StartTime = time.Now() //触发钩子 - TriggerHook(Hook{"Publish", p.Stream}) + TriggerHook(Hook{HOOK_PUBLISH, p.Stream}) return true } diff --git a/stream.go b/stream.go index 6c3b8bd..7282e5d 100644 --- a/stream.go +++ b/stream.go @@ -28,7 +28,6 @@ func GetStream(streamPath string) (result *Stream) { HasAudio: true, EnableAudio: &config.EnableAudio, EnableVideo: &config.EnableVideo, - WaitPub: make(chan struct{}), }) result = item.(*Stream) if !loaded { @@ -56,7 +55,6 @@ type Stream struct { Subscribers []*Subscriber // 订阅者 VideoTracks []*VideoTrack AudioTracks []*AudioTrack - WaitPub chan struct{} `json:"-"` //用于订阅和等待发布者 HasAudio bool HasVideo bool EnableVideo *bool @@ -66,6 +64,7 @@ type Stream struct { func (r *Stream) AddVideoTrack() (vt *VideoTrack) { vt = new(VideoTrack) + vt.WaitFirst = make(chan struct{}) vt.Buffer = NewRing_Video() r.VideoTracks = append(r.VideoTracks, vt) return @@ -80,7 +79,7 @@ func (r *Stream) Close() { r.cancel() utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) Streams.Delete(r.StreamPath) - TriggerHook(Hook{"StreamClose", r}) + TriggerHook(Hook{HOOK_STREAMCLOSE, r}) } //Subscribe 订阅流 @@ -93,7 +92,7 @@ func (r *Stream) Subscribe(s *Subscriber) { r.Subscribers = append(r.Subscribers, s) r.subscribeMutex.Unlock() utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) - TriggerHook(Hook{"Subscribe", s}) + TriggerHook(Hook{HOOK_SUBSCRIBE, s}) } } @@ -104,7 +103,7 @@ func (r *Stream) UnSubscribe(s *Subscriber) { r.Subscribers = DeleteSliceItem_Subscriber(r.Subscribers, s) r.subscribeMutex.Unlock() utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) - TriggerHook(Hook{"UnSubscribe", s}) + TriggerHook(Hook{HOOK_UNSUBSCRIBE, s}) if len(r.Subscribers) == 0 && (r.Publisher == nil || r.Publisher.AutoUnPublish) { r.Close() } diff --git a/subscriber.go b/subscriber.go index 657d279..e56c9fd 100644 --- a/subscriber.go +++ b/subscriber.go @@ -58,6 +58,5 @@ func (s *Subscriber) Subscribe(streamPath string) error { if s.Context == nil { return errors.Errorf("stream not exist:%s", streamPath) } - <-s.WaitPub return nil } diff --git a/video_track.go b/video_track.go index d9f0735..e96e602 100644 --- a/video_track.go +++ b/video_track.go @@ -3,6 +3,7 @@ package engine import ( "context" "encoding/binary" + "github.com/Monibuca/utils/v3" "github.com/Monibuca/utils/v3/codec" ) @@ -27,11 +28,12 @@ type VideoPack struct { type VideoTrack struct { FirstScreen byte //最近的关键帧位置,首屏渲染 Track_Video - SPS []byte - PPS []byte - SPSInfo codec.SPSInfo - GOP byte //关键帧间隔 - RtmpTag []byte //rtmp需要先发送一个序列帧,包含SPS和PPS + SPS []byte + PPS []byte + SPSInfo codec.SPSInfo + GOP byte //关键帧间隔 + RtmpTag []byte //rtmp需要先发送一个序列帧,包含SPS和PPS + WaitFirst chan struct{} } // Push 来自发布者推送的视频 @@ -82,11 +84,13 @@ func (vt *VideoTrack) Push(timestamp uint32, payload []byte) { case codec.NALU_IDR_Picture: if vt.RtmpTag == nil { + vt.FirstScreen = vbr.Index vt.setRtmpTag() + close(vt.WaitFirst) } else { vt.GOP = vbr.Index - vt.FirstScreen + vt.FirstScreen = vbr.Index } - vt.FirstScreen = vbr.Index fallthrough case codec.NALU_Non_IDR_Picture: video.Payload = payload @@ -109,6 +113,11 @@ func (vt *VideoTrack) setRtmpTag() { } func (vt *VideoTrack) Play(ctx context.Context, callback func(VideoPack)) { + select { + case <-vt.WaitFirst: + case <-ctx.Done(): + return + } ring := vt.Buffer.SubRing(vt.FirstScreen) ring.Current.Wait() droped := 0