mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-24 08:53:11 +08:00
将amf0编码器和解码器放到codec/amf.go中,增加MallocSlice函数用于减少内存复制,
对写入avcc音频进行长度判断
This commit is contained in:
283
codec/amf.go
Normal file
283
codec/amf.go
Normal file
@@ -0,0 +1,283 @@
|
||||
package codec
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
// Action Message Format -- AMF 0
|
||||
// Action Message Format -- AMF 3
|
||||
// http://download.macromedia.com/pub/labs/amf/amf0_spec_121207.pdf
|
||||
// http://wwwimages.adobe.com/www.adobe.com/content/dam/Adobe/en/devnet/amf/pdf/amf-file-format-spec.pdf
|
||||
|
||||
// AMF Object == AMF Object Type(1 byte) + AMF Object Value
|
||||
//
|
||||
// AMF Object Value :
|
||||
// AMF0_STRING : 2 bytes(datasize,记录string的长度) + data(string)
|
||||
// AMF0_OBJECT : AMF0_STRING + AMF Object
|
||||
// AMF0_NULL : 0 byte
|
||||
// AMF0_NUMBER : 8 bytes
|
||||
// AMF0_DATE : 10 bytes
|
||||
// AMF0_BOOLEAN : 1 byte
|
||||
// AMF0_ECMA_ARRAY : 4 bytes(arraysize,记录数组的长度) + AMF0_OBJECT
|
||||
// AMF0_STRICT_ARRAY : 4 bytes(arraysize,记录数组的长度) + AMF Object
|
||||
|
||||
// 实际测试时,AMF0_ECMA_ARRAY数据如下:
|
||||
// 8 0 0 0 13 0 8 100 117 114 97 116 105 111 110 0 0 0 0 0 0 0 0 0 0 5 119 105 100 116 104 0 64 158 0 0 0 0 0 0 0 6 104 101 105 103 104 116 0 64 144 224 0 0 0 0 0
|
||||
// 8 0 0 0 13 | { 0 8 100 117 114 97 116 105 111 110 --- 0 0 0 0 0 0 0 0 0 } | { 0 5 119 105 100 116 104 --- 0 64 158 0 0 0 0 0 0 } | { 0 6 104 101 105 103 104 116 --- 0 64 144 224 0 0 0 0 0 } |...
|
||||
// 13 | {AMF0_STRING --- AMF0_NUMBER} | {AMF0_STRING --- AMF0_NUMBER} | {AMF0_STRING --- AMF0_NUMBER} | ...
|
||||
// 13 | {AMF0_OBJECT} | {AMF0_OBJECT} | {AMF0_OBJECT} | ...
|
||||
// 13 | {duration --- 0} | {width --- 1920} | {height --- 1080} | ...
|
||||
|
||||
const (
|
||||
AMF0_NUMBER = iota // 浮点数
|
||||
AMF0_BOOLEAN
|
||||
AMF0_STRING
|
||||
AMF0_OBJECT
|
||||
AMF0_MOVIECLIP
|
||||
AMF0_NULL
|
||||
AMF0_UNDEFINED
|
||||
AMF0_REFERENCE
|
||||
AMF0_ECMA_ARRAY
|
||||
AMF0_END_OBJECT
|
||||
AMF0_STRICT_ARRAY
|
||||
AMF0_DATE
|
||||
AMF0_LONG_STRING
|
||||
AMF0_UNSUPPORTED
|
||||
AMF0_RECORDSET
|
||||
AMF0_XML_DOCUMENT
|
||||
AMF0_TYPED_OBJECT
|
||||
AMF0_AVMPLUS_OBJECT
|
||||
)
|
||||
const (
|
||||
AMF3_UNDEFINED = iota
|
||||
AMF3_NULL
|
||||
AMF3_FALSE
|
||||
AMF3_TRUE
|
||||
AMF3_INTEGER
|
||||
AMF3_DOUBLE
|
||||
AMF3_STRING
|
||||
AMF3_XML_DOC
|
||||
AMF3_DATE
|
||||
AMF3_ARRAY
|
||||
AMF3_OBJECT
|
||||
AMF3_XML
|
||||
AMF3_BYTE_ARRAY
|
||||
AMF3_VECTOR_INT
|
||||
AMF3_VECTOR_UINT
|
||||
AMF3_VECTOR_DOUBLE
|
||||
AMF3_VECTOR_OBJECT
|
||||
AMF3_DICTIONARY
|
||||
)
|
||||
|
||||
var (
|
||||
END_OBJ = []byte{0, 0, AMF0_END_OBJECT}
|
||||
ObjectEnd = &struct{}{}
|
||||
Undefined = &struct{}{}
|
||||
)
|
||||
|
||||
type EcmaArray map[string]any
|
||||
|
||||
type AMF struct {
|
||||
util.Buffer
|
||||
}
|
||||
|
||||
func (amf *AMF) ReadShortString() string {
|
||||
value, _ := amf.Unmarshal()
|
||||
return value.(string)
|
||||
}
|
||||
|
||||
func (amf *AMF) ReadNumber() float64 {
|
||||
value, _ := amf.Unmarshal()
|
||||
return value.(float64)
|
||||
}
|
||||
|
||||
func (amf *AMF) ReadObject() map[string]any {
|
||||
value, _ := amf.Unmarshal()
|
||||
if value == nil {
|
||||
return nil
|
||||
}
|
||||
return value.(map[string]any)
|
||||
}
|
||||
|
||||
func (amf *AMF) ReadBool() bool {
|
||||
value, _ := amf.Unmarshal()
|
||||
return value.(bool)
|
||||
}
|
||||
|
||||
func (amf *AMF) readKey() (string, error) {
|
||||
if !amf.CanReadN(2) {
|
||||
return "", io.ErrUnexpectedEOF
|
||||
}
|
||||
l := int(amf.ReadUint16())
|
||||
if !amf.CanReadN(l) {
|
||||
return "", io.ErrUnexpectedEOF
|
||||
}
|
||||
return string(amf.ReadN(l)), nil
|
||||
}
|
||||
|
||||
func (amf *AMF) readProperty(m map[string]any) (obj any, err error) {
|
||||
var k string
|
||||
var v any
|
||||
if k, err = amf.readKey(); err == nil {
|
||||
if v, err = amf.Unmarshal(); k == "" && v == ObjectEnd {
|
||||
obj = m
|
||||
} else if err == nil {
|
||||
m[k] = v
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) Unmarshal() (obj any, err error) {
|
||||
if !amf.CanRead() {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
defer func(b util.Buffer) {
|
||||
if err != nil {
|
||||
amf.Buffer = b
|
||||
}
|
||||
}(amf.Buffer)
|
||||
switch t := amf.ReadByte(); t {
|
||||
case AMF0_NUMBER:
|
||||
if !amf.CanReadN(8) {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
obj = amf.ReadFloat64()
|
||||
case AMF0_BOOLEAN:
|
||||
if !amf.CanRead() {
|
||||
return false, io.ErrUnexpectedEOF
|
||||
}
|
||||
obj = amf.ReadByte() == 1
|
||||
case AMF0_STRING:
|
||||
obj, err = amf.readKey()
|
||||
case AMF0_OBJECT:
|
||||
m := make(map[string]any)
|
||||
for err == nil && obj == nil {
|
||||
obj, err = amf.readProperty(m)
|
||||
}
|
||||
case AMF0_NULL:
|
||||
return nil, nil
|
||||
case AMF0_UNDEFINED:
|
||||
return Undefined, nil
|
||||
case AMF0_ECMA_ARRAY:
|
||||
size := amf.ReadUint32()
|
||||
m := make(EcmaArray)
|
||||
for i := uint32(0); i < size && err == nil && obj == nil; i++ {
|
||||
obj, err = amf.readProperty(m)
|
||||
}
|
||||
case AMF0_END_OBJECT:
|
||||
return ObjectEnd, nil
|
||||
case AMF0_STRICT_ARRAY:
|
||||
size := amf.ReadUint32()
|
||||
var list []any
|
||||
for i := uint32(0); i < size; i++ {
|
||||
v, err := amf.Unmarshal()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
list = append(list, v)
|
||||
}
|
||||
obj = list
|
||||
case AMF0_DATE:
|
||||
if !amf.CanReadN(10) {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
obj = amf.ReadFloat64()
|
||||
amf.ReadN(2)
|
||||
case AMF0_LONG_STRING,
|
||||
AMF0_XML_DOCUMENT:
|
||||
if !amf.CanReadN(4) {
|
||||
return "", io.ErrUnexpectedEOF
|
||||
}
|
||||
l := int(amf.ReadUint32())
|
||||
if !amf.CanReadN(l) {
|
||||
return "", io.ErrUnexpectedEOF
|
||||
}
|
||||
obj = string(amf.ReadN(l))
|
||||
default:
|
||||
err = fmt.Errorf("unsupported type:%d", t)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) writeProperty(key string, v any) {
|
||||
amf.WriteUint16(uint16(len(key)))
|
||||
amf.WriteString(key)
|
||||
amf.Marshal(v)
|
||||
}
|
||||
|
||||
func MarshalAMFs(v ...any) []byte {
|
||||
var amf AMF
|
||||
return amf.Marshals(v...)
|
||||
}
|
||||
|
||||
func (amf *AMF) Marshals(v ...any) []byte {
|
||||
for _, vv := range v {
|
||||
amf.Marshal(vv)
|
||||
}
|
||||
return amf.Buffer
|
||||
}
|
||||
|
||||
func (amf *AMF) Marshal(v any) []byte {
|
||||
if v == nil {
|
||||
amf.WriteByte(AMF0_NULL)
|
||||
return amf.Buffer
|
||||
}
|
||||
switch vv := v.(type) {
|
||||
case string:
|
||||
if l := len(vv); l > 0xFFFF {
|
||||
amf.WriteByte(AMF0_LONG_STRING)
|
||||
amf.WriteUint32(uint32(l))
|
||||
} else {
|
||||
amf.WriteByte(AMF0_STRING)
|
||||
amf.WriteUint16(uint16(l))
|
||||
}
|
||||
amf.WriteString(vv)
|
||||
case float64, uint, float32, int, int16, int32, int64, uint16, uint32, uint64, uint8, int8:
|
||||
amf.WriteByte(AMF0_NUMBER)
|
||||
amf.WriteFloat64(util.ToFloat64(vv))
|
||||
case bool:
|
||||
amf.WriteByte(AMF0_BOOLEAN)
|
||||
if vv {
|
||||
amf.WriteByte(1)
|
||||
} else {
|
||||
amf.WriteByte(0)
|
||||
}
|
||||
case EcmaArray:
|
||||
amf.WriteByte(AMF0_ECMA_ARRAY)
|
||||
amf.WriteUint32(uint32(len(vv)))
|
||||
for k, v := range vv {
|
||||
amf.writeProperty(k, v)
|
||||
}
|
||||
amf.Write(END_OBJ)
|
||||
case map[string]any:
|
||||
amf.WriteByte(AMF0_OBJECT)
|
||||
for k, v := range vv {
|
||||
amf.writeProperty(k, v)
|
||||
}
|
||||
amf.Write(END_OBJ)
|
||||
default:
|
||||
v := reflect.ValueOf(vv)
|
||||
if !v.IsValid() {
|
||||
amf.WriteByte(AMF0_NULL)
|
||||
return amf.Buffer
|
||||
}
|
||||
switch v.Kind() {
|
||||
case reflect.Slice, reflect.Array:
|
||||
amf.WriteByte(AMF0_STRICT_ARRAY)
|
||||
size := v.Len()
|
||||
amf.WriteUint32(uint32(size))
|
||||
for i := 0; i < size; i++ {
|
||||
amf.Marshal(v.Index(i).Interface())
|
||||
}
|
||||
amf.Write(END_OBJ)
|
||||
default:
|
||||
panic("amf Marshal faild")
|
||||
}
|
||||
}
|
||||
return amf.Buffer
|
||||
}
|
||||
@@ -217,19 +217,3 @@ func AudioSpecificConfigToADTS(asc *AudioSpecificConfig, rawDataLength int) (adt
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// func ParseRTPAAC(payload []byte) (result [][]byte) {
|
||||
// auHeaderLen := util.ReadBE[int](payload[:2]) >> 3
|
||||
// var auLenArray []int
|
||||
// for iIndex := 2; iIndex <= auHeaderLen; iIndex += 2 {
|
||||
// auLen := util.ReadBE[int](payload[iIndex:iIndex+2]) >> 3
|
||||
// auLenArray = append(auLenArray, auLen)
|
||||
// }
|
||||
// startOffset := 2 + auHeaderLen
|
||||
// for _, auLen := range auLenArray {
|
||||
// endOffset := startOffset + auLen
|
||||
// result = append(result, payload[startOffset:endOffset])
|
||||
// startOffset = startOffset + auLen
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
37
codec/flv.go
37
codec/flv.go
@@ -78,23 +78,11 @@ var ErrInvalidFLV = errors.New("invalid flv")
|
||||
var FLVHeader = []byte{'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 9, 0, 0, 0, 0}
|
||||
|
||||
func WriteFLVTag(w io.Writer, t byte, timestamp uint32, payload net.Buffers) (err error) {
|
||||
head := make([]byte, 11)
|
||||
tail := make([]byte, 4)
|
||||
head[0] = t
|
||||
dataSize := uint32(util.SizeOfBuffers(payload))
|
||||
util.PutBE(tail, dataSize+11)
|
||||
util.PutBE(head[1:4], dataSize)
|
||||
head[4] = byte(timestamp >> 16)
|
||||
head[5] = byte(timestamp >> 8)
|
||||
head[6] = byte(timestamp)
|
||||
head[7] = byte(timestamp >> 24)
|
||||
var tag = net.Buffers{head}
|
||||
tag = append(tag, payload...)
|
||||
tag = append(tag, tail)
|
||||
// Tag Data
|
||||
_, err = tag.WriteTo(w)
|
||||
payload = AVCC2FLV(t, payload, timestamp)
|
||||
_, err = payload.WriteTo(w)
|
||||
return
|
||||
}
|
||||
|
||||
func ReadFLVTag(r io.Reader) (t byte, timestamp uint32, payload []byte, err error) {
|
||||
head := make([]byte, 11)
|
||||
if _, err = io.ReadFull(r, head); err != nil {
|
||||
@@ -110,20 +98,17 @@ func ReadFLVTag(r io.Reader) (t byte, timestamp uint32, payload []byte, err erro
|
||||
return
|
||||
}
|
||||
|
||||
func AudioAVCC2FLV(avcc net.Buffers, ts uint32) (flv net.Buffers) {
|
||||
b := util.Buffer(make([]byte, 0, 15))
|
||||
b.WriteByte(FLV_TAG_TYPE_AUDIO)
|
||||
dataSize := util.SizeOfBuffers(avcc)
|
||||
b.WriteUint24(uint32(dataSize))
|
||||
b.WriteUint24(ts)
|
||||
b.WriteByte(byte(ts >> 24))
|
||||
b.WriteUint24(0)
|
||||
return append(append(append(flv, b), avcc...), util.PutBE(b.Malloc(4), dataSize+11))
|
||||
func AudioAVCC2FLV(avcc net.Buffers, ts uint32) net.Buffers {
|
||||
return AVCC2FLV(FLV_TAG_TYPE_AUDIO, avcc, ts)
|
||||
}
|
||||
|
||||
func VideoAVCC2FLV(avcc net.Buffers, ts uint32) (flv net.Buffers) {
|
||||
func VideoAVCC2FLV(avcc net.Buffers, ts uint32) net.Buffers {
|
||||
return AVCC2FLV(FLV_TAG_TYPE_VIDEO, avcc, ts)
|
||||
}
|
||||
|
||||
func AVCC2FLV(t byte, avcc net.Buffers, ts uint32) (flv net.Buffers) {
|
||||
b := util.Buffer(make([]byte, 0, 15))
|
||||
b.WriteByte(FLV_TAG_TYPE_VIDEO)
|
||||
b.WriteByte(t)
|
||||
dataSize := util.SizeOfBuffers(avcc)
|
||||
b.WriteUint24(uint32(dataSize))
|
||||
b.WriteUint24(ts)
|
||||
|
||||
@@ -526,15 +526,13 @@ func (s *MpegTsStream) Feed(ts io.Reader) error {
|
||||
s.tsPktBuffer = s.tsPktBuffer[:0]
|
||||
}()
|
||||
for {
|
||||
bufferLen := len(s.tsPktBuffer)
|
||||
if cap(s.tsPktBuffer) > bufferLen {
|
||||
if s.tsPktBuffer = s.tsPktBuffer[:bufferLen+1]; s.tsPktBuffer[bufferLen] == nil {
|
||||
s.tsPktBuffer[bufferLen] = make([]byte, TS_PACKET_SIZE)
|
||||
}
|
||||
var tsData []byte
|
||||
if tsDataP := util.MallocSlice(&s.tsPktBuffer); tsDataP == nil {
|
||||
tsData = make([]byte, TS_PACKET_SIZE)
|
||||
s.tsPktBuffer = append(s.tsPktBuffer, tsData)
|
||||
} else {
|
||||
s.tsPktBuffer = append(s.tsPktBuffer, make([]byte, TS_PACKET_SIZE))
|
||||
tsData = *tsDataP
|
||||
}
|
||||
tsData := s.tsPktBuffer[bufferLen]
|
||||
_, err := io.ReadFull(ts, tsData)
|
||||
reader.Reset(tsData)
|
||||
if err == io.EOF {
|
||||
|
||||
@@ -27,9 +27,6 @@ type RawSlice interface {
|
||||
~[][]byte | ~[]byte
|
||||
}
|
||||
|
||||
// func (nalu *H264NALU) Append(slice ...NALUSlice) {
|
||||
// *nalu = append(*nalu, slice...)
|
||||
// }
|
||||
func (nalu NALUSlice) H264Type() (naluType codec.H264NALUType) {
|
||||
return naluType.Parse(nalu[0][0])
|
||||
}
|
||||
|
||||
@@ -54,6 +54,9 @@ func (p *Publisher) OnEvent(event any) {
|
||||
}
|
||||
|
||||
func (p *Publisher) WriteAVCCVideo(ts uint32, frame common.AVCCFrame) {
|
||||
if len(frame) < 6 {
|
||||
return
|
||||
}
|
||||
if p.VideoTrack == nil {
|
||||
if frame.IsSequence() {
|
||||
ts = 0
|
||||
@@ -77,11 +80,14 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame common.AVCCFrame) {
|
||||
}
|
||||
|
||||
func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) {
|
||||
if len(frame) < 4 {
|
||||
return
|
||||
}
|
||||
if p.AudioTrack == nil {
|
||||
codecID := frame.AudioCodecID()
|
||||
switch codecID {
|
||||
case codec.CodecID_AAC:
|
||||
if !frame.IsSequence() || len(frame) < 4 {
|
||||
if !frame.IsSequence() {
|
||||
return
|
||||
}
|
||||
a := track.NewAAC(p.Stream)
|
||||
|
||||
@@ -138,6 +138,8 @@ func (vt *Video) WriteAnnexB(frame AnnexBFrame) (s []NALUSlice) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
vt.Media.WriteAVCC(ts, frame)
|
||||
for nalus := frame[5:]; len(nalus) > vt.nalulenSize; {
|
||||
@@ -147,7 +149,12 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
return
|
||||
}
|
||||
if end := nalulen + vt.nalulenSize; len(nalus) >= end {
|
||||
vt.AVRing.RingBuffer.Value.AppendRaw(NALUSlice{nalus[vt.nalulenSize:end]})
|
||||
slice := nalus[vt.nalulenSize:end]
|
||||
if _rawSlice := util.MallocSlice(&vt.AVRing.Value.Raw); _rawSlice == nil {
|
||||
vt.Value.AppendRaw(NALUSlice{slice})
|
||||
} else {
|
||||
_rawSlice.Reset().Append(slice)
|
||||
}
|
||||
nalus = nalus[end:]
|
||||
} else {
|
||||
vt.Stream.Error("WriteAVCC", zap.Int("len", len(nalus)), zap.Int("naluLenSize", vt.nalulenSize), zap.Int("end", end))
|
||||
|
||||
@@ -56,8 +56,13 @@ func (b *Buffer) Write(a []byte) (n int, err error) {
|
||||
func (b Buffer) Len() int {
|
||||
return len(b)
|
||||
}
|
||||
|
||||
func (b Buffer) CanRead() bool {
|
||||
return b.Len() > 0
|
||||
return b.CanReadN(1)
|
||||
}
|
||||
|
||||
func (b Buffer) CanReadN(n int) bool {
|
||||
return b.Len() >= n
|
||||
}
|
||||
func (b Buffer) Cap() int {
|
||||
return cap(b)
|
||||
@@ -87,6 +92,16 @@ func (b *Buffer) Glow(n int) {
|
||||
*b = b.SubBuf(0, l)
|
||||
}
|
||||
|
||||
// MallocSlice 用来对容量够的slice进行长度扩展+1,并返回新的位置的指针,用于写入
|
||||
func MallocSlice[T any](slice *[]T) *T {
|
||||
oslice := *slice
|
||||
if rawLen := len(oslice); cap(oslice) > rawLen {
|
||||
*slice = oslice[:rawLen+1]
|
||||
return &(*slice)[rawLen]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConcatBuffers 合并碎片内存为一个完整内存
|
||||
func ConcatBuffers[T ~[]byte](input []T) (out []byte) {
|
||||
for _, v := range input {
|
||||
|
||||
@@ -16,3 +16,15 @@ func TestBuffer(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestMallocSlice(t *testing.T) {
|
||||
t.Run(t.Name(), func(t *testing.T) {
|
||||
var a [][]byte = [][]byte{}
|
||||
b := MallocSlice(&a)
|
||||
if *b != nil {
|
||||
t.Fail()
|
||||
} else if *b = []byte{1}; a[0][0] != 1 {
|
||||
t.Fail()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user