From 1a347b5a0bbb14e23db125221a1f0222ea22897b Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Sun, 6 Aug 2023 14:16:06 +0800 Subject: [PATCH] feat: add stop subscribe api, show reasons for subscriber closure --- README.md | 3 +- common/frame.go | 1 + config/types.go | 1 - go.mod | 16 +++---- go.sum | 8 ++-- http.go | 57 ++++++++++++------------ io.go | 11 +++-- lang/zh.yaml | 6 ++- memory-ts.go | 3 ++ plugin.go | 104 ++++++++++++++++++++++++------------------- publisher-rtpdump.go | 12 ++--- publisher.go | 6 ++- subscriber.go | 8 +++- subscribers.go | 9 ++++ summary.go | 91 +++++++++++++------------------------ track/aac.go | 4 +- track/base.go | 16 ++++--- track/g711.go | 13 ++++++ track/h264.go | 16 ++++--- track/h265.go | 14 +++--- track/rtp.go | 10 ++++- util/buffer.go | 3 +- util/index.go | 8 ++++ util/socket.go | 6 +++ 24 files changed, 238 insertions(+), 188 deletions(-) diff --git a/README.md b/README.md index 3aad07b..4a36b78 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,8 @@ - 热更新配置信息 `/api/updateconfig?name=xxx` 热更新xxx插件的配置信息,如果不带参数或参数为空则热更新全局配置 - 获取所有远端拉流信息 `/api/list/pull` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""} - 获取所有向远端推流信息 `/api/list/push` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""} -- 停止推流 `/api/stoppush?url=xxx` 停止向xxx推流 ,成功返回ok +- 停止推流 `/api/stop/push?url=xxx` 停止向xxx推流 ,成功返回ok +- 停止某个订阅者 `/api/stop/subscribe?streamPath=xxx&id=xxx` 停止xxx流的xxx订阅者 ,成功返回ok - 插入SEI帧 `/api/insertsei?streamPath=xxx&type=5` 向xxx流内插入SEI帧 ,成功返回ok。type为SEI类型,可选,默认是5 # 引擎默认配置 ```yaml diff --git a/common/frame.go b/common/frame.go index 4c9dd79..cb1e49e 100644 --- a/common/frame.go +++ b/common/frame.go @@ -177,6 +177,7 @@ func (av *AVFrame) Reset() { av.ADTS = nil } av.Timestamp = 0 + av.IFrame = false av.DataFrame.Reset() } diff --git a/config/types.go b/config/types.go index 36c3933..b594d55 100755 --- a/config/types.go +++ b/config/types.go @@ -135,7 +135,6 @@ type Engine struct { LogLang string `default:"zh"` //日志语言 LogLevel string `default:"info"` //日志级别 RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲长度 - SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间 EventBusSize int `default:"10"` //事件总线大小 PulseInterval time.Duration `default:"5s"` //心跳事件间隔 DisableAll bool `default:"false"` //禁用所有插件 diff --git a/go.mod b/go.mod index 72aeeae..0ad6a9c 100644 --- a/go.mod +++ b/go.mod @@ -3,18 +3,18 @@ module m7s.live/engine/v4 go 1.19 require ( - github.com/aler9/gortsplib/v2 v2.2.2 + github.com/bluenviron/mediacommon v0.7.0 github.com/cnotch/ipchub v1.1.0 github.com/google/uuid v1.3.0 github.com/logrusorgru/aurora v2.0.3+incompatible github.com/mcuadros/go-defaults v1.2.0 - github.com/pion/rtp v1.7.13 - github.com/pion/webrtc/v3 v3.1.49 + github.com/pion/rtp v1.8.0 + github.com/pion/webrtc/v3 v3.1.56 github.com/q191201771/naza v0.30.8 github.com/quic-go/quic-go v0.32.0 - github.com/shirou/gopsutil/v3 v3.22.10 - go.uber.org/zap v1.23.0 - golang.org/x/net v0.8.0 + github.com/shirou/gopsutil/v3 v3.22.11 + go.uber.org/zap v1.24.0 + golang.org/x/net v0.12.0 golang.org/x/sync v0.1.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -43,9 +43,9 @@ require ( github.com/tklauser/numcpus v0.6.0 // indirect github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 github.com/yusufpapurcu/wmi v1.2.2 // indirect - golang.org/x/crypto v0.4.0 // indirect + golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect golang.org/x/mod v0.7.0 // indirect - golang.org/x/sys v0.6.0 // indirect + golang.org/x/sys v0.10.0 // indirect golang.org/x/tools v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index 6a11735..bfdff85 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/aler9/gortsplib/v2 v2.2.2 h1:tTw8pdKSOEjlZjjE1S4ftXPHJkYOqjNNv3hjQ0Nto9M= -github.com/aler9/gortsplib/v2 v2.2.2/go.mod h1:k6uBVHGwsIc/0L5SLLqWwi6bSJUb4VR0HfvncyHlKQI= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/bluenviron/mediacommon v0.7.0 h1:dJWLLL9oDbAqfK8KuNfnDUQwNbeMAtGeRjZc9Vo95js= +github.com/bluenviron/mediacommon v0.7.0/go.mod h1:wuLJdxcITiSPgY1MvQqrX+qPlKmNfeV9wNvXth5M98I= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -141,15 +141,13 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4= -github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33 h1:uyZY++dluUg7iTSsNzuOVln/mC2U2KXwgKLfKLCJ74Y= -github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc= github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 h1:cj4I+bvWX9I+Hg6tnZ7DAiOVxzhyLhdvYVKp+WpM/2c= github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/http.go b/http.go index 5b163c6..72ceb81 100644 --- a/http.go +++ b/http.go @@ -2,6 +2,7 @@ package engine import ( "encoding/json" + "fmt" "io" "net/http" "os" @@ -9,6 +10,7 @@ import ( "strings" "time" + "go.uber.org/zap" "gopkg.in/yaml.v3" "m7s.live/engine/v4/codec" "m7s.live/engine/v4/config" @@ -40,33 +42,12 @@ func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } } -func fetchSummary() *Summary { - return &summary -} - func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) { y := ShouldYaml(r) - if r.Header.Get("Accept") == "text/event-stream" { - summary.Add() - defer summary.Done() - if y { - util.ReturnYaml(fetchSummary, time.Second, rw, r) - } else { - util.ReturnJson(fetchSummary, time.Second, rw, r) - } + if y { + util.ReturnYaml(util.FetchValue(&summary), time.Second, rw, r) } else { - if !summary.Running() { - summary.collect() - } - summary.rw.RLock() - defer summary.rw.RUnlock() - if y { - if err := yaml.NewEncoder(rw).Encode(&summary); err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - } - } else if err := json.NewEncoder(rw).Encode(&summary); err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - } + util.ReturnJson(util.FetchValue(&summary), time.Second, rw, r) } } @@ -84,9 +65,9 @@ func (conf *GlobalConfig) API_stream(rw http.ResponseWriter, r *http.Request) { if streamPath := r.URL.Query().Get("streamPath"); streamPath != "" { if s := Streams.Get(streamPath); s != nil { if ShouldYaml(r) { - util.ReturnYaml(func() *Stream { return s }, time.Second, rw, r) + util.ReturnYaml(util.FetchValue(s), time.Second, rw, r) } else { - util.ReturnJson(func() *Stream { return s }, time.Second, rw, r) + util.ReturnJson(util.FetchValue(s), time.Second, rw, r) } } else { http.Error(rw, NO_SUCH_STREAM, http.StatusNotFound) @@ -218,17 +199,35 @@ func (conf *GlobalConfig) API_list_push(w http.ResponseWriter, r *http.Request) }, time.Second, w, r) } -func (conf *GlobalConfig) API_stopPush(w http.ResponseWriter, r *http.Request) { +func (conf *GlobalConfig) API_stop_push(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() pusher, ok := Pushers.Load(q.Get("url")) if ok { pusher.(IPusher).Stop() - w.Write([]byte("ok")) + fmt.Fprintln(w, "ok") } else { http.Error(w, "no such pusher", http.StatusNotFound) } } +func (conf *GlobalConfig) API_stop_subscribe(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + streamPath := q.Get("streamPath") + id := q.Get("id") + s := Streams.Get(streamPath) + if s == nil { + http.Error(w, NO_SUCH_STREAM, http.StatusNotFound) + return + } + suber := s.Subscribers.Find(id) + if suber == nil { + http.Error(w, "no such subscriber", http.StatusNotFound) + return + } + suber.Stop(zap.String("reason", "stop by api")) + fmt.Fprintln(w, "ok") +} + func (conf *GlobalConfig) API_replay_rtpdump(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() streamPath := q.Get("streamPath") @@ -314,7 +313,7 @@ func (conf *GlobalConfig) API_replay_ts(w http.ResponseWriter, r *http.Request) } else { tsReader := NewTSReader(&pub) pub.SetIO(f) - go func(){ + go func() { tsReader.Feed(f) tsReader.Close() }() diff --git a/io.go b/io.go index d444346..7a6c24a 100644 --- a/io.go +++ b/io.go @@ -12,6 +12,7 @@ import ( "time" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "m7s.live/engine/v4/config" "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" @@ -36,6 +37,7 @@ type AuthPub interface { type IO struct { ID string Type string + RemoteAddr string context.Context `json:"-" yaml:"-"` //不要直接设置,应当通过OnEvent传入父级Context context.CancelFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者 *log.Logger `json:"-" yaml:"-"` @@ -92,7 +94,7 @@ type IIO interface { receive(string, IIO) error IsClosed() bool OnEvent(any) - Stop() + Stop(reason ...zapcore.Field) SetIO(any) SetParentCtx(context.Context) SetLogger(*log.Logger) @@ -113,9 +115,11 @@ func (i *IO) close() bool { } // Stop 停止订阅或者发布,由订阅者或者发布者调用 -func (io *IO) Stop() { +func (io *IO) Stop(reason ...zapcore.Field) { if io.close() { - io.Debug("stop", zap.Stack("stack")) + io.Info("stop", reason...) + } else { + io.Warn("already stopped", reason...) } } @@ -194,6 +198,7 @@ func (io *IO) receive(streamPath string, specific IIO) error { } else if oldPublisher == specific { //断线重连 } else { + s.Warn("duplicate publish", zap.String("type", oldPublisher.GetPublisher().Type)) return ErrDuplicatePublish } } diff --git a/lang/zh.yaml b/lang/zh.yaml index 7447d16..efaacf9 100644 --- a/lang/zh.yaml +++ b/lang/zh.yaml @@ -19,6 +19,7 @@ state: 状态 initialize: 初始化 "start read": 开始读取 "start pull": 开始从远端拉流 +"restart pull": 重新拉流 "pull failed": 拉取失败 "wait publisher": 等待发布者发布 "wait timeout": 等待超时 @@ -45,6 +46,7 @@ reamins: 剩余 "video track attached": 视频轨道已附加 "audio track attached": 音频轨道已附加 "data track attached": 数据轨道已附加 +"track back online": 轨道已恢复 "first frame read": 第一帧已读取 "fu have no start": rtp的FU起始包丢了 "disabled by env": 被环境变量禁用 @@ -54,4 +56,6 @@ reamins: 剩余 firstTs: 第一帧时间戳 firstSeq: 第一帧序列号 skipSeq: 跳过序列号 -skipTs: 跳过时间戳 \ No newline at end of file +skipTs: 跳过时间戳 +"nalu type not supported": nalu类型不支持 +"create file": 创建文件 diff --git a/memory-ts.go b/memory-ts.go index 43578d6..19a65aa 100644 --- a/memory-ts.go +++ b/memory-ts.go @@ -50,6 +50,9 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M var tsHeaderLength int for i := 0; len(pesBuffers) > 0; i++ { if bigLen < mpegts.TS_PACKET_SIZE { + if i == 0 { + ts.Recycle() + } headerItem := ts.Get(mpegts.TS_PACKET_SIZE) ts.BLL.Push(headerItem) bwTsHeader = &headerItem.Value diff --git a/plugin.go b/plugin.go index e9cd6c1..6c5e1df 100644 --- a/plugin.go +++ b/plugin.go @@ -294,7 +294,9 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { copyConfig := *conf.GetSubscribeConfig() suber.Config = ©Config } - suber.ID = fmt.Sprintf("%s_%d", suber.ID, uintptr(unsafe.Pointer(suber))) + if suber.ID == "" { + suber.ID = fmt.Sprintf("%d", uintptr(unsafe.Pointer(suber))) + } return sub.Subscribe(streamPath, sub) } @@ -310,60 +312,66 @@ var ErrNoPullConfig = errors.New("no pull config") var Pullers sync.Map func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) (err error) { - zurl := zap.String("url", url) - zpath := zap.String("path", streamPath) - opt.Info("pull", zpath, zurl) - defer func() { - if err != nil { - opt.Error("pull failed", zurl, zap.Error(err)) - } - }() conf, ok := opt.Config.(config.PullConfig) if !ok { return ErrNoPullConfig } pullConf := conf.GetPullConfig() - - puller.init(streamPath, url, pullConf) - puller.SetLogger(opt.Logger.With(zpath, zurl)) - go func() { - Pullers.Store(puller, url) - defer Pullers.Delete(puller) - for opt.Info("start pull", zurl); puller.Reconnect(); opt.Warn("restart pull", zurl) { - if err = puller.Connect(); err != nil { - if err == io.EOF { - puller.GetPublisher().Stream.Close() - opt.Info("pull complete", zurl) - return - } - opt.Error("pull connect", zurl, zap.Error(err)) - time.Sleep(time.Second * 5) - } else { - if err = opt.Publish(streamPath, puller); err != nil { - if stream := Streams.Get(streamPath); stream != nil { - if stream.Publisher != puller && stream.Publisher != nil { - io := stream.Publisher.GetPublisher() - opt.Error("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err)) - return - } else { - opt.Warn("pull publish", zurl, zap.Error(err)) - } - } else { - opt.Error("pull publish", zurl, zap.Error(err)) + if save < 2 { + zurl := zap.String("url", url) + zpath := zap.String("path", streamPath) + opt.Info("pull", zpath, zurl) + defer func() { + if err != nil { + opt.Error("pull failed", zurl, zap.Error(err)) + } + }() + puller.init(streamPath, url, pullConf) + puller.SetLogger(opt.Logger.With(zpath, zurl)) + badPuller := true + go func() { + Pullers.Store(puller, url) + defer Pullers.Delete(puller) + for opt.Info("start pull", zurl); puller.Reconnect(); opt.Warn("restart pull", zurl) { + if err = puller.Connect(); err != nil { + if err == io.EOF { + puller.GetPublisher().Stream.Close() + opt.Info("pull complete", zurl) return } + opt.Error("pull connect", zurl, zap.Error(err)) + if badPuller { + return + } + time.Sleep(time.Second * 5) + } else { + if err = opt.Publish(streamPath, puller); err != nil { + if stream := Streams.Get(streamPath); stream != nil { + if stream.Publisher != puller && stream.Publisher != nil { + io := stream.Publisher.GetPublisher() + opt.Error("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err)) + return + } else { + opt.Warn("pull publish", zurl, zap.Error(err)) + } + } else { + opt.Error("pull publish", zurl, zap.Error(err)) + return + } + } + badPuller = false + if err = puller.Pull(); err != nil && !puller.IsShutdown() { + opt.Error("pull", zurl, zap.Error(err)) + } } - if err = puller.Pull(); err != nil && !puller.IsShutdown() { - opt.Error("pull", zurl, zap.Error(err)) + if puller.IsShutdown() { + opt.Info("stop pull shutdown", zurl) + return } } - if puller.IsShutdown() { - opt.Info("stop pull shutdown", zurl) - return - } - } - opt.Warn("stop pull stop reconnect", zurl) - }() + opt.Warn("stop pull stop reconnect", zurl) + }() + } switch save { case 1: pullConf.AddPullOnStart(streamPath, url) @@ -400,7 +408,7 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool pushConfig := conf.GetPushConfig() pusher.init(streamPath, url, pushConfig) - + badPusher := true go func() { Pushers.Store(url, pusher) defer Pushers.Delete(url) @@ -418,10 +426,14 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool opt.Error("push connect", zp, zu, zap.Error(err)) time.Sleep(time.Second * 5) stream.Receive(pusher) // 通知stream移除订阅者 + if badPusher { + return + } } else if err = pusher.Push(); err != nil && !stream.IsClosed() { opt.Error("push", zp, zu, zap.Error(err)) pusher.Stop() } + badPusher = false if stream.IsClosed() { opt.Info("stop push closed", zp, zu) return diff --git a/publisher-rtpdump.go b/publisher-rtpdump.go index a9772fd..2da014c 100644 --- a/publisher-rtpdump.go +++ b/publisher-rtpdump.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/aler9/gortsplib/v2/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/pion/webrtc/v3/pkg/media/rtpdump" "go.uber.org/zap" "m7s.live/engine/v4/codec" @@ -20,7 +20,7 @@ type RTPDumpPublisher struct { ACodec codec.AudioCodecID VPayloadType uint8 APayloadType uint8 - other *rtpdump.Packet + other rtpdump.Packet sync.Mutex } @@ -77,15 +77,15 @@ func (t *RTPDumpPublisher) Feed(file *os.File) { if needLock { t.Lock() } - if t.other == nil { - t.other = &packet + if t.other.Payload == nil { + t.other = packet t.Unlock() needLock = true continue } - if packet.Offset > t.other.Offset { + if packet.Offset >= t.other.Offset { t.WriteRTP(t.other.Payload) - t.other = &packet + t.other = packet t.Unlock() needLock = true continue diff --git a/publisher.go b/publisher.go index 148a540..41af08f 100644 --- a/publisher.go +++ b/publisher.go @@ -2,6 +2,7 @@ package engine import ( "go.uber.org/zap" + "go.uber.org/zap/zapcore" "m7s.live/engine/v4/codec" "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" @@ -34,10 +35,11 @@ func (p *Publisher) GetPublisher() *Publisher { return p } -func (p *Publisher) Stop() { - p.IO.Stop() +func (p *Publisher) Stop(reason ...zapcore.Field) { + p.IO.Stop(reason...) p.Stream.Receive(ACTION_PUBLISHLOST) } + func (p *Publisher) getAudioTrack() common.AudioTrack { return p.AudioTrack } diff --git a/subscriber.go b/subscriber.go index 83ded5d..8792676 100644 --- a/subscriber.go +++ b/subscriber.go @@ -105,7 +105,7 @@ type ISubscriber interface { PlayRaw() PlayBlock(byte) PlayFLV() - Stop() + Stop(reason ...zapcore.Field) Subscribe(streamPath string, sub ISubscriber) error } @@ -316,6 +316,9 @@ func (s *Subscriber) PlayBlock(subType byte) { if hasVideo { for ctx.Err() == nil { err := s.VideoReader.ReadFrame(subMode) + if err == nil { + err = ctx.Err() + } if err != nil { stopReason = zap.Error(err) return @@ -361,6 +364,9 @@ func (s *Subscriber) PlayBlock(subType byte) { } } err := s.AudioReader.ReadFrame(subMode) + if err == nil { + err = ctx.Err() + } if err != nil { stopReason = zap.Error(err) return diff --git a/subscribers.go b/subscribers.go index 96dc5af..a678b31 100644 --- a/subscribers.go +++ b/subscribers.go @@ -94,6 +94,15 @@ func (s *Subscribers) AbortWait() { } } +func (s *Subscribers) Find(id string) ISubscriber { + for sub := range s.public { + if sub.GetSubscriber().ID == id { + return sub + } + } + return nil +} + func (s *Subscribers) Delete(suber ISubscriber) { delete(s.public, suber) io := suber.GetSubscriber() diff --git a/summary.go b/summary.go index c61b7e6..8c1e24b 100644 --- a/summary.go +++ b/summary.go @@ -1,25 +1,23 @@ package engine import ( + "encoding/json" "sync" - "sync/atomic" "time" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/mem" "github.com/shirou/gopsutil/v3/net" - "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) -var summary Summary -var children util.Map[string, *Summary] - -func init() { - go summary.Start() -} - +var ( + summary SummaryUtil + lastSummary Summary + children util.Map[string, *Summary] + collectLock sync.Mutex +) // ServerSummary 系统摘要定义 type Summary struct { Address string @@ -36,11 +34,9 @@ type Summary struct { Used uint64 Usage float64 } - NetWork []NetWorkInfo - Streams []StreamSummay - lastNetWork []net.IOCountersStat - ref atomic.Int32 - rw sync.RWMutex + NetWork []NetWorkInfo + Streams []StreamSummay + ts time.Time //上次更新时间 } // NetWorkInfo 网速信息 @@ -51,51 +47,28 @@ type NetWorkInfo struct { ReceiveSpeed uint64 SentSpeed uint64 } - -// StartSummary 开始定时采集数据,每秒一次 -func (s *Summary) Start() { - for range time.Tick(time.Second) { - if s.Running() { - summary.collect() - } - } -} -func (s *Summary) Point() *Summary { - return s -} - -// Running 是否正在采集数据 -func (s *Summary) Running() bool { - return s.ref.Load() > 0 -} - -// Add 增加订阅者 -func (s *Summary) Add() { - if count := s.ref.Add(1); count == 1 { - log.Info("start report summary") - } else { - log.Info("summary count", count) - } -} - -// Done 删除订阅者 -func (s *Summary) Done() { - if count := s.ref.Add(-1); count == 0 { - log.Info("stop report summary") - s.lastNetWork = nil - } else { - log.Info("summary count", count) - } -} - +type SummaryUtil Summary // Report 上报数据 func (s *Summary) Report(slave *Summary) { children.Set(slave.Address, slave) } -func (s *Summary) collect() *Summary { - s.rw.Lock() - defer s.rw.Unlock() +func (s *SummaryUtil) MarshalJSON() ([]byte, error) { + return json.Marshal(s.collect()) +} + +func (s *SummaryUtil) MarshalYAML() (any, error) { + return s.collect(), nil +} + +func (s *SummaryUtil) collect() *Summary { + collectLock.Lock() + defer collectLock.Unlock() + dur := time.Since(s.ts) + if dur < time.Second { + return &lastSummary + } + s.ts = time.Now() v, _ := mem.VirtualMemory() d, _ := disk.Usage("/") nv, _ := net.IOCounters(true) @@ -119,16 +92,16 @@ func (s *Summary) collect() *Summary { Receive: n.BytesRecv, Sent: n.BytesSent, } - if s.lastNetWork != nil && len(s.lastNetWork) > i { - info.ReceiveSpeed = n.BytesRecv - s.lastNetWork[i].BytesRecv - info.SentSpeed = n.BytesSent - s.lastNetWork[i].BytesSent + if len(lastSummary.NetWork) > i { + info.ReceiveSpeed = (n.BytesRecv - lastSummary.NetWork[i].Receive) / uint64(dur.Seconds()) + info.SentSpeed = (n.BytesSent - lastSummary.NetWork[i].Sent) / uint64(dur.Seconds()) } netWorks = append(netWorks, info) } s.NetWork = netWorks - s.lastNetWork = nv s.Streams = util.MapList(&Streams, func(name string, ss *Stream) StreamSummay { return ss.Summary() }) - return s + lastSummary = Summary(*s) + return &lastSummary } diff --git a/track/aac.go b/track/aac.go index 78802cd..993c6cf 100644 --- a/track/aac.go +++ b/track/aac.go @@ -5,7 +5,7 @@ import ( "io" "net" - "github.com/aler9/gortsplib/v2/pkg/bits" + "github.com/bluenviron/mediacommon/pkg/bits" "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" @@ -171,7 +171,7 @@ func (aac *AAC) WriteSequenceHead(sh []byte) { aac.Channels = ((config2 >> 3) & 0x0F) //声道 aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) aac.Parse(aac.SequenceHead[2:]) - aac.Attach() + go aac.Attach() } func (aac *AAC) WriteAVCC(ts uint32, frame *util.BLL) error { diff --git a/track/base.go b/track/base.go index 2deaace..f556737 100644 --- a/track/base.go +++ b/track/base.go @@ -32,7 +32,7 @@ func (p *流速控制) 根据起始DTS计算绝对时间戳(dts time.Duration) t return ((dts-p.起始dts)*time.Millisecond + p.起始时间戳*90) / 90 } -func (p *流速控制) 控制流速(绝对时间戳 time.Duration, dts time.Duration) { +func (p *流速控制) 控制流速(绝对时间戳 time.Duration, dts time.Duration) (等待了 time.Duration) { 数据时间差, 实际时间差 := 绝对时间戳-p.起始时间戳, time.Since(p.起始时间) // println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05")) // if 实际时间差 > 数据时间差 { @@ -43,19 +43,18 @@ func (p *流速控制) 控制流速(绝对时间戳 time.Duration, dts time.Dura if 过快 := (数据时间差 - 实际时间差); 过快 > 100*time.Millisecond { // fmt.Println("过快毫秒", 过快.Milliseconds()) // println("过快毫秒", p.name, 过快.Milliseconds()) - // if log.Trace { - // log.Trace("sleep", zap.Duration("sleep", 过快)) - // } if 过快 > p.等待上限 { - time.Sleep(p.等待上限) + 等待了 = p.等待上限 } else { - time.Sleep(过快) + 等待了 = 过快 } + time.Sleep(等待了) } else if 过快 < -100*time.Millisecond { // fmt.Println("过慢毫秒", 过快.Milliseconds()) // p.重置(绝对时间戳, dts) // println("过慢毫秒", p.name, 过快.Milliseconds()) } + return } type SpesificTrack interface { @@ -305,7 +304,10 @@ func (av *Media) Flush() { av.ComputeBPS(curValue.BytesIn) av.Step() if av.等待上限 > 0 { - av.控制流速(curValue.Timestamp, curValue.DTS) + 等待了 := av.控制流速(curValue.Timestamp, curValue.DTS) + if log.Trace && 等待了 > 0 { + av.Trace("speed control", zap.Duration("sleep", 等待了)) + } } } diff --git a/track/g711.go b/track/g711.go index f24d8f0..dfd5167 100644 --- a/track/g711.go +++ b/track/g711.go @@ -66,3 +66,16 @@ func (g711 *G711) WriteRTPFrame(frame *RTPFrame) { g711.AppendAuBytes(frame.Payload) g711.Flush() } + +func (g711 *G711) CompleteRTP(value *AVFrame) { + if value.AUList.ByteLength > RTPMTU { + var packets [][][]byte + r := value.AUList.Next.Value.NewReader() + for bufs := r.ReadN(RTPMTU); len(bufs) > 0; bufs = r.ReadN(RTPMTU) { + packets = append(packets, bufs) + } + g711.PacketizeRTP(packets...) + } else { + g711.Audio.CompleteRTP(value) + } +} \ No newline at end of file diff --git a/track/h264.go b/track/h264.go index ee5e87c..5f9e3a1 100644 --- a/track/h264.go +++ b/track/h264.go @@ -35,6 +35,10 @@ func (vt *H264) WriteSliceBytes(slice []byte) { if log.Trace { vt.Trace("naluType", zap.Uint8("naluType", naluType.Byte())) } + if vt.Value.IFrame { + vt.AppendAuBytes(slice) + return + } switch naluType { case codec.NALU_SPS: spsInfo, _ := codec.ParseSPS(slice) @@ -78,7 +82,7 @@ func (vt *H264) WriteSliceBytes(slice []byte) { case codec.NALU_Access_Unit_Delimiter: case codec.NALU_Filler_Data: default: - vt.Error("WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType))) + vt.Error("nalu type not support", zap.Int("type", int(naluType))) } } @@ -155,6 +159,7 @@ func (vt *H264) CompleteRTP(value *AVFrame) { if value.IFrame { out = append(out, [][]byte{vt.SPS}, [][]byte{vt.PPS}) } + startIndex := len(out) vt.Value.AUList.Range(func(au *util.BLL) bool { if au.ByteLength < RTPMTU { out = append(out, au.ToBuffers()) @@ -164,14 +169,11 @@ func (vt *H264) CompleteRTP(value *AVFrame) { b0, _ := r.ReadByte() naluType = naluType.Parse(b0) b0 = codec.NALU_FUA.Or(b0 & 0x60) - buf := [][]byte{{b0, naluType.Or(1 << 7)}} - buf = append(buf, r.ReadN(RTPMTU-2)...) - out = append(out, buf) for bufs := r.ReadN(RTPMTU); len(bufs) > 0; bufs = r.ReadN(RTPMTU) { - buf = append([][]byte{{b0, naluType.Byte()}}, bufs...) - out = append(out, buf) + out = append(out, append([][]byte{{b0, naluType.Byte()}}, bufs...)) } - buf[0][1] |= 1 << 6 // set end bit + out[startIndex][0][1] |= 1 << 7 // set start bit + out[len(out)-1][0][1] |= 1 << 6 // set end bit } return true }) diff --git a/track/h265.go b/track/h265.go index f93d486..7718d0b 100644 --- a/track/h265.go +++ b/track/h265.go @@ -69,10 +69,10 @@ func (vt *H265) WriteSliceBytes(slice []byte) { case 0, 1, 2, 3, 4, 5, 6, 7, 8, 9: vt.Value.IFrame = false vt.AppendAuBytes(slice) - case codec.NAL_UNIT_SEI: + case codec.NAL_UNIT_SEI, codec.NAL_UNIT_SEI_SUFFIX: vt.AppendAuBytes(slice) default: - vt.Warn("h265 slice type not supported", zap.Uint("type", uint(t))) + vt.Warn("nalu type not supported", zap.Uint("type", uint(t))) } } func (vt *H265) writeSequenceHead(head []byte) (err error) { @@ -189,6 +189,7 @@ func (vt *H265) CompleteRTP(value *AVFrame) { if value.IFrame { out = append(out, [][]byte{vt.VPS}, [][]byte{vt.SPS}, [][]byte{vt.PPS}) } + startIndex := len(out) vt.Value.AUList.Range(func(au *util.BLL) bool { if au.ByteLength < RTPMTU { out = append(out, au.ToBuffers()) @@ -199,14 +200,11 @@ func (vt *H265) CompleteRTP(value *AVFrame) { b1, _ := r.ReadByte() naluType = naluType.Parse(b0) b0 = (byte(codec.NAL_UNIT_RTP_FU) << 1) | (b0 & 0b10000001) - buf := [][]byte{{b0, b1, (1 << 7) | byte(naluType)}} - buf = append(buf, r.ReadN(RTPMTU-3)...) - out = append(out, buf) for bufs := r.ReadN(RTPMTU); len(bufs) > 0; bufs = r.ReadN(RTPMTU) { - buf = append([][]byte{{b0, b1, byte(naluType)}}, bufs...) - out = append(out, buf) + out = append(out, append([][]byte{{b0, b1, byte(naluType)}}, bufs...)) } - buf[0][2] |= 1 << 6 // set end bit + out[startIndex][0][2] |= 1 << 7 // set start bit + out[len(out)-1][0][2] |= 1 << 6 // set end bit } return true }) diff --git a/track/rtp.go b/track/rtp.go index b894bbd..16350b6 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -60,7 +60,15 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) { } packet.Marker = false for _, p := range pp { - br.Write(p) + if _, err := br.Write(p); err != nil { + av.Error("rtp payload write error", zap.Error(err)) + for i, pp := range payloads { + for j, p := range pp { + av.Error("rtp payload", zap.Int("i", i), zap.Int("j", j), zap.Int("len", len(p))) + } + } + return + } } packet.Payload = br.Bytes() av.Value.RTP.Push(rtpItem) diff --git a/util/buffer.go b/util/buffer.go index e04639b..3e749bd 100644 --- a/util/buffer.go +++ b/util/buffer.go @@ -55,7 +55,8 @@ func (b *LimitBuffer) Write(a []byte) (n int, err error) { l := b.Len() newL := l + len(a) if c := b.Cap(); newL > c { - panic(fmt.Sprintf("LimitBuffer Write %d > %d", newL, c)) + return 0, fmt.Errorf("LimitBuffer Write %d > %d", newL, c) + // panic(fmt.Sprintf("LimitBuffer Write %d > %d", newL, c)) } else { b.Buffer = b.Buffer.SubBuf(0, newL) copy(b.Buffer[l:], a) diff --git a/util/index.go b/util/index.go index c8c77f6..d6411c5 100644 --- a/util/index.go +++ b/util/index.go @@ -56,3 +56,11 @@ func IsSubdir(baseDir, joinedDir string) bool { } return !strings.HasPrefix(rel, "..") && !strings.HasPrefix(rel, "/") } + +func Conditoinal[T any](cond bool, t, f T) T { + if cond { + return t + } else { + return f + } +} \ No newline at end of file diff --git a/util/socket.go b/util/socket.go index 636285e..7985b6b 100644 --- a/util/socket.go +++ b/util/socket.go @@ -11,6 +11,12 @@ import ( "gopkg.in/yaml.v3" ) +func FetchValue[T any](t T) func() T { + return func() T { + return t + } +} + func ReturnJson[T any](fetch func() T, tickDur time.Duration, rw http.ResponseWriter, r *http.Request) { if r.Header.Get("Accept") == "text/event-stream" { sse := NewSSE(rw, r.Context())