mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-04 16:22:41 +08:00
feat: sei track use channel to replace data track
This commit is contained in:
15
stream.go
15
stream.go
@@ -128,7 +128,7 @@ type Tracks struct {
|
|||||||
Data []common.Track
|
Data []common.Track
|
||||||
MainVideo *track.Video
|
MainVideo *track.Video
|
||||||
MainAudio *track.Audio
|
MainAudio *track.Audio
|
||||||
SEI *track.Data[[]byte]
|
SEI *track.Channel[[]byte]
|
||||||
marshalLock sync.Mutex
|
marshalLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,8 +147,7 @@ func (tracks *Tracks) Add(name string, t Track) bool {
|
|||||||
tracks.SetIDR(v)
|
tracks.SetIDR(v)
|
||||||
}
|
}
|
||||||
if tracks.SEI != nil {
|
if tracks.SEI != nil {
|
||||||
v.SEIReader = &track.DataReader[[]byte]{}
|
v.SEIReader = tracks.SEI.CreateReader(100)
|
||||||
v.SEIReader.Ring = tracks.SEI.Ring
|
|
||||||
}
|
}
|
||||||
case *track.Audio:
|
case *track.Audio:
|
||||||
if tracks.MainAudio == nil {
|
if tracks.MainAudio == nil {
|
||||||
@@ -194,7 +193,7 @@ func (tracks *Tracks) AddSEI(t byte, data []byte) bool {
|
|||||||
buffer.WriteByte(byte(l))
|
buffer.WriteByte(byte(l))
|
||||||
buffer.Write(data)
|
buffer.Write(data)
|
||||||
buffer.WriteByte(0x80)
|
buffer.WriteByte(0x80)
|
||||||
tracks.SEI.Push(buffer)
|
tracks.SEI.Write(buffer)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
@@ -577,12 +576,8 @@ func (s *Stream) run() {
|
|||||||
}
|
}
|
||||||
if conf.InsertSEI {
|
if conf.InsertSEI {
|
||||||
if s.Tracks.SEI == nil {
|
if s.Tracks.SEI == nil {
|
||||||
s.Tracks.SEI = track.NewDataTrack[[]byte]("sei")
|
s.Tracks.SEI = &track.Channel[[]byte]{}
|
||||||
s.Tracks.SEI.Locker = &sync.Mutex{}
|
s.Info("sei track added")
|
||||||
s.Tracks.SEI.SetStuff(s)
|
|
||||||
if s.Tracks.Add("sei", s.Tracks.SEI) {
|
|
||||||
s.Info("sei track added")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
v.Resolve()
|
v.Resolve()
|
||||||
|
@@ -125,9 +125,14 @@ func (s *Subscribers) Find(id string) ISubscriber {
|
|||||||
|
|
||||||
func (s *Subscribers) Delete(suber ISubscriber) {
|
func (s *Subscribers) Delete(suber ISubscriber) {
|
||||||
io := suber.GetSubscriber()
|
io := suber.GetSubscriber()
|
||||||
|
io.TrackPlayer.Audio = nil
|
||||||
|
io.TrackPlayer.AudioReader = nil
|
||||||
|
io.TrackPlayer.Video = nil
|
||||||
|
io.TrackPlayer.VideoReader = nil
|
||||||
for _, reader := range io.readers {
|
for _, reader := range io.readers {
|
||||||
reader.Track.Debug("reader -1", zap.Int32("count", reader.Track.ReaderCount.Add(-1)))
|
reader.Track.Debug("reader -1", zap.Int32("count", reader.Track.ReaderCount.Add(-1)))
|
||||||
}
|
}
|
||||||
|
io.readers = nil
|
||||||
if _, ok := s.public[suber]; ok {
|
if _, ok := s.public[suber]; ok {
|
||||||
delete(s.public, suber)
|
delete(s.public, suber)
|
||||||
io.Info("suber -1", zap.Int("remains", s.Len()))
|
io.Info("suber -1", zap.Int("remains", s.Len()))
|
||||||
|
24
track/channel.go
Normal file
24
track/channel.go
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
package track
|
||||||
|
|
||||||
|
type Channel[T any] struct {
|
||||||
|
listeners []chan T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Channel[T]) CreateReader(l int) chan T {
|
||||||
|
c := make(chan T, l)
|
||||||
|
r.listeners = append(r.listeners, c)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Channel[T]) AddListener(c chan T) {
|
||||||
|
r.listeners = append(r.listeners, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Channel[T]) Write(data T) {
|
||||||
|
for _, listener := range r.listeners {
|
||||||
|
if len(listener) == cap(listener) {
|
||||||
|
<-listener
|
||||||
|
}
|
||||||
|
listener <- data
|
||||||
|
}
|
||||||
|
}
|
@@ -129,7 +129,7 @@ func (r *AVRingReader) ReadFrame(mode int) (err error) {
|
|||||||
// r.Delay = uint32((r.Track.LastValue.Timestamp - r.Value.Timestamp).Milliseconds())
|
// r.Delay = uint32((r.Track.LastValue.Timestamp - r.Value.Timestamp).Milliseconds())
|
||||||
r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence)
|
r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence)
|
||||||
// fmt.Println(r.Track.Name, r.Delay)
|
// fmt.Println(r.Track.Name, r.Delay)
|
||||||
// fmt.Println(r.Track.Name, r.State, r.Value.Timestamp, r.SkipTs, r.AbsTime)
|
// fmt.Println(r.Track.Name, r.Value.Sequence, r.Delay, r.AbsTime)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (r *AVRingReader) GetPTS32() uint32 {
|
func (r *AVRingReader) GetPTS32() uint32 {
|
||||||
|
@@ -22,9 +22,9 @@ type Video struct {
|
|||||||
lostFlag bool // 是否丢帧
|
lostFlag bool // 是否丢帧
|
||||||
codec.SPSInfo
|
codec.SPSInfo
|
||||||
ParamaterSets `json:"-" yaml:"-"`
|
ParamaterSets `json:"-" yaml:"-"`
|
||||||
SPS []byte `json:"-" yaml:"-"`
|
SPS []byte `json:"-" yaml:"-"`
|
||||||
PPS []byte `json:"-" yaml:"-"`
|
PPS []byte `json:"-" yaml:"-"`
|
||||||
SEIReader *DataReader[[]byte] `json:"-" yaml:"-"`
|
SEIReader chan []byte `json:"-" yaml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Video) Attach() {
|
func (v *Video) Attach() {
|
||||||
@@ -249,15 +249,16 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) {
|
|||||||
|
|
||||||
func (vt *Video) Flush() {
|
func (vt *Video) Flush() {
|
||||||
rv := vt.Value
|
rv := vt.Value
|
||||||
if vt.SEIReader != nil {
|
if vt.SEIReader != nil && len(vt.SEIReader) > 0 {
|
||||||
if seiFrame, err := vt.SEIReader.TryRead(); seiFrame != nil {
|
for seiFrame := range vt.SEIReader {
|
||||||
var au util.BLL
|
var au util.BLL
|
||||||
au.Push(vt.SpesificTrack.GetNALU_SEI())
|
au.Push(vt.SpesificTrack.GetNALU_SEI())
|
||||||
au.Push(vt.BytesPool.GetShell(seiFrame.Data))
|
au.Push(vt.BytesPool.GetShell(seiFrame))
|
||||||
vt.Info("sei", zap.Int("len", len(seiFrame.Data)))
|
vt.Info("sei", zap.Int("len", len(seiFrame)))
|
||||||
vt.Value.AUList.UnshiftValue(&au)
|
vt.Value.AUList.UnshiftValue(&au)
|
||||||
} else if err != nil {
|
if len(vt.SEIReader) == 0 {
|
||||||
vt.SEIReader = nil
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if rv.IFrame {
|
if rv.IFrame {
|
||||||
|
Reference in New Issue
Block a user