diff --git a/common/frame.go b/common/frame.go index 9469d6c..38a0b13 100644 --- a/common/frame.go +++ b/common/frame.go @@ -102,13 +102,13 @@ func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame { type BaseFrame struct { DeltaTime uint32 // 相对上一帧时间戳,毫秒 + AbsTime uint32 // 绝对时间戳,毫秒 Timestamp time.Time // 写入时间,可用于比较两个帧的先后 SeqInTrack uint32 // 在一个Track中的序号 BytesIn int // 输入字节数用于计算BPS } type DataFrame[T any] struct { - Timestamp time.Time // 写入时间 BaseFrame Value T } diff --git a/common/ring_av.go b/common/ring_av.go index 2cbdad1..9d62af2 100644 --- a/common/ring_av.go +++ b/common/ring_av.go @@ -4,7 +4,6 @@ import ( "context" "runtime" "time" - ) type AVRing[T RawSlice] struct { @@ -36,9 +35,9 @@ func (r *AVRing[T]) Read(ctx context.Context) (item *AVFrame[T]) { return } -func (r *AVRing[T]) TryRead(ctx context.Context) (item *AVFrame[T]) { - if item = &r.Value; ctx.Err() == nil && !item.canRead { - return nil - } - return -} +// func (r *AVRing[T]) TryRead(ctx context.Context) (item *AVFrame[T]) { +// if item = &r.Value; ctx.Err() == nil && !item.canRead { +// return nil +// } +// return +// } diff --git a/config/config.go b/config/config.go index 9492b16..f7ec4eb 100644 --- a/config/config.go +++ b/config/config.go @@ -1,12 +1,14 @@ package config import ( - "github.com/Monibuca/engine/v4/log" "net" "net/http" "reflect" "strings" "time" + + "github.com/Monibuca/engine/v4/log" + "go.uber.org/zap" ) type Config map[string]any @@ -110,8 +112,11 @@ func (config Config) Merge(source Config) { case Config: m.Merge(v.(Config)) default: + log.Debug("merge", zap.String("k", k), zap.Any("v", v)) config[k] = v } + } else { + log.Debug("exist", zap.String("k", k)) } } } diff --git a/http.go b/http.go index 372913e..ce06a09 100644 --- a/http.go +++ b/http.go @@ -14,8 +14,6 @@ type GlobalConfig struct { } func (cfg *GlobalConfig) Update(override config.Config) { - // 使得RawConfig具备全量配置信息,用于合并到插件配置中 - Engine.RawConfig = config.Struct2Config(cfg.Engine) log.Info(Green("api server start at"), BrightBlue(cfg.ListenAddr), BrightBlue(cfg.ListenAddrTLS)) cfg.Listen(Engine, cfg) } diff --git a/io.go b/io.go index 855d22f..2ab07e4 100644 --- a/io.go +++ b/io.go @@ -15,6 +15,9 @@ import ( type IOConfig interface { config.Publish | config.Subscribe } +type ClientConfig interface { + config.Pull | config.Push +} type IO[C IOConfig] struct { ID string @@ -40,7 +43,6 @@ func (io *IO[C]) OnEvent(event any) any { io.Context, io.CancelFunc = context.WithCancel(v) case *Stream: io.StartTime = time.Now() - io.Stream = v io.Logger = v.With(zap.String("type", io.Type)) if io.ID != "" { io.Logger = io.Logger.With(zap.String("ID", io.ID)) @@ -78,16 +80,18 @@ func (io *IO[C]) bye(specific any) { } } +// receive 用于接收发布或者订阅 func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool { Streams.Lock() defer Streams.Unlock() streamPath = strings.Trim(streamPath, "/") u, err := url.Parse(streamPath) if err != nil { + io.Error("receive streamPath wrong format", zap.String("streamPath", streamPath), zap.Error(err)) return false } io.Args = u.Query() - wt := time.Second + wt := time.Second*5 var c any = conf if v, ok := c.(*config.Subscribe); ok { wt = v.WaitTimeout.Duration() @@ -99,14 +103,16 @@ func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool { if s.IsClosed() { return false } + io.Config = conf + io.Stream = s if v, ok := c.(*config.Publish); ok { if s.Publisher != nil && !s.Publisher.IsClosed() { // 根据配置是否剔出原来的发布者 if v.KickExist { s.Warn("kick", zap.Any("publisher", s.Publisher)) - s.Publisher.OnEvent(SEKick{}) + s.Publisher.OnEvent(SEKick{specific.(IPublisher)}) } else { - s.Warn("publisher exist", zap.Any("publisher", s.Publisher)) + s.Warn("badName", zap.Any("publisher", s.Publisher)) return false } } @@ -120,9 +126,13 @@ func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool { if io.Type == "" { io.Type = reflect.TypeOf(specific).Elem().Name() } - if s.Receive(specific); io.Stream != nil { - io.Config = conf - return true - } - return false + s.Receive(specific) + return true +} + +type Client[C ClientConfig] struct { + Config *C + StreamPath string // 本地流标识 + RemoteURL string // 远程服务器地址(用于推拉) + ReConnectCount int //重连次数 } diff --git a/log/log.go b/log/log.go index e6c6795..3cdaa44 100644 --- a/log/log.go +++ b/log/log.go @@ -2,21 +2,32 @@ package log import ( // . "github.com/logrusorgru/aurora" - "github.com/mattn/go-colorable" + "io" + + // "github.com/mattn/go-colorable" + "gopkg.in/yaml.v3" // log "github.com/sirupsen/logrus" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) var logger *zap.SugaredLogger + // var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White} // type LogWriter func(*log.Entry) string -var colorableStdout = colorable.NewColorableStdout() +// var colorableStdout = colorable.NewColorableStdout() func init() { - l, _ := zap.NewDevelopment() + config := zap.NewDevelopmentConfig() + config.EncoderConfig.NewReflectedEncoder = func(w io.Writer) zapcore.ReflectedEncoder { + return yaml.NewEncoder(w) + } + config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder + config.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("15:04:05") + l, _ := config.Build(zap.WithCaller(false)) logger = l.Sugar() // std.SetOutput(colorableStdout) // std.SetFormatter(LogWriter(defaultFormatter)) @@ -53,6 +64,7 @@ func Error(args ...any) { func Debugf(format string, args ...interface{}) { logger.Debugf(format, args...) } + // Infof logs a message at level Info on the standard logger. func Infof(format string, args ...interface{}) { logger.Infof(format, args...) diff --git a/main.go b/main.go index 2416313..fb0818f 100644 --- a/main.go +++ b/main.go @@ -83,8 +83,10 @@ func Run(ctx context.Context, configFile string) (err error) { } else { log.Warn("no config file found , use default config values") } - Engine.Logger = log.With(zap.String("plugin", "engine")) + Engine.Logger = log.With(zap.Bool("engine", true)) Engine.registerHandler() + // 使得RawConfig具备全量配置信息,用于合并到插件配置中 + Engine.RawConfig = config.Struct2Config(EngineConfig.Engine) go EngineConfig.Update(Engine.RawConfig) for name, plugin := range Plugins { plugin.RawConfig = cg.GetChild(name) diff --git a/plugin.go b/plugin.go index 4821ffa..7b43ac6 100644 --- a/plugin.go +++ b/plugin.go @@ -122,6 +122,7 @@ func (opt *Plugin) Update() { opt.Context, opt.CancelFunc = context.WithCancel(Engine) opt.RawConfig.Unmarshal(opt.Config) opt.autoPull() + opt.Debug("config", zap.Any("config", opt.Config)) go opt.Config.Update(opt.RawConfig) } @@ -134,16 +135,16 @@ func (opt *Plugin) autoPull() { reflect.ValueOf(&pullConfig).Elem().Set(v.Field(i)) for streamPath, url := range pullConfig.PullList { if pullConfig.PullOnStart { - opt.Config.(PullPlugin).PullStream(Puller{&pullConfig, streamPath, url, 0}) + opt.Config.(PullPlugin).PullStream(Puller{Client[config.Pull]{&pullConfig, streamPath, url, 0}}) } else if pullConfig.PullOnSubscribe { - PullOnSubscribeList[streamPath] = PullOnSubscribe{opt.Config.(PullPlugin), Puller{&pullConfig, streamPath, url, 0}} + PullOnSubscribeList[streamPath] = PullOnSubscribe{opt.Config.(PullPlugin), Puller{Client[config.Pull]{&pullConfig, streamPath, url, 0}}} } } } else if name == "Push" { var pushConfig config.Push reflect.ValueOf(&pushConfig).Elem().Set(v.Field(i)) for streamPath, url := range pushConfig.PushList { - PushOnPublishList[streamPath] = append(PushOnPublishList[streamPath], PushOnPublish{opt.Config.(PushPlugin), Pusher{&pushConfig, streamPath, url, 0}}) + PushOnPublishList[streamPath] = append(PushOnPublishList[streamPath], PushOnPublish{opt.Config.(PushPlugin), Pusher{Client[config.Push]{&pushConfig, streamPath, url, 0}}}) } } } diff --git a/publisher.go b/publisher.go index e5c809f..3a5a0b7 100644 --- a/publisher.go +++ b/publisher.go @@ -30,13 +30,10 @@ type PullEvent int // 用于远程拉流的发布者 type Puller struct { - Config *config.Pull - StreamPath string - RemoteURL string - PullCount int + Client[config.Pull] } // 是否需要重连 func (pub *Puller) Reconnect() bool { - return pub.Config.RePull == -1 || pub.PullCount <= pub.Config.RePull + return pub.Config.RePull == -1 || pub.ReConnectCount <= pub.Config.RePull } diff --git a/stream.go b/stream.go index 3e5b18b..2d09e8a 100644 --- a/stream.go +++ b/stream.go @@ -30,12 +30,18 @@ type SEwaitPublish struct { StateEvent Publisher IPublisher } - +type SEpublish struct { + StateEvent +} +type SEwaitClose struct { + StateEvent +} type SEclose struct { StateEvent } type SEKick struct { + Publisher IPublisher } const ( @@ -56,6 +62,8 @@ const ( ACTION_NOTRACKS // 轨道为空了 ) +var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴", "❌"} +var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"} var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{ { ACTION_PUBLISH: STATE_PUBLISHING, @@ -148,15 +156,18 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream return s, true } } - -func (r *Stream) action(action StreamAction) bool { +func (r *Stream) broadcast(event any) { + for _, sub := range r.Subscribers { + sub.OnEvent(event) + } +} +func (r *Stream) action(action StreamAction) (ok bool) { event := StateEvent{From: r.State, Action: action} - if next, ok := event.Next(); ok { + if r.State, ok = event.Next(); ok { // 给Publisher状态变更的回调,方便进行远程拉流等操作 var stateEvent any - r.Debug("state change", zap.Uint8("action", uint8(action)), zap.Uint8("oldState", uint8(r.State)), zap.Uint8("newState", uint8(next))) - r.State = next - switch next { + r.Debug(Sprintf("%s%s%s", StateNames[event.From], Yellow("->"), StateNames[r.State]), zap.String("action", ActionNames[action])) + switch r.State { case STATE_WAITPUBLISH: stateEvent = SEwaitPublish{event, r.Publisher} Bus.Publish(Event_REQUEST_PUBLISH, r) @@ -165,7 +176,9 @@ func (r *Stream) action(action StreamAction) bool { PullOnSubscribeList[r.Path].Pull() } case STATE_PUBLISHING: - r.timeout.Reset(time.Second) // 秒级心跳,检测track的存活度 + stateEvent = SEpublish{event} + r.broadcast(stateEvent) + r.timeout.Reset(time.Second * 5) // 5秒心跳,检测track的存活度 Bus.Publish(Event_PUBLISH, r) if v, ok := PushOnPublishList[r.Path]; ok { for _, v := range v { @@ -173,12 +186,11 @@ func (r *Stream) action(action StreamAction) bool { } } case STATE_WAITCLOSE: + stateEvent = SEwaitClose{event} r.timeout.Reset(r.WaitCloseTimeout) case STATE_CLOSED: stateEvent = SEclose{event} - for _, sub := range r.Subscribers { - sub.OnEvent(stateEvent) - } + r.broadcast(stateEvent) r.Subscribers.Reset() Bus.Publish(Event_STREAMCLOSE, r) Streams.Delete(r.Path) @@ -192,9 +204,8 @@ func (r *Stream) action(action StreamAction) bool { if r.Publisher != nil { r.Publisher.OnEvent(stateEvent) } - return true } - return false + return } func (r *Stream) IsClosed() bool { if r == nil { @@ -218,27 +229,24 @@ func (s *Stream) run() { for { select { case <-s.timeout.C: - s.Debug("timeout", zap.Uint8("action", uint8(s.State))) if s.State == STATE_PUBLISHING { for name, t := range s.Tracks { // track 超过一定时间没有更新数据了 if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout { - s.Warn("track timeout", zap.String("name", name)) + s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) delete(s.Tracks, name) - for _, sub := range s.Subscribers { - sub.OnEvent(TrackRemoved(t)) // 通知Subscriber Track已被移除 - } + s.broadcast(TrackRemoved(t)) } } if len(s.Tracks) == 0 { s.action(ACTION_NOTRACKS) } else { - s.timeout.Reset(time.Second) + s.timeout.Reset(time.Second * 5) } } else { + s.Debug("timeout", zap.String("state", StateNames[s.State])) s.action(ACTION_TIMEOUT) } - case action, ok := <-s.actionChan: if ok { switch v := action.(type) { @@ -253,7 +261,7 @@ func (s *Stream) run() { name := v.GetName() if _, ok := s.Tracks[name]; !ok { s.Tracks[name] = v - s.Info("Track added", zap.String("name", name)) + s.Info("TrackAdd", zap.String("name", name)) for _, sub := range s.Subscribers { sub.OnEvent(v) // 通知Subscriber有新Track可用了 } diff --git a/subscriber.go b/subscriber.go index 18a10b2..cb69bf3 100644 --- a/subscriber.go +++ b/subscriber.go @@ -21,10 +21,12 @@ type ISubscriber interface { type TrackPlayer struct { context.Context context.CancelFunc - AudioTrack *track.Audio - VideoTrack *track.Video - vr *AVRing[NALUSlice] - ar *AVRing[AudioSlice] + AudioTrack *track.Audio + VideoTrack *track.Video + vr *AVRing[NALUSlice] + ar *AVRing[AudioSlice] + startTime time.Time //读到第一个关键帧的时间 + firstIFrame *VideoFrame //起始关键帧 } // Subscriber 订阅者实体定义 @@ -66,6 +68,7 @@ func (s *Subscriber) AddTrack(t Track) bool { } s.VideoTrack = v s.vr = v.ReadRing() + s.firstIFrame = (*VideoFrame)(s.vr.Read(s.TrackPlayer)) return true } } else if a, ok := t.(*track.Audio); ok { @@ -92,12 +95,31 @@ func (s *Subscriber) Play() { for s.TrackPlayer.Err() == nil { if s.vr != nil { for { - vp := s.vr.Read(s.TrackPlayer) - s.OnEvent((*VideoFrame)(vp)) - s.vr.MoveNext() - if vp.Timestamp.After(t) { - t = vp.Timestamp - break + // 如果进入正常模式 + if s.firstIFrame == nil { + vp := s.vr.Read(s.TrackPlayer) + s.OnEvent((*VideoFrame)(vp)) + s.vr.MoveNext() + if vp.Timestamp.After(t) { + t = vp.Timestamp + break + } + } else { + if s.startTime.IsZero() { + s.startTime = time.Now() + } + if &s.VideoTrack.IDRing.Value != (*AVFrame[NALUSlice])(s.firstIFrame) { + s.firstIFrame = nil + s.vr = s.VideoTrack.ReadRing() + } else { + vp := s.vr.Read(s.TrackPlayer) + s.OnEvent((*VideoFrame)(vp)) + fast := time.Duration(vp.AbsTime-s.firstIFrame.AbsTime)*time.Millisecond - time.Since(s.startTime) + if fast > 0 { + time.Sleep(fast) + } + s.vr.MoveNext() + } } } } @@ -118,13 +140,10 @@ func (s *Subscriber) Play() { type PushEvent int type Pusher struct { - Config *config.Push - StreamPath string - RemoteURL string - PushCount int + Client[config.Push] } // 是否需要重连 func (pub *Pusher) Reconnect() bool { - return pub.Config.RePush == -1 || pub.PushCount <= pub.Config.RePush + return pub.Config.RePush == -1 || pub.ReConnectCount <= pub.Config.RePush } diff --git a/track/aac.go b/track/aac.go index 5e8ade3..281c681 100644 --- a/track/aac.go +++ b/track/aac.go @@ -11,7 +11,6 @@ import ( ) func NewAAC(stream IStream) (aac *AAC) { - stream.Debug("create aac track") aac = &AAC{} aac.Name = "aac" aac.Stream = stream diff --git a/track/base.go b/track/base.go index d4c795c..210ab07 100644 --- a/track/base.go +++ b/track/base.go @@ -34,10 +34,11 @@ type Media[T RawSlice] struct { SampleSize byte DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) // util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用 - rtpSequence uint16 //用于生成下一个rtp包的序号 - orderQueue []*RTPFrame //rtp包的缓存队列,用于乱序重排 - lastSeq uint16 //上一个收到的序号,用于乱序重排 - lastSeq2 uint16 //记录上上一个收到的序列号 + rtpSequence uint16 //用于生成下一个rtp包的序号 + orderQueue []*RTPFrame //rtp包的缓存队列,用于乱序重排 + lastSeq uint16 //上一个收到的序号,用于乱序重排 + lastSeq2 uint16 //记录上上一个收到的序列号 + firstTimestamp time.Time //第一次写入的时间,用于计算总时间防止过快写入 } func (av *Media[T]) LastWriteTime() time.Time { @@ -154,10 +155,18 @@ func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) { } func (av *Media[T]) Flush() { - if av.Prev().Value.DTS != 0 { - av.Value.DeltaTime = (av.Value.DTS - av.Prev().Value.DTS) / 90 + preValue := av.PreValue() + if av.firstTimestamp.IsZero() { + av.firstTimestamp = time.Now() + } else { + av.Value.DeltaTime = (av.Value.DTS - preValue.DTS) / 90 + av.Value.AbsTime += av.Value.DeltaTime } av.Base.Flush(&av.Value.BaseFrame) + // 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep + if fast := time.Duration(av.Value.AbsTime)*time.Millisecond - time.Since(av.firstTimestamp); fast > time.Millisecond*100 { + time.Sleep(fast) + } av.Step() } diff --git a/track/h264.go b/track/h264.go index 7ecb345..b713c26 100644 --- a/track/h264.go +++ b/track/h264.go @@ -16,7 +16,6 @@ type H264 struct { } func NewH264(stream IStream) (vt *H264) { - stream.Debug("create h264 track") vt = &H264{} vt.Name = "h264" vt.CodecID = codec.CodecID_H264 diff --git a/track/h265.go b/track/h265.go index 4a06a02..db669c3 100644 --- a/track/h265.go +++ b/track/h265.go @@ -15,7 +15,6 @@ type H265 struct { } func NewH265(stream IStream) (vt *H265) { - stream.Debug("create h265 track") vt = &H265{} vt.Name = "h265" vt.CodecID = codec.CodecID_H265