diff --git a/codec/amf.go b/codec/amf.go new file mode 100644 index 0000000..b4ae9ad --- /dev/null +++ b/codec/amf.go @@ -0,0 +1,283 @@ +package codec + +import ( + "fmt" + "io" + "reflect" + + "m7s.live/engine/v4/util" +) + +// Action Message Format -- AMF 0 +// Action Message Format -- AMF 3 +// http://download.macromedia.com/pub/labs/amf/amf0_spec_121207.pdf +// http://wwwimages.adobe.com/www.adobe.com/content/dam/Adobe/en/devnet/amf/pdf/amf-file-format-spec.pdf + +// AMF Object == AMF Object Type(1 byte) + AMF Object Value +// +// AMF Object Value : +// AMF0_STRING : 2 bytes(datasize,记录string的长度) + data(string) +// AMF0_OBJECT : AMF0_STRING + AMF Object +// AMF0_NULL : 0 byte +// AMF0_NUMBER : 8 bytes +// AMF0_DATE : 10 bytes +// AMF0_BOOLEAN : 1 byte +// AMF0_ECMA_ARRAY : 4 bytes(arraysize,记录数组的长度) + AMF0_OBJECT +// AMF0_STRICT_ARRAY : 4 bytes(arraysize,记录数组的长度) + AMF Object + +// 实际测试时,AMF0_ECMA_ARRAY数据如下: +// 8 0 0 0 13 0 8 100 117 114 97 116 105 111 110 0 0 0 0 0 0 0 0 0 0 5 119 105 100 116 104 0 64 158 0 0 0 0 0 0 0 6 104 101 105 103 104 116 0 64 144 224 0 0 0 0 0 +// 8 0 0 0 13 | { 0 8 100 117 114 97 116 105 111 110 --- 0 0 0 0 0 0 0 0 0 } | { 0 5 119 105 100 116 104 --- 0 64 158 0 0 0 0 0 0 } | { 0 6 104 101 105 103 104 116 --- 0 64 144 224 0 0 0 0 0 } |... +// 13 | {AMF0_STRING --- AMF0_NUMBER} | {AMF0_STRING --- AMF0_NUMBER} | {AMF0_STRING --- AMF0_NUMBER} | ... +// 13 | {AMF0_OBJECT} | {AMF0_OBJECT} | {AMF0_OBJECT} | ... +// 13 | {duration --- 0} | {width --- 1920} | {height --- 1080} | ... + +const ( + AMF0_NUMBER = iota // 浮点数 + AMF0_BOOLEAN + AMF0_STRING + AMF0_OBJECT + AMF0_MOVIECLIP + AMF0_NULL + AMF0_UNDEFINED + AMF0_REFERENCE + AMF0_ECMA_ARRAY + AMF0_END_OBJECT + AMF0_STRICT_ARRAY + AMF0_DATE + AMF0_LONG_STRING + AMF0_UNSUPPORTED + AMF0_RECORDSET + AMF0_XML_DOCUMENT + AMF0_TYPED_OBJECT + AMF0_AVMPLUS_OBJECT +) +const ( + AMF3_UNDEFINED = iota + AMF3_NULL + AMF3_FALSE + AMF3_TRUE + AMF3_INTEGER + AMF3_DOUBLE + AMF3_STRING + AMF3_XML_DOC + AMF3_DATE + AMF3_ARRAY + AMF3_OBJECT + AMF3_XML + AMF3_BYTE_ARRAY + AMF3_VECTOR_INT + AMF3_VECTOR_UINT + AMF3_VECTOR_DOUBLE + AMF3_VECTOR_OBJECT + AMF3_DICTIONARY +) + +var ( + END_OBJ = []byte{0, 0, AMF0_END_OBJECT} + ObjectEnd = &struct{}{} + Undefined = &struct{}{} +) + +type EcmaArray map[string]any + +type AMF struct { + util.Buffer +} + +func (amf *AMF) ReadShortString() string { + value, _ := amf.Unmarshal() + return value.(string) +} + +func (amf *AMF) ReadNumber() float64 { + value, _ := amf.Unmarshal() + return value.(float64) +} + +func (amf *AMF) ReadObject() map[string]any { + value, _ := amf.Unmarshal() + if value == nil { + return nil + } + return value.(map[string]any) +} + +func (amf *AMF) ReadBool() bool { + value, _ := amf.Unmarshal() + return value.(bool) +} + +func (amf *AMF) readKey() (string, error) { + if !amf.CanReadN(2) { + return "", io.ErrUnexpectedEOF + } + l := int(amf.ReadUint16()) + if !amf.CanReadN(l) { + return "", io.ErrUnexpectedEOF + } + return string(amf.ReadN(l)), nil +} + +func (amf *AMF) readProperty(m map[string]any) (obj any, err error) { + var k string + var v any + if k, err = amf.readKey(); err == nil { + if v, err = amf.Unmarshal(); k == "" && v == ObjectEnd { + obj = m + } else if err == nil { + m[k] = v + } + } + return +} + +func (amf *AMF) Unmarshal() (obj any, err error) { + if !amf.CanRead() { + return nil, io.ErrUnexpectedEOF + } + defer func(b util.Buffer) { + if err != nil { + amf.Buffer = b + } + }(amf.Buffer) + switch t := amf.ReadByte(); t { + case AMF0_NUMBER: + if !amf.CanReadN(8) { + return 0, io.ErrUnexpectedEOF + } + obj = amf.ReadFloat64() + case AMF0_BOOLEAN: + if !amf.CanRead() { + return false, io.ErrUnexpectedEOF + } + obj = amf.ReadByte() == 1 + case AMF0_STRING: + obj, err = amf.readKey() + case AMF0_OBJECT: + m := make(map[string]any) + for err == nil && obj == nil { + obj, err = amf.readProperty(m) + } + case AMF0_NULL: + return nil, nil + case AMF0_UNDEFINED: + return Undefined, nil + case AMF0_ECMA_ARRAY: + size := amf.ReadUint32() + m := make(EcmaArray) + for i := uint32(0); i < size && err == nil && obj == nil; i++ { + obj, err = amf.readProperty(m) + } + case AMF0_END_OBJECT: + return ObjectEnd, nil + case AMF0_STRICT_ARRAY: + size := amf.ReadUint32() + var list []any + for i := uint32(0); i < size; i++ { + v, err := amf.Unmarshal() + if err != nil { + return nil, err + } + list = append(list, v) + } + obj = list + case AMF0_DATE: + if !amf.CanReadN(10) { + return 0, io.ErrUnexpectedEOF + } + obj = amf.ReadFloat64() + amf.ReadN(2) + case AMF0_LONG_STRING, + AMF0_XML_DOCUMENT: + if !amf.CanReadN(4) { + return "", io.ErrUnexpectedEOF + } + l := int(amf.ReadUint32()) + if !amf.CanReadN(l) { + return "", io.ErrUnexpectedEOF + } + obj = string(amf.ReadN(l)) + default: + err = fmt.Errorf("unsupported type:%d", t) + } + return +} + +func (amf *AMF) writeProperty(key string, v any) { + amf.WriteUint16(uint16(len(key))) + amf.WriteString(key) + amf.Marshal(v) +} + +func MarshalAMFs(v ...any) []byte { + var amf AMF + return amf.Marshals(v...) +} + +func (amf *AMF) Marshals(v ...any) []byte { + for _, vv := range v { + amf.Marshal(vv) + } + return amf.Buffer +} + +func (amf *AMF) Marshal(v any) []byte { + if v == nil { + amf.WriteByte(AMF0_NULL) + return amf.Buffer + } + switch vv := v.(type) { + case string: + if l := len(vv); l > 0xFFFF { + amf.WriteByte(AMF0_LONG_STRING) + amf.WriteUint32(uint32(l)) + } else { + amf.WriteByte(AMF0_STRING) + amf.WriteUint16(uint16(l)) + } + amf.WriteString(vv) + case float64, uint, float32, int, int16, int32, int64, uint16, uint32, uint64, uint8, int8: + amf.WriteByte(AMF0_NUMBER) + amf.WriteFloat64(util.ToFloat64(vv)) + case bool: + amf.WriteByte(AMF0_BOOLEAN) + if vv { + amf.WriteByte(1) + } else { + amf.WriteByte(0) + } + case EcmaArray: + amf.WriteByte(AMF0_ECMA_ARRAY) + amf.WriteUint32(uint32(len(vv))) + for k, v := range vv { + amf.writeProperty(k, v) + } + amf.Write(END_OBJ) + case map[string]any: + amf.WriteByte(AMF0_OBJECT) + for k, v := range vv { + amf.writeProperty(k, v) + } + amf.Write(END_OBJ) + default: + v := reflect.ValueOf(vv) + if !v.IsValid() { + amf.WriteByte(AMF0_NULL) + return amf.Buffer + } + switch v.Kind() { + case reflect.Slice, reflect.Array: + amf.WriteByte(AMF0_STRICT_ARRAY) + size := v.Len() + amf.WriteUint32(uint32(size)) + for i := 0; i < size; i++ { + amf.Marshal(v.Index(i).Interface()) + } + amf.Write(END_OBJ) + default: + panic("amf Marshal faild") + } + } + return amf.Buffer +} diff --git a/codec/codec.go b/codec/codec.go index be52bb1..abed589 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -217,19 +217,3 @@ func AudioSpecificConfigToADTS(asc *AudioSpecificConfig, rawDataLength int) (adt return } - -// func ParseRTPAAC(payload []byte) (result [][]byte) { -// auHeaderLen := util.ReadBE[int](payload[:2]) >> 3 -// var auLenArray []int -// for iIndex := 2; iIndex <= auHeaderLen; iIndex += 2 { -// auLen := util.ReadBE[int](payload[iIndex:iIndex+2]) >> 3 -// auLenArray = append(auLenArray, auLen) -// } -// startOffset := 2 + auHeaderLen -// for _, auLen := range auLenArray { -// endOffset := startOffset + auLen -// result = append(result, payload[startOffset:endOffset]) -// startOffset = startOffset + auLen -// } -// return -// } diff --git a/codec/flv.go b/codec/flv.go index 48d9e8d..906bfbd 100644 --- a/codec/flv.go +++ b/codec/flv.go @@ -78,23 +78,11 @@ var ErrInvalidFLV = errors.New("invalid flv") var FLVHeader = []byte{'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 9, 0, 0, 0, 0} func WriteFLVTag(w io.Writer, t byte, timestamp uint32, payload net.Buffers) (err error) { - head := make([]byte, 11) - tail := make([]byte, 4) - head[0] = t - dataSize := uint32(util.SizeOfBuffers(payload)) - util.PutBE(tail, dataSize+11) - util.PutBE(head[1:4], dataSize) - head[4] = byte(timestamp >> 16) - head[5] = byte(timestamp >> 8) - head[6] = byte(timestamp) - head[7] = byte(timestamp >> 24) - var tag = net.Buffers{head} - tag = append(tag, payload...) - tag = append(tag, tail) - // Tag Data - _, err = tag.WriteTo(w) + payload = AVCC2FLV(t, payload, timestamp) + _, err = payload.WriteTo(w) return } + func ReadFLVTag(r io.Reader) (t byte, timestamp uint32, payload []byte, err error) { head := make([]byte, 11) if _, err = io.ReadFull(r, head); err != nil { @@ -110,20 +98,17 @@ func ReadFLVTag(r io.Reader) (t byte, timestamp uint32, payload []byte, err erro return } -func AudioAVCC2FLV(avcc net.Buffers, ts uint32) (flv net.Buffers) { - b := util.Buffer(make([]byte, 0, 15)) - b.WriteByte(FLV_TAG_TYPE_AUDIO) - dataSize := util.SizeOfBuffers(avcc) - b.WriteUint24(uint32(dataSize)) - b.WriteUint24(ts) - b.WriteByte(byte(ts >> 24)) - b.WriteUint24(0) - return append(append(append(flv, b), avcc...), util.PutBE(b.Malloc(4), dataSize+11)) +func AudioAVCC2FLV(avcc net.Buffers, ts uint32) net.Buffers { + return AVCC2FLV(FLV_TAG_TYPE_AUDIO, avcc, ts) } -func VideoAVCC2FLV(avcc net.Buffers, ts uint32) (flv net.Buffers) { +func VideoAVCC2FLV(avcc net.Buffers, ts uint32) net.Buffers { + return AVCC2FLV(FLV_TAG_TYPE_VIDEO, avcc, ts) +} + +func AVCC2FLV(t byte, avcc net.Buffers, ts uint32) (flv net.Buffers) { b := util.Buffer(make([]byte, 0, 15)) - b.WriteByte(FLV_TAG_TYPE_VIDEO) + b.WriteByte(t) dataSize := util.SizeOfBuffers(avcc) b.WriteUint24(uint32(dataSize)) b.WriteUint24(ts) diff --git a/codec/mpegts/mpegts.go b/codec/mpegts/mpegts.go index 4094512..d1f7555 100644 --- a/codec/mpegts/mpegts.go +++ b/codec/mpegts/mpegts.go @@ -526,15 +526,13 @@ func (s *MpegTsStream) Feed(ts io.Reader) error { s.tsPktBuffer = s.tsPktBuffer[:0] }() for { - bufferLen := len(s.tsPktBuffer) - if cap(s.tsPktBuffer) > bufferLen { - if s.tsPktBuffer = s.tsPktBuffer[:bufferLen+1]; s.tsPktBuffer[bufferLen] == nil { - s.tsPktBuffer[bufferLen] = make([]byte, TS_PACKET_SIZE) - } + var tsData []byte + if tsDataP := util.MallocSlice(&s.tsPktBuffer); tsDataP == nil { + tsData = make([]byte, TS_PACKET_SIZE) + s.tsPktBuffer = append(s.tsPktBuffer, tsData) } else { - s.tsPktBuffer = append(s.tsPktBuffer, make([]byte, TS_PACKET_SIZE)) + tsData = *tsDataP } - tsData := s.tsPktBuffer[bufferLen] _, err := io.ReadFull(ts, tsData) reader.Reset(tsData) if err == io.EOF { diff --git a/common/frame.go b/common/frame.go index b069bb6..bac25a7 100644 --- a/common/frame.go +++ b/common/frame.go @@ -27,9 +27,6 @@ type RawSlice interface { ~[][]byte | ~[]byte } -// func (nalu *H264NALU) Append(slice ...NALUSlice) { -// *nalu = append(*nalu, slice...) -// } func (nalu NALUSlice) H264Type() (naluType codec.H264NALUType) { return naluType.Parse(nalu[0][0]) } diff --git a/publisher.go b/publisher.go index a9741d1..5123321 100644 --- a/publisher.go +++ b/publisher.go @@ -54,6 +54,9 @@ func (p *Publisher) OnEvent(event any) { } func (p *Publisher) WriteAVCCVideo(ts uint32, frame common.AVCCFrame) { + if len(frame) < 6 { + return + } if p.VideoTrack == nil { if frame.IsSequence() { ts = 0 @@ -77,11 +80,14 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame common.AVCCFrame) { } func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) { + if len(frame) < 4 { + return + } if p.AudioTrack == nil { codecID := frame.AudioCodecID() switch codecID { case codec.CodecID_AAC: - if !frame.IsSequence() || len(frame) < 4 { + if !frame.IsSequence() { return } a := track.NewAAC(p.Stream) diff --git a/track/video.go b/track/video.go index 7a74753..b9f7622 100644 --- a/track/video.go +++ b/track/video.go @@ -138,6 +138,8 @@ func (vt *Video) WriteAnnexB(frame AnnexBFrame) (s []NALUSlice) { } return } + + func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) { vt.Media.WriteAVCC(ts, frame) for nalus := frame[5:]; len(nalus) > vt.nalulenSize; { @@ -147,7 +149,12 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) { return } if end := nalulen + vt.nalulenSize; len(nalus) >= end { - vt.AVRing.RingBuffer.Value.AppendRaw(NALUSlice{nalus[vt.nalulenSize:end]}) + slice := nalus[vt.nalulenSize:end] + if _rawSlice := util.MallocSlice(&vt.AVRing.Value.Raw); _rawSlice == nil { + vt.Value.AppendRaw(NALUSlice{slice}) + } else { + _rawSlice.Reset().Append(slice) + } nalus = nalus[end:] } else { vt.Stream.Error("WriteAVCC", zap.Int("len", len(nalus)), zap.Int("naluLenSize", vt.nalulenSize), zap.Int("end", end)) diff --git a/util/buffer.go b/util/buffer.go index 636f9f0..bb66ee5 100644 --- a/util/buffer.go +++ b/util/buffer.go @@ -56,8 +56,13 @@ func (b *Buffer) Write(a []byte) (n int, err error) { func (b Buffer) Len() int { return len(b) } + func (b Buffer) CanRead() bool { - return b.Len() > 0 + return b.CanReadN(1) +} + +func (b Buffer) CanReadN(n int) bool { + return b.Len() >= n } func (b Buffer) Cap() int { return cap(b) @@ -87,6 +92,16 @@ func (b *Buffer) Glow(n int) { *b = b.SubBuf(0, l) } +// MallocSlice 用来对容量够的slice进行长度扩展+1,并返回新的位置的指针,用于写入 +func MallocSlice[T any](slice *[]T) *T { + oslice := *slice + if rawLen := len(oslice); cap(oslice) > rawLen { + *slice = oslice[:rawLen+1] + return &(*slice)[rawLen] + } + return nil +} + // ConcatBuffers 合并碎片内存为一个完整内存 func ConcatBuffers[T ~[]byte](input []T) (out []byte) { for _, v := range input { diff --git a/util/buffer_test.go b/util/buffer_test.go index 383d71e..6a42421 100644 --- a/util/buffer_test.go +++ b/util/buffer_test.go @@ -16,3 +16,15 @@ func TestBuffer(t *testing.T) { } }) } + +func TestMallocSlice(t *testing.T) { + t.Run(t.Name(), func(t *testing.T) { + var a [][]byte = [][]byte{} + b := MallocSlice(&a) + if *b != nil { + t.Fail() + } else if *b = []byte{1}; a[0][0] != 1 { + t.Fail() + } + }) +}