1.修复了config文件解析错误无法提示的问题

2.改成了goland支持的语法
This commit is contained in:
charlestamz
2022-05-31 17:27:48 +08:00
parent 8ba12de307
commit 524a68cfbe
9 changed files with 65 additions and 57 deletions

View File

@@ -22,21 +22,21 @@ func (lr *LockRing[T]) Init(n int) *LockRing[T] {
}
lr.RingBuffer.Init(n)
lr.Flag = &flag
lr.Value.Lock()
lr.RingBuffer.Value.Lock()
return lr
}
func (rb *LockRing[T]) Read() *DataFrame[T] {
current := &rb.Value
current := &rb.RingBuffer.Value
current.RLock()
defer current.RUnlock()
return &current.DataFrame
}
func (rb *LockRing[T]) Step() {
last := &rb.Value
last := &rb.RingBuffer.Value
if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) {
current := rb.MoveNext()
current := rb.RingBuffer.MoveNext()
current.Lock()
last.Unlock()
//Flag不为1代表被Dispose了但尚未处理Done
@@ -47,10 +47,10 @@ func (rb *LockRing[T]) Step() {
}
func (rb *LockRing[T]) Write(value T) {
last := &rb.Value
last := &rb.RingBuffer.Value
last.Value = value
if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) {
current := rb.MoveNext()
current := rb.RingBuffer.MoveNext()
current.Lock()
last.Unlock()
//Flag不为1代表被Dispose了但尚未处理Done
@@ -61,7 +61,7 @@ func (rb *LockRing[T]) Write(value T) {
}
func (rb *LockRing[T]) Dispose() {
current := &rb.Value
current := &rb.RingBuffer.Value
if atomic.CompareAndSwapInt32(rb.Flag, 0, 2) {
current.Unlock()
} else if atomic.CompareAndSwapInt32(rb.Flag, 1, 2) {

View File

@@ -28,6 +28,11 @@ type HTTPPlugin interface {
}
func (config Config) Unmarshal(s any) {
defer func() {
if err := recover(); err != nil {
log.Error("Unmarshal error:", err)
}
}()
if s == nil {
return
}

View File

@@ -60,6 +60,8 @@ func Run(ctx context.Context, configFile string) (err error) {
Engine.RawConfig = cg.GetChild("global")
//将配置信息同步到结构体
Engine.RawConfig.Unmarshal(config.Global)
} else {
log.Error("parsing yml error:", err)
}
}
Engine.Logger = log.With(zap.Bool("engine", true))

View File

@@ -86,7 +86,7 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) {
}
a := track.NewAAC(p.Stream)
p.AudioTrack = a
a.SampleSize = 16
a.Audio.SampleSize = 16
a.AVCCHead = []byte{frame[0], 1}
a.WriteAVCC(0, frame)
case codec.CodecID_PCMA,
@@ -97,10 +97,10 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) {
}
a := track.NewG711(p.Stream, alaw)
p.AudioTrack = a
a.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2])
a.SampleSize = 16
a.Audio.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2])
a.Audio.SampleSize = 16
if frame[0]&0x02 == 0 {
a.SampleSize = 8
a.Audio.SampleSize = 8
}
a.Channels = frame[0]&0x01 + 1
a.AVCCHead = frame[:1]
@@ -160,15 +160,15 @@ func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream) {
switch s.StreamType {
case mpegts.STREAM_TYPE_H264:
if t.VideoTrack == nil {
t.VideoTrack = track.NewH264(t.Stream)
t.VideoTrack = track.NewH264(t.Publisher.Stream)
}
case mpegts.STREAM_TYPE_H265:
if t.VideoTrack == nil {
t.VideoTrack = track.NewH265(t.Stream)
t.VideoTrack = track.NewH265(t.Publisher.Stream)
}
case mpegts.STREAM_TYPE_AAC:
if t.AudioTrack == nil {
t.AudioTrack = track.NewAAC(t.Stream)
t.AudioTrack = track.NewAAC(t.Publisher.Stream)
}
default:
t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType))

