diff --git a/plugin.go b/plugin.go index a94293a..c3e0ce8 100644 --- a/plugin.go +++ b/plugin.go @@ -3,7 +3,6 @@ package m7s import ( "context" "fmt" - "log/slog" "net" "net/http" "net/url" @@ -366,7 +365,7 @@ func (p *Plugin) OnPublish(pub *Publisher) { if p.Meta.Recorder != nil { for r, recConf := range onPublish.Record { if recConf.FilePath = r.Replace(pub.StreamPath, recConf.FilePath); recConf.FilePath != "" { - p.Record(pub, recConf) + p.Record(pub, recConf, nil) } } } @@ -496,9 +495,9 @@ func (p *Plugin) Push(pub *Publisher, conf config.Push) { job.Depend(pub) } -func (p *Plugin) Record(pub *Publisher, conf config.Record) { +func (p *Plugin) Record(pub *Publisher, conf config.Record, subConf *config.Subscribe) { recorder := p.Meta.Recorder() - job := recorder.GetRecordJob().Init(recorder, p, pub.StreamPath, conf) + job := recorder.GetRecordJob().Init(recorder, p, pub.StreamPath, conf, subConf) job.Depend(pub) } @@ -558,10 +557,6 @@ func (p *Plugin) handle(pattern string, handler http.Handler) { p.Server.apiList = append(p.Server.apiList, pattern) } -func (p *Plugin) AddLogHandler(handler slog.Handler) { - p.Server.LogHandler.Add(handler) -} - func (p *Plugin) SaveConfig() (err error) { return Servers.AddTask(&SaveConfig{Plugin: p}).WaitStopped() } diff --git a/plugin/logrotate/api.go b/plugin/logrotate/api.go index 598a15e..96641ff 100644 --- a/plugin/logrotate/api.go +++ b/plugin/logrotate/api.go @@ -51,9 +51,10 @@ func (h *LogRotatePlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { } -func (l *LogRotatePlugin) API_tail(w http.ResponseWriter, r *http.Request) { +func (l *LogRotatePlugin) API_trail(w http.ResponseWriter, r *http.Request) { writer := util.NewSSE(w, r.Context()) h := console.NewHandler(writer, &console.HandlerOptions{NoColor: true}) - l.Server.AddLogHandler(h) + l.Server.LogHandler.Add(h) <-r.Context().Done() + l.Server.LogHandler.Remove(h) } diff --git a/plugin/logrotate/index.go b/plugin/logrotate/index.go index 1fc5cbb..187f2c8 100644 --- a/plugin/logrotate/index.go +++ b/plugin/logrotate/index.go @@ -32,7 +32,7 @@ func (config *LogRotatePlugin) OnInit() (err error) { } config.handler, err = rotoslog.NewHandler(rotoslog.LogHandlerBuilder(builder), rotoslog.LogDir(config.Path), rotoslog.MaxFileSize(config.Size), rotoslog.DateTimeLayout(config.Formatter), rotoslog.MaxRotatedFiles(config.MaxFiles)) if err == nil { - config.AddLogHandler(config.handler) + config.Server.LogHandler.Add(config.handler) } return } diff --git a/plugin/rtp/pkg/video.go b/plugin/rtp/pkg/video.go index be0fab3..4f61d8c 100644 --- a/plugin/rtp/pkg/video.go +++ b/plugin/rtp/pkg/video.go @@ -2,7 +2,6 @@ package rtp import ( "encoding/base64" - "errors" "fmt" "io" "slices" @@ -22,7 +21,7 @@ import ( type ( H26xCtx struct { RTPCtx - seq uint16 + seq uint16 dtsEst util.DTSEstimator } H264Ctx struct { @@ -410,7 +409,7 @@ func (r *Video) Demux(ictx codec.ICodecCtx) (any, error) { if nalu.Size > 0 { nalu.AppendOne(packet.Payload[offset:]) } else { - return nil, errors.New("fu have no start") + continue } if util.Bit1(b1, 1) { gotNalu() diff --git a/plugin/rtsp/pkg/transceiver.go b/plugin/rtsp/pkg/transceiver.go index daa93dd..877f950 100644 --- a/plugin/rtsp/pkg/transceiver.go +++ b/plugin/rtsp/pkg/transceiver.go @@ -165,6 +165,22 @@ func (r *Receiver) Receive() (err error) { }, } return r.NetConnection.Receive(false, func(channelID byte, buf []byte) error { + if time.Since(rtcpTS) > 5*time.Second { + rtcpTS = time.Now() + // Serialize RTCP packets + rawRR, err := rr.Marshal() + if err != nil { + return err + } + rawSDES, err := sdes.Marshal() + if err != nil { + return err + } + // Send RTCP packets + if _, err = r.NetConnection.Write(append(rawRR, rawSDES...)); err != nil { + return err + } + } switch int(channelID) { case r.AudioChannelID: if !r.PubAudio { @@ -224,25 +240,6 @@ func (r *Receiver) Receive() (err error) { default: } - - if time.Now().After(rtcpTS) { - rtcpTS = time.Now().Add(5 * time.Second) - // Serialize RTCP packets - rawRR, err := rr.Marshal() - if err != nil { - return err - } - rawSDES, err := sdes.Marshal() - if err != nil { - return err - } - // Send RTCP packets - if _, err = r.NetConnection.Write(append(rawRR, rawSDES...)); err != nil { - return err - } - - } - return pkg.ErrUnsupportCodec }, func(channelID byte, buf []byte) error { msg := &RTCP{Channel: channelID} diff --git a/publisher.go b/publisher.go index af8eb44..b95c2ca 100644 --- a/publisher.go +++ b/publisher.go @@ -196,7 +196,7 @@ func (p *Publisher) Start() (err error) { if device.Status == DeviceStatusOnline { device.ChangeStatus(DeviceStatusPulling) if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && device.FilePath != "" { - mp4Plugin.Record(p, device.Record) + mp4Plugin.Record(p, device.Record, nil) } } } diff --git a/recoder.go b/recoder.go index efcf7a6..ee32f6b 100644 --- a/recoder.go +++ b/recoder.go @@ -1,9 +1,10 @@ package m7s import ( - "gorm.io/gorm" "time" + "gorm.io/gorm" + "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" @@ -22,6 +23,7 @@ type ( StreamPath string // 对应本地流 Plugin *Plugin Subscriber *Subscriber + SubConf *config.Subscribe Fragment time.Duration Append bool FilePath string @@ -63,19 +65,24 @@ func (p *RecordJob) GetKey() string { } func (p *RecordJob) Subscribe() (err error) { - p.Subscriber, err = p.Plugin.Subscribe(p.recorder.GetTask().Context, p.StreamPath) + if p.SubConf != nil { + p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.recorder.GetTask().Context, p.StreamPath, *p.SubConf) + } else { + p.Subscriber, err = p.Plugin.Subscribe(p.recorder.GetTask().Context, p.StreamPath) + } if p.Subscriber != nil { p.Subscriber.Internal = true } return } -func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, conf config.Record) *RecordJob { +func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, conf config.Record, subConf *config.Subscribe) *RecordJob { p.Plugin = plugin p.Fragment = conf.Fragment p.Append = conf.Append p.FilePath = conf.FilePath p.StreamPath = streamPath + p.SubConf = subConf p.recorder = recorder p.SetDescriptions(task.Description{ "plugin": plugin.Meta.Name,