diff --git a/common/frame.go b/common/frame.go index 365204b..40e129d 100644 --- a/common/frame.go +++ b/common/frame.go @@ -136,6 +136,7 @@ func (av *AVFrame[T]) Reset() { } av.RTP = nil av.Raw = nil + av.BytesIn = 0 } func (avcc AVCCFrame) IsIDR() bool { diff --git a/common/index.go b/common/index.go index a09bf9d..89108b1 100644 --- a/common/index.go +++ b/common/index.go @@ -8,13 +8,15 @@ import ( // Base 基础Track类 type Base struct { - Name string - Stream IStream `json:"-"` - ts time.Time - bytes int - frames int - BPS int - FPS int + Name string + Stream IStream `json:"-"` + ts time.Time + bytes int + frames int + BPS int + FPS int + RawPart []int // 裸数据片段用于UI上显示 + RawSize int // 裸数据长度 } func (bt *Base) ComputeBPS(bytes int) { @@ -32,7 +34,8 @@ func (bt *Base) ComputeBPS(bytes int) { func (bt *Base) GetBase() *Base { return bt } - +func (bt *Base) SnapForJson() { +} func (bt *Base) Flush(bf *BaseFrame) { bt.ComputeBPS(bf.BytesIn) bf.Timestamp = time.Now() @@ -41,6 +44,7 @@ func (bt *Base) Flush(bf *BaseFrame) { type Track interface { GetBase() *Base LastWriteTime() time.Time + SnapForJson() } type AVTrack interface { diff --git a/common/ring.go b/common/ring.go index 49fd810..68b72d6 100644 --- a/common/ring.go +++ b/common/ring.go @@ -5,9 +5,10 @@ import ( ) type RingBuffer[T any] struct { - *util.Ring[T] - Size int - MoveCount uint32 + *util.Ring[T] `json:"-"` + Size int + MoveCount uint32 + LastValue *T } func (rb *RingBuffer[T]) Init(n int) *RingBuffer[T] { @@ -16,21 +17,13 @@ func (rb *RingBuffer[T]) Init(n int) *RingBuffer[T] { } rb.Ring = util.NewRing[T](n) rb.Size = n + rb.LastValue = &rb.Value return rb } -func (rb RingBuffer[T]) SubRing(rr *util.Ring[T]) *RingBuffer[T] { - rb.Ring = rr - rb.MoveCount = 0 - return &rb -} - func (rb *RingBuffer[T]) MoveNext() *T { + rb.LastValue = &rb.Value rb.Ring = rb.Next() rb.MoveCount++ return &rb.Value -} - -func (rb *RingBuffer[T]) PreValue() *T { - return &rb.Prev().Value -} +} \ No newline at end of file diff --git a/common/ring_av.go b/common/ring_av.go index afb1c7c..bfe39fb 100644 --- a/common/ring_av.go +++ b/common/ring_av.go @@ -2,27 +2,22 @@ package common import ( "context" - "encoding/json" "runtime" "time" ) + type AVRing[T RawSlice] struct { RingBuffer[AVFrame[T]] Poll time.Duration } -func (av *AVRing[T]) MarshalJSON() ([]byte, error) { - return json.Marshal(av.PreValue()) -} - func (r *AVRing[T]) Step() *AVFrame[T] { - last := &r.RingBuffer.Value current := r.RingBuffer.MoveNext() current.Sequence = r.MoveCount current.canRead = false current.Reset() - last.canRead = true + r.LastValue.canRead = true return current } diff --git a/common/ring_lock.go b/common/ring_lock.go index b437658..9f994d8 100644 --- a/common/ring_lock.go +++ b/common/ring_lock.go @@ -47,12 +47,11 @@ func (rb *LockRing[T]) Step() { } func (rb *LockRing[T]) Write(value T) { - last := &rb.RingBuffer.Value - last.Value = value + rb.Value.Value = value if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) { current := rb.RingBuffer.MoveNext() current.Lock() - last.Unlock() + rb.LastValue.Unlock() //Flag不为1代表被Dispose了,但尚未处理Done if !atomic.CompareAndSwapInt32(rb.Flag, 1, 0) { current.Unlock() diff --git a/publisher.go b/publisher.go index 2e62e3b..aced3be 100644 --- a/publisher.go +++ b/publisher.go @@ -186,15 +186,19 @@ func (t *TSPublisher) OnPES(pes mpegts.MpegTsPESPacket) { t.adts = append(t.adts, pes.Payload[:7]...) t.AudioTrack.WriteADTS(t.adts) } - t.AudioTrack.CurrentFrame().PTS = uint32(pes.Header.Pts) - t.AudioTrack.CurrentFrame().DTS = uint32(pes.Header.Dts) - for remainLen := len(pes.Payload); remainLen > 0; { + current := t.AudioTrack.CurrentFrame() + current.PTS = uint32(pes.Header.Pts) + current.DTS = uint32(pes.Header.Dts) + remainLen := len(pes.Payload) + current.BytesIn += remainLen + for remainLen > 0 { // AACFrameLength(13) // xx xxxxxxxx xxx frameLen := (int(pes.Payload[3]&3) << 11) | (int(pes.Payload[4]) << 3) | (int(pes.Payload[5]) >> 5) if frameLen > remainLen { break } + t.AudioTrack.WriteSlice(pes.Payload[7:frameLen]) pes.Payload = pes.Payload[frameLen:remainLen] remainLen -= frameLen diff --git a/stream.go b/stream.go index 1ea0482..a81d3fe 100644 --- a/stream.go +++ b/stream.go @@ -1,6 +1,7 @@ package engine import ( + "encoding/json" "errors" "strings" "sync" @@ -122,6 +123,16 @@ type StreamTimeoutConfig struct { PublishTimeout time.Duration WaitCloseTimeout time.Duration } +type Tracks struct { + util.Map[string, Track] +} + +func (tracks *Tracks) MarshalJSON() ([]byte, error) { + return json.Marshal(util.MapList(&tracks.Map, func(_ string, t Track) Track { + t.SnapForJson() + return t + })) +} // Stream 流定义 type Stream struct { @@ -134,7 +145,7 @@ type Stream struct { Publisher IPublisher State StreamState Subscribers []ISubscriber // 订阅者 - Tracks map[string]Track + Tracks Tracks AppName string StreamName string } @@ -153,11 +164,11 @@ func (s *Stream) Summary() (r StreamSummay) { if s.Publisher != nil { r.Type = s.Publisher.GetIO().Type } - //TODO: Lock - for _, t := range s.Tracks { - r.BPS += t.GetBase().BPS - r.Tracks = append(r.Tracks, t.GetBase().Name) - } + r.Tracks = util.MapList(&s.Tracks.Map, func(name string, t Track) string { + b := t.GetBase() + r.BPS += b.BPS + return name + }) r.Path = s.Path r.State = s.State r.Subscribers = len(s.Subscribers) @@ -191,7 +202,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream Streams.Map[streamPath] = s s.actionChan.Init(1) s.timeout = time.NewTimer(waitTimeout) - s.Tracks = make(map[string]Track) + s.Tracks.Init() go s.run() return s, true } @@ -262,14 +273,14 @@ func (s *Stream) run() { select { case <-s.timeout.C: if s.State == STATE_PUBLISHING { - for name, t := range s.Tracks { + s.Tracks.ModifyRange(func(name string, t Track) { // track 超过一定时间没有更新数据了 if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout { s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) - delete(s.Tracks, name) + delete(s.Tracks.Map.Map, name) s.broadcast(TrackRemoved{t}) } - } + }) for l := len(s.Subscribers) - 1; l >= 0; l-- { if sub := s.Subscribers[l]; sub.IsClosed() { s.Subscribers = append(s.Subscribers[:l], s.Subscribers[l+1:]...) @@ -282,7 +293,7 @@ func (s *Stream) run() { } } } - if len(s.Tracks) == 0 || (s.Publisher != nil && s.Publisher.IsClosed()) { + if s.Tracks.Len() == 0 || (s.Publisher != nil && s.Publisher.IsClosed()) { s.action(ACTION_PUBLISHLOST) for p := range waitP { p.Reject(errors.New("publisher lost")) @@ -341,23 +352,23 @@ func (s *Stream) run() { if s.Publisher != nil { s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量 needAudio, needVideo := sbConfig.SubAudio && s.Publisher.GetConfig().PubAudio, sbConfig.SubVideo && s.Publisher.GetConfig().PubVideo - for _, t := range s.Tracks { + s.Tracks.Range(func(name string, t Track) { switch t.(type) { case *track.Audio: if needAudio { needAudio = false } else { - continue + return } case *track.Video: if needVideo { needVideo = false } else { - continue + return } } suber.OnEvent(t) // 把现有的Track发给订阅者 - } + }) // 还需要等一下发布者的音频或者视频Track if needAudio || needVideo { waitP[v] = 0 @@ -378,8 +389,7 @@ func (s *Stream) run() { } case Track: name := v.GetBase().Name - if _, ok := s.Tracks[name]; !ok { - s.Tracks[name] = v + if s.Tracks.Add(name, v) { s.Info("track +1", zap.String("name", name)) s.broadcast(v) for w, flag := range waitP { @@ -399,11 +409,10 @@ func (s *Stream) run() { } case TrackRemoved: name := v.GetBase().Name - if t, ok := s.Tracks[name]; ok { + if t, ok := s.Tracks.Delete(name); ok { s.Info("track -1", zap.String("name", name)) - delete(s.Tracks, name) s.broadcast(v) - if len(s.Tracks) == 0 { + if s.Tracks.Len() == 0 { s.action(ACTION_PUBLISHLOST) } if dt, ok := t.(*track.Data); ok { @@ -419,11 +428,11 @@ func (s *Stream) run() { for w := range waitP { w.Reject(StreamIsClosedErr) } - for _, t := range s.Tracks { + s.Tracks.Range(func(_ string, t Track) { if dt, ok := t.(*track.Data); ok { dt.Dispose() } - } + }) return } } diff --git a/summary.go b/summary.go index 24df674..5fcea89 100644 --- a/summary.go +++ b/summary.go @@ -119,9 +119,8 @@ func (s *Summary) collect() *Summary { s.NetWork = append(s.NetWork, info) } s.lastNetWork = nv - s.Streams = nil - Streams.Range(func(ss *Stream) { - s.Streams = append(s.Streams, ss.Summary()) + s.Streams = util.MapList(&Streams, func(name string, ss *Stream) StreamSummay { + return ss.Summary() }) return s } diff --git a/track/aac.go b/track/aac.go index 38f207b..4decc16 100644 --- a/track/aac.go +++ b/track/aac.go @@ -74,6 +74,7 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { aac.Audio.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2} aac.Attach() } else { + aac.WriteSlice(AudioSlice(frame[2:])) aac.Audio.WriteAVCC(ts, frame) aac.Flush() } diff --git a/track/audio.go b/track/audio.go index 12e65d7..9a4efe1 100644 --- a/track/audio.go +++ b/track/audio.go @@ -1,7 +1,6 @@ package track import ( - "encoding/json" "net" "m7s.live/engine/v4/codec" @@ -25,20 +24,24 @@ type Audio struct { Profile byte } -func (a *Audio) MarshalJSON() ([]byte, error) { - v := a.PreValue() +//为json序列化而计算的数据 +func (a *Audio) SnapForJson() { + v := a.LastValue if a.RawPart != nil { a.RawPart = a.RawPart[:0] } a.RawSize = 0 for i := 0; i < len(v.Raw) && i < 10; i++ { - a.RawSize += len(v.Raw[i]) + l := len(v.Raw[i]) + a.RawSize += l + if sl := len(a.RawPart); sl < 10 { + for j := 0; i < l && j < 10-sl; j++ { + a.RawPart = append(a.RawPart, int(v.Raw[i][j])) + } + } } - for i := 0; i < len(v.Raw[0]) && i < 10; i++ { - a.RawPart = append(a.RawPart, int(v.Raw[0][i])) - } - return json.Marshal(v) } + func (a *Audio) IsAAC() bool { return a.CodecID == codec.CodecID_AAC } @@ -49,8 +52,8 @@ func (a *Audio) Attach() { a.Stream.AddTrack(a) } func (a *Audio) Detach() { - a.Stream = nil a.Stream.RemoveTrack(a) + a.Stream = nil } func (a *Audio) GetName() string { if a.Name == "" { diff --git a/track/base.go b/track/base.go index 6eac09f..d3828b9 100644 --- a/track/base.go +++ b/track/base.go @@ -43,8 +43,6 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) { type Media[T RawSlice] struct { Base AVRing[T] - RawPart []int // 裸数据片段用于UI上显示 - RawSize int //裸数据长度 SampleRate uint32 SampleSize byte DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) @@ -57,7 +55,7 @@ type Media[T RawSlice] struct { } func (av *Media[T]) LastWriteTime() time.Time { - return av.AVRing.RingBuffer.PreValue().Timestamp + return av.AVRing.RingBuffer.LastValue.Timestamp } func (av *Media[T]) Play(ctx context.Context, onMedia func(*AVFrame[T]) error) error { @@ -83,7 +81,7 @@ func (av *Media[T]) CurrentFrame() *AVFrame[T] { return &av.AVRing.RingBuffer.Value } func (av *Media[T]) PreFrame() *AVFrame[T] { - return av.AVRing.RingBuffer.PreValue() + return av.AVRing.RingBuffer.LastValue } // 获取缓存中下一个rtpFrame @@ -160,9 +158,11 @@ func (av *Media[T]) UnmarshalRTPPacket(p *rtp.Packet) (frame *RTPFrame) { frame = &RTPFrame{ Packet: *p, } + av.Value.BytesIn += len(p.Payload) + 12 return av.recorderRTP(frame) } func (av *Media[T]) UnmarshalRTP(raw []byte) (frame *RTPFrame) { + av.Value.BytesIn += len(raw) if frame = new(RTPFrame); frame.Unmarshal(raw) == nil { return } @@ -176,7 +176,7 @@ func (av *Media[T]) WriteSlice(slice T) { func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) { curValue := &av.AVRing.RingBuffer.Value cts := frame.CTS() - curValue.BytesIn = len(frame) + curValue.BytesIn += len(frame) curValue.AppendAVCC(frame) curValue.DTS = ts * 90 curValue.PTS = (ts + cts) * 90 @@ -185,7 +185,7 @@ func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) { func (av *Media[T]) Flush() { curValue := &av.AVRing.RingBuffer.Value - preValue := av.AVRing.RingBuffer.PreValue() + preValue := av.AVRing.RingBuffer.LastValue if av.起始时间.IsZero() { av.重置(curValue.AbsTime) } else { diff --git a/track/data.go b/track/data.go index 7831b25..0abacdc 100644 --- a/track/data.go +++ b/track/data.go @@ -20,7 +20,7 @@ func (d *Data) ReadRing() *LockRing[any] { } func (d *Data) LastWriteTime() time.Time { - return d.LockRing.RingBuffer.PreValue().Timestamp + return d.LockRing.RingBuffer.LastValue.Timestamp } func (dt *Data) Push(data any) { diff --git a/track/h264.go b/track/h264.go index 9bdbcb9..9568740 100644 --- a/track/h264.go +++ b/track/h264.go @@ -109,8 +109,8 @@ func (vt *H264) writeRTPFrame(frame *RTPFrame) { } rv.Raw[lastIndex].Append(frame.Payload[naluType.Offset():]) if util.Bit1(frame.Payload[1], 1) { - complete := rv.Raw[lastIndex] //拼接完成 - rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去 + complete := rv.Raw[lastIndex] //拼接完成 + rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去 vt.WriteSlice(complete) } } diff --git a/track/video.go b/track/video.go index 4cd240d..8aa2be7 100644 --- a/track/video.go +++ b/track/video.go @@ -2,7 +2,6 @@ package track import ( "bytes" - "encoding/json" . "github.com/logrusorgru/aurora" "go.uber.org/zap" @@ -24,22 +23,24 @@ type Video struct { dtsEst *DTSEstimator } -func (vt *Video) MarshalJSON() ([]byte, error) { - v := vt.PreValue() +func (vt *Video) SnapForJson() { + v := vt.LastValue if vt.RawPart != nil { vt.RawPart = vt.RawPart[:0] } size := 0 for i := 0; i < len(v.Raw); i++ { for j := 0; j < len(v.Raw[i]); j++ { - size += len(v.Raw[i][j]) + l := len(v.Raw[i][j]) + size += l + if sl := len(vt.RawPart); sl < 10 { + for k := 0; k < l && k < 10-sl; k++ { + vt.RawPart = append(vt.RawPart, int(v.Raw[i][j][k])) + } + } } } vt.RawSize = size - for i := 0; i < len(v.Raw[0][0]) && i < 10; i++ { - vt.RawPart = append(vt.RawPart, int(v.Raw[0][0][i])) - } - return json.Marshal(v) } func (vt *Video) GetDecConfSeq() int { return vt.DecoderConfiguration.Seq @@ -48,8 +49,8 @@ func (vt *Video) Attach() { vt.Stream.AddTrack(vt) } func (vt *Video) Detach() { - vt.Stream = nil vt.Stream.RemoveTrack(vt) + vt.Stream = nil } func (vt *Video) GetName() string { if vt.Name == "" { @@ -93,6 +94,7 @@ func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame, s *[]NALUSlice) { func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) (s []NALUSlice) { // vt.Stream.Tracef("WriteAnnexB:pts %d,dts %d,len %d", pts, dts, len(frame)) + vt.Value.BytesIn += len(frame) for len(frame) > 0 { before, after, found := bytes.Cut(frame, codec.NALU_Delimiter2) if !found { diff --git a/util/map.go b/util/map.go index 03ed1d8..4863361 100644 --- a/util/map.go +++ b/util/map.go @@ -35,8 +35,6 @@ func (m *Map[K, V]) Has(k K) (ok bool) { } func (m *Map[K, V]) Len() int { - m.RLock() - defer m.RUnlock() return len(m.Map) } @@ -46,10 +44,16 @@ func (m *Map[K, V]) Get(k K) V { return m.Map[k] } -func (m *Map[K, V]) Delete(k K) { - m.Lock() - delete(m.Map, k) - m.Unlock() +func (m *Map[K, V]) Delete(k K) (v V, ok bool) { + m.RLock() + v, ok = m.Map[k] + m.RUnlock() + if ok { + m.Lock() + delete(m.Map, k) + m.Unlock() + } + return } func (m *Map[K, V]) ToList() (r []V) { @@ -61,10 +65,28 @@ func (m *Map[K, V]) ToList() (r []V) { return } -func (m *Map[K, V]) Range(f func(V)) { +func MapList[K comparable, V any, R any](m *Map[K, V], f func(K, V) R) (r []R) { m.RLock() defer m.RUnlock() - for _, s := range m.Map { - f(s) + for k, v := range m.Map { + r = append(r, f(k, v)) + } + return +} + +func (m *Map[K, V]) Range(f func(K, V)) { + m.RLock() + defer m.RUnlock() + for k, v := range m.Map { + f(k, v) + } +} + +//遍历时有写入操作 +func (m *Map[K, V]) ModifyRange(f func(K, V)) { + m.Lock() + defer m.Unlock() + for k, v := range m.Map { + f(k, v) } }