diff --git a/common/frame.go b/common/frame.go index 05fe689..2cddab5 100644 --- a/common/frame.go +++ b/common/frame.go @@ -126,7 +126,7 @@ type AVFrame[T RawSlice] struct { FLV net.Buffers // 打包好的FLV Tag AVCC net.Buffers // 打包好的AVCC格式 RTP []*RTPFrame - Raw []T //裸数据 + Raw []T // 裸数据 canRead bool } diff --git a/common/index.go b/common/index.go index 94dc234..de47902 100644 --- a/common/index.go +++ b/common/index.go @@ -9,6 +9,7 @@ import ( type Track interface { GetName() string LastWriteTime() time.Time + } type AVTrack interface { diff --git a/config/config.go b/config/config.go index 4f0c7f4..eaeb703 100644 --- a/config/config.go +++ b/config/config.go @@ -5,7 +5,6 @@ import ( "net/http" "reflect" "strings" - "time" "go.uber.org/zap" "m7s.live/engine/v4/log" @@ -13,12 +12,6 @@ import ( type Config map[string]any -type Second int - -func (s Second) Duration() time.Duration { - return time.Duration(s) * time.Second -} - type Plugin interface { // 可能的入参类型:FirstConfig 第一次初始化配置,Config 后续配置更新,SE系列(StateEvent)流状态变化事件 OnEvent(any) diff --git a/config/types.go b/config/types.go index f7df432..28b5eb0 100644 --- a/config/types.go +++ b/config/types.go @@ -32,8 +32,8 @@ type Publish struct { PubAudio bool PubVideo bool KickExist bool // 是否踢掉已经存在的发布者 - PublishTimeout Second // 发布无数据超时 - WaitCloseTimeout Second // 延迟自动关闭(无订阅时) + PublishTimeout int // 发布无数据超时 + WaitCloseTimeout int // 延迟自动关闭(无订阅时) } func (c *Publish) GetPublishConfig() *Publish { @@ -44,7 +44,7 @@ type Subscribe struct { SubAudio bool SubVideo bool IFrameOnly bool // 只要关键帧 - WaitTimeout Second // 等待流超时 + WaitTimeout int // 等待流超时 } func (c *Subscribe) GetSubscribeConfig() *Subscribe { diff --git a/io.go b/io.go index 5b1ce40..d29644a 100644 --- a/io.go +++ b/io.go @@ -111,7 +111,7 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) error { wt := time.Second * 5 var c any = conf if v, ok := c.(*config.Subscribe); ok { - wt = v.WaitTimeout.Duration() + wt = util.Second2Duration(v.WaitTimeout) } if io.Context == nil { io.Context, io.CancelFunc = context.WithCancel(Engine) @@ -134,8 +134,8 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) error { return BadNameErr } } - s.PublishTimeout = v.PublishTimeout.Duration() - s.WaitCloseTimeout = v.WaitCloseTimeout.Duration() + s.PublishTimeout = util.Second2Duration(v.PublishTimeout) + s.WaitCloseTimeout = util.Second2Duration(v.WaitCloseTimeout) } else if create { EventBus <- s //通知发布者按需拉流 } diff --git a/stream.go b/stream.go index 4ba9da8..12fa0a4 100644 --- a/stream.go +++ b/stream.go @@ -301,7 +301,7 @@ func (s *Stream) run() { io.Spesic = suber s.Subscribers = append(s.Subscribers, suber) sbConfig := io.Config - if wt := sbConfig.WaitTimeout.Duration(); wt > s.WaitTimeout { + if wt := util.Second2Duration(sbConfig.WaitTimeout); wt > s.WaitTimeout { s.WaitTimeout = wt } io.Stream = s diff --git a/subscriber.go b/subscriber.go index 15d45a2..38eeacb 100644 --- a/subscriber.go +++ b/subscriber.go @@ -71,13 +71,29 @@ type ISubscriber interface { PlayBlock() Stop() } +type PlayContext[T interface { + GetDecConfSeq() int + ReadRing() *AVRing[R] +}, R RawSlice] struct { + Track T + ring *AVRing[R] + confSeq int + First AVFrame[R] +} + +func (p *PlayContext[T, R]) init(t T) { + p.Track = t + p.ring = t.ReadRing() +} +func (p *PlayContext[T, R]) decConfChanged() bool { + return p.confSeq != p.Track.GetDecConfSeq() +} + type TrackPlayer struct { context.Context context.CancelFunc - AudioTrack *track.Audio - VideoTrack *track.Video - vr *AVRing[NALUSlice] - ar *AVRing[AudioSlice] + Audio PlayContext[*track.Audio, AudioSlice] + Video PlayContext[*track.Video, NALUSlice] } // Subscriber 订阅者实体定义 @@ -89,10 +105,10 @@ type Subscriber struct { func (s *Subscriber) OnEvent(event any) { switch v := event.(type) { case TrackRemoved: - if a, ok := v.Track.(*track.Audio); ok && a == s.AudioTrack { - s.ar = nil - } else if v, ok := v.Track.(*track.Video); ok && v == s.VideoTrack { - s.vr = nil + if a, ok := v.Track.(*track.Audio); ok && a == s.Audio.Track { + s.Audio.ring = nil + } else if v, ok := v.Track.(*track.Video); ok && v == s.Video.Track { + s.Video.ring = nil } case Track: //默认接受所有track s.AddTrack(v) @@ -104,17 +120,15 @@ func (s *Subscriber) OnEvent(event any) { func (s *Subscriber) AddTrack(t Track) bool { switch v := t.(type) { case *track.Video: - if s.VideoTrack != nil || !s.Config.SubVideo { + if s.Video.Track != nil || !s.Config.SubVideo { return false } - s.VideoTrack = v - s.vr = v.ReadRing() + s.Video.init(v) case *track.Audio: - if s.AudioTrack != nil || !s.Config.SubAudio { + if s.Audio.Track != nil || !s.Config.SubAudio { return false } - s.AudioTrack = v - s.ar = v.ReadRing() + s.Audio.init(v) case *track.Data: default: return false @@ -127,86 +141,84 @@ func (s *Subscriber) IsPlaying() bool { return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil } -// func (s *Subscriber) Stop() { -// if s.IsPlaying() { -// s.TrackPlayer.CancelFunc() +// 非阻塞式读取,通过反复调用返回的函数可以尝试读取数据,读取到数据后会调用OnEvent,这种模式自由的在不同的goroutine中调用 +// func (s *Subscriber) Play(spesic ISubscriber) func() error { +// s.Info("play") +// var confSeqa, confSeqv int +// var t time.Time +// var startTime time.Time //读到第一个关键帧的时间 +// var firstIFrame *VideoFrame //起始关键帧 +// var audioSent bool //音频是否发送过 +// s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) +// ctx := s.TrackPlayer.Context +// var nextRoundReadAudio bool //下一次读取音频 +// return func() error { +// if ctx.Err() != nil { +// return ctx.Err() +// } +// if !nextRoundReadAudio || s.ar == nil { +// if s.Video.ring != nil { +// if startTime.IsZero() { +// startTime = time.Now() +// firstIFrame = (*VideoFrame)(s.Video.ring.Read(ctx)) // 这里阻塞读取为0耗时 +// s.FirstVideo = *firstIFrame +// s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence)) +// if ctx.Err() != nil { +// return ctx.Err() +// } +// confSeqv = s.VideoTrack.DecoderConfiguration.Seq +// spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration)) +// spesic.OnEvent(firstIFrame) +// s.Video.ring.MoveNext() +// if firstIFrame.Timestamp.After(t) { +// t = firstIFrame.Timestamp +// } +// return nil +// } else if firstIFrame == nil { +// if vp := (s.Video.ring.TryRead()); vp != nil { +// if vp.IFrame && confSeqv != s.VideoTrack.DecoderConfiguration.Seq { +// confSeqv = s.VideoTrack.DecoderConfiguration.Seq +// spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration)) +// } +// spesic.OnEvent((*VideoFrame)(vp)) +// s.Video.ring.MoveNext() +// // 如果本次读取的视频时间戳比较大,下次给音频一个机会 +// if nextRoundReadAudio = vp.Timestamp.After(t); nextRoundReadAudio { +// t = vp.Timestamp +// } +// return nil +// } +// } +// } else if s.Config.SubVideo && (s.Stream == nil || s.Stream.Publisher == nil || s.Stream.Publisher.GetConfig().PubVideo) { +// // 如果订阅了视频需要等待视频轨道 +// // TODO: 如果发布配置了视频,订阅配置了视频,但是实际上没有视频,需要处理播放纯音频 +// return nil +// } +// } +// // 正常模式下或者纯音频模式下,音频开始播放 +// if s.ar != nil && firstIFrame == nil { +// if !audioSent { +// confSeqa = s.AudioTrack.DecoderConfiguration.Seq +// spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration)) +// audioSent = true +// } +// if ap := s.ar.TryRead(); ap != nil { +// if s.AudioTrack.DecoderConfiguration.Raw != nil && confSeqa != s.AudioTrack.DecoderConfiguration.Seq { +// spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration)) +// } +// spesic.OnEvent(ap) +// s.ar.MoveNext() +// // 这次如果音频比较大,则下次读取给视频一个机会 +// if nextRoundReadAudio = !ap.Timestamp.After(t); !nextRoundReadAudio { +// t = ap.Timestamp +// } +// return nil +// } +// } +// return nil // } // } -// 非阻塞式读取,通过反复调用返回的函数可以尝试读取数据,读取到数据后会调用OnEvent,这种模式自由的在不同的goroutine中调用 -func (s *Subscriber) Play(spesic ISubscriber) func() error { - s.Info("play") - var confSeqa, confSeqv int - var t time.Time - var startTime time.Time //读到第一个关键帧的时间 - var firstIFrame *VideoFrame //起始关键帧 - var audioSent bool //音频是否发送过 - s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) - ctx := s.TrackPlayer.Context - var nextRoundReadAudio bool //下一次读取音频 - return func() error { - if ctx.Err() != nil { - return ctx.Err() - } - if !nextRoundReadAudio || s.ar == nil { - if s.vr != nil { - if startTime.IsZero() { - startTime = time.Now() - firstIFrame = (*VideoFrame)(s.vr.Read(ctx)) // 这里阻塞读取为0耗时 - s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence)) - if ctx.Err() != nil { - return ctx.Err() - } - confSeqv = s.VideoTrack.DecoderConfiguration.Seq - spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration)) - spesic.OnEvent(firstIFrame) - s.vr.MoveNext() - if firstIFrame.Timestamp.After(t) { - t = firstIFrame.Timestamp - } - return nil - } else if firstIFrame == nil { - if vp := (s.vr.TryRead()); vp != nil { - if vp.IFrame && confSeqv != s.VideoTrack.DecoderConfiguration.Seq { - confSeqv = s.VideoTrack.DecoderConfiguration.Seq - spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration)) - } - spesic.OnEvent((*VideoFrame)(vp)) - s.vr.MoveNext() - // 如果本次读取的视频时间戳比较大,下次给音频一个机会 - if nextRoundReadAudio = vp.Timestamp.After(t); nextRoundReadAudio { - t = vp.Timestamp - } - return nil - } - } - } else if s.Config.SubVideo && (s.Stream == nil || s.Stream.Publisher == nil || s.Stream.Publisher.GetConfig().PubVideo) { - // 如果订阅了视频需要等待视频轨道 - // TODO: 如果发布配置了视频,订阅配置了视频,但是实际上没有视频,需要处理播放纯音频 - return nil - } - } - // 正常模式下或者纯音频模式下,音频开始播放 - if s.ar != nil && firstIFrame == nil { - if !audioSent { - confSeqa = s.AudioTrack.DecoderConfiguration.Seq - spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration)) - audioSent = true - } - if ap := s.ar.TryRead(); ap != nil { - spesic.OnEvent(ap) - s.ar.MoveNext() - // 这次如果音频比较大,则下次读取给视频一个机会 - if nextRoundReadAudio = !ap.Timestamp.After(t); !nextRoundReadAudio { - t = ap.Timestamp - } - return nil - } - } - return nil - } -} - //PlayBlock 阻塞式读取数据 func (s *Subscriber) PlayBlock() { spesic := s.Spesic @@ -216,49 +228,52 @@ func (s *Subscriber) PlayBlock() { } s.Info("playblock") var t time.Time - var startTime time.Time //读到第一个关键帧的时间 - var firstIFrame *VideoFrame //起始关键帧 - var audioSent bool //音频是否发送过 + var startTime time.Time //读到第一个关键帧的时间 + var normal bool //正常模式——已追上正常的进度 + var audioSent bool //音频是否发送过 s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) ctx := s.TrackPlayer.Context defer s.Info("stop") for ctx.Err() == nil { - if s.vr != nil { + if s.Video.ring != nil { if startTime.IsZero() { startTime = time.Now() - firstIFrame = (*VideoFrame)(s.vr.Read(ctx)) - s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence)) + s.Video.First = *(s.Video.ring.Read(ctx)) //不使用指针存储,为了永久保留数据 + s.Debug("firstIFrame", zap.Uint32("seq", s.Video.First.Sequence)) if ctx.Err() != nil { return } - spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration)) + s.sendVideoDecConf() } for { var vp *VideoFrame // 如果进入正常模式 - if firstIFrame == nil { - vp = (*VideoFrame)(s.vr.Read(ctx)) + if normal { + vp = (*VideoFrame)(s.Video.ring.Read(ctx)) if ctx.Err() != nil { return } + if vp.IFrame && s.Video.decConfChanged() { + s.sendVideoDecConf() + } spesic.OnEvent(vp) - s.vr.MoveNext() + s.Video.ring.MoveNext() } else { - if s.VideoTrack.IDRing.Value.Sequence != firstIFrame.Sequence { - firstIFrame = nil - s.vr = s.VideoTrack.ReadRing() - s.Debug("skip to latest key frame", zap.Uint32("seq", s.VideoTrack.IDRing.Value.Sequence)) + if s.Video.Track.IDRing.Value.Sequence != s.Video.First.Sequence { + normal = true + s.Video.ring = s.Video.Track.ReadRing() + s.Debug("skip to latest key frame", zap.Uint32("seq", s.Video.Track.IDRing.Value.Sequence)) continue } else { - vp = (*VideoFrame)(s.vr.Read(ctx)) + vp = (*VideoFrame)(s.Video.ring.Read(ctx)) if ctx.Err() != nil { return } spesic.OnEvent(vp) - if fast := time.Duration(vp.AbsTime-firstIFrame.AbsTime)*time.Millisecond - time.Since(startTime); fast > 0 { + if fast := time.Duration(vp.AbsTime-s.Video.First.AbsTime)*time.Millisecond - time.Since(startTime); fast > 0 { time.Sleep(fast) } - s.vr.MoveNext() + s.Video.ring.MoveNext() } } if vp.Timestamp.After(t) { @@ -272,20 +287,23 @@ func (s *Subscriber) PlayBlock() { continue } // 正常模式下或者纯音频模式下,音频开始播放 - if s.ar != nil && firstIFrame == nil { + if s.Audio.ring != nil && normal { if !audioSent { - if s.AudioTrack.IsAAC() { - spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration)) + if s.Audio.Track.IsAAC() { + s.sendAudioDecConf() } audioSent = true } for { - ap := (*AudioFrame)(s.ar.Read(ctx)) + ap := (*AudioFrame)(s.Audio.ring.Read(ctx)) if ctx.Err() != nil { return } + if s.Audio.Track.IsAAC() && s.Audio.decConfChanged() { + s.sendAudioDecConf() + } spesic.OnEvent(ap) - s.ar.MoveNext() + s.Audio.ring.MoveNext() if ap.Timestamp.After(t) { t = ap.Timestamp break @@ -295,6 +313,15 @@ func (s *Subscriber) PlayBlock() { } } +func (s *Subscriber) sendVideoDecConf() { + s.Video.confSeq = s.Video.Track.DecoderConfiguration.Seq + s.Spesic.OnEvent(VideoDeConf(s.Video.Track.DecoderConfiguration)) +} +func (s *Subscriber) sendAudioDecConf() { + s.Audio.confSeq = s.Audio.Track.DecoderConfiguration.Seq + s.Spesic.OnEvent(AudioDeConf(s.Audio.Track.DecoderConfiguration)) +} + type IPusher interface { ISubscriber Push() diff --git a/track/aac.go b/track/aac.go index dd3473e..0e67ee7 100644 --- a/track/aac.go +++ b/track/aac.go @@ -59,11 +59,7 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { if frame.IsSequence() { aac.DecoderConfiguration.AVCC = net.Buffers{frame} config1, config2 := frame[2], frame[3] - //audioObjectType = (config1 & 0xF8) >> 3 - // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 - // 2 AAC LC ISO/IEC 14496-3 subpart 4 - // 3 AAC SSR ISO/IEC 14496-3 subpart 4 - // 4 AAC LTP ISO/IEC 14496-3 subpart 4 + aac.Profile = (config1 & 0xF8) >> 3 aac.Channels = ((config2 >> 3) & 0x0F) //声道 aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) aac.DecoderConfiguration.Raw = AudioSlice(frame[2:]) diff --git a/track/audio.go b/track/audio.go index cf002d3..9f0b9b3 100644 --- a/track/audio.go +++ b/track/audio.go @@ -17,12 +17,20 @@ type Audio struct { CodecID codec.AudioCodecID Channels byte AVCCHead []byte // 音频包在AVCC格式中,AAC会有两个字节,其他的只有一个字节 + // Profile: + // 0: Main profile + // 1: Low Complexity profile(LC) + // 2: Scalable Sampling Rate profile(SSR) + // 3: Reserved + Profile byte } func (a *Audio) IsAAC() bool { return a.CodecID == codec.CodecID_AAC } - +func (a *Audio) GetDecConfSeq() int { + return a.DecoderConfiguration.Seq +} func (a *Audio) Attach() { a.Stream.AddTrack(a) } @@ -41,10 +49,10 @@ func (a *Audio) GetInfo() *Audio { } func (a *Audio) WriteADTS(adts []byte) { - profile := ((adts[2] & 0xc0) >> 6) + 1 + a.Profile = ((adts[2] & 0xc0) >> 6) + 1 sampleRate := (adts[2] & 0x3c) >> 2 channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6) - config1 := (profile << 3) | ((sampleRate & 0xe) >> 1) + config1 := (a.Profile << 3) | ((sampleRate & 0xe) >> 1) config2 := ((sampleRate & 0x1) << 7) | (channel << 3) a.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) a.Channels = channel diff --git a/track/video.go b/track/video.go index 2b6e969..a1c7647 100644 --- a/track/video.go +++ b/track/video.go @@ -21,6 +21,9 @@ type Video struct { idrCount int //缓存中包含的idr数量 } +func (t *Video) GetDecConfSeq() int { + return t.DecoderConfiguration.Seq +} func (t *Video) Attach() { t.Stream.AddTrack(t) } diff --git a/util/buffer.go b/util/buffer.go index 3aa02cd..636f9f0 100644 --- a/util/buffer.go +++ b/util/buffer.go @@ -87,6 +87,14 @@ func (b *Buffer) Glow(n int) { *b = b.SubBuf(0, l) } +// ConcatBuffers 合并碎片内存为一个完整内存 +func ConcatBuffers[T ~[]byte](input []T) (out []byte) { + for _, v := range input { + out = append(out, v...) + } + return +} + // SizeOfBuffers 计算Buffers的内容长度 func SizeOfBuffers[T ~[]byte](buf []T) (size int) { for _, b := range buf { diff --git a/util/convert.go b/util/convert.go index 2bf429b..9a90dab 100644 --- a/util/convert.go +++ b/util/convert.go @@ -3,8 +3,13 @@ package util import ( "errors" "io" + "time" ) +func Second2Duration(s int) time.Duration { + return time.Duration(s) * time.Second +} + /* func ReadByteToUintX(r io.Reader, l int) (data uint64, err error) { if l%8 != 0 || l > 64 {