fix: pull rtsp

This commit is contained in:
langhuihui
2024-10-06 17:55:48 +08:00
parent c57200178d
commit fc790cfe06
30 changed files with 371 additions and 108 deletions

View File

@@ -26,7 +26,8 @@ func main() {
|-----------|-------------|
| disable_rm | Disables the memory pool |
| sqlite | Enables the sqlite DB |
| duckdb | Enables the duckdb DB |
| taskpanic | Throws panic, for testing |
## More Example

View File

@@ -24,7 +24,8 @@ func main() {
|-----------|-------------|
| disable_rm | 禁用内存池 |
| sqlite | 启用 sqlite |
| duckdb | 启用 duckdb |
| taskpanic | 抛出 panic用于测试 |
## 更多示例
查看 example 目录

132
doc/pull.md Normal file
View File

@@ -0,0 +1,132 @@
# sequence
```mermaid
sequenceDiagram
participant P as Plugin
participant M as PluginMeta
participant PJ as PullJob
participant S as Server
participant IPuller as IPuller
P->>P: Pull(streamPath, conf)
P->>M: Meta.Puller(conf)
M-->>P: puller (IPuller)
P->>PJ: GetPullJob()
PJ-->>P: pullJob
P->>PJ: Init(puller, p, streamPath, conf)
PJ->>S: Server.Pulls.Add(p, logger)
S->>PJ: Start()
PJ->>IPuller: SetRetry(conf.MaxRetry, conf.RetryInterval)
PJ->>PJ: Description = {...}
Note over PJ: Set description with plugin info, streamPath, URL, etc.
PJ->>IPuller: Start()
```
# simple config
## flv plugin
### local file
```yaml
flv:
pull:
live/test: /Users/dexter/Movies/jb-demo.flv
```
### remote file
```yaml
flv:
pull:
live/test: http://192.168.1.100/live/stream.flv
```
## mp4 plugin
### local file
```yaml
mp4:
pull:
live/test: /Users/dexter/Movies/jb-demo.mp4
```
### remote file
```yaml
mp4:
pull:
live/test: http://192.168.1.100/live/stream.mp4
```
## srt plugin
### local file
```yaml
srt:
pull:
live/test: srt://127.0.0.1:6000?streamid=subscribe:/live/stream&passphrase=foobarfoobar
```
## rtmp plugin
```yaml
rtmp:
pull:
live/test: rtmp://127.0.0.1/live/stream
```
## rtsp plugin
```yaml
rtsp:
pull:
live/test: rtsp://127.0.0.1/live/stream
```
## hls plugin
```yaml
hls:
pull:
live/test: http://127.0.0.1/live/stream.m3u8
```
## gb28181 plugin
deivceID/channelID
```yaml
gb28181:
pull:
live/test: 34020000002000000001/34020000002000000001
```
# full config
## pull on subscribe
```yaml
xxx:
onsub:
pull:
.*: $0
```
## config retry
```yaml
xxx:
pull:
live/test: xxxx
maxRetry: 3
retryInterval: 5s
```
## config proxy
```yaml
xxx:
pull:
live/test: xxxx
proxy: http://127.0.0.1:8080
```
## config header
```yaml
xxx:
pull:
live/test: xxxx
header:
User-Agent: xxx
```
## config args
```yaml
xxx:
pull:
live/test: xxxx
args:
user: xxx
password: xxx
```

View File

@@ -0,0 +1,23 @@
global:
loglevel: debug
http: :8081
tcp: :50052
rtsp:
pull:
live/test: rtsp://127.0.0.1:8554/live/test
#transcode:
# onpub:
# transform:
# ^live.+:
# output:
# - target: rtmp://localhost/trans/$0/small
# conf: -loglevel debug -c:a aac -c:v h264 -vf scale=320:240
mp4:
onpub:
record:
^live/.+:
fragment: 30s
filepath: record/mp4

42
example/custom/main.go Normal file
View File

@@ -0,0 +1,42 @@
package main
import (
"context"
"flag"
"fmt"
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/console"
_ "m7s.live/m7s/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/flv"
_ "m7s.live/m7s/v5/plugin/gb28181"
_ "m7s.live/m7s/v5/plugin/logrotate"
_ "m7s.live/m7s/v5/plugin/monitor"
_ "m7s.live/m7s/v5/plugin/mp4"
mp4 "m7s.live/m7s/v5/plugin/mp4/pkg"
_ "m7s.live/m7s/v5/plugin/preview"
_ "m7s.live/m7s/v5/plugin/rtmp"
_ "m7s.live/m7s/v5/plugin/rtsp"
_ "m7s.live/m7s/v5/plugin/sei"
_ "m7s.live/m7s/v5/plugin/srt"
_ "m7s.live/m7s/v5/plugin/stress"
_ "m7s.live/m7s/v5/plugin/transcode"
_ "m7s.live/m7s/v5/plugin/webrtc"
"path/filepath"
"strings"
"time"
)
func main() {
conf := flag.String("c", "config.yaml", "config file")
flag.Parse()
mp4.CustomFileName = func(job *m7s.RecordJob) string {
if job.Fragment == 0 {
return job.FilePath + ".mp4"
}
ss := strings.Split(job.StreamPath, "/")
lastPart := ss[len(ss)-1]
return filepath.Join(job.FilePath, fmt.Sprintf("%s_%s%s", lastPart, time.Now().Local().Format("2006-01-02-15-04-05"), ".mp4"))
}
// ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100))
m7s.Run(context.Background(), *conf)
}

View File

@@ -3,7 +3,7 @@ global:
disableall: true
rtsp:
enable: true
listenaddr: :554
tcp: :8554
flv:
enable: true
pull:

View File

@@ -1,25 +1,16 @@
global:
loglevel: debug
#rtsp:
# tcp:
# listenaddr: :10554
loglevel: trace
http: :8081
tcp: :50052
rtsp:
pull:
live/test: rtsp://127.0.0.1:8554/live/test
debug:
profile: cpu.prof
transcode:
onpub:
transform:
# .+:
# output:
# - target: rtmp://localhost/$0/h265
# conf: -loglevel debug -c:a aac -c:v hevc_videotoolbox
live/.*:
^live.+:
output:
- target: rtmp://localhost/$0/h264
overlay: rtmp://localhost/overlay/test
# filter: drawtext=fontfile=/usr/share/fonts/truetype/noto/NotoSans-Regular.ttf:fontsize=24:fontcolor=white:box=1:boxcolor=black@0.5:boxborderw=5:x=(w-tw)/2:y=h-th-10:text='%{localtime\:%Y-%m-%d %H.%M.%S}'
filter: |
[1:v]crop=400:300:10:10[overlay];
[0:v][overlay]overlay=600:100[base];
[base]drawtext=fontfile=/usr/share/fonts/truetype/noto/NotoSans-Regular.ttf:fontsize=24:fontcolor=white:box=1:boxcolor=black@0.5:boxborderw=5:x=(w-tw)/2:y=h-th-10:text='%{localtime\:%Y-%m-%d %H.%M.%S}'[out]
conf: -loglevel debug -c:a copy -c:v h264
- target: rtmp://localhost/trans/$0/small
conf: -loglevel debug -c:a aac -c:v h264 -vf scale=320:240

View File

@@ -39,7 +39,7 @@ type (
}
Pull struct {
URL string `desc:"拉流地址"`
MaxRetry int `desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉高于0 的数代表最大重拉次数
MaxRetry int `default:"-1" desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉高于0 的数代表最大重拉次数
RetryInterval time.Duration `default:"5s" desc:"重试间隔"` // 重试间隔
Proxy string `desc:"代理地址"` // 代理地址
Header map[string][]string

View File

@@ -150,6 +150,7 @@ func (rb *RingWriter) Step() (normal bool) {
// do not remove only idr
if next == rb.IDRingList.Back().Value {
if rb.Size < rb.SizeRange[1] {
rb.SLogger.Debug("only idr")
rb.glow(5)
next = rb.Next()
}
@@ -160,7 +161,7 @@ func (rb *RingWriter) Step() (normal bool) {
rb.IDRingList.Remove(oldIDR)
rb.Unlock()
} else {
rb.SLogger.Log(nil, task.TraceLevel, "not enough buffer")
rb.SLogger.Debug("not enough buffer")
rb.glow(5)
next = rb.Next()
}

View File

@@ -1,3 +1,6 @@
//go:build !taskpanic
// +build !taskpanic
package task
var ThrowPanic = false

View File

@@ -228,7 +228,7 @@ func (task *Task) GetSignal() any {
}
func (task *Task) checkRetry(err error) bool {
if errors.Is(err, ErrTaskComplete) {
if errors.Is(err, ErrTaskComplete) || errors.Is(err, ErrExit) {
return false
}
if task.retry.MaxRetry < 0 || task.retry.RetryCount < task.retry.MaxRetry {
@@ -349,9 +349,21 @@ func (task *Task) ResetRetryCount() {
}
func (task *Task) run(handler func() error) {
if err := handler(); err == nil {
var err error
defer func() {
if !ThrowPanic {
if r := recover(); r != nil {
err = errors.New(fmt.Sprint(r))
if task.Logger != nil {
task.Error("panic", "error", err, "stack", string(debug.Stack()))
}
}
}
if err == nil {
task.Stop(ErrTaskComplete)
} else {
task.Stop(err)
}
}()
err = handler()
}

View File

@@ -41,8 +41,6 @@ func NewBufReaderWithBufLen(reader io.Reader, bufLen int) (r *BufReader) {
func NewBufReaderBuffersChan(feedChan chan net.Buffers) (r *BufReader) {
r = &BufReader{
Allocator: NewScalableMemoryAllocator(defaultBufSize),
BufLen: defaultBufSize,
feedData: func() error {
data, ok := <-feedChan
if !ok {
@@ -63,8 +61,6 @@ func NewBufReaderBuffersChan(feedChan chan net.Buffers) (r *BufReader) {
func NewBufReaderChan(feedChan chan []byte) (r *BufReader) {
r = &BufReader{
Allocator: NewScalableMemoryAllocator(defaultBufSize),
BufLen: defaultBufSize,
feedData: func() error {
data, ok := <-feedChan
if !ok {
@@ -87,7 +83,9 @@ func NewBufReader(reader io.Reader) (r *BufReader) {
func (r *BufReader) Recycle() {
r.buf = MemoryReader{}
if r.Allocator != nil {
r.Allocator.Recycle()
}
}
func (r *BufReader) Buffered() int {

View File

@@ -1,17 +1,17 @@
package plugin_debug
import (
myproc "github.com/cloudwego/goref/pkg/proc"
"github.com/go-delve/delve/pkg/config"
"github.com/go-delve/delve/service/debugger"
"io"
"m7s.live/m7s/v5"
"net/http"
"net/http/pprof"
"os"
runtimePPROF "runtime/pprof"
"strings"
"time"
myproc "github.com/cloudwego/goref/pkg/proc"
"m7s.live/m7s/v5"
)
var _ = m7s.InstallPlugin[DebugPlugin]()
@@ -19,6 +19,8 @@ var conf, _ = config.LoadConfig()
type DebugPlugin struct {
m7s.Plugin
ProfileDuration time.Duration `default:"10s" desc:"profile持续时间"`
Profile string `desc:"采集profile存储文件"`
ChartPeriod time.Duration `default:"1s" desc:"图表更新周期"`
Grfout string `default:"grf.out" desc:"grf输出文件"`
}
@@ -41,6 +43,24 @@ func (w *WriteToFile) WriteHeader(statusCode int) {
// w.w.WriteHeader(statusCode)
}
func (p *DebugPlugin) OnInit() error {
if p.Profile != "" {
go func() {
file, err := os.Create(p.Profile)
if err != nil {
return
}
defer file.Close()
p.Info("cpu profile start")
err = runtimePPROF.StartCPUProfile(file)
time.Sleep(p.ProfileDuration)
runtimePPROF.StopCPUProfile()
p.Info("cpu profile done")
}()
}
return nil
}
func (p *DebugPlugin) Pprof_Trace(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/debug" + r.URL.Path
pprof.Trace(w, r)

View File

@@ -6,7 +6,6 @@ import (
"os"
"path/filepath"
"slices"
"strings"
"time"
"m7s.live/m7s/v5"
@@ -145,6 +144,13 @@ type Recorder struct {
m7s.DefaultRecorder
}
var CustomFileName = func(job *m7s.RecordJob) string {
if job.Fragment == 0 || job.Append {
return fmt.Sprintf("%s.flv", job.FilePath)
}
return filepath.Join(job.FilePath, time.Now().Local().Format("2006-01-02T15:04:05")+".flv")
}
func (r *Recorder) Run() (err error) {
var file *os.File
var filepositions []uint64
@@ -155,11 +161,7 @@ func (r *Recorder) Run() (err error) {
suber := ctx.Subscriber
noFragment := ctx.Fragment == 0 || ctx.Append
if noFragment {
filePath := ctx.FilePath
if !strings.HasSuffix(filePath, ".flv") {
filePath += ".flv"
}
if file, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666); err != nil {
if file, err = os.OpenFile(CustomFileName(ctx), os.O_CREATE|os.O_RDWR|util.Conditional(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666); err != nil {
return
}
defer writeMetaTag(file, suber, filepositions, times, &duration)
@@ -194,7 +196,7 @@ func (r *Recorder) Run() (err error) {
} else if ctx.Fragment == 0 {
_, err = file.Write(FLVHead)
} else {
if file, err = os.OpenFile(filepath.Join(ctx.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())), os.O_CREATE|os.O_RDWR, 0666); err != nil {
if file, err = os.OpenFile(CustomFileName(ctx), os.O_CREATE|os.O_RDWR, 0666); err != nil {
return
}
_, err = file.Write(FLVHead)
@@ -206,7 +208,7 @@ func (r *Recorder) Run() (err error) {
filepositions = []uint64{0}
times = []float64{0}
offset = 0
if file, err = os.OpenFile(filepath.Join(ctx.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())), os.O_CREATE|os.O_RDWR, 0666); err != nil {
if file, err = os.OpenFile(CustomFileName(ctx), os.O_CREATE|os.O_RDWR, 0666); err != nil {
return
}
_, err = file.Write(FLVHead)

View File

@@ -47,7 +47,7 @@ func (p *MP4Plugin) List(ctx context.Context, req *pb.ReqRecordList) (resp *pb.R
if req.FilePath == "" {
p.DB.Find(&streams, "end_time>? AND start_time<?", startTime, endTime)
} else {
p.DB.Find(&streams, "end_time>? AND start_time<? AND file_path=?", startTime, endTime, req.FilePath)
p.DB.Find(&streams, "end_time>? AND start_time<? AND file_path like ?", startTime, endTime, req.FilePath+"%")
}
resp = &pb.ResponseList{}
for _, stream := range streams {

View File

@@ -84,8 +84,9 @@ type RecordFile struct {
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
FilePath string `protobuf:"bytes,2,opt,name=filePath,proto3" json:"filePath,omitempty"`
StartTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=startTime,proto3" json:"startTime,omitempty"`
EndTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=endTime,proto3" json:"endTime,omitempty"`
StreamPath string `protobuf:"bytes,3,opt,name=streamPath,proto3" json:"streamPath,omitempty"`
StartTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=startTime,proto3" json:"startTime,omitempty"`
EndTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=endTime,proto3" json:"endTime,omitempty"`
}
func (x *RecordFile) Reset() {
@@ -134,6 +135,13 @@ func (x *RecordFile) GetFilePath() string {
return ""
}
func (x *RecordFile) GetStreamPath() string {
if x != nil {
return x.StreamPath
}
return ""
}
func (x *RecordFile) GetStartTime() *timestamppb.Timestamp {
if x != nil {
return x.StartTime
@@ -223,15 +231,17 @@ var file_mp4_proto_rawDesc = []byte{
0x12, 0x1a, 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x14, 0x0a, 0x05,
0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x72, 0x61, 0x6e,
0x67, 0x65, 0x22, 0xa8, 0x01, 0x0a, 0x0a, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x46, 0x69, 0x6c,
0x67, 0x65, 0x22, 0xc8, 0x01, 0x0a, 0x0a, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x46, 0x69, 0x6c,
0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69,
0x64, 0x12, 0x1a, 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x38, 0x0a,
0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1e, 0x0a,
0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x38, 0x0a,
0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74,
0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69,
0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x61, 0x0a,
0x0c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x12, 0x0a,

View File

@@ -21,8 +21,9 @@ message ReqRecordList {
message RecordFile {
uint32 id = 1;
string filePath = 2;
google.protobuf.Timestamp startTime = 3;
google.protobuf.Timestamp endTime = 4;
string streamPath = 3;
google.protobuf.Timestamp startTime = 4;
google.protobuf.Timestamp endTime = 5;
}
message ResponseList {

View File

@@ -21,10 +21,6 @@ type SampleSizeBox struct {
EntrySizelist []uint32
}
func NewSampleSizeBox() *SampleSizeBox {
return &SampleSizeBox{}
}
func (stsz *SampleSizeBox) Size() uint64 {
if stsz.SampleSize == 0 {
return FullBoxLen + 8 + 4*uint64(stsz.SampleCount)

View File

@@ -2,6 +2,7 @@ package mp4
import (
"encoding/binary"
"errors"
"io"
"os"
@@ -164,6 +165,9 @@ func (m *FileMuxer) WriteSample(t *Track, sample Sample) (err error) {
}
func (m *Muxer) WriteSample(w io.Writer, t *Track, sample Sample) (err error) {
if len(sample.Data) == 0 {
return errors.New("sample data is empty")
}
sample.Offset = m.CurrentOffset
sample.Size, err = w.Write(sample.Data)
if err != nil {

View File

@@ -2,10 +2,8 @@ package mp4
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
@@ -147,7 +145,7 @@ func (p *RecordReader) Run() (err error) {
}
for i, stream := range p.Streams {
tsOffset = ts
p.File, err = os.Open(filepath.Join(pullJob.RemoteURL, fmt.Sprintf("%d.mp4", stream.ID)))
p.File, err = os.Open(stream.FilePath)
if err != nil {
return
}

View File

@@ -2,7 +2,7 @@ package mp4
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"io"
"os"
"path/filepath"
"time"
@@ -44,9 +44,11 @@ func (task *writeTrailerTask) Start() (err error) {
task.Error("rewrite with moov", "err", err)
return
}
_, err = task.muxer.File.Seek(0, io.SeekStart)
_, err = temp.Seek(0, io.SeekStart)
_, err = io.Copy(task.muxer.File, temp)
err = task.muxer.File.Close()
err = temp.Close()
fs.MustCopyFile(temp.Name(), task.muxer.File.Name())
return os.Remove(temp.Name())
}
}
@@ -67,45 +69,55 @@ type Recorder struct {
func (r *Recorder) writeTailer(end time.Time) {
r.stream.EndTime = end
if r.RecordJob.Plugin.DB != nil {
r.RecordJob.Plugin.DB.Save(&r.stream)
}
writeTrailerQueueTask.AddTask(&writeTrailerTask{
muxer: r.muxer,
}, r.Logger)
}
var CustomFileName = func(job *m7s.RecordJob) string {
if job.Fragment == 0 {
return fmt.Sprintf("%s.mp4", job.FilePath)
}
return filepath.Join(job.FilePath, time.Now().Local().Format("2006-01-02T15:04:05")+".mp4")
}
func (r *Recorder) createStream(start time.Time) (err error) {
recordJob := &r.RecordJob
sub := recordJob.Subscriber
r.stream = m7s.RecordStream{
StartTime: start,
FilePath: recordJob.FilePath,
var file *os.File
r.stream.FilePath = CustomFileName(&r.RecordJob)
dir := filepath.Dir(r.stream.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
if file, err = os.Create(r.stream.FilePath); err != nil {
return
}
r.muxer, err = NewFileMuxer(file)
r.stream.StreamPath = sub.StreamPath
r.stream.StartTime = start
if sub.Publisher.HasAudioTrack() {
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.FourCC().String()
}
if sub.Publisher.HasVideoTrack() {
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.FourCC().String()
}
if recordJob.Plugin.DB != nil {
recordJob.Plugin.DB.Save(&r.stream)
var file *os.File
if r.RecordJob.Fragment == 0 {
if file, err = os.Create(fmt.Sprintf("%d.mp4", r.RecordJob.FilePath)); err != nil {
return
}
} else {
if file, err = os.Create(filepath.Join(r.RecordJob.FilePath, fmt.Sprintf("%d.mp4", r.stream.ID))); err != nil {
return
}
}
r.muxer, err = NewFileMuxer(file)
return
}
func (r *Recorder) Start() (err error) {
if r.RecordJob.Plugin.DB != nil {
err = r.RecordJob.Plugin.DB.AutoMigrate(&r.stream)
if err != nil {
return
}
}
return r.DefaultRecorder.Start()
}

View File

@@ -199,11 +199,11 @@ func (r *RTPData) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq IAVF
return
}
type RTPAudio struct {
type Audio struct {
RTPData
}
func (r *RTPAudio) Parse(t *AVTrack) (err error) {
func (r *Audio) Parse(t *AVTrack) (err error) {
switch r.MimeType {
case webrtc.MimeTypeOpus:
var ctx OPUSCtx
@@ -278,7 +278,7 @@ func payloadLengthInfoDecode(buf []byte) (int, int, error) {
return l, n, nil
}
func (r *RTPAudio) Demux(codexCtx codec.ICodecCtx) (any, error) {
func (r *Audio) Demux(codexCtx codec.ICodecCtx) (any, error) {
var data AudioData
switch r.MimeType {
case "audio/MP4A-LATM":
@@ -389,7 +389,7 @@ func (r *RTPAudio) Demux(codexCtx codec.ICodecCtx) (any, error) {
return data, nil
}
func (r *RTPAudio) Mux(codexCtx codec.ICodecCtx, from *AVFrame) {
func (r *Audio) Mux(codexCtx codec.ICodecCtx, from *AVFrame) {
data := from.Raw.(AudioData)
var ctx *RTPCtx
var lastPacket *rtp.Packet
@@ -435,7 +435,7 @@ func (r *RTPAudio) Mux(codexCtx codec.ICodecCtx, from *AVFrame) {
lastPacket.Header.Marker = true
}
func (r *RTPAudio) readAUHeaders(ctx *AACCtx, buf []byte, headersLen int) ([]uint64, error) {
func (r *Audio) readAUHeaders(ctx *AACCtx, buf []byte, headersLen int) ([]uint64, error) {
firstRead := false
count := 0

View File

@@ -24,7 +24,7 @@ type Receiver struct {
func (s *Sender) GetMedia() (medias []*Media, err error) {
if s.SubAudio && s.Publisher.PubAudio && s.Publisher.HasAudioTrack() {
audioTrack := s.Publisher.GetAudioTrack(reflect.TypeOf((*mrtp.RTPAudio)(nil)))
audioTrack := s.Publisher.GetAudioTrack(reflect.TypeOf((*mrtp.Audio)(nil)))
if err = audioTrack.WaitReady(); err != nil {
return
}
@@ -88,7 +88,7 @@ func (s *Sender) sendRTP(pack *mrtp.RTPData, channel int) (err error) {
}
func (s *Sender) Send() (err error) {
s.Stream.AddTask(m7s.CreatePlayTask(s.Subscriber, func(audio *mrtp.RTPAudio) error {
s.Stream.AddTask(m7s.CreatePlayTask(s.Subscriber, func(audio *mrtp.Audio) error {
return s.sendRTP(&audio.RTPData, s.AudioChannelID)
}, func(video *mrtp.Video) error {
return s.sendRTP(&video.RTPData, s.VideoChannelID)
@@ -135,7 +135,7 @@ func (r *Receiver) SetMedia(medias []*Media) (err error) {
}
func (r *Receiver) Receive() (err error) {
audioFrame, videoFrame := &mrtp.RTPAudio{}, &mrtp.Video{}
audioFrame, videoFrame := &mrtp.Audio{}, &mrtp.Video{}
audioFrame.SetAllocator(r.MemoryAllocator)
audioFrame.RTPCodecParameters = r.AudioCodecParameters
videoFrame.SetAllocator(r.MemoryAllocator)
@@ -168,10 +168,10 @@ func (r *Receiver) Receive() (err error) {
if err = r.WriteAudio(audioFrame); err != nil {
return
}
audioFrame = &mrtp.RTPAudio{}
audioFrame = &mrtp.Audio{}
audioFrame.AddRecycleBytes(buf)
audioFrame.Packets = []*rtp.Packet{packet}
audioFrame.RTPCodecParameters = r.VideoCodecParameters
audioFrame.RTPCodecParameters = r.AudioCodecParameters
audioFrame.SetAllocator(r.MemoryAllocator)
}
case r.VideoChannelID:

View File

@@ -23,7 +23,7 @@ type SRTPlugin struct {
const defaultConfig = m7s.DefaultYaml(`listenaddr: :6000`)
var _ = m7s.InstallPlugin[SRTPlugin](defaultConfig,pkg.NewPuller, pkg.NewPusher)
var _ = m7s.InstallPlugin[SRTPlugin](defaultConfig, pkg.NewPuller, pkg.NewPusher)
func (p *SRTPlugin) OnInit() error {
var t SRTServer
@@ -69,6 +69,10 @@ func (t *SRTServer) Start() error {
return nil
}
func (t *SRTServer) OnStop() {
t.server.Shutdown()
}
func (t *SRTServer) Run() error {
return t.server.ListenAndServe()
}

View File

@@ -118,6 +118,8 @@ func (t *Transformer) Start() (err error) {
args = append(args, "-f", "flv", to.Target)
case "rtsp":
args = append(args, "-f", "rtsp", to.Target)
case "srt":
args = append(args, "-f", "mpegts", to.Target)
default:
args = append(args, to.Target)
}

View File

@@ -186,7 +186,7 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) {
if videoSender == nil {
suber.SubVideo = false
}
conn.AddTask(m7s.CreatePlayTask(suber, func(frame *mrtp.RTPAudio) (err error) {
conn.AddTask(m7s.CreatePlayTask(suber, func(frame *mrtp.Audio) (err error) {
for _, p := range frame.Packets {
if err = audioTLSRTP.WriteRTP(p); err != nil {
return

View File

@@ -69,7 +69,7 @@ func (IO *Connection) Receive() {
}
mem := util.NewScalableMemoryAllocator(1 << 12)
defer mem.Recycle()
frame := &mrtp.RTPAudio{}
frame := &mrtp.Audio{}
frame.RTPCodecParameters = &codecP
frame.SetAllocator(mem)
for {
@@ -91,7 +91,7 @@ func (IO *Connection) Receive() {
frame.Packets = append(frame.Packets, &packet)
} else {
err = IO.Publisher.WriteAudio(frame)
frame = &mrtp.RTPAudio{}
frame = &mrtp.Audio{}
frame.AddRecycleBytes(buf)
frame.Packets = []*rtp.Packet{&packet}
frame.RTPCodecParameters = &codecP

View File

@@ -186,7 +186,7 @@ func (p *RecordFilePuller) Start() (err error) {
return
}
tx := p.PullJob.Plugin.DB.Find(&p.Streams, "end_time>? AND file_path=?", p.PullStartTime, p.PullJob.RemoteURL)
tx := p.PullJob.Plugin.DB.Find(&p.Streams, "end_time>? AND stream_path=?", p.PullStartTime, p.PullJob.RemoteURL)
if tx.Error != nil {
return tx.Error
}

View File

@@ -35,6 +35,7 @@ type (
ID uint `gorm:"primarykey"`
StartTime, EndTime time.Time
FilePath string
StreamPath string
AudioCodec, VideoCodec string
}
)

View File

@@ -13,6 +13,8 @@ import (
"strings"
"time"
"github.com/shirou/gopsutil/v3/cpu"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/config"
@@ -121,7 +123,7 @@ func NewServer(conf any) (s *Server) {
"arch": sysruntime.GOARCH,
"cpus": int32(sysruntime.NumCPU()),
}
s.Transforms.PublishEvent = make(chan *Publisher, 1)
s.Transforms.PublishEvent = make(chan *Publisher, 10)
s.prometheusDesc.init()
return
}
@@ -356,6 +358,13 @@ func (c *CheckSubWaitTimeout) GetTickInterval() time.Duration {
}
func (c *CheckSubWaitTimeout) Tick(any) {
percents, err := cpu.Percent(time.Second, false)
if err == nil {
for _, cpu := range percents {
c.Info("tick", "cpu", cpu, "streams", c.s.Streams.Length, "subscribers", c.s.Subscribers.Length, "waits", c.s.Waiting.Length)
}
}
for waits := range c.s.Waiting.Range {
for sub := range waits.Range {
select {