diff --git a/codec/codec.go b/codec/codec.go index 5e9072d..e027b07 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -2,18 +2,42 @@ package codec import ( "errors" - ) +type AudioCodecID byte +type VideoCodecID byte + const ( - ADTS_HEADER_SIZE = 7 - CodecID_AAC = 0xA - CodecID_PCMA = 7 - CodecID_PCMU = 8 - CodecID_H264 = 7 - CodecID_H265 = 0xC + ADTS_HEADER_SIZE = 7 + CodecID_AAC AudioCodecID = 0xA + CodecID_PCMA AudioCodecID = 7 + CodecID_PCMU AudioCodecID = 8 + CodecID_H264 VideoCodecID = 7 + CodecID_H265 VideoCodecID = 0xC ) +func (codecId AudioCodecID) String() string { + switch codecId { + case CodecID_AAC: + return "aac" + case CodecID_PCMA: + return "pcma" + case CodecID_PCMU: + return "pcmu" + } + return "unknow" +} + +func (codecId VideoCodecID) String() string { + switch codecId { + case CodecID_H264: + return "h264" + case CodecID_H265: + return "h265" + } + return "unknow" +} + // ISO/IEC 14496-3 38(52)/page // // Audio @@ -212,5 +236,3 @@ func ParseRTPAAC(payload []byte) (result [][]byte) { } return } - - diff --git a/common/frame.go b/common/frame.go index 1d95dfe..b8585e6 100644 --- a/common/frame.go +++ b/common/frame.go @@ -46,7 +46,7 @@ func (nalu NALUSlice) Bytes() (b []byte) { return } -func (nalu *NALUSlice) Reset() *NALUSlice{ +func (nalu *NALUSlice) Reset() *NALUSlice { if len(*nalu) > 0 { *nalu = (*nalu)[:0] } @@ -94,7 +94,7 @@ func (rtp *RTPFrame) Marshal() *RTPFrame { func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame { rtp.Raw = raw if err := rtp.Packet.Unmarshal(raw); err != nil { - logrus.Errorln(err) + logrus.Error(err) return nil } return rtp @@ -154,11 +154,11 @@ func (avcc AVCCFrame) IsSequence() bool { func (avcc AVCCFrame) CTS() uint32 { return uint32(avcc[2])<<24 | uint32(avcc[3])<<8 | uint32(avcc[4]) } -func (avcc AVCCFrame) VideoCodecID() byte { - return avcc[0] & 0x0F +func (avcc AVCCFrame) VideoCodecID() codec.VideoCodecID { + return codec.VideoCodecID(avcc[0] & 0x0F) } -func (avcc AVCCFrame) AudioCodecID() byte { - return avcc[0] >> 4 +func (avcc AVCCFrame) AudioCodecID() codec.AudioCodecID { + return codec.AudioCodecID(avcc[0] >> 4) } // func (annexb AnnexBFrame) ToSlices() (ret []NALUSlice) { diff --git a/common/index.go b/common/index.go index 34ea78a..5272257 100644 --- a/common/index.go +++ b/common/index.go @@ -8,6 +8,7 @@ type Track interface { type AVTrack interface { Track + Attach() WriteAVCC(ts uint32, frame AVCCFrame) //写入AVCC格式的数据 Flush() } diff --git a/config/config.go b/config/config.go index b942644..2231bdb 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,8 @@ import ( "reflect" "strings" "time" + + "github.com/sirupsen/logrus" ) type Config map[string]any @@ -57,8 +59,13 @@ func (config Config) Unmarshal(s any) { nameMap[strings.ToLower(name)] = name } for k, v := range config { + name, ok := nameMap[k] + if !ok { + logrus.Error("no config named:", k) + continue + } // 需要被写入的字段 - fv := el.FieldByName(nameMap[k]) + fv := el.FieldByName(name) if value := reflect.ValueOf(v); value.Kind() == reflect.Slice { l := value.Len() s := reflect.MakeSlice(fv.Type(), l, value.Cap()) @@ -110,8 +117,17 @@ func (config Config) Merge(source Config) { } } -func (config Config) Set(key string, value any) { - config[strings.ToLower(key)] = value +func (config *Config) Set(key string, value any) { + if *config == nil { + *config = Config{strings.ToLower(key): value} + } else { + (*config)[strings.ToLower(key)] = value + } +} + +func (config Config) Get(key string) any { + v, _ := config[strings.ToLower(key)] + return v } func (config Config) Has(key string) (ok bool) { @@ -148,13 +164,14 @@ func Struct2Config(s any) (config Config) { } for i, j := 0, t.NumField(); i < j; i++ { ft := t.Field(i) + name := strings.ToLower(ft.Name) switch ft.Type.Kind() { case reflect.Struct: - config[ft.Name] = Struct2Config(v.Field(i)) + config[name] = Struct2Config(v.Field(i)) case reflect.Slice: fallthrough default: - reflect.ValueOf(config).SetMapIndex(reflect.ValueOf(strings.ToLower(ft.Name)), v.Field(i)) + reflect.ValueOf(config).SetMapIndex(reflect.ValueOf(name), v.Field(i)) } } return diff --git a/config/types.go b/config/types.go index 41c6738..e4de83f 100644 --- a/config/types.go +++ b/config/types.go @@ -33,6 +33,13 @@ type Push struct { PushList map[string]string // 自动推流列表 } +func (p *Push) AddPush(streamPath string, url string) { + if p.PushList == nil { + p.PushList = make(map[string]string) + } + p.PushList[streamPath] = url +} + type Engine struct { Publish Subscribe @@ -44,7 +51,7 @@ type Engine struct { } var Global = &Engine{ - Publish{true, true, false, 10, 10}, + Publish{true, true, false, 10, 0}, Subscribe{true, true, false, 10}, HTTP{ListenAddr: ":8080", CORS: true}, false, true, true, true, diff --git a/http.go b/http.go index 832f570..ebc5790 100644 --- a/http.go +++ b/http.go @@ -3,9 +3,10 @@ package engine import ( "encoding/json" "net/http" - log "github.com/sirupsen/logrus" + "github.com/Monibuca/engine/v4/config" . "github.com/logrusorgru/aurora" + log "github.com/sirupsen/logrus" ) type GlobalConfig struct { @@ -14,8 +15,9 @@ type GlobalConfig struct { } func (cfg *GlobalConfig) Update(override config.Config) { + // 使得RawConfig具备全量配置信息,用于合并到插件配置中 Engine.RawConfig = config.Struct2Config(cfg.Engine) - log.Infoln(Green("api server start at"), BrightBlue(cfg.ListenAddr), BrightBlue(cfg.ListenAddrTLS)) + log.Info(Green("api server start at"), BrightBlue(cfg.ListenAddr), BrightBlue(cfg.ListenAddrTLS)) cfg.Listen(Engine, cfg) } diff --git a/log.go b/log.go index b55e1d0..a8241fb 100644 --- a/log.go +++ b/log.go @@ -2,24 +2,31 @@ package engine import ( "io" - "github.com/mattn/go-colorable" + "strings" + "github.com/Monibuca/engine/v4/util" + . "github.com/logrusorgru/aurora" + "github.com/mattn/go-colorable" log "github.com/sirupsen/logrus" ) +var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White} + // MultiLogWriter 可动态增减输出的多端写日志类 type MultiLogWriter struct { writers util.Slice[io.Writer] io.Writer } + var colorableStdout = colorable.NewColorableStdout() -var LogWriter = MultiLogWriter{ +var LogWriter = &MultiLogWriter{ writers: util.Slice[io.Writer]{colorableStdout}, Writer: colorableStdout, } func init() { log.SetOutput(LogWriter) + log.SetFormatter(LogWriter) } func (ml *MultiLogWriter) Add(w io.Writer) { @@ -31,3 +38,22 @@ func (ml *MultiLogWriter) Delete(w io.Writer) { ml.writers.Delete(w) ml.Writer = io.MultiWriter(ml.writers...) } + +func (ml *MultiLogWriter) Format(entry *log.Entry) (b []byte, err error) { + pl := entry.Data["plugin"] + if pl == nil { + pl = "Engine" + } + l := strings.ToUpper(entry.Level.String())[:1] + var props string + if stream := entry.Data["stream"]; stream != nil { + props = Sprintf("[s:%s] ", stream) + } + if puber := entry.Data["puber"]; puber != nil { + props += Sprintf("[pub:%s] ", puber) + } + if suber := entry.Data["suber"]; suber != nil { + props += Sprintf("[sub:%s] ", suber) + } + return []byte(Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message)), nil +} diff --git a/main.go b/main.go index 24c099c..235d93a 100644 --- a/main.go +++ b/main.go @@ -49,27 +49,29 @@ func (p PullOnSubscribe) Pull(streamPath string) { func Run(ctx context.Context, configFile string) (err error) { Engine.Context = ctx if err := util.CreateShutdownScript(); err != nil { - log.Errorln("create shutdown script error:", err) + log.Error("create shutdown script error:", err) } StartTime = time.Now() if ConfigRaw, err = ioutil.ReadFile(configFile); err != nil { - log.Errorln("read config file error:", err) + log.Error("read config file error:", err) } settingDir = filepath.Join(filepath.Dir(configFile), ".m7s") if err = os.MkdirAll(settingDir, 0755); err != nil { - log.Errorln("create dir .m7s error:", err) + log.Error("create dir .m7s error:", err) return } - log.Infoln(BgGreen(White("Ⓜ starting m7s v4"))) + log.Info(Blink("Ⓜ starting m7s v4")) var cg config.Config if ConfigRaw != nil { if err = yaml.Unmarshal(ConfigRaw, &cg); err == nil { Engine.RawConfig = cg.GetChild("global") + //将配置信息同步到结构体 Engine.RawConfig.Unmarshal(config.Global) } } else { - log.Warnln("no config file found , use default config values") + log.Warn("no config file found , use default config values") } + Engine.Entry = log.WithContext(Engine) Engine.registerHandler() go EngineConfig.Update(Engine.RawConfig) for name, plugin := range Plugins { diff --git a/plugin.go b/plugin.go index f78b68e..c8f4e51 100644 --- a/plugin.go +++ b/plugin.go @@ -35,7 +35,7 @@ func InstallPlugin(config config.Plugin) *Plugin { if config != EngineConfig { plugin.Entry = log.WithField("plugin", name) Plugins[name] = plugin - plugin.Infoln(Green("install"), BrightBlue(plugin.Version)) + plugin.Info(Green("install"), BrightBlue(plugin.Version)) } return plugin } @@ -72,12 +72,12 @@ func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter, if opt != Engine { pattern = "/" + strings.ToLower(opt.Name) + pattern } - opt.Infoln("http handle added:", pattern) + opt.Info("http handle added:", pattern) EngineConfig.HandleFunc(pattern, func(rw http.ResponseWriter, r *http.Request) { if cors { util.CORS(rw, r) } - opt.Debugln(r.RemoteAddr, " -> ", pattern) + opt.Debug(r.RemoteAddr, " -> ", pattern) handler(rw, r) }) } @@ -99,9 +99,9 @@ func (opt *Plugin) assign() { // 用全局配置覆盖没有设置的配置 for _, fname := range MergeConfigs { if _, ok := t.FieldByName(fname); ok { - if Engine.RawConfig.Has(fname) { + if v, ok := Engine.RawConfig[strings.ToLower(fname)]; ok { if !opt.RawConfig.Has(fname) { - opt.RawConfig.Set(fname, Engine.RawConfig[fname]) + opt.RawConfig.Set(fname, v) } else if opt.RawConfig.HasChild(fname) { opt.RawConfig.GetChild(fname).Merge(Engine.RawConfig.GetChild(fname)) } diff --git a/publisher.go b/publisher.go index ab0a166..b240c4f 100644 --- a/publisher.go +++ b/publisher.go @@ -13,7 +13,9 @@ type IPublisher interface { Close() // 流关闭时或者被踢时触发 OnStateChange(oldState StreamState, newState StreamState) bool OnStateChanged(oldState StreamState, newState StreamState) + Publish(streamPath string, specific IPublisher, config config.Publish) bool } + type IPuller interface { IPublisher Pull(int) @@ -35,10 +37,10 @@ func (pub *Publisher) Publish(streamPath string, specific IPublisher, config con } if s.Publisher != nil { if config.KickExsit { - s.Warnln("kick", s.Publisher) + s.Warn("kick", s.Publisher) s.Publisher.Close() } else { - s.Warnln("publisher exsit", s.Publisher) + s.Warn("publisher exsit", s.Publisher) return false } } diff --git a/stream.go b/stream.go index c68affd..6573998 100644 --- a/stream.go +++ b/stream.go @@ -62,6 +62,9 @@ var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{ { ACTION_TIMEOUT: STATE_DESTROYED, }, + { + + }, } // Streams 所有的流集合 @@ -124,11 +127,11 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream } p := strings.Split(u.Path, "/") if len(p) < 2 { - log.Warnln(Red("Stream Path Format Error:"), streamPath) + log.Warn(Red("Stream Path Format Error:"), streamPath) return nil, false } if s, ok := Streams.Map[u.Path]; ok { - s.Debugln(Green("Stream Found")) + s.Debug(Green("Stream Found")) return s, false } else { p := strings.Split(u.Path, "/") @@ -138,7 +141,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream StreamName: util.LastElement(p), Entry: log.WithField("stream", u.Path), } - s.Infoln("created:", streamPath) + s.Info("created") s.WaitTimeout = waitTimeout Streams.Map[u.Path] = s s.actionChan = make(chan any, 1) @@ -152,43 +155,48 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream } func (r *Stream) action(action StreamAction) bool { + r.Tracef("action:%d", action) if next, ok := StreamFSM[r.State][action]; ok { - if r.Publisher == nil || r.Publisher.OnStateChange(r.State, next) { + if r.Publisher != nil { + // 给Publisher状态变更的回调,方便进行远程拉流等操作 defer r.Publisher.OnStateChanged(r.State, next) - r.Debugln(action, " :", r.State, "->", next) - r.State = next - switch next { - case STATE_WAITPUBLISH: - r.Publisher = nil - Bus.Publish(Event_REQUEST_PUBLISH, r) - r.timeout.Reset(r.WaitTimeout) - if _, ok = PullOnSubscribeList[r.Path]; ok { - PullOnSubscribeList[r.Path].Pull(r.Path) - } - case STATE_WAITTRACK: - r.timeout.Reset(time.Second * 5) - case STATE_PUBLISHING: - r.WaitDone() - r.timeout.Reset(r.PublishTimeout) - Bus.Publish(Event_PUBLISH, r) - case STATE_WAITCLOSE: - r.timeout.Reset(r.WaitCloseTimeout) - case STATE_CLOSED: - r.cancel() - if r.Publisher != nil { - r.Publisher.Close() - } - r.WaitDone() - Bus.Publish(Event_STREAMCLOSE, r) - Streams.Delete(r.Path) - r.timeout.Reset(time.Second) // 延迟1秒钟销毁,防止访问到已关闭的channel - case STATE_DESTROYED: - close(r.actionChan) - fallthrough - default: - r.timeout.Stop() + if !r.Publisher.OnStateChange(r.State, next) { + return false } } + r.Debug(action, " :", r.State, "->", next) + r.State = next + switch next { + case STATE_WAITPUBLISH: + r.Publisher = nil + Bus.Publish(Event_REQUEST_PUBLISH, r) + r.timeout.Reset(r.WaitTimeout) + if _, ok = PullOnSubscribeList[r.Path]; ok { + PullOnSubscribeList[r.Path].Pull(r.Path) + } + case STATE_WAITTRACK: + r.timeout.Reset(time.Second * 5) + case STATE_PUBLISHING: + r.WaitDone() + r.timeout.Reset(r.PublishTimeout) + Bus.Publish(Event_PUBLISH, r) + case STATE_WAITCLOSE: + r.timeout.Reset(r.WaitCloseTimeout) + case STATE_CLOSED: + r.cancel() + if r.Publisher != nil { + r.Publisher.Close() + } + r.WaitDone() + Bus.Publish(Event_STREAMCLOSE, r) + Streams.Delete(r.Path) + r.timeout.Reset(time.Second) // 延迟1秒钟销毁,防止访问到已关闭的channel + case STATE_DESTROYED: + close(r.actionChan) + fallthrough + default: + r.timeout.Stop() + } return true } return false @@ -197,7 +205,7 @@ func (r *Stream) IsClosed() bool { if r == nil { return true } - return r.State == STATE_CLOSED + return r.State >= STATE_CLOSED } func (r *Stream) Close() { @@ -207,13 +215,13 @@ func (r *Stream) Close() { } func (r *Stream) UnSubscribe(sub *Subscriber) { - r.Debugln("unsubscribe", sub.ID) + r.Debug("unsubscribe", sub.ID) if !r.IsClosed() { r.actionChan <- UnSubscibeAction(sub) } } func (r *Stream) Subscribe(sub *Subscriber) { - r.Debugln("subscribe", sub.ID) + r.Debug("subscribe", sub.ID) if !r.IsClosed() { sub.Stream = r sub.Context, sub.cancel = context.WithCancel(r) @@ -223,14 +231,15 @@ func (r *Stream) Subscribe(sub *Subscriber) { // 流状态处理中枢,包括接收订阅发布指令等 func (r *Stream) run() { + var done = r.Done() for { select { case <-r.timeout.C: - r.Debugln(r.State, "timeout") + r.Debugf("%v timeout", r.State) r.action(ACTION_TIMEOUT) - case <-r.Done(): + case <-done: r.action(ACTION_CLOSE) - + done = nil case action, ok := <-r.actionChan: if ok { switch v := action.(type) { @@ -243,14 +252,14 @@ func (r *Stream) run() { case *Subscriber: r.Subscribers.Add(v) Bus.Publish(Event_SUBSCRIBE, v) - v.Infoln(Sprintf(Yellow("added remains:%d"), Cyan(v.ID), Blue(len(r.Subscribers)))) + v.Info(Sprintf(Yellow("added remains:%d") ,len(r.Subscribers))) if r.Subscribers.Len() == 1 { r.action(ACTION_FIRSTENTER) } case UnSubscibeAction: if r.Subscribers.Delete(v) { Bus.Publish(Event_UNSUBSCRIBE, v) - (*Subscriber)(v).Infoln(Sprintf(Yellow("removed remains:%d"), Cyan(v.ID), Blue(len(r.Subscribers)))) + (*Subscriber)(v).Info(Sprintf(Yellow("removed remains:%d"), len(r.Subscribers))) if r.Subscribers.Len() == 0 && r.WaitCloseTimeout > 0 { r.action(ACTION_LASTLEAVE) } @@ -266,7 +275,7 @@ func (r *Stream) run() { // Update 更新数据重置超时定时器 func (r *Stream) Update() uint32 { if r.State == STATE_PUBLISHING { - r.Traceln("update") + r.Trace("update") r.timeout.Reset(r.PublishTimeout) } return atomic.AddUint32(&r.FrameCount, 1) @@ -274,31 +283,29 @@ func (r *Stream) Update() uint32 { // 如果暂时不知道编码格式可以用这个 func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo) { - r.Debugln("create unknow video track") - vt = &track.UnknowVideo{ - Stream: r, - } + r.Debug("create unknow video track") + vt = &track.UnknowVideo{} + vt.Stream = r return } func (r *Stream) NewAudioTrack() (at *track.UnknowAudio) { - r.Debugln("create unknow audio track") - at = &track.UnknowAudio{ - Stream: r, - } + r.Debug("create unknow audio track") + at = &track.UnknowAudio{} + at.Stream = r return } func (r *Stream) NewH264Track() *track.H264 { - r.Debugln("create h264 track") + r.Debug("create h264 track") return track.NewH264(r) } func (r *Stream) NewH265Track() *track.H265 { - r.Debugln("create h265 track") + r.Debug("create h265 track") return track.NewH265(r) } func (r *Stream) NewAACTrack() *track.AAC { - r.Debugln("create aac track") + r.Debug("create aac track") return track.NewAAC(r) } diff --git a/subscriber.go b/subscriber.go index 764f1cb..8734906 100644 --- a/subscriber.go +++ b/subscriber.go @@ -48,7 +48,7 @@ func (sub *Subscriber) Subscribe(streamPath string, config config.Subscribe) boo log.Info(sub.ID, "try to subscribe", streamPath) s, created := findOrCreateStream(streamPath, config.WaitTimeout.Duration()) if s.IsClosed() { - log.Warnln("stream is closed") + log.Warn("stream is closed") return false } sub.Entry = s.Entry.WithField("suber", sub.ID) diff --git a/track/audio.go b/track/audio.go index 58f11e0..fe64d65 100644 --- a/track/audio.go +++ b/track/audio.go @@ -2,7 +2,6 @@ package track import ( "net" - "strings" "github.com/Monibuca/engine/v4/codec" . "github.com/Monibuca/engine/v4/common" @@ -15,15 +14,20 @@ var adcflv2 = []byte{0, 0, 0, 15} type Audio struct { Media[AudioSlice] + CodecID codec.AudioCodecID Channels byte avccHead []byte } -func (av *Audio) GetName() string { - if av.Name == "" { - return strings.ToLower(codec.SoundFormat[av.CodecID]) +func (at *Audio) Attach() { + at.Stream.AddTrack(at) +} + +func (at *Audio) GetName() string { + if at.Name == "" { + return at.CodecID.String() } - return av.Name + return at.Name } func (at *Audio) GetInfo() *Audio { return at @@ -81,16 +85,15 @@ func (at *Audio) Flush() { } type UnknowAudio struct { - Name string - Stream IStream - Know AVTrack + Base + AudioTrack } func (at *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) { - if at.Know == nil { + if at.AudioTrack == nil { codecID := frame.AudioCodecID() if at.Name == "" { - at.Name = strings.ToLower(codec.SoundFormat[codecID]) + at.Name = codecID.String() } switch codecID { case codec.CodecID_AAC: @@ -98,11 +101,11 @@ func (at *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) { return } a := NewAAC(at.Stream) - at.Know = a + at.AudioTrack = a a.SampleSize = 16 a.avccHead = []byte{frame[0], 1} a.WriteAVCC(0, frame) - a.Stream.AddTrack(&a.Audio) + a.Attach() case codec.CodecID_PCMA, codec.CodecID_PCMU: alaw := true @@ -110,7 +113,7 @@ func (at *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) { alaw = false } a := NewG711(at.Stream, alaw) - at.Know = a + at.AudioTrack = a a.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2]) a.SampleSize = 16 if frame[0]&0x02 == 0 { @@ -118,9 +121,12 @@ func (at *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) { } a.Channels = frame[0]&0x01 + 1 a.avccHead = frame[:1] - a.Stream.AddTrack(&a.Audio) + a.Attach() + at.AudioTrack.WriteAVCC(ts, frame) + default: + at.Stream.Errorf("audio codec not support yet:", codecID) } } else { - at.Know.WriteAVCC(ts, frame) + at.AudioTrack.WriteAVCC(ts, frame) } } diff --git a/track/base.go b/track/base.go index 1b55591..fd5b643 100644 --- a/track/base.go +++ b/track/base.go @@ -26,7 +26,6 @@ func (bt *Base) Flush(bf *BaseFrame) { type Media[T RawSlice] struct { Base AVRing[T] `json:"-"` - CodecID byte SampleRate uint32 SampleSize byte DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) @@ -110,7 +109,7 @@ func (av *Media[T]) UnmarshalRTP(raw []byte) (frame *RTPFrame) { av.lastSeq2 = av.lastSeq av.lastSeq = frame.SequenceNumber if av.lastSeq != av.lastSeq2+1 { //序号不连续 - av.Stream.Warnln("RTP SequenceNumber error", av.lastSeq2, av.lastSeq) + av.Stream.Warn("RTP SequenceNumber error", av.lastSeq2, av.lastSeq) return } } diff --git a/track/h264.go b/track/h264.go index 19ceb5d..a682313 100644 --- a/track/h264.go +++ b/track/h264.go @@ -113,7 +113,7 @@ func (vt *H264) WriteRTP(raw []byte) { func (vt *H264) Flush() { if vt.Value.IFrame { if vt.IDRing == nil { - defer vt.Stream.AddTrack(&vt.Video) + defer vt.Attach() } vt.Video.ComputeGOP() } diff --git a/track/h265.go b/track/h265.go index cdf91ab..d417543 100644 --- a/track/h265.go +++ b/track/h265.go @@ -116,7 +116,7 @@ func (vt *H265) WriteRTP(raw []byte) { func (vt *H265) Flush() { if vt.Value.IFrame { if vt.IDRing == nil { - defer vt.Stream.AddTrack(&vt.Video) + defer vt.Attach() } vt.Video.ComputeGOP() } diff --git a/track/video.go b/track/video.go index 7a1fe5d..543b939 100644 --- a/track/video.go +++ b/track/video.go @@ -3,7 +3,6 @@ package track import ( "bytes" "net" - "strings" "github.com/Monibuca/engine/v4/codec" . "github.com/Monibuca/engine/v4/common" @@ -13,6 +12,7 @@ import ( type Video struct { Media[NALUSlice] + CodecID codec.VideoCodecID IDRing *util.Ring[AVFrame[NALUSlice]] `json:"-"` //最近的关键帧位置,首屏渲染 SPSInfo codec.SPSInfo GOP int //关键帧间隔 @@ -20,9 +20,13 @@ type Video struct { idrCount int //缓存中包含的idr数量 } +func (t *Video) Attach() { + t.Stream.AddTrack(t) +} + func (t *Video) GetName() string { if t.Name == "" { - return strings.ToLower(codec.CodecID[t.CodecID]) + return t.CodecID.String() } return t.Name } @@ -86,7 +90,7 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) { vt.Value.AppendRaw(NALUSlice{nalus[vt.nalulenSize:end]}) nalus = nalus[end:] } else { - vt.Stream.Errorln("WriteAVCC error,len %d,nalulenSize:%d,end:%d", len(nalus), vt.nalulenSize, end) + vt.Stream.Error("WriteAVCC error,len %d,nalulenSize:%d,end:%d", len(nalus), vt.nalulenSize, end) break } } @@ -100,7 +104,7 @@ func (vt *Video) Flush() { } // AVCC格式补完 if vt.Value.AVCC == nil && (config.Global.EnableAVCC || config.Global.EnableFLV) { - b := []byte{vt.CodecID, 1, 0, 0, 0} + b := []byte{byte(vt.CodecID), 1, 0, 0, 0} if vt.Value.IFrame { b[0] |= 0x10 } else { @@ -147,9 +151,8 @@ func (vt *Video) Play(onVideo func(*AVFrame[NALUSlice]) error) { } type UnknowVideo struct { - Name string - Stream IStream - Know AVTrack + Base + VideoTrack } /* @@ -172,24 +175,27 @@ func (vt *UnknowVideo) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { } func (vt *UnknowVideo) WriteAVCC(ts uint32, frame AVCCFrame) { - if vt.Know == nil { + if vt.VideoTrack == nil { if frame.IsSequence() { + ts = 0 codecID := frame.VideoCodecID() if vt.Name == "" { - vt.Name = strings.ToLower(codec.CodecID[codecID]) + vt.Name = codecID.String() } switch codecID { case codec.CodecID_H264: - v := NewH264(vt.Stream) - vt.Know = v - v.WriteAVCC(0, frame) + vt.VideoTrack = NewH264(vt.Stream) case codec.CodecID_H265: - v := NewH265(vt.Stream) - vt.Know = v - v.WriteAVCC(0, frame) + vt.VideoTrack = NewH265(vt.Stream) + default: + vt.Stream.Error("video codecID not support: ", codecID) + return } + vt.VideoTrack.WriteAVCC(ts, frame) + } else { + vt.Stream.Warnf("need sequence frame") } } else { - vt.Know.WriteAVCC(ts, frame) + vt.VideoTrack.WriteAVCC(ts, frame) } } diff --git a/tracks.go b/tracks.go index 66f10c9..c87aeac 100644 --- a/tracks.go +++ b/tracks.go @@ -32,7 +32,7 @@ func (s *Stream) AddTrack(t Track) { defer s.Tracks.Unlock() name := t.GetName() if _, ok := s.Tracks.m[name]; !ok { - s.Infoln("Track", name, "added") + s.Infof("Track '%s' added", name) if s.Tracks.m[name] = t; s.Tracks.Err() == nil { for _, ch := range s.Tracks.waiters[name] { if *ch != nil {