This commit is contained in:
dexter
2024-03-31 23:23:11 +08:00
parent 9936fe980d
commit ae2a18aff6
5 changed files with 16 additions and 11 deletions

View File

@@ -1,5 +1,5 @@
global: global:
loglevel: info loglevel: debug
rtmp: rtmp:
publish: publish:
# pubvideo: false # pubvideo: false

View File

@@ -38,7 +38,7 @@ func (r *AVRingReader) DecConfChanged() bool {
} }
func NewAVRingReader(t *AVTrack) *AVRingReader { func NewAVRingReader(t *AVTrack) *AVRingReader {
// t.Debug("reader +1", zap.Int32("count", t.ReaderCount.Add(1))) t.Debug("reader +1", "count", t.ReaderCount.Add(1))
return &AVRingReader{ return &AVRingReader{
Track: t, Track: t,
} }
@@ -51,7 +51,7 @@ func (r *AVRingReader) readFrame() (err error) {
} }
// 超过一半的缓冲区大小说明Reader太慢需要丢帧 // 超过一半的缓冲区大小说明Reader太慢需要丢帧
if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Value.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Value.Sequence { if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Value.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Value.Sequence {
// r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Value.Sequence)) r.Warn("reader too slow", "lastSeq", r.Track.LastValue.Sequence, "seq", r.Value.Sequence)
return r.Read(r.Track.IDRing) return r.Read(r.Track.IDRing)
} }
return return
@@ -61,12 +61,12 @@ func (r *AVRingReader) ReadFrame(mode int) (err error) {
r.mode = mode r.mode = mode
switch r.State { switch r.State {
case READSTATE_INIT: case READSTATE_INIT:
// r.Info("start read", zap.Int("mode", mode)) r.Info("start read", "mode", mode)
startRing := r.Track.Ring startRing := r.Track.Ring
if r.Track.IDRing != nil { if r.Track.IDRing != nil {
startRing = r.Track.IDRing startRing = r.Track.IDRing
} else { } else {
// r.Warn("no IDRring") r.Warn("no IDRring")
} }
switch mode { switch mode {
case SUBMODE_REAL: case SUBMODE_REAL:
@@ -92,14 +92,14 @@ func (r *AVRingReader) ReadFrame(mode int) (err error) {
} }
r.SkipTs = r.FirstTs - r.StartTs r.SkipTs = r.FirstTs - r.StartTs
r.FirstSeq = r.Value.Sequence r.FirstSeq = r.Value.Sequence
// r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq)) r.Info("first frame read", "firstTs", r.FirstTs, "firstSeq", r.FirstSeq)
case READSTATE_FIRST: case READSTATE_FIRST:
if r.Track.IDRing.Value.Sequence != r.FirstSeq { if r.Track.IDRing.Value.Sequence != r.FirstSeq {
if err = r.Read(r.Track.IDRing); err != nil { if err = r.Read(r.Track.IDRing); err != nil {
return return
} }
r.SkipTs = r.Value.Timestamp - r.beforeJump - r.StartTs - 10*time.Millisecond r.SkipTs = r.Value.Timestamp - r.beforeJump - r.StartTs - 10*time.Millisecond
// r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs)) r.Info("jump", "skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq, "skipTs", r.SkipTs)
r.State = READSTATE_NORMAL r.State = READSTATE_NORMAL
} else { } else {
if err = r.readFrame(); err != nil { if err = r.readFrame(); err != nil {

View File

@@ -79,6 +79,7 @@ func (p *Publisher) RemoveSubscriber(subscriber *Subscriber) (err error) {
func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) { func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
subscriber.Publisher = p
p.Subscribers[subscriber] = struct{}{} p.Subscribers[subscriber] = struct{}{}
switch p.State { switch p.State {
case PublisherStateTrackAdded, PublisherStateWaitSubscriber: case PublisherStateTrackAdded, PublisherStateWaitSubscriber:

View File

@@ -54,7 +54,7 @@ func Run(ctx context.Context, conf any) error {
} }
func (s *Server) Run(ctx context.Context, conf any) (err error) { func (s *Server) Run(ctx context.Context, conf any) (err error) {
s.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})).With("server", serverIndexG.Add(1)) s.Logger = slog.With("server", serverIndexG.Add(1))
s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx) s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx)
s.config.HTTP.ListenAddrTLS = ":8443" s.config.HTTP.ListenAddrTLS = ":8443"
s.config.HTTP.ListenAddr = ":8080" s.config.HTTP.ListenAddr = ":8080"
@@ -179,7 +179,9 @@ func (s *Server) eventLoop() {
} }
func (s *Server) onUnsubscribe(subscriber *Subscriber) { func (s *Server) onUnsubscribe(subscriber *Subscriber) {
if subscriber.Publisher != nil {
subscriber.Publisher.RemoveSubscriber(subscriber) subscriber.Publisher.RemoveSubscriber(subscriber)
}
} }
func (s *Server) onUnpublish(publisher *Publisher) { func (s *Server) onUnpublish(publisher *Publisher) {

View File

@@ -83,7 +83,8 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
} }
if at := s.Publisher.GetAudioTrack(a1); at != nil { if at := s.Publisher.GetAudioTrack(a1); at != nil {
ar = NewAVRingReader(at) ar = NewAVRingReader(at)
ar.Logger = s.Logger.With("reader", a1.Name()) ar.Logger = s.Logger.With("reader", a1.String())
ar.Info("start read")
ah = reflect.ValueOf(audioHandler) ah = reflect.ValueOf(audioHandler)
} }
} }
@@ -93,7 +94,8 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
} }
if vt := s.Publisher.GetVideoTrack(v1); vt != nil { if vt := s.Publisher.GetVideoTrack(v1); vt != nil {
vr = NewAVRingReader(vt) vr = NewAVRingReader(vt)
vr.Logger = s.Logger.With("reader", v1.Name()) vr.Logger = s.Logger.With("reader", v1.String())
vr.Info("start read")
vh = reflect.ValueOf(videoHandler) vh = reflect.ValueOf(videoHandler)
} }
} }