From 0280cfa65521ceff0eb76110c67a53acfe4f76d3 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Thu, 7 Mar 2024 20:48:20 +0800 Subject: [PATCH] feat: add amf3 protocol, audio track attach when get data --- http.go | 8 +- track/aac.go | 1 + track/audio.go | 4 + track/g711.go | 1 - track/media.go | 6 +- track/opus.go | 1 - track/video.go | 1 - util/amf.go | 36 +++--- util/amf3.go | 308 +++++++++++++++++++++++++++++++++++++++++++++++++ util/buffer.go | 32 ++++- 10 files changed, 365 insertions(+), 33 deletions(-) create mode 100644 util/amf3.go diff --git a/http.go b/http.go index c2066cf..06dbfa2 100644 --- a/http.go +++ b/http.go @@ -2,6 +2,7 @@ package engine import ( "encoding/json" + "fmt" "net/http" "os" "strconv" @@ -28,9 +29,12 @@ func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request) { http.ServeFile(rw, r, "favicon.ico") return } - rw.Write([]byte("Monibuca API Server\n")) + fmt.Fprintf(rw, "Monibuca Engine %s StartTime:%s\n", SysInfo.Version, SysInfo.StartTime) + for _, plugin := range Plugins { + fmt.Fprintf(rw, "Plugin %s Version:%s\n", plugin.Name, plugin.Version) + } for _, api := range apiList { - rw.Write([]byte(api + "\n")) + fmt.Fprintf(rw, "%s\n", api) } } diff --git a/track/aac.go b/track/aac.go index 3d4b061..e71c440 100644 --- a/track/aac.go +++ b/track/aac.go @@ -51,6 +51,7 @@ func (aac *AAC) WriteADTS(ts uint32, b util.IBytes) { aac.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) aac.Channels = channel aac.Parse(aac.SequenceHead[2:]) + aac.iframeReceived = true aac.Attach() } aac.generateTimestamp(ts) diff --git a/track/audio.go b/track/audio.go index c7dbd8e..68545ab 100644 --- a/track/audio.go +++ b/track/audio.go @@ -58,6 +58,10 @@ func (av *Audio) Flush() { av.Value.ADTS = item } av.Media.Flush() + if av.CodecID != codec.CodecID_AAC && !av.iframeReceived { + av.iframeReceived = true + av.Attach() + } } func (av *Audio) WriteRawBytes(pts uint32, raw util.IBytes) { diff --git a/track/g711.go b/track/g711.go index 6e43caa..fa44b36 100644 --- a/track/g711.go +++ b/track/g711.go @@ -32,7 +32,6 @@ func NewG711(puber IPuber, alaw bool, stuff ...any) (g711 *G711) { if g711.BytesPool == nil { g711.BytesPool = make(util.BytesPool, 17) } - go g711.Attach() return } diff --git a/track/media.go b/track/media.go index 0ac532c..6ad3f27 100644 --- a/track/media.go +++ b/track/media.go @@ -100,9 +100,9 @@ type Media struct { SequenceHead []byte `json:"-" yaml:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) SequenceHeadSeq int RTPDemuxer - SpesificTrack `json:"-" yaml:"-"` - deltaTs time.Duration //用于接续发布后时间戳连续 - + SpesificTrack `json:"-" yaml:"-"` + deltaTs time.Duration //用于接续发布后时间戳连续 + iframeReceived bool 流速控制 } diff --git a/track/opus.go b/track/opus.go index 1d5abfd..127f0bb 100644 --- a/track/opus.go +++ b/track/opus.go @@ -19,7 +19,6 @@ func NewOpus(puber IPuber, stuff ...any) (opus *Opus) { if opus.BytesPool == nil { opus.BytesPool = make(util.BytesPool, 17) } - go opus.Attach() return } diff --git a/track/video.go b/track/video.go index e55031f..36c3471 100644 --- a/track/video.go +++ b/track/video.go @@ -24,7 +24,6 @@ type Video struct { ParamaterSets `json:"-" yaml:"-"` SPS []byte `json:"-" yaml:"-"` PPS []byte `json:"-" yaml:"-"` - iframeReceived bool } func (v *Video) Attach() { diff --git a/util/amf.go b/util/amf.go index 6c61816..08157ed 100644 --- a/util/amf.go +++ b/util/amf.go @@ -50,26 +50,6 @@ const ( AMF0_TYPED_OBJECT AMF0_AVMPLUS_OBJECT ) -const ( - AMF3_UNDEFINED = iota - AMF3_NULL - AMF3_FALSE - AMF3_TRUE - AMF3_INTEGER - AMF3_DOUBLE - AMF3_STRING - AMF3_XML_DOC - AMF3_DATE - AMF3_ARRAY - AMF3_OBJECT - AMF3_XML - AMF3_BYTE_ARRAY - AMF3_VECTOR_INT - AMF3_VECTOR_UINT - AMF3_VECTOR_DOUBLE - AMF3_VECTOR_OBJECT - AMF3_DICTIONARY -) var ( END_OBJ = []byte{0, 0, AMF0_END_OBJECT} @@ -77,6 +57,13 @@ var ( Undefined = &struct{}{} ) +type IAMF interface { + IBuffer + Unmarshal() (any, error) + Marshal(any) []byte + Marshals(...any) []byte +} + type EcmaArray map[string]any type AMF struct { @@ -283,6 +270,15 @@ func (amf *AMF) Marshal(v any) []byte { amf.Marshal(v.Index(i).Interface()) } amf.Write(END_OBJ) + case reflect.Ptr: + vv := reflect.Indirect(v) + if vv.Kind() == reflect.Struct { + amf.WriteByte(AMF0_OBJECT) + for i := 0; i < vv.NumField(); i++ { + amf.writeProperty(vv.Type().Field(i).Name, vv.Field(i).Interface()) + } + amf.Write(END_OBJ) + } default: panic("amf Marshal faild") } diff --git a/util/amf3.go b/util/amf3.go new file mode 100644 index 0000000..971cbb5 --- /dev/null +++ b/util/amf3.go @@ -0,0 +1,308 @@ +package util + +import ( + "errors" + "reflect" + "strconv" + "unicode" +) + +const ( + AMF3_UNDEFINED = iota + AMF3_NULL + AMF3_FALSE + AMF3_TRUE + AMF3_INTEGER + AMF3_DOUBLE + AMF3_STRING + AMF3_XML_DOC + AMF3_DATE + AMF3_ARRAY + AMF3_OBJECT + AMF3_XML + AMF3_BYTE_ARRAY + AMF3_VECTOR_INT + AMF3_VECTOR_UINT + AMF3_VECTOR_DOUBLE + AMF3_VECTOR_OBJECT + AMF3_DICTIONARY +) + +type AMF3 struct { + AMF + scEnc map[string]int + scDec []string + ocEnc map[uintptr]int + ocDec []any + reservStruct bool +} + +func (amf *AMF3) readString() (string, error) { + index, err := amf.readU29() + if err != nil { + return "", err + } + ret := "" + if (index & 0x01) == 0 { + ret = amf.scDec[int(index>>1)] + } else { + index >>= 1 + ret = string(amf.ReadN(int(index))) + } + if ret != "" { + amf.scDec = append(amf.scDec, ret) + } + return ret, nil +} +func (amf *AMF3) Unmarshal() (obj any, err error) { + defer func() { + if e := recover(); e != nil { + err = errors.New("amf3 unmarshal error") + } + }() + switch amf.ReadByte() { + case AMF3_NULL: + return nil, nil + case AMF3_FALSE: + return false, nil + case AMF3_TRUE: + return true, nil + case AMF3_INTEGER: + return amf.readU29() + case AMF3_DOUBLE: + return amf.ReadFloat64(), nil + case AMF3_STRING: + return amf.readString() + case AMF3_OBJECT: + index, err := amf.readU29() + if err != nil { + return nil, err + } + if (index & 0x01) == 0 { + return amf.ocDec[int(index>>1)], nil + } + if index != 0x0b { + return nil, errors.New("invalid object type") + } + if amf.ReadByte() != 0x01 { + return nil, errors.New("type object not allowed") + } + ret := make(map[string]any) + amf.ocDec = append(amf.ocDec, ret) + for { + key, err := amf.readString() + if err != nil { + return nil, err + } + if key == "" { + break + } + ret[key], err = amf.Unmarshal() + if err != nil { + return nil, err + } + } + return ret, nil + } + return nil, errors.New("amf3 unmarshal error") +} + +func (amf *AMF3) writeString(s string) error { + index, ok := amf.scEnc[s] + if ok { + amf.writeU29(uint32(index << 1)) + return nil + } + + err := amf.writeU29(uint32((len(s) << 1) | 0x01)) + if err != nil { + return err + } + + if s != "" { + amf.scEnc[s] = len(amf.scEnc) + } + amf.WriteString(s) + return nil +} + +func (amf *AMF3) readU29() (uint32, error) { + var ret uint32 = 0 + for i := 0; i < 4; i++ { + b := amf.ReadByte() + if i != 3 { + ret = (ret << 7) | uint32(b&0x7f) + if (b & 0x80) == 0 { + break + } + } else { + ret = (ret << 8) | uint32(b) + } + } + + return ret, nil +} +func (amf *AMF3) writeU29(value uint32) error { + switch { + case value < 0x80: + amf.WriteByte(byte(value)) + case value < 0x4000: + amf.Write([]byte{byte((value >> 7) | 0x80), byte(value & 0x7f)}) + case value < 0x200000: + amf.Write([]byte{byte((value >> 14) | 0x80), byte((value >> 7) | 0x80), byte(value & 0x7f)}) + case value < 0x20000000: + amf.Write([]byte{byte((value >> 22) | 0x80), byte((value >> 15) | 0x80), byte((value >> 7) | 0x80), byte(value & 0xff)}) + default: + return errors.New("u29 over flow") + } + return nil +} + +func (amf *AMF3) Marshals(v ...any) []byte { + for _, vv := range v { + amf.Marshal(vv) + } + return amf.Buffer +} + +func MarshalAMF3s(v ...any) []byte { + var amf AMF3 + amf.ocEnc = make(map[uintptr]int) + amf.scEnc = make(map[string]int) + return amf.Marshals(v...) +} + +func (amf *AMF3) Marshal(v any) []byte { + if v == nil { + amf.WriteByte(AMF3_NULL) + return amf.Buffer + } + switch vv := v.(type) { + case string: + amf.WriteByte(AMF3_STRING) + amf.writeString(vv) + case bool: + if vv { + amf.WriteByte(AMF3_TRUE) + } else { + amf.WriteByte(AMF3_FALSE) + } + case int, int8, int16, int32, int64: + var value int64 + reflect.ValueOf(&value).Elem().Set(reflect.ValueOf(vv).Convert(reflect.TypeOf(value))) + if value < -0xfffffff { + if value > -0x7fffffff { + return amf.Marshal(float64(value)) + } + return amf.Marshal(strconv.FormatInt(value, 10)) + } + amf.WriteByte(AMF3_INTEGER) + amf.writeU29(uint32(value)) + case uint, uint8, uint16, uint32, uint64: + var value uint64 + reflect.ValueOf(&value).Elem().Set(reflect.ValueOf(vv).Convert(reflect.TypeOf(value))) + if value >= 0x20000000 { + if value <= 0xffffffff { + return amf.Marshal(float64(value)) + } + return amf.Marshal(strconv.FormatUint(value, 10)) + } + amf.WriteByte(AMF3_INTEGER) + amf.writeU29(uint32(value)) + case float32: + amf.Marshal(float64(vv)) + case float64: + amf.WriteByte(AMF3_DOUBLE) + amf.WriteFloat64(vv) + case map[string]any: + amf.WriteByte(AMF3_OBJECT) + index, ok := amf.ocEnc[reflect.ValueOf(vv).Pointer()] + if ok { + index <<= 1 + amf.writeU29(uint32(index << 1)) + return nil + } + amf.WriteByte(0x0b) + err := amf.writeString("") + if err != nil { + return nil + } + for k, v := range vv { + err = amf.writeString(k) + if err != nil { + return nil + } + amf.Marshal(v) + } + amf.writeString("") + + default: + v := reflect.ValueOf(vv) + if !v.IsValid() { + amf.WriteByte(AMF3_NULL) + return amf.Buffer + } + switch v.Kind() { + case reflect.Ptr: + if v.IsNil() { + amf.WriteByte(AMF3_NULL) + return amf.Buffer + } + vv := reflect.Indirect(v) + if vv.Kind() == reflect.Struct { + amf.WriteByte(AMF3_OBJECT) + index, ok := amf.ocEnc[v.Pointer()] + if ok { + index <<= 1 + amf.writeU29(uint32(index << 1)) + return nil + } + amf.WriteByte(0x0b) + err := amf.writeString("") + if err != nil { + return nil + } + t := vv.Type() + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + key := amf.getFieldName(f) + if key == "" { + continue + } + + err = amf.writeString(key) + if err != nil { + return nil + } + + fv := v.FieldByName(f.Name) + if fv.Kind() == reflect.Struct { + fv = fv.Addr() + } + amf.Marshal(fv.Interface()) + } + amf.writeString("") + } + } + } + return amf.Buffer +} + +func (amf *AMF3) getFieldName(f reflect.StructField) string { + chars := []rune(f.Name) + if unicode.IsLower(chars[0]) { + return "" + } + + name := f.Tag.Get("amf.name") + if name != "" { + return name + } + + if !amf.reservStruct { + chars[0] = unicode.ToLower(chars[0]) + return string(chars) + } + + return f.Name +} diff --git a/util/buffer.go b/util/buffer.go index 3e749bd..d37e1a6 100644 --- a/util/buffer.go +++ b/util/buffer.go @@ -70,7 +70,33 @@ type IBytes interface { Bytes() []byte Reuse() bool } - +type IBuffer interface { + Len() int + Bytes() []byte + Reuse() bool + SubBuf(start int, length int) Buffer + Malloc(count int) Buffer + Reset() + WriteUint32(v uint32) + WriteUint24(v uint32) + WriteUint16(v uint16) + WriteFloat64(v float64) + WriteByte(v byte) + WriteString(a string) + Write(a []byte) (n int, err error) + ReadN(n int) Buffer + ReadFloat64() float64 + ReadUint64() uint64 + ReadUint32() uint32 + ReadUint24() uint32 + ReadUint16() uint16 + ReadByte() byte + Read(buf []byte) (n int, err error) + Clone() Buffer + CanRead() bool + CanReadN(n int) bool + Cap() int +} func (Buffer) Reuse() bool { return false } @@ -202,10 +228,6 @@ func (b *Buffer) Split(n int) (result net.Buffers) { } } -func (b *Buffer) MarshalAMFs(v ...any) { - amf := AMF{*b} - *b = amf.Marshals(v...) -} // ConcatBuffers 合并碎片内存为一个完整内存 func ConcatBuffers[T ~[]byte](input []T) (out []byte) {