json序列化时显示每个track的前10个字节数据以及裸数据长度

This commit is contained in:
dexter
2022-07-10 13:45:13 +08:00
parent f4bf54d746
commit 374596f185
15 changed files with 128 additions and 96 deletions

View File

@@ -136,6 +136,7 @@ func (av *AVFrame[T]) Reset() {
}
av.RTP = nil
av.Raw = nil
av.BytesIn = 0
}
func (avcc AVCCFrame) IsIDR() bool {

View File

@@ -8,13 +8,15 @@ import (
// Base 基础Track类
type Base struct {
Name string
Stream IStream `json:"-"`
ts time.Time
bytes int
frames int
BPS int
FPS int
Name string
Stream IStream `json:"-"`
ts time.Time
bytes int
frames int
BPS int
FPS int
RawPart []int // 裸数据片段用于UI上显示
RawSize int // 裸数据长度
}
func (bt *Base) ComputeBPS(bytes int) {
@@ -32,7 +34,8 @@ func (bt *Base) ComputeBPS(bytes int) {
func (bt *Base) GetBase() *Base {
return bt
}
func (bt *Base) SnapForJson() {
}
func (bt *Base) Flush(bf *BaseFrame) {
bt.ComputeBPS(bf.BytesIn)
bf.Timestamp = time.Now()
@@ -41,6 +44,7 @@ func (bt *Base) Flush(bf *BaseFrame) {
type Track interface {
GetBase() *Base
LastWriteTime() time.Time
SnapForJson()
}
type AVTrack interface {

View File

@@ -5,9 +5,10 @@ import (
)
type RingBuffer[T any] struct {
*util.Ring[T]
Size int
MoveCount uint32
*util.Ring[T] `json:"-"`
Size int
MoveCount uint32
LastValue *T
}
func (rb *RingBuffer[T]) Init(n int) *RingBuffer[T] {
@@ -16,21 +17,13 @@ func (rb *RingBuffer[T]) Init(n int) *RingBuffer[T] {
}
rb.Ring = util.NewRing[T](n)
rb.Size = n
rb.LastValue = &rb.Value
return rb
}
func (rb RingBuffer[T]) SubRing(rr *util.Ring[T]) *RingBuffer[T] {
rb.Ring = rr
rb.MoveCount = 0
return &rb
}
func (rb *RingBuffer[T]) MoveNext() *T {
rb.LastValue = &rb.Value
rb.Ring = rb.Next()
rb.MoveCount++
return &rb.Value
}
func (rb *RingBuffer[T]) PreValue() *T {
return &rb.Prev().Value
}
}

View File

@@ -2,27 +2,22 @@ package common
import (
"context"
"encoding/json"
"runtime"
"time"
)
type AVRing[T RawSlice] struct {
RingBuffer[AVFrame[T]]
Poll time.Duration
}
func (av *AVRing[T]) MarshalJSON() ([]byte, error) {
return json.Marshal(av.PreValue())
}
func (r *AVRing[T]) Step() *AVFrame[T] {
last := &r.RingBuffer.Value
current := r.RingBuffer.MoveNext()
current.Sequence = r.MoveCount
current.canRead = false
current.Reset()
last.canRead = true
r.LastValue.canRead = true
return current
}

View File

@@ -47,12 +47,11 @@ func (rb *LockRing[T]) Step() {
}
func (rb *LockRing[T]) Write(value T) {
last := &rb.RingBuffer.Value
last.Value = value
rb.Value.Value = value
if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) {
current := rb.RingBuffer.MoveNext()
current.Lock()
last.Unlock()
rb.LastValue.Unlock()
//Flag不为1代表被Dispose了但尚未处理Done
if !atomic.CompareAndSwapInt32(rb.Flag, 1, 0) {
current.Unlock()

View File

@@ -186,15 +186,19 @@ func (t *TSPublisher) OnPES(pes mpegts.MpegTsPESPacket) {
t.adts = append(t.adts, pes.Payload[:7]...)
t.AudioTrack.WriteADTS(t.adts)
}
t.AudioTrack.CurrentFrame().PTS = uint32(pes.Header.Pts)
t.AudioTrack.CurrentFrame().DTS = uint32(pes.Header.Dts)
for remainLen := len(pes.Payload); remainLen > 0; {
current := t.AudioTrack.CurrentFrame()
current.PTS = uint32(pes.Header.Pts)
current.DTS = uint32(pes.Header.Dts)
remainLen := len(pes.Payload)
current.BytesIn += remainLen
for remainLen > 0 {
// AACFrameLength(13)
// xx xxxxxxxx xxx
frameLen := (int(pes.Payload[3]&3) << 11) | (int(pes.Payload[4]) << 3) | (int(pes.Payload[5]) >> 5)
if frameLen > remainLen {
break
}
t.AudioTrack.WriteSlice(pes.Payload[7:frameLen])
pes.Payload = pes.Payload[frameLen:remainLen]
remainLen -= frameLen

View File

@@ -1,6 +1,7 @@
package engine
import (
"encoding/json"
"errors"
"strings"
"sync"
@@ -122,6 +123,16 @@ type StreamTimeoutConfig struct {
PublishTimeout time.Duration
WaitCloseTimeout time.Duration
}
type Tracks struct {
util.Map[string, Track]
}
func (tracks *Tracks) MarshalJSON() ([]byte, error) {
return json.Marshal(util.MapList(&tracks.Map, func(_ string, t Track) Track {
t.SnapForJson()
return t
}))
}
// Stream 流定义
type Stream struct {
@@ -134,7 +145,7 @@ type Stream struct {
Publisher IPublisher
State StreamState
Subscribers []ISubscriber // 订阅者
Tracks map[string]Track
Tracks Tracks
AppName string
StreamName string
}
@@ -153,11 +164,11 @@ func (s *Stream) Summary() (r StreamSummay) {
if s.Publisher != nil {
r.Type = s.Publisher.GetIO().Type
}
//TODO: Lock
for _, t := range s.Tracks {
r.BPS += t.GetBase().BPS
r.Tracks = append(r.Tracks, t.GetBase().Name)
}
r.Tracks = util.MapList(&s.Tracks.Map, func(name string, t Track) string {
b := t.GetBase()
r.BPS += b.BPS
return name
})
r.Path = s.Path
r.State = s.State
r.Subscribers = len(s.Subscribers)
@@ -191,7 +202,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
Streams.Map[streamPath] = s
s.actionChan.Init(1)
s.timeout = time.NewTimer(waitTimeout)
s.Tracks = make(map[string]Track)
s.Tracks.Init()
go s.run()
return s, true
}
@@ -262,14 +273,14 @@ func (s *Stream) run() {
select {
case <-s.timeout.C:
if s.State == STATE_PUBLISHING {
for name, t := range s.Tracks {
s.Tracks.ModifyRange(func(name string, t Track) {
// track 超过一定时间没有更新数据了
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout {
s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout))
delete(s.Tracks, name)
delete(s.Tracks.Map.Map, name)
s.broadcast(TrackRemoved{t})
}
}
})
for l := len(s.Subscribers) - 1; l >= 0; l-- {
if sub := s.Subscribers[l]; sub.IsClosed() {
s.Subscribers = append(s.Subscribers[:l], s.Subscribers[l+1:]...)
@@ -282,7 +293,7 @@ func (s *Stream) run() {
}
}
}
if len(s.Tracks) == 0 || (s.Publisher != nil && s.Publisher.IsClosed()) {
if s.Tracks.Len() == 0 || (s.Publisher != nil && s.Publisher.IsClosed()) {
s.action(ACTION_PUBLISHLOST)
for p := range waitP {
p.Reject(errors.New("publisher lost"))
@@ -341,23 +352,23 @@ func (s *Stream) run() {
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入在回调中可以去获取订阅者数量
needAudio, needVideo := sbConfig.SubAudio && s.Publisher.GetConfig().PubAudio, sbConfig.SubVideo && s.Publisher.GetConfig().PubVideo
for _, t := range s.Tracks {
s.Tracks.Range(func(name string, t Track) {
switch t.(type) {
case *track.Audio:
if needAudio {
needAudio = false
} else {
continue
return
}
case *track.Video:
if needVideo {
needVideo = false
} else {
continue
return
}
}
suber.OnEvent(t) // 把现有的Track发给订阅者
}
})
// 还需要等一下发布者的音频或者视频Track
if needAudio || needVideo {
waitP[v] = 0
@@ -378,8 +389,7 @@ func (s *Stream) run() {
}
case Track:
name := v.GetBase().Name
if _, ok := s.Tracks[name]; !ok {
s.Tracks[name] = v
if s.Tracks.Add(name, v) {
s.Info("track +1", zap.String("name", name))
s.broadcast(v)
for w, flag := range waitP {
@@ -399,11 +409,10 @@ func (s *Stream) run() {
}
case TrackRemoved:
name := v.GetBase().Name
if t, ok := s.Tracks[name]; ok {
if t, ok := s.Tracks.Delete(name); ok {
s.Info("track -1", zap.String("name", name))
delete(s.Tracks, name)
s.broadcast(v)
if len(s.Tracks) == 0 {
if s.Tracks.Len() == 0 {
s.action(ACTION_PUBLISHLOST)
}
if dt, ok := t.(*track.Data); ok {
@@ -419,11 +428,11 @@ func (s *Stream) run() {
for w := range waitP {
w.Reject(StreamIsClosedErr)
}
for _, t := range s.Tracks {
s.Tracks.Range(func(_ string, t Track) {
if dt, ok := t.(*track.Data); ok {
dt.Dispose()
}
}
})
return
}
}

View File

@@ -119,9 +119,8 @@ func (s *Summary) collect() *Summary {
s.NetWork = append(s.NetWork, info)
}
s.lastNetWork = nv
s.Streams = nil
Streams.Range(func(ss *Stream) {
s.Streams = append(s.Streams, ss.Summary())
s.Streams = util.MapList(&Streams, func(name string, ss *Stream) StreamSummay {
return ss.Summary()
})
return s
}

View File

@@ -74,6 +74,7 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
aac.Audio.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2}
aac.Attach()
} else {
aac.WriteSlice(AudioSlice(frame[2:]))
aac.Audio.WriteAVCC(ts, frame)
aac.Flush()
}

View File

@@ -1,7 +1,6 @@
package track
import (
"encoding/json"
"net"
"m7s.live/engine/v4/codec"
@@ -25,20 +24,24 @@ type Audio struct {
Profile byte
}
func (a *Audio) MarshalJSON() ([]byte, error) {
v := a.PreValue()
//为json序列化而计算的数据
func (a *Audio) SnapForJson() {
v := a.LastValue
if a.RawPart != nil {
a.RawPart = a.RawPart[:0]
}
a.RawSize = 0
for i := 0; i < len(v.Raw) && i < 10; i++ {
a.RawSize += len(v.Raw[i])
l := len(v.Raw[i])
a.RawSize += l
if sl := len(a.RawPart); sl < 10 {
for j := 0; i < l && j < 10-sl; j++ {
a.RawPart = append(a.RawPart, int(v.Raw[i][j]))
}
}
}
for i := 0; i < len(v.Raw[0]) && i < 10; i++ {
a.RawPart = append(a.RawPart, int(v.Raw[0][i]))
}
return json.Marshal(v)
}
func (a *Audio) IsAAC() bool {
return a.CodecID == codec.CodecID_AAC
}
@@ -49,8 +52,8 @@ func (a *Audio) Attach() {
a.Stream.AddTrack(a)
}
func (a *Audio) Detach() {
a.Stream = nil
a.Stream.RemoveTrack(a)
a.Stream = nil
}
func (a *Audio) GetName() string {
if a.Name == "" {

View File

@@ -43,8 +43,6 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) {
type Media[T RawSlice] struct {
Base
AVRing[T]
RawPart []int // 裸数据片段用于UI上显示
RawSize int //裸数据长度
SampleRate uint32
SampleSize byte
DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
@@ -57,7 +55,7 @@ type Media[T RawSlice] struct {
}
func (av *Media[T]) LastWriteTime() time.Time {
return av.AVRing.RingBuffer.PreValue().Timestamp
return av.AVRing.RingBuffer.LastValue.Timestamp
}
func (av *Media[T]) Play(ctx context.Context, onMedia func(*AVFrame[T]) error) error {
@@ -83,7 +81,7 @@ func (av *Media[T]) CurrentFrame() *AVFrame[T] {
return &av.AVRing.RingBuffer.Value
}
func (av *Media[T]) PreFrame() *AVFrame[T] {
return av.AVRing.RingBuffer.PreValue()
return av.AVRing.RingBuffer.LastValue
}
// 获取缓存中下一个rtpFrame
@@ -160,9 +158,11 @@ func (av *Media[T]) UnmarshalRTPPacket(p *rtp.Packet) (frame *RTPFrame) {
frame = &RTPFrame{
Packet: *p,
}
av.Value.BytesIn += len(p.Payload) + 12
return av.recorderRTP(frame)
}
func (av *Media[T]) UnmarshalRTP(raw []byte) (frame *RTPFrame) {
av.Value.BytesIn += len(raw)
if frame = new(RTPFrame); frame.Unmarshal(raw) == nil {
return
}
@@ -176,7 +176,7 @@ func (av *Media[T]) WriteSlice(slice T) {
func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) {
curValue := &av.AVRing.RingBuffer.Value
cts := frame.CTS()
curValue.BytesIn = len(frame)
curValue.BytesIn += len(frame)
curValue.AppendAVCC(frame)
curValue.DTS = ts * 90
curValue.PTS = (ts + cts) * 90
@@ -185,7 +185,7 @@ func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) {
func (av *Media[T]) Flush() {
curValue := &av.AVRing.RingBuffer.Value
preValue := av.AVRing.RingBuffer.PreValue()
preValue := av.AVRing.RingBuffer.LastValue
if av.起始时间.IsZero() {
av.重置(curValue.AbsTime)
} else {

View File

@@ -20,7 +20,7 @@ func (d *Data) ReadRing() *LockRing[any] {
}
func (d *Data) LastWriteTime() time.Time {
return d.LockRing.RingBuffer.PreValue().Timestamp
return d.LockRing.RingBuffer.LastValue.Timestamp
}
func (dt *Data) Push(data any) {

View File

@@ -109,8 +109,8 @@ func (vt *H264) writeRTPFrame(frame *RTPFrame) {
}
rv.Raw[lastIndex].Append(frame.Payload[naluType.Offset():])
if util.Bit1(frame.Payload[1], 1) {
complete := rv.Raw[lastIndex] //拼接完成
rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去
complete := rv.Raw[lastIndex] //拼接完成
rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去
vt.WriteSlice(complete)
}
}

View File

@@ -2,7 +2,6 @@ package track
import (
"bytes"
"encoding/json"
. "github.com/logrusorgru/aurora"
"go.uber.org/zap"
@@ -24,22 +23,24 @@ type Video struct {
dtsEst *DTSEstimator
}
func (vt *Video) MarshalJSON() ([]byte, error) {
v := vt.PreValue()
func (vt *Video) SnapForJson() {
v := vt.LastValue
if vt.RawPart != nil {
vt.RawPart = vt.RawPart[:0]
}
size := 0
for i := 0; i < len(v.Raw); i++ {
for j := 0; j < len(v.Raw[i]); j++ {
size += len(v.Raw[i][j])
l := len(v.Raw[i][j])
size += l
if sl := len(vt.RawPart); sl < 10 {
for k := 0; k < l && k < 10-sl; k++ {
vt.RawPart = append(vt.RawPart, int(v.Raw[i][j][k]))
}
}
}
}
vt.RawSize = size
for i := 0; i < len(v.Raw[0][0]) && i < 10; i++ {
vt.RawPart = append(vt.RawPart, int(v.Raw[0][0][i]))
}
return json.Marshal(v)
}
func (vt *Video) GetDecConfSeq() int {
return vt.DecoderConfiguration.Seq
@@ -48,8 +49,8 @@ func (vt *Video) Attach() {
vt.Stream.AddTrack(vt)
}
func (vt *Video) Detach() {
vt.Stream = nil
vt.Stream.RemoveTrack(vt)
vt.Stream = nil
}
func (vt *Video) GetName() string {
if vt.Name == "" {
@@ -93,6 +94,7 @@ func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame, s *[]NALUSlice) {
func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) (s []NALUSlice) {
// vt.Stream.Tracef("WriteAnnexB:pts %d,dts %d,len %d", pts, dts, len(frame))
vt.Value.BytesIn += len(frame)
for len(frame) > 0 {
before, after, found := bytes.Cut(frame, codec.NALU_Delimiter2)
if !found {

View File

@@ -35,8 +35,6 @@ func (m *Map[K, V]) Has(k K) (ok bool) {
}
func (m *Map[K, V]) Len() int {
m.RLock()
defer m.RUnlock()
return len(m.Map)
}
@@ -46,10 +44,16 @@ func (m *Map[K, V]) Get(k K) V {
return m.Map[k]
}
func (m *Map[K, V]) Delete(k K) {
m.Lock()
delete(m.Map, k)
m.Unlock()
func (m *Map[K, V]) Delete(k K) (v V, ok bool) {
m.RLock()
v, ok = m.Map[k]
m.RUnlock()
if ok {
m.Lock()
delete(m.Map, k)
m.Unlock()
}
return
}
func (m *Map[K, V]) ToList() (r []V) {
@@ -61,10 +65,28 @@ func (m *Map[K, V]) ToList() (r []V) {
return
}
func (m *Map[K, V]) Range(f func(V)) {
func MapList[K comparable, V any, R any](m *Map[K, V], f func(K, V) R) (r []R) {
m.RLock()
defer m.RUnlock()
for _, s := range m.Map {
f(s)
for k, v := range m.Map {
r = append(r, f(k, v))
}
return
}
func (m *Map[K, V]) Range(f func(K, V)) {
m.RLock()
defer m.RUnlock()
for k, v := range m.Map {
f(k, v)
}
}
//遍历时有写入操作
func (m *Map[K, V]) ModifyRange(f func(K, V)) {
m.Lock()
defer m.Unlock()
for k, v := range m.Map {
f(k, v)
}
}