mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-06 00:56:58 +08:00
190 lines
4.0 KiB
Go
190 lines
4.0 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net/url"
|
|
"time"
|
|
|
|
. "github.com/Monibuca/engine/v4/common"
|
|
"github.com/Monibuca/engine/v4/config"
|
|
"github.com/Monibuca/engine/v4/track"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type AudioFrame AVFrame[AudioSlice]
|
|
type VideoFrame AVFrame[NALUSlice]
|
|
|
|
// Subscriber 订阅者实体定义
|
|
type Subscriber struct {
|
|
context.Context `json:"-"`
|
|
cancel context.CancelFunc
|
|
Config config.Subscribe
|
|
Stream *Stream `json:"-"`
|
|
ID string
|
|
TotalDrop int //总丢帧
|
|
TotalPacket int
|
|
Type string
|
|
BufferLength int
|
|
Delay uint32
|
|
SubscribeTime time.Time
|
|
SubscribeArgs url.Values
|
|
OnAudio func(*AudioFrame) error `json:"-"`
|
|
OnVideo func(*VideoFrame) error `json:"-"`
|
|
*log.Entry
|
|
}
|
|
|
|
// Close 关闭订阅者
|
|
func (s *Subscriber) Close() {
|
|
s.Stream.UnSubscribe(s)
|
|
if s.cancel != nil {
|
|
s.cancel()
|
|
}
|
|
}
|
|
|
|
//Subscribe 开始订阅 将Subscriber与Stream关联
|
|
func (sub *Subscriber) Subscribe(streamPath string, config config.Subscribe) bool {
|
|
Streams.Lock()
|
|
defer Streams.Unlock()
|
|
log.Info(sub.ID, "try to subscribe", streamPath)
|
|
s, created := findOrCreateStream(streamPath, config.WaitTimeout.Duration())
|
|
if s.IsClosed() {
|
|
log.Warn("stream is closed")
|
|
return false
|
|
}
|
|
sub.Entry = s.Entry.WithField("suber", sub.ID)
|
|
if created {
|
|
Bus.Publish(Event_REQUEST_PUBLISH, s)
|
|
}
|
|
if s.Subscribe(sub); sub.Stream != nil {
|
|
sub.Config = config
|
|
}
|
|
return true
|
|
}
|
|
|
|
//Play 开始播放
|
|
func (s *Subscriber) Play(at *track.Audio, vt *track.Video) {
|
|
defer s.Close()
|
|
if vt == nil && at == nil {
|
|
return
|
|
}
|
|
if vt == nil {
|
|
s.PlayAudio(at)
|
|
return
|
|
} else if at == nil {
|
|
s.PlayVideo(vt)
|
|
return
|
|
}
|
|
vr := vt.ReadRing() //从关键帧开始读取,首屏秒开
|
|
ar := at.ReadRing()
|
|
vp := vr.Read()
|
|
ap := ar.TryRead()
|
|
// chase := true
|
|
for s.Err() == nil {
|
|
if ap == nil && vp == nil {
|
|
time.Sleep(time.Millisecond * 10)
|
|
} else if ap != nil && (vp == nil || vp.SeqInStream > ap.SeqInStream) {
|
|
if s.onAudio(ap) != nil {
|
|
return
|
|
}
|
|
ar.MoveNext()
|
|
} else if vp != nil && (ap == nil || ap.SeqInStream > vp.SeqInStream) {
|
|
if s.onVideo(vp) != nil {
|
|
return
|
|
}
|
|
// if chase {
|
|
// if add10 := vst.Add(time.Millisecond * 10); realSt.After(add10) {
|
|
// vst = add10
|
|
// } else {
|
|
// vst = realSt
|
|
// chase = false
|
|
// }
|
|
// }
|
|
vr.MoveNext()
|
|
}
|
|
ap = ar.TryRead()
|
|
vp = vr.TryRead()
|
|
}
|
|
}
|
|
func (s *Subscriber) onAudio(af *AVFrame[AudioSlice]) error {
|
|
return s.OnAudio((*AudioFrame)(af))
|
|
}
|
|
func (s *Subscriber) onVideo(vf *AVFrame[NALUSlice]) error {
|
|
return s.OnVideo((*VideoFrame)(vf))
|
|
}
|
|
func (s *Subscriber) PlayAudio(at *track.Audio) {
|
|
at.Play(s.onAudio)
|
|
}
|
|
func (s *Subscriber) PlayVideo(vt *track.Video) {
|
|
vt.Play(s.onVideo)
|
|
}
|
|
func (r *Subscriber) WaitVideoTrack(names ...string) *track.Video {
|
|
if !r.Config.SubVideo {
|
|
return nil
|
|
}
|
|
if len(names) == 0 {
|
|
names = []string{"h264", "h265"}
|
|
}
|
|
if t := <-r.Stream.WaitTrack(names...); t == nil {
|
|
return nil
|
|
} else {
|
|
return t.(*track.Video)
|
|
}
|
|
}
|
|
|
|
func (r *Subscriber) WaitAudioTrack(names ...string) *track.Audio {
|
|
if !r.Config.SubAudio {
|
|
return nil
|
|
}
|
|
if len(names) == 0 {
|
|
names = []string{"aac", "pcma", "pcmu"}
|
|
}
|
|
if t := <-r.Stream.WaitTrack(names...); t == nil {
|
|
return nil
|
|
} else {
|
|
return t.(*track.Audio)
|
|
}
|
|
}
|
|
|
|
type IPusher interface {
|
|
Push(int)
|
|
Close()
|
|
}
|
|
type Pusher struct {
|
|
Subscriber
|
|
specific IPusher
|
|
Config *config.Push
|
|
RemoteURL string
|
|
io.Writer
|
|
io.Closer
|
|
pushCount int
|
|
}
|
|
|
|
// 是否需要重连
|
|
func (pub *Pusher) reconnect() bool {
|
|
return pub.Config.RePush == -1 || pub.pushCount <= pub.Config.RePush
|
|
}
|
|
|
|
func (pub *Pusher) push() {
|
|
pub.specific.Push(pub.pushCount)
|
|
pub.pushCount++
|
|
pub.specific.Close()
|
|
pub.Subscriber.Stream.Subscribe(&pub.Subscriber)
|
|
if !pub.Subscriber.Stream.IsClosed() {
|
|
go pub.push()
|
|
}
|
|
}
|
|
|
|
func (pub *Pusher) Push(specific IPusher, config config.Push) {
|
|
pub.specific = specific
|
|
pub.Config = &config
|
|
go pub.push()
|
|
}
|
|
|
|
func (p *Pusher) Close() {
|
|
if p.Closer != nil {
|
|
p.Closer.Close()
|
|
}
|
|
p.Subscriber.Close()
|
|
}
|