fix: rtsp add send rtcp

This commit is contained in:
langhuihui
2024-11-29 12:08:14 +08:00
parent 42ba757aef
commit 6eef325fa6
7 changed files with 36 additions and 37 deletions

View File

@@ -3,7 +3,6 @@ package m7s
import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
"net/url"
@@ -366,7 +365,7 @@ func (p *Plugin) OnPublish(pub *Publisher) {
if p.Meta.Recorder != nil {
for r, recConf := range onPublish.Record {
if recConf.FilePath = r.Replace(pub.StreamPath, recConf.FilePath); recConf.FilePath != "" {
p.Record(pub, recConf)
p.Record(pub, recConf, nil)
}
}
}
@@ -496,9 +495,9 @@ func (p *Plugin) Push(pub *Publisher, conf config.Push) {
job.Depend(pub)
}
func (p *Plugin) Record(pub *Publisher, conf config.Record) {
func (p *Plugin) Record(pub *Publisher, conf config.Record, subConf *config.Subscribe) {
recorder := p.Meta.Recorder()
job := recorder.GetRecordJob().Init(recorder, p, pub.StreamPath, conf)
job := recorder.GetRecordJob().Init(recorder, p, pub.StreamPath, conf, subConf)
job.Depend(pub)
}
@@ -558,10 +557,6 @@ func (p *Plugin) handle(pattern string, handler http.Handler) {
p.Server.apiList = append(p.Server.apiList, pattern)
}
func (p *Plugin) AddLogHandler(handler slog.Handler) {
p.Server.LogHandler.Add(handler)
}
func (p *Plugin) SaveConfig() (err error) {
return Servers.AddTask(&SaveConfig{Plugin: p}).WaitStopped()
}

View File

@@ -51,9 +51,10 @@ func (h *LogRotatePlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (l *LogRotatePlugin) API_tail(w http.ResponseWriter, r *http.Request) {
func (l *LogRotatePlugin) API_trail(w http.ResponseWriter, r *http.Request) {
writer := util.NewSSE(w, r.Context())
h := console.NewHandler(writer, &console.HandlerOptions{NoColor: true})
l.Server.AddLogHandler(h)
l.Server.LogHandler.Add(h)
<-r.Context().Done()
l.Server.LogHandler.Remove(h)
}

View File

@@ -32,7 +32,7 @@ func (config *LogRotatePlugin) OnInit() (err error) {
}
config.handler, err = rotoslog.NewHandler(rotoslog.LogHandlerBuilder(builder), rotoslog.LogDir(config.Path), rotoslog.MaxFileSize(config.Size), rotoslog.DateTimeLayout(config.Formatter), rotoslog.MaxRotatedFiles(config.MaxFiles))
if err == nil {
config.AddLogHandler(config.handler)
config.Server.LogHandler.Add(config.handler)
}
return
}

View File

@@ -2,7 +2,6 @@ package rtp
import (
"encoding/base64"
"errors"
"fmt"
"io"
"slices"
@@ -22,7 +21,7 @@ import (
type (
H26xCtx struct {
RTPCtx
seq uint16
seq uint16
dtsEst util.DTSEstimator
}
H264Ctx struct {
@@ -410,7 +409,7 @@ func (r *Video) Demux(ictx codec.ICodecCtx) (any, error) {
if nalu.Size > 0 {
nalu.AppendOne(packet.Payload[offset:])
} else {
return nil, errors.New("fu have no start")
continue
}
if util.Bit1(b1, 1) {
gotNalu()

View File

@@ -165,6 +165,22 @@ func (r *Receiver) Receive() (err error) {
},
}
return r.NetConnection.Receive(false, func(channelID byte, buf []byte) error {
if time.Since(rtcpTS) > 5*time.Second {
rtcpTS = time.Now()
// Serialize RTCP packets
rawRR, err := rr.Marshal()
if err != nil {
return err
}
rawSDES, err := sdes.Marshal()
if err != nil {
return err
}
// Send RTCP packets
if _, err = r.NetConnection.Write(append(rawRR, rawSDES...)); err != nil {
return err
}
}
switch int(channelID) {
case r.AudioChannelID:
if !r.PubAudio {
@@ -224,25 +240,6 @@ func (r *Receiver) Receive() (err error) {
default:
}
if time.Now().After(rtcpTS) {
rtcpTS = time.Now().Add(5 * time.Second)
// Serialize RTCP packets
rawRR, err := rr.Marshal()
if err != nil {
return err
}
rawSDES, err := sdes.Marshal()
if err != nil {
return err
}
// Send RTCP packets
if _, err = r.NetConnection.Write(append(rawRR, rawSDES...)); err != nil {
return err
}
}
return pkg.ErrUnsupportCodec
}, func(channelID byte, buf []byte) error {
msg := &RTCP{Channel: channelID}

View File

@@ -196,7 +196,7 @@ func (p *Publisher) Start() (err error) {
if device.Status == DeviceStatusOnline {
device.ChangeStatus(DeviceStatusPulling)
if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && device.FilePath != "" {
mp4Plugin.Record(p, device.Record)
mp4Plugin.Record(p, device.Record, nil)
}
}
}

View File

@@ -1,9 +1,10 @@
package m7s
import (
"gorm.io/gorm"
"time"
"gorm.io/gorm"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
@@ -22,6 +23,7 @@ type (
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
SubConf *config.Subscribe
Fragment time.Duration
Append bool
FilePath string
@@ -63,19 +65,24 @@ func (p *RecordJob) GetKey() string {
}
func (p *RecordJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.recorder.GetTask().Context, p.StreamPath)
if p.SubConf != nil {
p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.recorder.GetTask().Context, p.StreamPath, *p.SubConf)
} else {
p.Subscriber, err = p.Plugin.Subscribe(p.recorder.GetTask().Context, p.StreamPath)
}
if p.Subscriber != nil {
p.Subscriber.Internal = true
}
return
}
func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, conf config.Record) *RecordJob {
func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, conf config.Record, subConf *config.Subscribe) *RecordJob {
p.Plugin = plugin
p.Fragment = conf.Fragment
p.Append = conf.Append
p.FilePath = conf.FilePath
p.StreamPath = streamPath
p.SubConf = subConf
p.recorder = recorder
p.SetDescriptions(task.Description{
"plugin": plugin.Meta.Name,