将track中最新的数据可序列化

This commit is contained in:
dexter
2022-07-09 04:59:19 +08:00
parent 59db5595d9
commit f4bf54d746
8 changed files with 69 additions and 22 deletions

View File

@@ -112,9 +112,9 @@ type AVFrame[T RawSlice] struct {
IFrame bool IFrame bool
PTS uint32 PTS uint32
DTS uint32 DTS uint32
AVCC net.Buffers // 打包好的AVCC格式 AVCC net.Buffers `json:"-"` // 打包好的AVCC格式
RTP []*RTPFrame RTP []*RTPFrame `json:"-"`
Raw []T // 裸数据 Raw []T `json:"-"` // 裸数据
canRead bool canRead bool
} }

View File

@@ -2,6 +2,7 @@ package common
import ( import (
"context" "context"
"encoding/json"
"runtime" "runtime"
"time" "time"
) )
@@ -11,8 +12,11 @@ type AVRing[T RawSlice] struct {
Poll time.Duration Poll time.Duration
} }
func (r *AVRing[T]) Step() *AVFrame[T] { func (av *AVRing[T]) MarshalJSON() ([]byte, error) {
return json.Marshal(av.PreValue())
}
func (r *AVRing[T]) Step() *AVFrame[T] {
last := &r.RingBuffer.Value last := &r.RingBuffer.Value
current := r.RingBuffer.MoveNext() current := r.RingBuffer.MoveNext()
current.Sequence = r.MoveCount current.Sequence = r.MoveCount

View File

@@ -83,22 +83,28 @@ func (p *Push) AddPush(streamPath string, url string) {
p.PushList[streamPath] = url p.PushList[streamPath] = url
} }
type Console struct {
Server string //远程控制台地址
Secret string //远程控制台密钥
PublicAddr string //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
PublicAddrTLS string
}
type Engine struct { type Engine struct {
Publish Publish
Subscribe Subscribe
HTTP HTTP
RTPReorder bool RTPReorder bool
EnableAVCC bool //启用AVCC格式rtmp协议使用 EnableAVCC bool //启用AVCC格式rtmp协议使用
EnableRTP bool //启用RTP格式rtsp、gb18181等协议使用 EnableRTP bool //启用RTP格式rtsp、gb18181等协议使用
ConsoleURL string //远程控制台地址 Console
Secret string //远程控制台密钥
} }
type myResponseWriter struct { type myResponseWriter struct {
*websocket.Conn *websocket.Conn
} }
func (w *myResponseWriter) Write(b []byte) (int, error) { func (w *myResponseWriter) Write(b []byte) (int, error) {
return len(b), websocket.Message.Send(w.Conn,b) return len(b), websocket.Message.Send(w.Conn, b)
} }
func (w *myResponseWriter) Header() http.Header { func (w *myResponseWriter) Header() http.Header {
@@ -111,10 +117,10 @@ func (cfg *Engine) OnEvent(event any) {
case context.Context: case context.Context:
go func() { go func() {
for { for {
conn, err := websocket.Dial(cfg.ConsoleURL, "", "https://console.monibuca.com") conn, err := websocket.Dial(cfg.Server, "", "https://console.monibuca.com")
wr := &myResponseWriter{conn} wr := &myResponseWriter{conn}
if err != nil { if err != nil {
log.Error("connect to console server ", cfg.ConsoleURL, " ", err) log.Error("connect to console server ", cfg.Server, " ", err)
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
continue continue
} }
@@ -125,10 +131,10 @@ func (cfg *Engine) OnEvent(event any) {
var rMessage map[string]interface{} var rMessage map[string]interface{}
if err := websocket.JSON.Receive(conn, &rMessage); err == nil { if err := websocket.JSON.Receive(conn, &rMessage); err == nil {
if rMessage["code"].(float64) != 0 { if rMessage["code"].(float64) != 0 {
log.Error("connect to console server ", cfg.ConsoleURL, " ", rMessage["msg"]) log.Error("connect to console server ", cfg.Server, " ", rMessage["msg"])
return return
} else { } else {
log.Info("connect to console server ", cfg.ConsoleURL, " success") log.Info("connect to console server ", cfg.Server, " success")
} }
} }
for { for {
@@ -171,5 +177,7 @@ var Global = &Engine{
Publish{true, true, false, 10, 0}, Publish{true, true, false, 10, 0},
Subscribe{true, true, true, false, 10}, Subscribe{true, true, true, false, 10},
HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux}, HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux},
false, true, true, "wss://console.monibuca.com:9999/ws/v1", "", false, true, true, Console{
"wss://console.monibuca.com:9999/ws/v1", "", "", "",
},
} }

4
io.go
View File

@@ -28,8 +28,8 @@ type IO[C IOConfig, S IIO] struct {
context.Context `json:"-"` //不要直接设置应当通过OnEvent传入父级Context context.Context `json:"-"` //不要直接设置应当通过OnEvent传入父级Context
context.CancelFunc `json:"-"` //流关闭是关闭发布者或者订阅者 context.CancelFunc `json:"-"` //流关闭是关闭发布者或者订阅者
*zap.Logger `json:"-"` *zap.Logger `json:"-"`
StartTime uint32 //创建时间 StartTime time.Time //创建时间
Stream *Stream `json:"-"` Stream *Stream `json:"-"`
io.Reader `json:"-"` io.Reader `json:"-"`
io.Writer `json:"-"` io.Writer `json:"-"`
io.Closer `json:"-"` io.Closer `json:"-"`

View File

@@ -143,7 +143,7 @@ type StreamSummay struct {
State StreamState State StreamState
Subscribers int Subscribers int
Tracks []string Tracks []string
StartTime uint32 StartTime time.Time
Type string Type string
BPS int BPS int
} }
@@ -161,7 +161,7 @@ func (s *Stream) Summary() (r StreamSummay) {
r.Path = s.Path r.Path = s.Path
r.State = s.State r.State = s.State
r.Subscribers = len(s.Subscribers) r.Subscribers = len(s.Subscribers)
r.StartTime = uint32(s.StartTime.Unix()) r.StartTime = s.StartTime
return return
} }
@@ -309,7 +309,7 @@ func (s *Stream) run() {
io := v.Value.GetIO() io := v.Value.GetIO()
io.Spesic = v.Value io.Spesic = v.Value
io.Stream = s io.Stream = s
io.StartTime = uint32(time.Now().Unix()) io.StartTime = time.Now()
io.Logger = s.With(zap.String("type", io.Type)) io.Logger = s.With(zap.String("type", io.Type))
if io.ID != "" { if io.ID != "" {
io.Logger = io.Logger.With(zap.String("ID", io.ID)) io.Logger = io.Logger.With(zap.String("ID", io.ID))
@@ -332,7 +332,7 @@ func (s *Stream) run() {
s.WaitTimeout = wt s.WaitTimeout = wt
} }
io.Stream = s io.Stream = s
io.StartTime = uint32(time.Now().Unix()) io.StartTime = time.Now()
io.Logger = s.With(zap.String("type", io.Type)) io.Logger = s.With(zap.String("type", io.Type))
if io.ID != "" { if io.ID != "" {
io.Logger = io.Logger.With(zap.String("ID", io.ID)) io.Logger = io.Logger.With(zap.String("ID", io.ID))

View File

@@ -1,6 +1,7 @@
package track package track
import ( import (
"encoding/json"
"net" "net"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
@@ -24,6 +25,20 @@ type Audio struct {
Profile byte Profile byte
} }
func (a *Audio) MarshalJSON() ([]byte, error) {
v := a.PreValue()
if a.RawPart != nil {
a.RawPart = a.RawPart[:0]
}
a.RawSize = 0
for i := 0; i < len(v.Raw) && i < 10; i++ {
a.RawSize += len(v.Raw[i])
}
for i := 0; i < len(v.Raw[0]) && i < 10; i++ {
a.RawPart = append(a.RawPart, int(v.Raw[0][i]))
}
return json.Marshal(v)
}
func (a *Audio) IsAAC() bool { func (a *Audio) IsAAC() bool {
return a.CodecID == codec.CodecID_AAC return a.CodecID == codec.CodecID_AAC
} }

View File

@@ -32,7 +32,7 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) {
if 过快毫秒 := 数据时间差 - 实际时间差; 过快毫秒 > time.Millisecond*100 { if 过快毫秒 := 数据时间差 - 实际时间差; 过快毫秒 > time.Millisecond*100 {
// println("休息", 过快毫秒/time.Millisecond, 绝对时间戳, p.起始时间戳) // println("休息", 过快毫秒/time.Millisecond, 绝对时间戳, p.起始时间戳)
if 过快毫秒 > time.Millisecond*500 { if 过快毫秒 > time.Millisecond*500 {
time.Sleep(time.Millisecond*500) time.Sleep(time.Millisecond * 500)
} else { } else {
time.Sleep(过快毫秒) time.Sleep(过快毫秒)
} }
@@ -42,7 +42,9 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) {
// Media 基础媒体Track类 // Media 基础媒体Track类
type Media[T RawSlice] struct { type Media[T RawSlice] struct {
Base Base
AVRing[T] `json:"-"` AVRing[T]
RawPart []int // 裸数据片段用于UI上显示
RawSize int //裸数据长度
SampleRate uint32 SampleRate uint32
SampleSize byte SampleSize byte
DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)

View File

@@ -2,6 +2,7 @@ package track
import ( import (
"bytes" "bytes"
"encoding/json"
. "github.com/logrusorgru/aurora" . "github.com/logrusorgru/aurora"
"go.uber.org/zap" "go.uber.org/zap"
@@ -23,6 +24,23 @@ type Video struct {
dtsEst *DTSEstimator dtsEst *DTSEstimator
} }
func (vt *Video) MarshalJSON() ([]byte, error) {
v := vt.PreValue()
if vt.RawPart != nil {
vt.RawPart = vt.RawPart[:0]
}
size := 0
for i := 0; i < len(v.Raw); i++ {
for j := 0; j < len(v.Raw[i]); j++ {
size += len(v.Raw[i][j])
}
}
vt.RawSize = size
for i := 0; i < len(v.Raw[0][0]) && i < 10; i++ {
vt.RawPart = append(vt.RawPart, int(v.Raw[0][0][i]))
}
return json.Marshal(v)
}
func (vt *Video) GetDecConfSeq() int { func (vt *Video) GetDecConfSeq() int {
return vt.DecoderConfiguration.Seq return vt.DecoderConfiguration.Seq
} }