diff --git a/common/ring_lock.go b/common/ring_lock.go index 2ddb293..b437658 100644 --- a/common/ring_lock.go +++ b/common/ring_lock.go @@ -22,21 +22,21 @@ func (lr *LockRing[T]) Init(n int) *LockRing[T] { } lr.RingBuffer.Init(n) lr.Flag = &flag - lr.Value.Lock() + lr.RingBuffer.Value.Lock() return lr } func (rb *LockRing[T]) Read() *DataFrame[T] { - current := &rb.Value + current := &rb.RingBuffer.Value current.RLock() defer current.RUnlock() return ¤t.DataFrame } func (rb *LockRing[T]) Step() { - last := &rb.Value + last := &rb.RingBuffer.Value if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) { - current := rb.MoveNext() + current := rb.RingBuffer.MoveNext() current.Lock() last.Unlock() //Flag不为1代表被Dispose了,但尚未处理Done @@ -47,10 +47,10 @@ func (rb *LockRing[T]) Step() { } func (rb *LockRing[T]) Write(value T) { - last := &rb.Value + last := &rb.RingBuffer.Value last.Value = value if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) { - current := rb.MoveNext() + current := rb.RingBuffer.MoveNext() current.Lock() last.Unlock() //Flag不为1代表被Dispose了,但尚未处理Done @@ -61,7 +61,7 @@ func (rb *LockRing[T]) Write(value T) { } func (rb *LockRing[T]) Dispose() { - current := &rb.Value + current := &rb.RingBuffer.Value if atomic.CompareAndSwapInt32(rb.Flag, 0, 2) { current.Unlock() } else if atomic.CompareAndSwapInt32(rb.Flag, 1, 2) { diff --git a/config/config.go b/config/config.go index 32058d3..25665db 100644 --- a/config/config.go +++ b/config/config.go @@ -28,6 +28,11 @@ type HTTPPlugin interface { } func (config Config) Unmarshal(s any) { + defer func() { + if err := recover(); err != nil { + log.Error("Unmarshal error:", err) + } + }() if s == nil { return } diff --git a/main.go b/main.go index 56665f3..001f344 100755 --- a/main.go +++ b/main.go @@ -60,6 +60,8 @@ func Run(ctx context.Context, configFile string) (err error) { Engine.RawConfig = cg.GetChild("global") //将配置信息同步到结构体 Engine.RawConfig.Unmarshal(config.Global) + } else { + log.Error("parsing yml error:", err) } } Engine.Logger = log.With(zap.Bool("engine", true)) diff --git a/publisher.go b/publisher.go index fe8a70f..2e62e3b 100644 --- a/publisher.go +++ b/publisher.go @@ -86,7 +86,7 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) { } a := track.NewAAC(p.Stream) p.AudioTrack = a - a.SampleSize = 16 + a.Audio.SampleSize = 16 a.AVCCHead = []byte{frame[0], 1} a.WriteAVCC(0, frame) case codec.CodecID_PCMA, @@ -97,10 +97,10 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) { } a := track.NewG711(p.Stream, alaw) p.AudioTrack = a - a.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2]) - a.SampleSize = 16 + a.Audio.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2]) + a.Audio.SampleSize = 16 if frame[0]&0x02 == 0 { - a.SampleSize = 8 + a.Audio.SampleSize = 8 } a.Channels = frame[0]&0x01 + 1 a.AVCCHead = frame[:1] @@ -160,15 +160,15 @@ func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream) { switch s.StreamType { case mpegts.STREAM_TYPE_H264: if t.VideoTrack == nil { - t.VideoTrack = track.NewH264(t.Stream) + t.VideoTrack = track.NewH264(t.Publisher.Stream) } case mpegts.STREAM_TYPE_H265: if t.VideoTrack == nil { - t.VideoTrack = track.NewH265(t.Stream) + t.VideoTrack = track.NewH265(t.Publisher.Stream) } case mpegts.STREAM_TYPE_AAC: if t.AudioTrack == nil { - t.AudioTrack = track.NewAAC(t.Stream) + t.AudioTrack = track.NewAAC(t.Publisher.Stream) } default: t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType)) diff --git a/subscriber.go b/subscriber.go index 4c14523..be893f2 100644 --- a/subscriber.go +++ b/subscriber.go @@ -3,13 +3,12 @@ package engine import ( "context" "encoding/json" - "net" - "time" - "go.uber.org/zap" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" "m7s.live/engine/v4/track" + "net" + "time" ) type HaveFLV interface { @@ -254,7 +253,7 @@ func (s *Subscriber) PlayBlock() { var vp *VideoFrame // 如果进入正常模式 if normal { - vp = (*VideoFrame)(s.Video.ring.Read(ctx)) + vp = (*VideoFrame)(&(*s.Video.ring.Read(ctx))) if ctx.Err() != nil { return } @@ -270,7 +269,7 @@ func (s *Subscriber) PlayBlock() { s.Debug("skip to latest key frame", zap.Uint32("seq", s.Video.Track.IDRing.Value.Sequence)) continue } else { - vp = (*VideoFrame)(s.Video.ring.Read(ctx)) + vp = (*VideoFrame)(&(*s.Video.ring.Read(ctx))) if ctx.Err() != nil { return } @@ -300,7 +299,7 @@ func (s *Subscriber) PlayBlock() { audioSent = true } for { - ap := (*AudioFrame)(s.Audio.ring.Read(ctx)) + ap := (*AudioFrame)(&(*s.Audio.ring.Read(ctx))) if ctx.Err() != nil { return } diff --git a/track/aac.go b/track/aac.go index 0411444..82014db 100644 --- a/track/aac.go +++ b/track/aac.go @@ -14,16 +14,16 @@ import ( func NewAAC(stream IStream) (aac *AAC) { aac = &AAC{} - aac.Name = "aac" - aac.Stream = stream + aac.Audio.Name = "aac" + aac.Audio.Stream = stream aac.CodecID = codec.CodecID_AAC aac.Init(32) - aac.Poll = time.Millisecond * 10 + aac.Audio.Media.Poll = time.Millisecond * 10 aac.AVCCHead = []byte{0xAF, 1} - aac.SampleSize = 16 - aac.DecoderConfiguration.PayloadType = 97 + aac.Audio.SampleSize = 16 + aac.Audio.DecoderConfiguration.PayloadType = 97 if config.Global.RTPReorder { - aac.orderQueue = make([]*RTPFrame, 20) + aac.Audio.orderQueue = make([]*RTPFrame, 20) } return } @@ -50,7 +50,7 @@ func (aac *AAC) writeRTPFrame(frame *RTPFrame) { for _, payload := range codec.ParseRTPAAC(frame.Payload) { aac.WriteSlice(payload) } - aac.Value.AppendRTP(frame) + aac.Audio.Media.AVRing.RingBuffer.Value.AppendRTP(frame) if frame.Marker { aac.generateTimestamp() aac.Flush() @@ -60,16 +60,16 @@ func (aac *AAC) writeRTPFrame(frame *RTPFrame) { func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { if frame.IsSequence() { if len(frame) < 2 { - aac.Stream.Error("AVCC sequence header too short", zap.ByteString("data", frame)) + aac.Audio.Stream.Error("AVCC sequence header too short", zap.ByteString("data", frame)) return } - aac.DecoderConfiguration.AVCC = net.Buffers{frame} + aac.Audio.DecoderConfiguration.AVCC = net.Buffers{frame} config1, config2 := frame[2], frame[3] aac.Profile = (config1 & 0xF8) >> 3 aac.Channels = ((config2 >> 3) & 0x0F) //声道 - aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) - aac.DecoderConfiguration.Raw = AudioSlice(frame[2:]) - aac.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2} + aac.Audio.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) + aac.Audio.DecoderConfiguration.Raw = AudioSlice(frame[2:]) + aac.Audio.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2} aac.Attach() } else { aac.Audio.WriteAVCC(ts, frame) @@ -80,8 +80,9 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { func (aac *AAC) Flush() { // RTP格式补完 // TODO: MTU 分割 - if aac.Value.RTP == nil && config.Global.EnableRTP { - l := util.SizeOfBuffers(aac.Value.Raw) + value := aac.Audio.Media.RingBuffer.Value + if value.RTP == nil && config.Global.EnableRTP { + l := util.SizeOfBuffers(value.Raw) o := make([]byte, 4, l+4) //AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度;又因为单个auheader字节长度2字节,所以再除以2就是auheader的个数。 o[0] = 0x00 //高位 @@ -89,7 +90,7 @@ func (aac *AAC) Flush() { //AU_HEADER o[2] = (byte)((l & 0x1fe0) >> 5) //高位 o[3] = (byte)((l & 0x1f) << 3) //低位 - for _, raw := range aac.Value.Raw { + for _, raw := range value.Raw { o = append(o, raw...) } aac.PacketizeRTP(o) diff --git a/track/audio.go b/track/audio.go index 60742be..3b0a6b1 100644 --- a/track/audio.go +++ b/track/audio.go @@ -68,19 +68,20 @@ func (a *Audio) WriteADTS(adts []byte) { func (a *Audio) Flush() { // AVCC 格式补完 - if len(a.Value.AVCC) == 0 && (config.Global.EnableAVCC || config.Global.EnableFLV) { - a.Value.AppendAVCC(a.AVCCHead) - for _, raw := range a.Value.Raw { - a.Value.AppendAVCC(raw) + value := a.Media.RingBuffer.Value + if len(value.AVCC) == 0 && (config.Global.EnableAVCC || config.Global.EnableFLV) { + value.AppendAVCC(a.AVCCHead) + for _, raw := range value.Raw { + value.AppendAVCC(raw) } } // FLV tag 补完 - if len(a.Value.FLV) == 0 && config.Global.EnableFLV { - a.Value.FillFLV(codec.FLV_TAG_TYPE_AUDIO, a.Value.AbsTime) + if len(value.FLV) == 0 && config.Global.EnableFLV { + value.FillFLV(codec.FLV_TAG_TYPE_AUDIO, value.AbsTime) } - if a.Value.RTP == nil && config.Global.EnableRTP { + if value.RTP == nil && config.Global.EnableRTP { var o []byte - for _, raw := range a.Value.Raw { + for _, raw := range value.Raw { o = append(o, raw...) } a.PacketizeRTP(o) diff --git a/track/data.go b/track/data.go index cf45013..7831b25 100644 --- a/track/data.go +++ b/track/data.go @@ -20,7 +20,7 @@ func (d *Data) ReadRing() *LockRing[any] { } func (d *Data) LastWriteTime() time.Time { - return d.LockRing.PreValue().Timestamp + return d.LockRing.RingBuffer.PreValue().Timestamp } func (dt *Data) Push(data any) { diff --git a/track/g711.go b/track/g711.go index 9b64688..ed0f76a 100644 --- a/track/g711.go +++ b/track/g711.go @@ -12,25 +12,25 @@ import ( func NewG711(stream IStream, alaw bool) (g711 *G711) { g711 = &G711{} if alaw { - g711.Name = "pcma" + g711.Audio.Name = "pcma" } else { - g711.Name = "pcmu" + g711.Audio.Name = "pcmu" } - g711.Stream = stream + g711.Audio.Stream = stream if alaw { - g711.CodecID = codec.CodecID_PCMA + g711.Audio.CodecID = codec.CodecID_PCMA } else { - g711.CodecID = codec.CodecID_PCMU + g711.Audio.CodecID = codec.CodecID_PCMU } - g711.Init(32) - g711.Poll = time.Millisecond * 10 - g711.DecoderConfiguration.PayloadType = 97 + g711.Audio.Init(32) + g711.Audio.Media.Poll = time.Millisecond * 10 + g711.Audio.DecoderConfiguration.PayloadType = 97 if config.Global.RTPReorder { - g711.orderQueue = make([]*RTPFrame, 20) + g711.Audio.orderQueue = make([]*RTPFrame, 20) } - g711.SampleSize = 8 - g711.SampleRate = 8000 - g711.Attach() + g711.Audio.SampleSize = 8 + g711.Audio.SampleRate = 8000 + g711.Audio.Attach() return } @@ -60,7 +60,7 @@ func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) { func (g711 *G711) writeRTPFrame(frame *RTPFrame) { g711.WriteSlice(frame.Payload) - g711.Value.AppendRTP(frame) + g711.Audio.Media.AVRing.RingBuffer.Value.AppendRTP(frame) if frame.Marker { g711.generateTimestamp() g711.Flush()