mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-30 19:36:25 +08:00
1、当发布者离线时没有订阅者时,流的超时时间从1s改为10ms。
2、Track增加离线状态,当发布者离线时,Track状态改为离线状态,当发布者重新上线时,Track状态改为在线状态。 3、Track在恢复在线后,记录时间戳的差值,保持后续时间戳和之前的连续。 4、进一步优化订阅者读取音视频同步逻辑。
This commit is contained in:
@@ -5,6 +5,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtp"
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4/log"
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
@@ -12,19 +14,27 @@ type TimelineData[T any] struct {
|
||||
Timestamp time.Time
|
||||
Value T
|
||||
}
|
||||
type TrackState byte
|
||||
|
||||
const (
|
||||
TrackStateOnline TrackState = iota // 上线
|
||||
TrackStateOffline // 下线
|
||||
)
|
||||
|
||||
// Base 基础Track类
|
||||
type Base struct {
|
||||
Name string
|
||||
log.Zap `json:"-"`
|
||||
Stream IStream `json:"-"`
|
||||
Attached atomic.Bool `json:"-"`
|
||||
State TrackState
|
||||
ts time.Time
|
||||
bytes int
|
||||
frames int
|
||||
BPS int
|
||||
FPS int
|
||||
RawPart []int // 裸数据片段用于UI上显示
|
||||
RawSize int // 裸数据长度
|
||||
RawPart []int // 裸数据片段用于UI上显示
|
||||
BPSs []TimelineData[int] // 10s码率统计
|
||||
FPSs []TimelineData[int] // 10s帧率统计
|
||||
}
|
||||
@@ -39,7 +49,7 @@ func (bt *Base) ComputeBPS(bytes int) {
|
||||
bt.frames = 0
|
||||
bt.ts = time.Now()
|
||||
bt.BPSs = append(bt.BPSs, TimelineData[int]{Timestamp: bt.ts, Value: bt.BPS})
|
||||
if len(bt.BPSs) > 10 {
|
||||
if len(bt.BPSs) > 9 {
|
||||
copy(bt.BPSs, bt.BPSs[1:])
|
||||
bt.BPSs = bt.BPSs[:10]
|
||||
}
|
||||
@@ -61,6 +71,17 @@ func (bt *Base) Flush(bf *BaseFrame) {
|
||||
bf.Timestamp = time.Now()
|
||||
}
|
||||
func (bt *Base) SetStuff(stuff ...any) {
|
||||
for _, s := range stuff {
|
||||
switch v := s.(type) {
|
||||
case IStream:
|
||||
bt.Stream = v
|
||||
bt.Zap = v.With(zap.String("track", bt.Name))
|
||||
case TrackState:
|
||||
bt.State = v
|
||||
case string:
|
||||
bt.Name = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Track interface {
|
||||
|
||||
18
stream.go
18
stream.go
@@ -161,8 +161,16 @@ type Tracks struct {
|
||||
}
|
||||
|
||||
func (tracks *Tracks) Add(name string, t Track) bool {
|
||||
if v, ok := t.(*track.Video); ok && tracks.MainVideo == nil {
|
||||
switch v := t.(type) {
|
||||
case *track.Video:
|
||||
if tracks.MainVideo == nil {
|
||||
tracks.MainVideo = v
|
||||
tracks.SetIDR(v)
|
||||
}
|
||||
case *track.Audio:
|
||||
if tracks.MainVideo != nil {
|
||||
v.Narrow()
|
||||
}
|
||||
}
|
||||
return tracks.Map.Add(name, t)
|
||||
}
|
||||
@@ -281,16 +289,18 @@ func (r *Stream) action(action StreamAction) (ok bool) {
|
||||
waitTime := time.Duration(0)
|
||||
if r.Publisher != nil {
|
||||
waitTime = r.Publisher.GetPublisher().Config.WaitCloseTimeout
|
||||
r.Tracks.Range(func(name string, t Track) {
|
||||
t.SetStuff(TrackStateOffline)
|
||||
})
|
||||
}
|
||||
r.Subscribers.OnPublisherLost(event)
|
||||
suber := r.Subscribers.Pick()
|
||||
if suber != nil {
|
||||
if suber := r.Subscribers.Pick(); suber != nil {
|
||||
r.Subscribers.Broadcast(stateEvent)
|
||||
if waitTime == 0 {
|
||||
waitTime = suber.GetSubscriber().Config.WaitTimeout
|
||||
}
|
||||
} else if waitTime == 0 {
|
||||
waitTime = time.Second //没有订阅者也没有配置发布者等待重连时间,默认1秒后关闭流
|
||||
waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流
|
||||
}
|
||||
r.timeout.Reset(waitTime)
|
||||
case STATE_PUBLISHING:
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4/codec"
|
||||
@@ -168,25 +167,26 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
||||
return
|
||||
}
|
||||
s.Info("playblock")
|
||||
var lastPlayAbsTime uint32 //最新的音频或者视频的时间戳,用于音视频同步
|
||||
s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO)
|
||||
ctx := s.TrackPlayer.Context
|
||||
sendVideoDecConf := func() {
|
||||
s.Debug("sendVideoDecConf")
|
||||
spesic.OnEvent(s.Video.ParamaterSets)
|
||||
spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead))
|
||||
}
|
||||
sendAudioDecConf := func() {
|
||||
s.Debug("sendAudioDecConf")
|
||||
spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead))
|
||||
}
|
||||
var sendAudioFrame, sendVideoFrame func(*AVFrame)
|
||||
switch subType {
|
||||
case SUBTYPE_RAW:
|
||||
sendVideoFrame = func(frame *AVFrame) {
|
||||
// println("v", frame.Sequence, s.VideoReader.AbsTime, frame.IFrame)
|
||||
// println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
|
||||
spesic.OnEvent(VideoFrame{frame, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipTs*90, frame.DTS - s.VideoReader.SkipTs*90})
|
||||
}
|
||||
sendAudioFrame = func(frame *AVFrame) {
|
||||
// println("a", frame.Sequence, s.AudioReader.AbsTime)
|
||||
// println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
|
||||
spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, s.AudioReader.AbsTime * 90, s.AudioReader.AbsTime * 90})
|
||||
}
|
||||
case SUBTYPE_RTP:
|
||||
@@ -254,54 +254,46 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
||||
subMode, _ = strconv.Atoi(s.Args.Get(s.Config.SubModeArgName))
|
||||
}
|
||||
var videoFrame, audioFrame *AVFrame
|
||||
for ctx.Err() == nil {
|
||||
var lastAbsTime uint32
|
||||
hasVideo, hasAudio := s.VideoReader.Track != nil && s.Config.SubVideo, s.AudioReader.Track != nil && s.Config.SubAudio
|
||||
if !hasAudio && !hasVideo {
|
||||
s.Error("play neither video nor audio")
|
||||
return
|
||||
}
|
||||
for ctx.Err() == nil {
|
||||
if hasVideo {
|
||||
if videoFrame != nil {
|
||||
if videoFrame.CanRead {
|
||||
sendVideoFrame(videoFrame)
|
||||
}
|
||||
videoFrame = nil
|
||||
}
|
||||
for ctx.Err() == nil {
|
||||
s.VideoReader.Read(ctx, subMode)
|
||||
frame := s.VideoReader.Frame
|
||||
// println("video", frame.Sequence, frame.AbsTime)
|
||||
if frame == nil || ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if frame.IFrame && s.VideoReader.DecConfChanged() {
|
||||
s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq
|
||||
sendVideoDecConf()
|
||||
}
|
||||
if audioFrame != nil {
|
||||
if frame.AbsTime > audioFrame.AbsTime {
|
||||
if frame.AbsTime > lastAbsTime {
|
||||
if audioFrame.CanRead {
|
||||
sendAudioFrame(audioFrame)
|
||||
}
|
||||
audioFrame = nil
|
||||
videoFrame = frame
|
||||
lastAbsTime = frame.AbsTime
|
||||
break
|
||||
}
|
||||
}
|
||||
if frame.IFrame && s.VideoReader.DecConfChanged() {
|
||||
s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq
|
||||
// println(s.Video.confSeq, s.Video.Track.SPSInfo.Width, s.Video.Track.SPSInfo.Height)
|
||||
sendVideoDecConf()
|
||||
}
|
||||
if !s.Config.IFrameOnly || frame.IFrame {
|
||||
if frame.AbsTime > lastPlayAbsTime {
|
||||
lastPlayAbsTime = frame.AbsTime
|
||||
} else if lastAbsTime == 0 {
|
||||
if lastAbsTime = frame.AbsTime; lastAbsTime != 0 {
|
||||
videoFrame = frame
|
||||
break
|
||||
} else {
|
||||
sendVideoFrame(frame)
|
||||
}
|
||||
}
|
||||
if !s.Config.IFrameOnly || frame.IFrame {
|
||||
sendVideoFrame(frame)
|
||||
}
|
||||
}
|
||||
}
|
||||
// 正常模式下或者纯音频模式下,音频开始播放
|
||||
if hasAudio {
|
||||
if audioFrame != nil {
|
||||
if audioFrame.CanRead {
|
||||
sendAudioFrame(audioFrame)
|
||||
}
|
||||
audioFrame = nil
|
||||
}
|
||||
for ctx.Err() == nil {
|
||||
switch s.AudioReader.State {
|
||||
case track.READSTATE_INIT:
|
||||
@@ -319,30 +311,25 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
||||
if frame == nil || ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if videoFrame != nil {
|
||||
if frame.AbsTime > videoFrame.AbsTime {
|
||||
if videoFrame.CanRead {
|
||||
sendVideoFrame(videoFrame)
|
||||
}
|
||||
videoFrame = nil
|
||||
}
|
||||
}
|
||||
if s.AudioReader.DecConfChanged() {
|
||||
s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq
|
||||
sendAudioDecConf()
|
||||
}
|
||||
if frame.AbsTime > lastPlayAbsTime {
|
||||
lastPlayAbsTime = frame.AbsTime
|
||||
if videoFrame != nil {
|
||||
if frame.AbsTime > lastAbsTime {
|
||||
if videoFrame.CanRead {
|
||||
sendVideoFrame(videoFrame)
|
||||
}
|
||||
audioFrame = frame
|
||||
lastAbsTime = frame.AbsTime
|
||||
break
|
||||
} else {
|
||||
}
|
||||
}
|
||||
if frame.AbsTime >= s.AudioReader.SkipTs {
|
||||
sendAudioFrame(frame)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !hasVideo && !hasAudio {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,7 +58,7 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) {
|
||||
if aac.lack > 0 {
|
||||
rawLen := aac.Value.AUList.ByteLength
|
||||
if rawLen == 0 {
|
||||
aac.Stream.Error("lack >0 but rawlen=0")
|
||||
aac.Error("lack >0 but rawlen=0")
|
||||
}
|
||||
last := aac.Value.AUList.Pre
|
||||
auLen := len(frame.Payload) - startOffset
|
||||
@@ -67,7 +67,7 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) {
|
||||
aac.lack -= auLen
|
||||
return
|
||||
} else if aac.lack < auLen {
|
||||
aac.Stream.Warn("lack < auLen", zap.Int("lack", aac.lack), zap.Int("auLen", auLen))
|
||||
aac.Warn("lack < auLen", zap.Int("lack", aac.lack), zap.Int("auLen", auLen))
|
||||
}
|
||||
last.Value.Push(aac.BytesPool.GetShell(frame.Payload[startOffset : startOffset+aac.lack]))
|
||||
aac.lack = 0
|
||||
@@ -99,7 +99,7 @@ func (aac *AAC) WriteSequenceHead(sh []byte) {
|
||||
|
||||
func (aac *AAC) WriteAVCC(ts uint32, frame util.BLL) error {
|
||||
if l := frame.ByteLength; l < 4 {
|
||||
aac.Stream.Error("AVCC data too short", zap.Int("len", l))
|
||||
aac.Error("AVCC data too short", zap.Int("len", l))
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
if frame.GetByte(1) == 0 {
|
||||
|
||||
@@ -84,6 +84,7 @@ type Media struct {
|
||||
RTPMuxer
|
||||
RTPDemuxer
|
||||
SpesificTrack `json:"-"`
|
||||
deltaTs int64 //用于接续发布后时间戳连续
|
||||
流速控制
|
||||
}
|
||||
|
||||
@@ -106,10 +107,9 @@ func (av *Media) SetSpeedLimit(value time.Duration) {
|
||||
}
|
||||
|
||||
func (av *Media) SetStuff(stuff ...any) {
|
||||
// 代表发布者已经离线,该Track成为遗留Track,等待下一任发布者接续发布
|
||||
for _, s := range stuff {
|
||||
switch v := s.(type) {
|
||||
case string:
|
||||
av.Name = v
|
||||
case int:
|
||||
av.Init(v)
|
||||
av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
|
||||
@@ -118,12 +118,12 @@ func (av *Media) SetStuff(stuff ...any) {
|
||||
av.SampleRate = v
|
||||
case byte:
|
||||
av.PayloadType = v
|
||||
case IStream:
|
||||
av.Stream = v
|
||||
case util.BytesPool:
|
||||
av.BytesPool = v
|
||||
case SpesificTrack:
|
||||
av.SpesificTrack = v
|
||||
default:
|
||||
av.Base.SetStuff(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -189,6 +189,16 @@ func (av *Media) AddIDR() {
|
||||
|
||||
func (av *Media) Flush() {
|
||||
curValue, preValue, nextValue := &av.Value, av.LastValue, av.Next()
|
||||
if av.State == TrackStateOffline {
|
||||
av.State = TrackStateOnline
|
||||
av.deltaTs = int64(preValue.AbsTime) - int64(curValue.AbsTime) + int64(preValue.DeltaTime)
|
||||
av.Info("track back online")
|
||||
}
|
||||
if av.deltaTs != 0 {
|
||||
curValue.DTS = uint32(int64(curValue.DTS) + av.deltaTs*90)
|
||||
curValue.PTS = uint32(int64(curValue.PTS) + av.deltaTs*90)
|
||||
curValue.AbsTime = 0
|
||||
}
|
||||
bufferTime := av.Stream.GetPublisherConfig().BufferTime
|
||||
if bufferTime > 0 && av.IDRingList.Length > 1 && time.Duration(curValue.AbsTime-av.IDRingList.Next.Next.Value.Value.AbsTime)*time.Millisecond > bufferTime {
|
||||
av.ShiftIDR()
|
||||
|
||||
@@ -39,7 +39,7 @@ type G711 struct {
|
||||
|
||||
func (g711 *G711) WriteAVCC(ts uint32, frame util.BLL) error {
|
||||
if l := frame.ByteLength; l < 2 {
|
||||
g711.Stream.Error("AVCC data too short", zap.Int("len", l))
|
||||
g711.Error("AVCC data too short", zap.Int("len", l))
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
g711.Value.AUList.Push(g711.BytesPool.GetShell(frame.Next.Value[1:]))
|
||||
|
||||
@@ -68,13 +68,13 @@ func (vt *H264) WriteSliceBytes(slice []byte) {
|
||||
case codec.NALU_Access_Unit_Delimiter:
|
||||
case codec.NALU_Filler_Data:
|
||||
default:
|
||||
vt.Stream.Error("H264 WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType)))
|
||||
vt.Error("WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType)))
|
||||
}
|
||||
}
|
||||
|
||||
func (vt *H264) WriteAVCC(ts uint32, frame util.BLL) (err error) {
|
||||
if l := frame.ByteLength; l < 6 {
|
||||
vt.Stream.Error("AVCC data too short", zap.Int("len", l))
|
||||
vt.Error("AVCC data too short", zap.Int("len", l))
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
if frame.GetByte(1) == 0 {
|
||||
@@ -110,7 +110,7 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
|
||||
if buffer.Len() >= nextSize {
|
||||
vt.WriteSliceBytes(buffer.ReadN(nextSize))
|
||||
} else {
|
||||
vt.Stream.Error("invalid nalu size", zap.Int("naluType", int(naluType)))
|
||||
vt.Error("invalid nalu size", zap.Int("naluType", int(naluType)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,13 +58,13 @@ func (vt *H265) WriteSliceBytes(slice []byte) {
|
||||
case codec.NAL_UNIT_SEI:
|
||||
vt.AppendAuBytes(slice)
|
||||
default:
|
||||
vt.Video.Stream.Warn("h265 slice type not supported", zap.Uint("type", uint(t)))
|
||||
vt.Warn("h265 slice type not supported", zap.Uint("type", uint(t)))
|
||||
}
|
||||
}
|
||||
|
||||
func (vt *H265) WriteAVCC(ts uint32, frame util.BLL) (err error) {
|
||||
if l := frame.ByteLength; l < 6 {
|
||||
vt.Stream.Error("AVCC data too short", zap.Int("len", l))
|
||||
vt.Error("AVCC data too short", zap.Int("len", l))
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
if frame.GetByte(1) == 0 {
|
||||
@@ -74,7 +74,7 @@ func (vt *H265) WriteAVCC(ts uint32, frame util.BLL) (err error) {
|
||||
vt.SPSInfo, _ = codec.ParseHevcSPS(vt.SequenceHead)
|
||||
vt.nalulenSize = (int(vt.SequenceHead[26]) & 0x03) + 1
|
||||
} else {
|
||||
vt.Stream.Error("H265 ParseVpsSpsPps Error")
|
||||
vt.Error("H265 ParseVpsSpsPps Error")
|
||||
vt.Stream.Close()
|
||||
}
|
||||
return
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4/common"
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
@@ -64,9 +65,12 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
|
||||
r.ctx = ctx
|
||||
switch r.State {
|
||||
case READSTATE_INIT:
|
||||
r.Track.Info("start read", zap.Int("mode", mode))
|
||||
startRing := r.Track.Ring
|
||||
if r.Track.IDRing != nil {
|
||||
startRing = r.Track.IDRing
|
||||
} else {
|
||||
r.Track.Warn("no IDRring")
|
||||
}
|
||||
switch mode {
|
||||
case 0:
|
||||
@@ -94,6 +98,7 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
|
||||
}
|
||||
r.SkipTs = r.FirstTs
|
||||
r.FirstSeq = r.Frame.Sequence
|
||||
r.Track.Info("first frame read", zap.Uint32("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
|
||||
case READSTATE_FIRST:
|
||||
if r.Track.IDRing.Value.Sequence != r.FirstSeq {
|
||||
r.Ring = r.Track.IDRing
|
||||
@@ -102,6 +107,7 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
|
||||
return
|
||||
}
|
||||
r.SkipTs = frame.AbsTime - r.beforeJump
|
||||
r.Track.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Uint32("skipTs", r.SkipTs))
|
||||
r.State = READSTATE_NORMAL
|
||||
} else {
|
||||
r.MoveNext()
|
||||
@@ -117,5 +123,6 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
|
||||
r.ReadFrame()
|
||||
}
|
||||
r.AbsTime = r.Frame.AbsTime - r.SkipTs
|
||||
// println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ const RTPMTU = 1400
|
||||
|
||||
func (av *Media) UnmarshalRTPPacket(p *rtp.Packet) (frame *RTPFrame) {
|
||||
if av.PayloadType != p.PayloadType {
|
||||
av.Stream.Warn("RTP PayloadType error", zap.Uint8("want", av.PayloadType), zap.Uint8("got", p.PayloadType))
|
||||
av.Warn("RTP PayloadType error", zap.Uint8("want", av.PayloadType), zap.Uint8("got", p.PayloadType))
|
||||
return
|
||||
}
|
||||
frame = &RTPFrame{
|
||||
@@ -26,7 +26,7 @@ func (av *Media) UnmarshalRTP(raw []byte) (frame *RTPFrame) {
|
||||
var p rtp.Packet
|
||||
err := p.Unmarshal(raw)
|
||||
if err != nil {
|
||||
av.Stream.Warn("RTP Unmarshal error", zap.Error(err))
|
||||
av.Warn("RTP Unmarshal error", zap.Error(err))
|
||||
return
|
||||
}
|
||||
return av.UnmarshalRTPPacket(&p)
|
||||
|
||||
@@ -282,9 +282,10 @@ func (p BytesPool) Get(size int) (item *ListItem[Buffer]) {
|
||||
type Pool[T any] List[T]
|
||||
|
||||
func (p *Pool[T]) Get() (item *ListItem[T]) {
|
||||
item = (*List[T])(p).PoolShift()
|
||||
if item == nil {
|
||||
item = &ListItem[T]{}
|
||||
if item = (*List[T])(p).PoolShift(); item == nil {
|
||||
item = &ListItem[T]{
|
||||
Pool: (*List[T])(p),
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user