diff --git a/common/ring_av.go b/common/ring_av.go index 1165bd4..727b8af 100644 --- a/common/ring_av.go +++ b/common/ring_av.go @@ -35,9 +35,9 @@ func (r *AVRing[T]) Read(ctx context.Context) (item *AVFrame[T]) { return } -// func (r *AVRing[T]) TryRead(ctx context.Context) (item *AVFrame[T]) { -// if item = &r.Value; ctx.Err() == nil && !item.canRead { -// return nil -// } -// return -// } +func (r *AVRing[T]) TryRead() (item *AVFrame[T]) { + if item = &r.Value; item.canRead { + return + } + return nil +} diff --git a/io.go b/io.go index debeca0..62fa20a 100644 --- a/io.go +++ b/io.go @@ -66,6 +66,10 @@ func (io *IO[C, S]) getType() string { return io.Type } +func (io *IO[C, S]) GetConfig() *C { + return io.Config +} + type IIO interface { IsClosed() bool OnEvent(any) diff --git a/plugin.go b/plugin.go index dd261b7..359c1f7 100644 --- a/plugin.go +++ b/plugin.go @@ -186,12 +186,7 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) bool { if !ok { conf = EngineConfig } - if ok = pub.receive(streamPath, pub, conf.GetPublishConfig()); ok { - p := pub.GetPublisher() - p.AudioTrack = p.Stream.NewAudioTrack() - p.VideoTrack = p.Stream.NewVideoTrack() - } - return ok + return pub.receive(streamPath, pub, conf.GetPublishConfig()) } func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) bool { diff --git a/publisher.go b/publisher.go index 5cf3d4e..91456ed 100644 --- a/publisher.go +++ b/publisher.go @@ -7,7 +7,7 @@ import ( type IPublisher interface { IIO - GetPublisher() *Publisher + GetConfig() *config.Publish receive(string, IPublisher, *config.Publish) bool } @@ -17,8 +17,13 @@ type Publisher struct { common.VideoTrack } -func (p *Publisher) GetPublisher() *Publisher { - return p +func (p *Publisher) OnEvent(event any) { + switch v := event.(type) { + case *Stream: + p.AudioTrack = v.NewAudioTrack() + p.VideoTrack = v.NewVideoTrack() + } + p.IO.OnEvent(event) } type PullEvent int diff --git a/stream.go b/stream.go index d6e9c56..9718393 100644 --- a/stream.go +++ b/stream.go @@ -276,7 +276,7 @@ func (s *Stream) run() { } suber := v.Value s.Subscribers = append(s.Subscribers, suber) - sbConfig := suber.GetSubscribeConfig() + sbConfig := suber.GetConfig() if wt := sbConfig.WaitTimeout.Duration(); wt > s.WaitTimeout { s.WaitTimeout = wt } diff --git a/subscriber.go b/subscriber.go index 8e9e4d6..0c40b42 100644 --- a/subscriber.go +++ b/subscriber.go @@ -17,10 +17,10 @@ type VideoDeConf DecoderConfiguration[NALUSlice] type ISubscriber interface { IIO receive(string, ISubscriber, *config.Subscribe) bool - config.SubscribeConfig - GetSubscriber() *Subscriber + GetConfig() *config.Subscribe IsPlaying() bool - Play(ISubscriber) + Play(ISubscriber) func() error + PlayBlock(ISubscriber) Stop() } type TrackPlayer struct { @@ -38,14 +38,6 @@ type Subscriber struct { TrackPlayer } -func (p *Subscriber) GetSubscriber() *Subscriber { - return p -} - -func (s *Subscriber) GetSubscribeConfig() *config.Subscribe { - return s.Config -} - func (s *Subscriber) OnEvent(event any) { switch v := event.(type) { case TrackRemoved: @@ -97,8 +89,8 @@ func (s *Subscriber) Stop() { } } -//Play 开始播放 -func (s *Subscriber) Play(spesic ISubscriber) { +// 非阻塞式读取,通过反复调用返回的函数可以尝试读取数据,读取到数据后会调用OnEvent,这种模式自由的在不同的goroutine中调用 +func (s *Subscriber) Play(spesic ISubscriber) func() error { s.Info("play") var t time.Time var startTime time.Time //读到第一个关键帧的时间 @@ -106,6 +98,73 @@ func (s *Subscriber) Play(spesic ISubscriber) { var audioSent bool //音频是否发送过 s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) ctx := s.TrackPlayer.Context + var nextRoundReadAudio bool //下一次读取音频 + return func() error { + if ctx.Err() != nil { + return ctx.Err() + } + if !nextRoundReadAudio || s.ar == nil { + if s.vr != nil { + if startTime.IsZero() { + startTime = time.Now() + firstIFrame = (VideoFrame)(s.vr.Read(ctx)) // 这里阻塞读取为0耗时 + s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence)) + if ctx.Err() != nil { + return ctx.Err() + } + spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration)) + spesic.OnEvent(firstIFrame) + s.vr.MoveNext() + if firstIFrame.Timestamp.After(t) { + t = firstIFrame.Timestamp + } + return nil + } else if firstIFrame == nil { + if vp := VideoFrame(s.vr.TryRead()); vp != nil { + spesic.OnEvent(vp) + s.vr.MoveNext() + // 如果本次读取的视频时间戳比较大,下次给音频一个机会 + if nextRoundReadAudio = vp.Timestamp.After(t); nextRoundReadAudio { + t = vp.Timestamp + } + return nil + } + } + } else if s.Config.SubVideo && (s.Stream == nil || s.Stream.Publisher == nil || s.Stream.Publisher.GetConfig().PubVideo) { + // 如果订阅了视频需要等待视频轨道 + // TODO: 如果发布配置了视频,订阅配置了视频,但是实际上没有视频,需要处理播放纯音频 + return nil + } + } + // 正常模式下或者纯音频模式下,音频开始播放 + if s.ar != nil && firstIFrame == nil { + if !audioSent { + spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration)) + audioSent = true + } + if ap := AudioFrame(s.ar.TryRead()); ap != nil { + spesic.OnEvent(ap) + s.ar.MoveNext() + // 这次如果音频比较大,则下次读取给视频一个机会 + if nextRoundReadAudio = !ap.Timestamp.After(t); !nextRoundReadAudio { + t = ap.Timestamp + } + return nil + } + } + return nil + } +} + +//PlayBlock 阻塞式读取数据 +func (s *Subscriber) PlayBlock(spesic ISubscriber) { + s.Info("playblock") + var t time.Time + var startTime time.Time //读到第一个关键帧的时间 + var firstIFrame VideoFrame //起始关键帧 + var audioSent bool //音频是否发送过 + s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) + ctx := s.TrackPlayer.Context defer s.Info("stop") for ctx.Err() == nil { if s.vr != nil { @@ -151,7 +210,12 @@ func (s *Subscriber) Play(spesic ISubscriber) { break } } + } else if s.Config.SubVideo && (s.Stream == nil || s.Stream.Publisher == nil || s.Stream.Publisher.GetConfig().PubVideo) { + // 如果订阅了视频需要等待视频轨道 + time.Sleep(time.Second) + continue } + // 正常模式下或者纯音频模式下,音频开始播放 if s.ar != nil && firstIFrame == nil { if !audioSent { spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration))