From f4bf54d74616a6fb5ab5b99671fee0d244433aa7 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Sat, 9 Jul 2022 04:59:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86track=E4=B8=AD=E6=9C=80=E6=96=B0?= =?UTF-8?q?=E7=9A=84=E6=95=B0=E6=8D=AE=E5=8F=AF=E5=BA=8F=E5=88=97=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/frame.go | 6 +++--- common/ring_av.go | 6 +++++- config/types.go | 28 ++++++++++++++++++---------- io.go | 4 ++-- stream.go | 8 ++++---- track/audio.go | 15 +++++++++++++++ track/base.go | 6 ++++-- track/video.go | 18 ++++++++++++++++++ 8 files changed, 69 insertions(+), 22 deletions(-) diff --git a/common/frame.go b/common/frame.go index 328125c..365204b 100644 --- a/common/frame.go +++ b/common/frame.go @@ -112,9 +112,9 @@ type AVFrame[T RawSlice] struct { IFrame bool PTS uint32 DTS uint32 - AVCC net.Buffers // 打包好的AVCC格式 - RTP []*RTPFrame - Raw []T // 裸数据 + AVCC net.Buffers `json:"-"` // 打包好的AVCC格式 + RTP []*RTPFrame `json:"-"` + Raw []T `json:"-"` // 裸数据 canRead bool } diff --git a/common/ring_av.go b/common/ring_av.go index bb728f6..afb1c7c 100644 --- a/common/ring_av.go +++ b/common/ring_av.go @@ -2,6 +2,7 @@ package common import ( "context" + "encoding/json" "runtime" "time" ) @@ -11,8 +12,11 @@ type AVRing[T RawSlice] struct { Poll time.Duration } -func (r *AVRing[T]) Step() *AVFrame[T] { +func (av *AVRing[T]) MarshalJSON() ([]byte, error) { + return json.Marshal(av.PreValue()) +} +func (r *AVRing[T]) Step() *AVFrame[T] { last := &r.RingBuffer.Value current := r.RingBuffer.MoveNext() current.Sequence = r.MoveCount diff --git a/config/types.go b/config/types.go index 06e9551..53e0251 100755 --- a/config/types.go +++ b/config/types.go @@ -83,22 +83,28 @@ func (p *Push) AddPush(streamPath string, url string) { p.PushList[streamPath] = url } +type Console struct { + Server string //远程控制台地址 + Secret string //远程控制台密钥 + PublicAddr string //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址 + PublicAddrTLS string +} + type Engine struct { Publish Subscribe HTTP RTPReorder bool - EnableAVCC bool //启用AVCC格式,rtmp协议使用 - EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用 - ConsoleURL string //远程控制台地址 - Secret string //远程控制台密钥 + EnableAVCC bool //启用AVCC格式,rtmp协议使用 + EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用 + Console } type myResponseWriter struct { *websocket.Conn } func (w *myResponseWriter) Write(b []byte) (int, error) { - return len(b), websocket.Message.Send(w.Conn,b) + return len(b), websocket.Message.Send(w.Conn, b) } func (w *myResponseWriter) Header() http.Header { @@ -111,10 +117,10 @@ func (cfg *Engine) OnEvent(event any) { case context.Context: go func() { for { - conn, err := websocket.Dial(cfg.ConsoleURL, "", "https://console.monibuca.com") + conn, err := websocket.Dial(cfg.Server, "", "https://console.monibuca.com") wr := &myResponseWriter{conn} if err != nil { - log.Error("connect to console server ", cfg.ConsoleURL, " ", err) + log.Error("connect to console server ", cfg.Server, " ", err) time.Sleep(time.Second * 5) continue } @@ -125,10 +131,10 @@ func (cfg *Engine) OnEvent(event any) { var rMessage map[string]interface{} if err := websocket.JSON.Receive(conn, &rMessage); err == nil { if rMessage["code"].(float64) != 0 { - log.Error("connect to console server ", cfg.ConsoleURL, " ", rMessage["msg"]) + log.Error("connect to console server ", cfg.Server, " ", rMessage["msg"]) return } else { - log.Info("connect to console server ", cfg.ConsoleURL, " success") + log.Info("connect to console server ", cfg.Server, " success") } } for { @@ -171,5 +177,7 @@ var Global = &Engine{ Publish{true, true, false, 10, 0}, Subscribe{true, true, true, false, 10}, HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux}, - false, true, true, "wss://console.monibuca.com:9999/ws/v1", "", + false, true, true, Console{ + "wss://console.monibuca.com:9999/ws/v1", "", "", "", + }, } diff --git a/io.go b/io.go index 9070dfb..1c4d663 100644 --- a/io.go +++ b/io.go @@ -28,8 +28,8 @@ type IO[C IOConfig, S IIO] struct { context.Context `json:"-"` //不要直接设置,应当通过OnEvent传入父级Context context.CancelFunc `json:"-"` //流关闭是关闭发布者或者订阅者 *zap.Logger `json:"-"` - StartTime uint32 //创建时间 - Stream *Stream `json:"-"` + StartTime time.Time //创建时间 + Stream *Stream `json:"-"` io.Reader `json:"-"` io.Writer `json:"-"` io.Closer `json:"-"` diff --git a/stream.go b/stream.go index d078adf..1ea0482 100644 --- a/stream.go +++ b/stream.go @@ -143,7 +143,7 @@ type StreamSummay struct { State StreamState Subscribers int Tracks []string - StartTime uint32 + StartTime time.Time Type string BPS int } @@ -161,7 +161,7 @@ func (s *Stream) Summary() (r StreamSummay) { r.Path = s.Path r.State = s.State r.Subscribers = len(s.Subscribers) - r.StartTime = uint32(s.StartTime.Unix()) + r.StartTime = s.StartTime return } @@ -309,7 +309,7 @@ func (s *Stream) run() { io := v.Value.GetIO() io.Spesic = v.Value io.Stream = s - io.StartTime = uint32(time.Now().Unix()) + io.StartTime = time.Now() io.Logger = s.With(zap.String("type", io.Type)) if io.ID != "" { io.Logger = io.Logger.With(zap.String("ID", io.ID)) @@ -332,7 +332,7 @@ func (s *Stream) run() { s.WaitTimeout = wt } io.Stream = s - io.StartTime = uint32(time.Now().Unix()) + io.StartTime = time.Now() io.Logger = s.With(zap.String("type", io.Type)) if io.ID != "" { io.Logger = io.Logger.With(zap.String("ID", io.ID)) diff --git a/track/audio.go b/track/audio.go index ad692c7..12e65d7 100644 --- a/track/audio.go +++ b/track/audio.go @@ -1,6 +1,7 @@ package track import ( + "encoding/json" "net" "m7s.live/engine/v4/codec" @@ -24,6 +25,20 @@ type Audio struct { Profile byte } +func (a *Audio) MarshalJSON() ([]byte, error) { + v := a.PreValue() + if a.RawPart != nil { + a.RawPart = a.RawPart[:0] + } + a.RawSize = 0 + for i := 0; i < len(v.Raw) && i < 10; i++ { + a.RawSize += len(v.Raw[i]) + } + for i := 0; i < len(v.Raw[0]) && i < 10; i++ { + a.RawPart = append(a.RawPart, int(v.Raw[0][i])) + } + return json.Marshal(v) +} func (a *Audio) IsAAC() bool { return a.CodecID == codec.CodecID_AAC } diff --git a/track/base.go b/track/base.go index 5b30c9b..6eac09f 100644 --- a/track/base.go +++ b/track/base.go @@ -32,7 +32,7 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) { if 过快毫秒 := 数据时间差 - 实际时间差; 过快毫秒 > time.Millisecond*100 { // println("休息", 过快毫秒/time.Millisecond, 绝对时间戳, p.起始时间戳) if 过快毫秒 > time.Millisecond*500 { - time.Sleep(time.Millisecond*500) + time.Sleep(time.Millisecond * 500) } else { time.Sleep(过快毫秒) } @@ -42,7 +42,9 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) { // Media 基础媒体Track类 type Media[T RawSlice] struct { Base - AVRing[T] `json:"-"` + AVRing[T] + RawPart []int // 裸数据片段用于UI上显示 + RawSize int //裸数据长度 SampleRate uint32 SampleSize byte DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) diff --git a/track/video.go b/track/video.go index d096025..4cd240d 100644 --- a/track/video.go +++ b/track/video.go @@ -2,6 +2,7 @@ package track import ( "bytes" + "encoding/json" . "github.com/logrusorgru/aurora" "go.uber.org/zap" @@ -23,6 +24,23 @@ type Video struct { dtsEst *DTSEstimator } +func (vt *Video) MarshalJSON() ([]byte, error) { + v := vt.PreValue() + if vt.RawPart != nil { + vt.RawPart = vt.RawPart[:0] + } + size := 0 + for i := 0; i < len(v.Raw); i++ { + for j := 0; j < len(v.Raw[i]); j++ { + size += len(v.Raw[i][j]) + } + } + vt.RawSize = size + for i := 0; i < len(v.Raw[0][0]) && i < 10; i++ { + vt.RawPart = append(vt.RawPart, int(v.Raw[0][0][i])) + } + return json.Marshal(v) +} func (vt *Video) GetDecConfSeq() int { return vt.DecoderConfiguration.Seq }