feat: add wait track conf to subscribe

This commit is contained in:
langhuihui
2025-06-21 21:55:17 +08:00
parent 10f4fe3fc6
commit 8f5a829900
3 changed files with 33 additions and 9 deletions

View File

@@ -70,6 +70,7 @@ type (
SyncMode int `default:"1" desc:"同步模式" enum:"0:采用时间戳同步,1:采用写入时间同步"` // 0采用时间戳同步1采用写入时间同步
IFrameOnly bool `desc:"只要关键帧"` // 只要关键帧
WaitTimeout time.Duration `default:"10s" desc:"等待流超时时间"` // 等待流超时
WaitTrack string `default:"" desc:"等待轨道" enum:"audio:等待音频,video:等待视频,all:等待全部"`
WriteBufferSize int `desc:"写缓冲大小"` // 写缓冲大小
Key string `desc:"订阅鉴权key"` // 订阅鉴权key
SubType string `desc:"订阅类型"` // 订阅类型

View File

@@ -613,7 +613,9 @@ func (p *Plugin) SubscribeWithConfig(ctx context.Context, streamPath string, con
if err == nil {
select {
case <-subscriber.waitPublishDone:
err = subscriber.Publisher.WaitTrack()
waitAudio := conf.WaitTrack == "all" || strings.Contains(conf.WaitTrack, "audio")
waitVideo := conf.WaitTrack == "all" || strings.Contains(conf.WaitTrack, "video")
err = subscriber.Publisher.WaitTrack(waitAudio, waitVideo)
case <-subscriber.Done():
err = subscriber.StopReason()
}
@@ -716,10 +718,11 @@ func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) {
streamPath := r.PathValue("streamPath")
t := r.PathValue("type")
expire := r.URL.Query().Get("expire")
if t == "publish" {
switch t {
case "publish":
secret := md5.Sum([]byte(p.config.Publish.Key + streamPath + expire))
rw.Write([]byte(hex.EncodeToString(secret[:])))
} else if t == "subscribe" {
case "subscribe":
secret := md5.Sum([]byte(p.config.Subscribe.Key + streamPath + expire))
rw.Write([]byte(hex.EncodeToString(secret[:])))
}

View File

@@ -1,6 +1,8 @@
package m7s
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@@ -652,13 +654,31 @@ func (p *Publisher) takeOver(old *Publisher) {
old.Subscribers = SubscriberCollection{}
}
func (p *Publisher) WaitTrack() (err error) {
func (p *Publisher) WaitTrack(audio, video bool) (err error) {
var v, a = pkg.ErrNoTrack, pkg.ErrNoTrack
if p.PubVideo {
v = p.videoReady.Await()
}
if p.PubAudio {
a = p.audioReady.Await()
// wait any track
if p.PubAudio && p.PubVideo && !audio && !video {
select {
case <-p.videoReady.Done():
err = context.Cause(p.videoReady.Context)
if errors.Is(err, util.ErrResolve) {
err = nil
}
case <-p.audioReady.Done():
err = context.Cause(p.audioReady.Context)
if errors.Is(err, util.ErrResolve) {
err = nil
}
}
} else {
// need wait video
if p.PubVideo && video {
v = p.videoReady.Await()
}
// need wait audio
if p.PubAudio && audio {
a = p.audioReady.Await()
}
}
if v != nil && a != nil {
return ErrNoTrack