修改等待关键帧逻辑

This commit is contained in:
langhuihui
2021-02-10 22:19:48 +08:00
parent c0f821e56e
commit 70706fc8da
8 changed files with 56 additions and 35 deletions

View File

@@ -57,7 +57,7 @@ rec_video = func(msg *Chunk) {
nalus = nalus[nalulen+nalulenSize:]
}
}
close(stream.WaitPub)
close(vt.WaitFirst)
}
```
在填充数据之前需要获取到SPS和PPS然后设置好因为订阅者需要先发送这个数据

View File

@@ -16,7 +16,6 @@ type AudioTrack struct {
SoundSize byte //1bit
SoundType byte //1bit
RtmpTag []byte //rtmp协议需要先发这个帧
ASC []byte //audio special configure
}
// Push 来自发布者推送的音频

View File

@@ -47,6 +47,11 @@ type TrackCP struct {
}
func (tcp *TrackCP) Play(ctx context.Context, cba func(AudioPack), cbv func(VideoPack)) {
select {
case <-tcp.Video.WaitFirst:
case <-ctx.Done():
return
}
vr := tcp.Video.Buffer.SubRing(tcp.Video.FirstScreen)
ar := tcp.Audio.Buffer.SubRing(tcp.Audio.Buffer.Index)
vr.Current.Wait()

44
hook.go
View File

@@ -6,26 +6,35 @@ import (
"sync"
"time"
)
type Hook struct {
Name string
Payload interface{}
}
const (
HOOK_SUBSCRIBE = "Subscribe"
HOOK_UNSUBSCRIBE = "UnSubscibe"
HOOK_STREAMCLOSE = "StreamClose"
HOOK_PUBLISH = "Publish"
)
var Hooks = NewRing_Hook()
func AddHook(name string,channel chan interface{}) {
for hooks:= Hooks.SubRing(Hooks.Index);;hooks.GoNext(){
func AddHook(name string, channel chan interface{}) {
for hooks := Hooks.SubRing(Hooks.Index); ; hooks.GoNext() {
hooks.Current.Wait()
if name == hooks.Current.Name {
channel<-hooks.Current.Payload
channel <- hooks.Current.Payload
}
}
}
func AddHookWithContext(name string,channel chan interface{},ctx context.Context) {
for hooks:= Hooks.SubRing(Hooks.Index);ctx.Err()==nil;hooks.GoNext(){
func AddHookWithContext(name string, channel chan interface{}, ctx context.Context) {
for hooks := Hooks.SubRing(Hooks.Index); ctx.Err() == nil; hooks.GoNext() {
hooks.Current.Wait()
if name == hooks.Current.Name && ctx.Err()==nil{
channel<-hooks.Current.Payload
if name == hooks.Current.Name && ctx.Err() == nil {
channel <- hooks.Current.Payload
}
}
}
@@ -35,7 +44,6 @@ func TriggerHook(hook Hook) {
Hooks.NextW()
}
type RingItem_Hook struct {
Hook
sync.WaitGroup
@@ -49,17 +57,19 @@ type Ring_Hook struct {
buffer []RingItem_Hook
Index byte
}
func (r *Ring_Hook) SubRing(index byte) *Ring_Hook{
result:= &Ring_Hook{
buffer:r.buffer,
func (r *Ring_Hook) SubRing(index byte) *Ring_Hook {
result := &Ring_Hook{
buffer: r.buffer,
}
result.GoTo(index)
return result
}
// NewRing 创建Ring
func NewRing_Hook() (r *Ring_Hook) {
r = &Ring_Hook{
buffer : make([]RingItem_Hook, 256),
buffer: make([]RingItem_Hook, 256),
}
r.GoTo(0)
r.Current.Add(1)
@@ -92,13 +102,13 @@ func (r *Ring_Hook) GetLast() *RingItem_Hook {
// GoNext 移动到下一个位置
func (r *Ring_Hook) GoNext() {
r.Index = r.Index+1
r.Index = r.Index + 1
r.Current = &r.buffer[r.Index]
}
// GoBack 移动到上一个位置
func (r *Ring_Hook) GoBack() {
r.Index = r.Index-1
r.Index = r.Index - 1
r.Current = &r.buffer[r.Index]
}
@@ -112,7 +122,7 @@ func (r *Ring_Hook) NextW() {
}
// NextR 读下一个
func (r *Ring_Hook) NextR(){
func (r *Ring_Hook) NextR() {
r.Current.Wait()
r.GoNext()
}
@@ -129,8 +139,8 @@ func (r *Ring_Hook) GetBuffer() *bytes.Buffer {
// Timeout 发布者是否超时了
func (r *Ring_Hook) Timeout(t time.Duration) bool {
// 如果设置为0则表示永不超时
if t==0 {
if t == 0 {
return false
}
return time.Since(r.Current.UpdateTime) >t
return time.Since(r.Current.UpdateTime) > t
}

View File

@@ -47,6 +47,6 @@ func (p *Publisher) Publish(streamPath string) bool {
p.Publisher = p
p.StartTime = time.Now()
//触发钩子
TriggerHook(Hook{"Publish", p.Stream})
TriggerHook(Hook{HOOK_PUBLISH, p.Stream})
return true
}

View File

@@ -28,7 +28,6 @@ func GetStream(streamPath string) (result *Stream) {
HasAudio: true,
EnableAudio: &config.EnableAudio,
EnableVideo: &config.EnableVideo,
WaitPub: make(chan struct{}),
})
result = item.(*Stream)
if !loaded {
@@ -56,7 +55,6 @@ type Stream struct {
Subscribers []*Subscriber // 订阅者
VideoTracks []*VideoTrack
AudioTracks []*AudioTrack
WaitPub chan struct{} `json:"-"` //用于订阅和等待发布者
HasAudio bool
HasVideo bool
EnableVideo *bool
@@ -66,6 +64,7 @@ type Stream struct {
func (r *Stream) AddVideoTrack() (vt *VideoTrack) {
vt = new(VideoTrack)
vt.WaitFirst = make(chan struct{})
vt.Buffer = NewRing_Video()
r.VideoTracks = append(r.VideoTracks, vt)
return
@@ -80,7 +79,7 @@ func (r *Stream) Close() {
r.cancel()
utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath))
Streams.Delete(r.StreamPath)
TriggerHook(Hook{"StreamClose", r})
TriggerHook(Hook{HOOK_STREAMCLOSE, r})
}
//Subscribe 订阅流
@@ -93,7 +92,7 @@ func (r *Stream) Subscribe(s *Subscriber) {
r.Subscribers = append(r.Subscribers, s)
r.subscribeMutex.Unlock()
utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
TriggerHook(Hook{"Subscribe", s})
TriggerHook(Hook{HOOK_SUBSCRIBE, s})
}
}
@@ -104,7 +103,7 @@ func (r *Stream) UnSubscribe(s *Subscriber) {
r.Subscribers = DeleteSliceItem_Subscriber(r.Subscribers, s)
r.subscribeMutex.Unlock()
utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
TriggerHook(Hook{"UnSubscribe", s})
TriggerHook(Hook{HOOK_UNSUBSCRIBE, s})
if len(r.Subscribers) == 0 && (r.Publisher == nil || r.Publisher.AutoUnPublish) {
r.Close()
}

View File

@@ -58,6 +58,5 @@ func (s *Subscriber) Subscribe(streamPath string) error {
if s.Context == nil {
return errors.Errorf("stream not exist:%s", streamPath)
}
<-s.WaitPub
return nil
}

View File

@@ -3,6 +3,7 @@ package engine
import (
"context"
"encoding/binary"
"github.com/Monibuca/utils/v3"
"github.com/Monibuca/utils/v3/codec"
)
@@ -32,6 +33,7 @@ type VideoTrack struct {
SPSInfo codec.SPSInfo
GOP byte //关键帧间隔
RtmpTag []byte //rtmp需要先发送一个序列帧包含SPS和PPS
WaitFirst chan struct{}
}
// Push 来自发布者推送的视频
@@ -82,11 +84,13 @@ func (vt *VideoTrack) Push(timestamp uint32, payload []byte) {
case codec.NALU_IDR_Picture:
if vt.RtmpTag == nil {
vt.FirstScreen = vbr.Index
vt.setRtmpTag()
close(vt.WaitFirst)
} else {
vt.GOP = vbr.Index - vt.FirstScreen
}
vt.FirstScreen = vbr.Index
}
fallthrough
case codec.NALU_Non_IDR_Picture:
video.Payload = payload
@@ -109,6 +113,11 @@ func (vt *VideoTrack) setRtmpTag() {
}
func (vt *VideoTrack) Play(ctx context.Context, callback func(VideoPack)) {
select {
case <-vt.WaitFirst:
case <-ctx.Done():
return
}
ring := vt.Buffer.SubRing(vt.FirstScreen)
ring.Current.Wait()
droped := 0