mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-05 08:36:56 +08:00
198 lines
4.2 KiB
Go
198 lines
4.2 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// Subscriber 订阅者实体定义
|
|
type Subscriber struct {
|
|
context.Context `json:"-"`
|
|
cancel context.CancelFunc
|
|
Ctx2 context.Context `json:"-"`
|
|
*Stream `json:"-"`
|
|
ID string
|
|
TotalDrop int //总丢帧
|
|
TotalPacket int
|
|
Type string
|
|
BufferLength int
|
|
Delay uint32
|
|
SubscribeTime time.Time
|
|
SubscribeArgs url.Values
|
|
OnAudio func(pack AudioPack) `json:"-"`
|
|
OnVideo func(pack VideoPack) `json:"-"`
|
|
ByteStreamFormat bool
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
func (s *Subscriber) close() {
|
|
if s.Stream != nil {
|
|
s.UnSubscribe(s)
|
|
}
|
|
if s.cancel != nil {
|
|
s.cancel()
|
|
}
|
|
}
|
|
|
|
// Close 关闭订阅者
|
|
func (s *Subscriber) Close() {
|
|
s.closeOnce.Do(s.close)
|
|
}
|
|
|
|
//Subscribe 开始订阅 将Subscriber与Stream关联
|
|
func (s *Subscriber) Subscribe(streamPath string) error {
|
|
u, _ := url.Parse(streamPath)
|
|
s.SubscribeArgs, _ = url.ParseQuery(u.RawQuery)
|
|
streamPath = u.Path
|
|
if stream := FindStream(streamPath); stream == nil {
|
|
return errors.Errorf("Stream not found:%s", streamPath)
|
|
} else {
|
|
if stream.Subscribe(s); s.Context == nil {
|
|
return errors.Errorf("stream not exist:%s", streamPath)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
//Play 开始播放
|
|
func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) {
|
|
defer s.Close()
|
|
if vt == nil && at == nil {
|
|
return
|
|
}
|
|
if vt == nil {
|
|
s.PlayAudio(at)
|
|
return
|
|
} else if at == nil {
|
|
s.PlayVideo(vt)
|
|
return
|
|
}
|
|
var extraExit <-chan struct{}
|
|
if s.Ctx2 != nil {
|
|
extraExit = s.Ctx2.Done()
|
|
}
|
|
streamExit := s.Context.Done()
|
|
select {
|
|
case <-vt.WaitIDR.Done(): //等待获取到第一个关键帧
|
|
case <-streamExit: //可能等不到关键帧就退出了
|
|
return
|
|
case <-extraExit: //可能等不到关键帧就退出了
|
|
return
|
|
}
|
|
vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开
|
|
ar := at.Clone()
|
|
dropping := false //是否处于丢帧中
|
|
vp := vr.Read().(*VideoPack)
|
|
ap := ar.Read().(*AudioPack)
|
|
startTimestamp := vp.Timestamp
|
|
for vt.Flag != 2 {
|
|
select {
|
|
case <-extraExit:
|
|
return
|
|
case <-streamExit:
|
|
return
|
|
default:
|
|
if ap.Timestamp > vp.Timestamp || ap.Timestamp == 0 {
|
|
if !dropping {
|
|
s.OnVideo(vp.Copy(startTimestamp))
|
|
if vt.CurrentValue().(AVPack).Since(vp.Timestamp) > 1000 {
|
|
dropping = true
|
|
}
|
|
} else if vp.IDR {
|
|
dropping = false
|
|
}
|
|
vr.MoveNext()
|
|
vp = vr.Read().(*VideoPack)
|
|
} else {
|
|
if !dropping {
|
|
s.OnAudio(ap.Copy(startTimestamp))
|
|
if at.CurrentValue().(AVPack).Since(ap.Timestamp) > 1000 {
|
|
dropping = true
|
|
}
|
|
}
|
|
ar.MoveNext()
|
|
ap = ar.Read().(*AudioPack)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
func (s *Subscriber) PlayAudio(at *AudioTrack) {
|
|
streamExit := s.Context.Done()
|
|
ar := at.Clone()
|
|
ap := ar.Read().(*AudioPack)
|
|
startTimestamp := ap.Timestamp
|
|
droped := 0
|
|
var action, send func()
|
|
drop := func() {
|
|
if at.CurrentValue().(AVPack).Distance(ap.Sequence) < 4 {
|
|
action = send
|
|
} else {
|
|
droped++
|
|
}
|
|
}
|
|
send = func() {
|
|
if s.OnAudio(ap.Copy(startTimestamp)); at.CurrentValue().(AVPack).Since(ap.Timestamp) > 1000 {
|
|
action = drop
|
|
}
|
|
}
|
|
var extraExit <-chan struct{}
|
|
if s.Ctx2 != nil {
|
|
extraExit = s.Ctx2.Done()
|
|
}
|
|
for action = send; at.Flag != 2; ap = ar.Read().(*AudioPack) {
|
|
select {
|
|
case <-extraExit:
|
|
return
|
|
case <-streamExit:
|
|
return
|
|
default:
|
|
action()
|
|
ar.MoveNext()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Subscriber) PlayVideo(vt *VideoTrack) {
|
|
var extraExit <-chan struct{}
|
|
if s.Ctx2 != nil {
|
|
extraExit = s.Ctx2.Done()
|
|
}
|
|
streamExit := s.Context.Done()
|
|
select {
|
|
case <-vt.WaitIDR.Done():
|
|
case <-streamExit:
|
|
return
|
|
case <-extraExit: //可能等不到关键帧就退出了
|
|
return
|
|
}
|
|
vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开
|
|
vp := vr.Read().(*VideoPack)
|
|
startTimestamp := vp.Timestamp
|
|
var action, send func()
|
|
drop := func() {
|
|
if vp.IDR {
|
|
action = send
|
|
}
|
|
}
|
|
send = func() {
|
|
if s.OnVideo(vp.Copy(startTimestamp)); vt.CurrentValue().(AVPack).Since(vp.Timestamp) > 1000 {
|
|
action = drop
|
|
}
|
|
}
|
|
for action = send; vt.Flag != 2; vp = vr.Read().(*VideoPack) {
|
|
select {
|
|
case <-extraExit:
|
|
return
|
|
case <-streamExit:
|
|
return
|
|
default:
|
|
action()
|
|
vr.MoveNext()
|
|
}
|
|
}
|
|
}
|