View File

@@ -3,13 +3,12 @@ package engine
import (
"context"
"encoding/json"
"net"
"time"
"go.uber.org/zap"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/track"
"net"
"time"
)
type HaveFLV interface {
@@ -254,7 +253,7 @@ func (s *Subscriber) PlayBlock() {
var vp *VideoFrame
// 如果进入正常模式
if normal {
vp = (*VideoFrame)(s.Video.ring.Read(ctx))
vp = (*VideoFrame)(&(*s.Video.ring.Read(ctx)))
if ctx.Err() != nil {
return
}
@@ -270,7 +269,7 @@ func (s *Subscriber) PlayBlock() {
s.Debug("skip to latest key frame", zap.Uint32("seq", s.Video.Track.IDRing.Value.Sequence))
continue
} else {
vp = (*VideoFrame)(s.Video.ring.Read(ctx))
vp = (*VideoFrame)(&(*s.Video.ring.Read(ctx)))
if ctx.Err() != nil {
return
}
@@ -300,7 +299,7 @@ func (s *Subscriber) PlayBlock() {
audioSent = true
}
for {
ap := (*AudioFrame)(s.Audio.ring.Read(ctx))
ap := (*AudioFrame)(&(*s.Audio.ring.Read(ctx)))
if ctx.Err() != nil {
return
}

View File

@@ -14,16 +14,16 @@ import (
func NewAAC(stream IStream) (aac *AAC) {
aac = &AAC{}
aac.Name = "aac"
aac.Stream = stream
aac.Audio.Name = "aac"
aac.Audio.Stream = stream
aac.CodecID = codec.CodecID_AAC
aac.Init(32)
aac.Poll = time.Millisecond * 10
aac.Audio.Media.Poll = time.Millisecond * 10
aac.AVCCHead = []byte{0xAF, 1}
aac.SampleSize = 16
aac.DecoderConfiguration.PayloadType = 97
aac.Audio.SampleSize = 16
aac.Audio.DecoderConfiguration.PayloadType = 97
if config.Global.RTPReorder {
aac.orderQueue = make([]*RTPFrame, 20)
aac.Audio.orderQueue = make([]*RTPFrame, 20)
}
return
}
@@ -50,7 +50,7 @@ func (aac *AAC) writeRTPFrame(frame *RTPFrame) {
for _, payload := range codec.ParseRTPAAC(frame.Payload) {
aac.WriteSlice(payload)
}
aac.Value.AppendRTP(frame)
aac.Audio.Media.AVRing.RingBuffer.Value.AppendRTP(frame)
if frame.Marker {
aac.generateTimestamp()
aac.Flush()
@@ -60,16 +60,16 @@ func (aac *AAC) writeRTPFrame(frame *RTPFrame) {
func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
if frame.IsSequence() {
if len(frame) < 2 {
aac.Stream.Error("AVCC sequence header too short", zap.ByteString("data", frame))
aac.Audio.Stream.Error("AVCC sequence header too short", zap.ByteString("data", frame))
return
}
aac.DecoderConfiguration.AVCC = net.Buffers{frame}
aac.Audio.DecoderConfiguration.AVCC = net.Buffers{frame}
config1, config2 := frame[2], frame[3]
aac.Profile = (config1 & 0xF8) >> 3
aac.Channels = ((config2 >> 3) & 0x0F) //声道
aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)])
aac.DecoderConfiguration.Raw = AudioSlice(frame[2:])
aac.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2}
aac.Audio.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)])
aac.Audio.DecoderConfiguration.Raw = AudioSlice(frame[2:])
aac.Audio.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2}
aac.Attach()
} else {
aac.Audio.WriteAVCC(ts, frame)
@@ -80,8 +80,9 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
func (aac *AAC) Flush() {
// RTP格式补完
// TODO: MTU 分割
if aac.Value.RTP == nil && config.Global.EnableRTP {
l := util.SizeOfBuffers(aac.Value.Raw)
value := aac.Audio.Media.RingBuffer.Value
if value.RTP == nil && config.Global.EnableRTP {
l := util.SizeOfBuffers(value.Raw)
o := make([]byte, 4, l+4)
//AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度又因为单个auheader字节长度2字节所以再除以2就是auheader的个数。
o[0] = 0x00 //高位
@@ -89,7 +90,7 @@ func (aac *AAC) Flush() {
//AU_HEADER
o[2] = (byte)((l & 0x1fe0) >> 5) //高位
o[3] = (byte)((l & 0x1f) << 3) //低位
for _, raw := range aac.Value.Raw {
for _, raw := range value.Raw {
o = append(o, raw...)
}
aac.PacketizeRTP(o)

View File

@@ -68,19 +68,20 @@ func (a *Audio) WriteADTS(adts []byte) {
func (a *Audio) Flush() {
// AVCC 格式补完
if len(a.Value.AVCC) == 0 && (config.Global.EnableAVCC || config.Global.EnableFLV) {
a.Value.AppendAVCC(a.AVCCHead)
for _, raw := range a.Value.Raw {
a.Value.AppendAVCC(raw)
value := a.Media.RingBuffer.Value
if len(value.AVCC) == 0 && (config.Global.EnableAVCC || config.Global.EnableFLV) {
value.AppendAVCC(a.AVCCHead)
for _, raw := range value.Raw {
value.AppendAVCC(raw)
}
}
// FLV tag 补完
if len(a.Value.FLV) == 0 && config.Global.EnableFLV {
a.Value.FillFLV(codec.FLV_TAG_TYPE_AUDIO, a.Value.AbsTime)
if len(value.FLV) == 0 && config.Global.EnableFLV {
value.FillFLV(codec.FLV_TAG_TYPE_AUDIO, value.AbsTime)
}
if a.Value.RTP == nil && config.Global.EnableRTP {
if value.RTP == nil && config.Global.EnableRTP {
var o []byte
for _, raw := range a.Value.Raw {
for _, raw := range value.Raw {
o = append(o, raw...)
}
a.PacketizeRTP(o)

View File

@@ -20,7 +20,7 @@ func (d *Data) ReadRing() *LockRing[any] {
}
func (d *Data) LastWriteTime() time.Time {
return d.LockRing.PreValue().Timestamp
return d.LockRing.RingBuffer.PreValue().Timestamp
}
func (dt *Data) Push(data any) {

View File

@@ -12,25 +12,25 @@ import (
func NewG711(stream IStream, alaw bool) (g711 *G711) {
g711 = &G711{}
if alaw {
g711.Name = "pcma"
g711.Audio.Name = "pcma"
} else {
g711.Name = "pcmu"
g711.Audio.Name = "pcmu"
}
g711.Stream = stream
g711.Audio.Stream = stream
if alaw {
g711.CodecID = codec.CodecID_PCMA
g711.Audio.CodecID = codec.CodecID_PCMA
} else {
g711.CodecID = codec.CodecID_PCMU
g711.Audio.CodecID = codec.CodecID_PCMU
}
g711.Init(32)
g711.Poll = time.Millisecond * 10
g711.DecoderConfiguration.PayloadType = 97
g711.Audio.Init(32)
g711.Audio.Media.Poll = time.Millisecond * 10
g711.Audio.DecoderConfiguration.PayloadType = 97
if config.Global.RTPReorder {
g711.orderQueue = make([]*RTPFrame, 20)
g711.Audio.orderQueue = make([]*RTPFrame, 20)
}
g711.SampleSize = 8
g711.SampleRate = 8000
g711.Attach()
g711.Audio.SampleSize = 8
g711.Audio.SampleRate = 8000
g711.Audio.Attach()
return
}
@@ -60,7 +60,7 @@ func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) {
func (g711 *G711) writeRTPFrame(frame *RTPFrame) {
g711.WriteSlice(frame.Payload)
g711.Value.AppendRTP(frame)
g711.Audio.Media.AVRing.RingBuffer.Value.AppendRTP(frame)
if frame.Marker {
g711.generateTimestamp()
g711.Flush()