fix:移除Second类型,优化订阅者代码

This commit is contained in:
dexter
2022-04-16 17:03:48 +08:00
parent 5d5f92bdce
commit 4c1e39afa1
12 changed files with 177 additions and 136 deletions

View File

@@ -126,7 +126,7 @@ type AVFrame[T RawSlice] struct {
FLV net.Buffers // 打包好的FLV Tag FLV net.Buffers // 打包好的FLV Tag
AVCC net.Buffers // 打包好的AVCC格式 AVCC net.Buffers // 打包好的AVCC格式
RTP []*RTPFrame RTP []*RTPFrame
Raw []T //裸数据 Raw []T // 裸数据
canRead bool canRead bool
} }

View File

@@ -9,6 +9,7 @@ import (
type Track interface { type Track interface {
GetName() string GetName() string
LastWriteTime() time.Time LastWriteTime() time.Time
} }
type AVTrack interface { type AVTrack interface {

View File

@@ -5,7 +5,6 @@ import (
"net/http" "net/http"
"reflect" "reflect"
"strings" "strings"
"time"
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4/log" "m7s.live/engine/v4/log"
@@ -13,12 +12,6 @@ import (
type Config map[string]any type Config map[string]any
type Second int
func (s Second) Duration() time.Duration {
return time.Duration(s) * time.Second
}
type Plugin interface { type Plugin interface {
// 可能的入参类型FirstConfig 第一次初始化配置Config 后续配置更新SE系列StateEvent流状态变化事件 // 可能的入参类型FirstConfig 第一次初始化配置Config 后续配置更新SE系列StateEvent流状态变化事件
OnEvent(any) OnEvent(any)

View File

@@ -32,8 +32,8 @@ type Publish struct {
PubAudio bool PubAudio bool
PubVideo bool PubVideo bool
KickExist bool // 是否踢掉已经存在的发布者 KickExist bool // 是否踢掉已经存在的发布者
PublishTimeout Second // 发布无数据超时 PublishTimeout int // 发布无数据超时
WaitCloseTimeout Second // 延迟自动关闭(无订阅时) WaitCloseTimeout int // 延迟自动关闭(无订阅时)
} }
func (c *Publish) GetPublishConfig() *Publish { func (c *Publish) GetPublishConfig() *Publish {
@@ -44,7 +44,7 @@ type Subscribe struct {
SubAudio bool SubAudio bool
SubVideo bool SubVideo bool
IFrameOnly bool // 只要关键帧 IFrameOnly bool // 只要关键帧
WaitTimeout Second // 等待流超时 WaitTimeout int // 等待流超时
} }
func (c *Subscribe) GetSubscribeConfig() *Subscribe { func (c *Subscribe) GetSubscribeConfig() *Subscribe {

6
io.go
View File

@@ -111,7 +111,7 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) error {
wt := time.Second * 5 wt := time.Second * 5
var c any = conf var c any = conf
if v, ok := c.(*config.Subscribe); ok { if v, ok := c.(*config.Subscribe); ok {
wt = v.WaitTimeout.Duration() wt = util.Second2Duration(v.WaitTimeout)
} }
if io.Context == nil { if io.Context == nil {
io.Context, io.CancelFunc = context.WithCancel(Engine) io.Context, io.CancelFunc = context.WithCancel(Engine)
@@ -134,8 +134,8 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) error {
return BadNameErr return BadNameErr
} }
} }
s.PublishTimeout = v.PublishTimeout.Duration() s.PublishTimeout = util.Second2Duration(v.PublishTimeout)
s.WaitCloseTimeout = v.WaitCloseTimeout.Duration() s.WaitCloseTimeout = util.Second2Duration(v.WaitCloseTimeout)
} else if create { } else if create {
EventBus <- s //通知发布者按需拉流 EventBus <- s //通知发布者按需拉流
} }

View File

@@ -301,7 +301,7 @@ func (s *Stream) run() {
io.Spesic = suber io.Spesic = suber
s.Subscribers = append(s.Subscribers, suber) s.Subscribers = append(s.Subscribers, suber)
sbConfig := io.Config sbConfig := io.Config
if wt := sbConfig.WaitTimeout.Duration(); wt > s.WaitTimeout { if wt := util.Second2Duration(sbConfig.WaitTimeout); wt > s.WaitTimeout {
s.WaitTimeout = wt s.WaitTimeout = wt
} }
io.Stream = s io.Stream = s

View File

@@ -71,13 +71,29 @@ type ISubscriber interface {
PlayBlock() PlayBlock()
Stop() Stop()
} }
type PlayContext[T interface {
GetDecConfSeq() int
ReadRing() *AVRing[R]
}, R RawSlice] struct {
Track T
ring *AVRing[R]
confSeq int
First AVFrame[R]
}
func (p *PlayContext[T, R]) init(t T) {
p.Track = t
p.ring = t.ReadRing()
}
func (p *PlayContext[T, R]) decConfChanged() bool {
return p.confSeq != p.Track.GetDecConfSeq()
}
type TrackPlayer struct { type TrackPlayer struct {
context.Context context.Context
context.CancelFunc context.CancelFunc
AudioTrack *track.Audio Audio PlayContext[*track.Audio, AudioSlice]
VideoTrack *track.Video Video PlayContext[*track.Video, NALUSlice]
vr *AVRing[NALUSlice]
ar *AVRing[AudioSlice]
} }
// Subscriber 订阅者实体定义 // Subscriber 订阅者实体定义
@@ -89,10 +105,10 @@ type Subscriber struct {
func (s *Subscriber) OnEvent(event any) { func (s *Subscriber) OnEvent(event any) {
switch v := event.(type) { switch v := event.(type) {
case TrackRemoved: case TrackRemoved:
if a, ok := v.Track.(*track.Audio); ok && a == s.AudioTrack { if a, ok := v.Track.(*track.Audio); ok && a == s.Audio.Track {
s.ar = nil s.Audio.ring = nil
} else if v, ok := v.Track.(*track.Video); ok && v == s.VideoTrack { } else if v, ok := v.Track.(*track.Video); ok && v == s.Video.Track {
s.vr = nil s.Video.ring = nil
} }
case Track: //默认接受所有track case Track: //默认接受所有track
s.AddTrack(v) s.AddTrack(v)
@@ -104,17 +120,15 @@ func (s *Subscriber) OnEvent(event any) {
func (s *Subscriber) AddTrack(t Track) bool { func (s *Subscriber) AddTrack(t Track) bool {
switch v := t.(type) { switch v := t.(type) {
case *track.Video: case *track.Video:
if s.VideoTrack != nil || !s.Config.SubVideo { if s.Video.Track != nil || !s.Config.SubVideo {
return false return false
} }
s.VideoTrack = v s.Video.init(v)
s.vr = v.ReadRing()
case *track.Audio: case *track.Audio:
if s.AudioTrack != nil || !s.Config.SubAudio { if s.Audio.Track != nil || !s.Config.SubAudio {
return false return false
} }
s.AudioTrack = v s.Audio.init(v)
s.ar = v.ReadRing()
case *track.Data: case *track.Data:
default: default:
return false return false
@@ -127,86 +141,84 @@ func (s *Subscriber) IsPlaying() bool {
return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil
} }
// func (s *Subscriber) Stop() { // 非阻塞式读取通过反复调用返回的函数可以尝试读取数据读取到数据后会调用OnEvent这种模式自由的在不同的goroutine中调用
// if s.IsPlaying() { // func (s *Subscriber) Play(spesic ISubscriber) func() error {
// s.TrackPlayer.CancelFunc() // s.Info("play")
// var confSeqa, confSeqv int
// var t time.Time
// var startTime time.Time //读到第一个关键帧的时间
// var firstIFrame *VideoFrame //起始关键帧
// var audioSent bool //音频是否发送过
// s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO)
// ctx := s.TrackPlayer.Context
// var nextRoundReadAudio bool //下一次读取音频
// return func() error {
// if ctx.Err() != nil {
// return ctx.Err()
// }
// if !nextRoundReadAudio || s.ar == nil {
// if s.Video.ring != nil {
// if startTime.IsZero() {
// startTime = time.Now()
// firstIFrame = (*VideoFrame)(s.Video.ring.Read(ctx)) // 这里阻塞读取为0耗时
// s.FirstVideo = *firstIFrame
// s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence))
// if ctx.Err() != nil {
// return ctx.Err()
// }
// confSeqv = s.VideoTrack.DecoderConfiguration.Seq
// spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration))
// spesic.OnEvent(firstIFrame)
// s.Video.ring.MoveNext()
// if firstIFrame.Timestamp.After(t) {
// t = firstIFrame.Timestamp
// }
// return nil
// } else if firstIFrame == nil {
// if vp := (s.Video.ring.TryRead()); vp != nil {
// if vp.IFrame && confSeqv != s.VideoTrack.DecoderConfiguration.Seq {
// confSeqv = s.VideoTrack.DecoderConfiguration.Seq
// spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration))
// }
// spesic.OnEvent((*VideoFrame)(vp))
// s.Video.ring.MoveNext()
// // 如果本次读取的视频时间戳比较大,下次给音频一个机会
// if nextRoundReadAudio = vp.Timestamp.After(t); nextRoundReadAudio {
// t = vp.Timestamp
// }
// return nil
// }
// }
// } else if s.Config.SubVideo && (s.Stream == nil || s.Stream.Publisher == nil || s.Stream.Publisher.GetConfig().PubVideo) {
// // 如果订阅了视频需要等待视频轨道
// // TODO: 如果发布配置了视频,订阅配置了视频,但是实际上没有视频,需要处理播放纯音频
// return nil
// }
// }
// // 正常模式下或者纯音频模式下,音频开始播放
// if s.ar != nil && firstIFrame == nil {
// if !audioSent {
// confSeqa = s.AudioTrack.DecoderConfiguration.Seq
// spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration))
// audioSent = true
// }
// if ap := s.ar.TryRead(); ap != nil {
// if s.AudioTrack.DecoderConfiguration.Raw != nil && confSeqa != s.AudioTrack.DecoderConfiguration.Seq {
// spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration))
// }
// spesic.OnEvent(ap)
// s.ar.MoveNext()
// // 这次如果音频比较大,则下次读取给视频一个机会
// if nextRoundReadAudio = !ap.Timestamp.After(t); !nextRoundReadAudio {
// t = ap.Timestamp
// }
// return nil
// }
// }
// return nil
// } // }
// } // }
// 非阻塞式读取通过反复调用返回的函数可以尝试读取数据读取到数据后会调用OnEvent这种模式自由的在不同的goroutine中调用
func (s *Subscriber) Play(spesic ISubscriber) func() error {
s.Info("play")
var confSeqa, confSeqv int
var t time.Time
var startTime time.Time //读到第一个关键帧的时间
var firstIFrame *VideoFrame //起始关键帧
var audioSent bool //音频是否发送过
s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO)
ctx := s.TrackPlayer.Context
var nextRoundReadAudio bool //下一次读取音频
return func() error {
if ctx.Err() != nil {
return ctx.Err()
}
if !nextRoundReadAudio || s.ar == nil {
if s.vr != nil {
if startTime.IsZero() {
startTime = time.Now()
firstIFrame = (*VideoFrame)(s.vr.Read(ctx)) // 这里阻塞读取为0耗时
s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence))
if ctx.Err() != nil {
return ctx.Err()
}
confSeqv = s.VideoTrack.DecoderConfiguration.Seq
spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration))
spesic.OnEvent(firstIFrame)
s.vr.MoveNext()
if firstIFrame.Timestamp.After(t) {
t = firstIFrame.Timestamp
}
return nil
} else if firstIFrame == nil {
if vp := (s.vr.TryRead()); vp != nil {
if vp.IFrame && confSeqv != s.VideoTrack.DecoderConfiguration.Seq {
confSeqv = s.VideoTrack.DecoderConfiguration.Seq
spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration))
}
spesic.OnEvent((*VideoFrame)(vp))
s.vr.MoveNext()
// 如果本次读取的视频时间戳比较大,下次给音频一个机会
if nextRoundReadAudio = vp.Timestamp.After(t); nextRoundReadAudio {
t = vp.Timestamp
}
return nil
}
}
} else if s.Config.SubVideo && (s.Stream == nil || s.Stream.Publisher == nil || s.Stream.Publisher.GetConfig().PubVideo) {
// 如果订阅了视频需要等待视频轨道
// TODO: 如果发布配置了视频,订阅配置了视频,但是实际上没有视频,需要处理播放纯音频
return nil
}
}
// 正常模式下或者纯音频模式下,音频开始播放
if s.ar != nil && firstIFrame == nil {
if !audioSent {
confSeqa = s.AudioTrack.DecoderConfiguration.Seq
spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration))
audioSent = true
}
if ap := s.ar.TryRead(); ap != nil {
spesic.OnEvent(ap)
s.ar.MoveNext()
// 这次如果音频比较大,则下次读取给视频一个机会
if nextRoundReadAudio = !ap.Timestamp.After(t); !nextRoundReadAudio {
t = ap.Timestamp
}
return nil
}
}
return nil
}
}
//PlayBlock 阻塞式读取数据 //PlayBlock 阻塞式读取数据
func (s *Subscriber) PlayBlock() { func (s *Subscriber) PlayBlock() {
spesic := s.Spesic spesic := s.Spesic
@@ -216,49 +228,52 @@ func (s *Subscriber) PlayBlock() {
} }
s.Info("playblock") s.Info("playblock")
var t time.Time var t time.Time
var startTime time.Time //读到第一个关键帧的时间 var startTime time.Time //读到第一个关键帧的时间
var firstIFrame *VideoFrame //起始关键帧 var normal bool //正常模式——已追上正常的进度
var audioSent bool //音频是否发送过 var audioSent bool //音频是否发送过
s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO)
ctx := s.TrackPlayer.Context ctx := s.TrackPlayer.Context
defer s.Info("stop") defer s.Info("stop")
for ctx.Err() == nil { for ctx.Err() == nil {
if s.vr != nil { if s.Video.ring != nil {
if startTime.IsZero() { if startTime.IsZero() {
startTime = time.Now() startTime = time.Now()
firstIFrame = (*VideoFrame)(s.vr.Read(ctx)) s.Video.First = *(s.Video.ring.Read(ctx)) //不使用指针存储,为了永久保留数据
s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence)) s.Debug("firstIFrame", zap.Uint32("seq", s.Video.First.Sequence))
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }
spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration)) s.sendVideoDecConf()
} }
for { for {
var vp *VideoFrame var vp *VideoFrame
// 如果进入正常模式 // 如果进入正常模式
if firstIFrame == nil { if normal {
vp = (*VideoFrame)(s.vr.Read(ctx)) vp = (*VideoFrame)(s.Video.ring.Read(ctx))
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }
if vp.IFrame && s.Video.decConfChanged() {
s.sendVideoDecConf()
}
spesic.OnEvent(vp) spesic.OnEvent(vp)
s.vr.MoveNext() s.Video.ring.MoveNext()
} else { } else {
if s.VideoTrack.IDRing.Value.Sequence != firstIFrame.Sequence { if s.Video.Track.IDRing.Value.Sequence != s.Video.First.Sequence {
firstIFrame = nil normal = true
s.vr = s.VideoTrack.ReadRing() s.Video.ring = s.Video.Track.ReadRing()
s.Debug("skip to latest key frame", zap.Uint32("seq", s.VideoTrack.IDRing.Value.Sequence)) s.Debug("skip to latest key frame", zap.Uint32("seq", s.Video.Track.IDRing.Value.Sequence))
continue continue
} else { } else {
vp = (*VideoFrame)(s.vr.Read(ctx)) vp = (*VideoFrame)(s.Video.ring.Read(ctx))
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }
spesic.OnEvent(vp) spesic.OnEvent(vp)
if fast := time.Duration(vp.AbsTime-firstIFrame.AbsTime)*time.Millisecond - time.Since(startTime); fast > 0 { if fast := time.Duration(vp.AbsTime-s.Video.First.AbsTime)*time.Millisecond - time.Since(startTime); fast > 0 {
time.Sleep(fast) time.Sleep(fast)
} }
s.vr.MoveNext() s.Video.ring.MoveNext()
} }
} }
if vp.Timestamp.After(t) { if vp.Timestamp.After(t) {
@@ -272,20 +287,23 @@ func (s *Subscriber) PlayBlock() {
continue continue
} }
// 正常模式下或者纯音频模式下,音频开始播放 // 正常模式下或者纯音频模式下,音频开始播放
if s.ar != nil && firstIFrame == nil { if s.Audio.ring != nil && normal {
if !audioSent { if !audioSent {
if s.AudioTrack.IsAAC() { if s.Audio.Track.IsAAC() {
spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration)) s.sendAudioDecConf()
} }
audioSent = true audioSent = true
} }
for { for {
ap := (*AudioFrame)(s.ar.Read(ctx)) ap := (*AudioFrame)(s.Audio.ring.Read(ctx))
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }
if s.Audio.Track.IsAAC() && s.Audio.decConfChanged() {
s.sendAudioDecConf()
}
spesic.OnEvent(ap) spesic.OnEvent(ap)
s.ar.MoveNext() s.Audio.ring.MoveNext()
if ap.Timestamp.After(t) { if ap.Timestamp.After(t) {
t = ap.Timestamp t = ap.Timestamp
break break
@@ -295,6 +313,15 @@ func (s *Subscriber) PlayBlock() {
} }
} }
func (s *Subscriber) sendVideoDecConf() {
s.Video.confSeq = s.Video.Track.DecoderConfiguration.Seq
s.Spesic.OnEvent(VideoDeConf(s.Video.Track.DecoderConfiguration))
}
func (s *Subscriber) sendAudioDecConf() {
s.Audio.confSeq = s.Audio.Track.DecoderConfiguration.Seq
s.Spesic.OnEvent(AudioDeConf(s.Audio.Track.DecoderConfiguration))
}
type IPusher interface { type IPusher interface {
ISubscriber ISubscriber
Push() Push()

View File

@@ -59,11 +59,7 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
if frame.IsSequence() { if frame.IsSequence() {
aac.DecoderConfiguration.AVCC = net.Buffers{frame} aac.DecoderConfiguration.AVCC = net.Buffers{frame}
config1, config2 := frame[2], frame[3] config1, config2 := frame[2], frame[3]
//audioObjectType = (config1 & 0xF8) >> 3 aac.Profile = (config1 & 0xF8) >> 3
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4
// 2 AAC LC ISO/IEC 14496-3 subpart 4
// 3 AAC SSR ISO/IEC 14496-3 subpart 4
// 4 AAC LTP ISO/IEC 14496-3 subpart 4
aac.Channels = ((config2 >> 3) & 0x0F) //声道 aac.Channels = ((config2 >> 3) & 0x0F) //声道
aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)])
aac.DecoderConfiguration.Raw = AudioSlice(frame[2:]) aac.DecoderConfiguration.Raw = AudioSlice(frame[2:])

View File

@@ -17,12 +17,20 @@ type Audio struct {
CodecID codec.AudioCodecID CodecID codec.AudioCodecID
Channels byte Channels byte
AVCCHead []byte // 音频包在AVCC格式中AAC会有两个字节其他的只有一个字节 AVCCHead []byte // 音频包在AVCC格式中AAC会有两个字节其他的只有一个字节
// Profile:
// 0: Main profile
// 1: Low Complexity profile(LC)
// 2: Scalable Sampling Rate profile(SSR)
// 3: Reserved
Profile byte
} }
func (a *Audio) IsAAC() bool { func (a *Audio) IsAAC() bool {
return a.CodecID == codec.CodecID_AAC return a.CodecID == codec.CodecID_AAC
} }
func (a *Audio) GetDecConfSeq() int {
return a.DecoderConfiguration.Seq
}
func (a *Audio) Attach() { func (a *Audio) Attach() {
a.Stream.AddTrack(a) a.Stream.AddTrack(a)
} }
@@ -41,10 +49,10 @@ func (a *Audio) GetInfo() *Audio {
} }
func (a *Audio) WriteADTS(adts []byte) { func (a *Audio) WriteADTS(adts []byte) {
profile := ((adts[2] & 0xc0) >> 6) + 1 a.Profile = ((adts[2] & 0xc0) >> 6) + 1
sampleRate := (adts[2] & 0x3c) >> 2 sampleRate := (adts[2] & 0x3c) >> 2
channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6) channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6)
config1 := (profile << 3) | ((sampleRate & 0xe) >> 1) config1 := (a.Profile << 3) | ((sampleRate & 0xe) >> 1)
config2 := ((sampleRate & 0x1) << 7) | (channel << 3) config2 := ((sampleRate & 0x1) << 7) | (channel << 3)
a.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) a.SampleRate = uint32(codec.SamplingFrequencies[sampleRate])
a.Channels = channel a.Channels = channel

View File

@@ -21,6 +21,9 @@ type Video struct {
idrCount int //缓存中包含的idr数量 idrCount int //缓存中包含的idr数量
} }
func (t *Video) GetDecConfSeq() int {
return t.DecoderConfiguration.Seq
}
func (t *Video) Attach() { func (t *Video) Attach() {
t.Stream.AddTrack(t) t.Stream.AddTrack(t)
} }

View File

@@ -87,6 +87,14 @@ func (b *Buffer) Glow(n int) {
*b = b.SubBuf(0, l) *b = b.SubBuf(0, l)
} }
// ConcatBuffers 合并碎片内存为一个完整内存
func ConcatBuffers[T ~[]byte](input []T) (out []byte) {
for _, v := range input {
out = append(out, v...)
}
return
}
// SizeOfBuffers 计算Buffers的内容长度 // SizeOfBuffers 计算Buffers的内容长度
func SizeOfBuffers[T ~[]byte](buf []T) (size int) { func SizeOfBuffers[T ~[]byte](buf []T) (size int) {
for _, b := range buf { for _, b := range buf {

View File

@@ -3,8 +3,13 @@ package util
import ( import (
"errors" "errors"
"io" "io"
"time"
) )
func Second2Duration(s int) time.Duration {
return time.Duration(s) * time.Second
}
/* /*
func ReadByteToUintX(r io.Reader, l int) (data uint64, err error) { func ReadByteToUintX(r io.Reader, l int) (data uint64, err error) {
if l%8 != 0 || l > 64 { if l%8 != 0 || l > 64 {