改变avcc类型

This commit is contained in:
dexter
2023-01-19 20:33:20 +08:00
parent 5c5747e669
commit b77e57b1bb
11 changed files with 241 additions and 70 deletions

View File

@@ -10,6 +10,7 @@ import (
)
type NALUSlice net.Buffers
// 裸数据片段
type RawSlice interface {
~[][]byte | ~[]byte
@@ -61,7 +62,7 @@ func (nalu *NALUSlice) Append(b ...[]byte) {
// return false
// }
type AVCCFrame []byte // 一帧AVCC格式的数据
type AVCCFrame net.Buffers // 一帧AVCC格式的数据
type AnnexBFrame []byte // 一帧AnnexB格式数据
type RTPFrame struct {
rtp.Packet
@@ -110,16 +111,16 @@ type AVFrame[T RawSlice] struct {
canRead bool
}
func (av *AVFrame[T]) AppendRaw(raw ...T) {
av.Raw = append(av.Raw, raw...)
func (av *AVFrame[T]) AppendRaw(raw T) {
av.Raw = append(av.Raw, raw)
}
func (av *AVFrame[T]) AppendAVCC(avcc ...[]byte) {
func (av *AVFrame[T]) AppendAVCC(avcc AVCCFrame) {
av.AVCC = append(av.AVCC, avcc...)
}
func (av *AVFrame[T]) AppendRTP(rtp ...*RTPFrame) {
av.RTP = append(av.RTP, rtp...)
func (av *AVFrame[T]) AppendRTP(rtp *RTPFrame) {
av.RTP = append(av.RTP, rtp)
}
// Clear 清空数据 gc
@@ -147,20 +148,50 @@ func (av *AVFrame[T]) Reset() {
}
func (avcc AVCCFrame) IsIDR() bool {
v := avcc[0] >> 4
v := avcc[0][0] >> 4
return v == 1 || v == 4 //generated keyframe
}
func (avcc AVCCFrame) IsSequence() bool {
return avcc[1] == 0
return avcc[0][1] == 0
}
func (avcc AVCCFrame) CTS() uint32 {
return uint32(avcc[2])<<24 | uint32(avcc[3])<<8 | uint32(avcc[4])
return uint32(avcc[0][2])<<24 | uint32(avcc[0][3])<<8 | uint32(avcc[0][4])
}
func (avcc AVCCFrame) VideoCodecID() codec.VideoCodecID {
return codec.VideoCodecID(avcc[0] & 0x0F)
return codec.VideoCodecID(avcc[0][0] & 0x0F)
}
func (avcc AVCCFrame) AudioCodecID() codec.AudioCodecID {
return codec.AudioCodecID(avcc[0] >> 4)
return codec.AudioCodecID(avcc[0][0] >> 4)
}
func (avcc *AVCCFrame) ReadByte() (b byte) {
cur := *avcc
b = cur[0][0]
if len(cur[0]) == 1 {
*avcc = cur[1:]
} else {
cur[0] = cur[0][1:]
}
return
}
func (avcc *AVCCFrame) ReadN(n int) (result net.Buffers) {
require := n
cur := *avcc
for require > 0 && len(cur) > 0 {
firstLen := len(cur[0])
if firstLen > require {
result = append(result, cur[0][:require])
cur[0] = cur[0][require:]
return
} else {
result = append(result, cur[0])
require -= firstLen
cur = cur[1:]
*avcc = cur
}
}
return
}
// func (annexb AnnexBFrame) ToSlices() (ret []NALUSlice) {

View File

@@ -93,7 +93,7 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) {
a := track.NewAAC(p.Stream)
p.AudioTrack = a
a.Audio.SampleSize = 16
a.AVCCHead = []byte{frame[0], 1}
a.AVCCHead = []byte{frame[0][0], 1}
a.WriteAVCC(0, frame)
case codec.CodecID_PCMA,
codec.CodecID_PCMU:
@@ -103,13 +103,13 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) {
}
a := track.NewG711(p.Stream, alaw)
p.AudioTrack = a
a.Audio.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2])
a.Audio.SampleRate = uint32(codec.SoundRate[(frame[0][0]&0x0c)>>2])
a.Audio.SampleSize = 16
if frame[0]&0x02 == 0 {
if frame[0][0]&0x02 == 0 {
a.Audio.SampleSize = 8
}
a.Channels = frame[0]&0x01 + 1
a.AVCCHead = frame[:1]
a.Channels = frame[0][0]&0x01 + 1
a.AVCCHead = frame[0][:1]
p.AudioTrack.WriteAVCC(ts, frame)
default:
p.Stream.Error("audio codec not support yet", zap.Uint8("codecId", uint8(codecID)))

View File

@@ -74,20 +74,23 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) {
}
func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
if len(frame) < 4 {
aac.Audio.Stream.Error("AVCC data too short", zap.ByteString("data", frame))
if l := util.SizeOfBuffers(frame); l < 4 {
aac.Stream.Error("AVCC data too short", zap.Int("len", l))
return
}
if frame.IsSequence() {
aac.Audio.DecoderConfiguration.AVCC = net.Buffers{frame}
config1, config2 := frame[2], frame[3]
aac.Audio.DecoderConfiguration.AVCC = net.Buffers(frame)
config1, config2 := frame[0][2], frame[0][3]
aac.Profile = (config1 & 0xF8) >> 3
aac.Channels = ((config2 >> 3) & 0x0F) //声道
aac.Audio.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)])
aac.Audio.DecoderConfiguration.Raw = frame[2:]
aac.Audio.DecoderConfiguration.Raw = frame[0][2:]
aac.Attach()
} else {
aac.Value.AppendRaw(frame[2:])
aac.Value.AppendRaw(frame[0][2:])
for _, data := range frame[1:] {
aac.Value.AppendRaw(data)
}
aac.Audio.WriteAVCC(ts, frame)
}
}

View File

@@ -99,10 +99,7 @@ func (av *Audio) WriteAVCC(ts uint32, frame AVCCFrame) {
}
func (a *Audio) CompleteAVCC(value *AVFrame[[]byte]) {
value.AppendAVCC(a.AVCCHead)
for _, raw := range value.Raw {
value.AppendAVCC(raw)
}
value.AVCC = append(append(value.AVCC, a.AVCCHead), value.Raw...)
}
func (a *Audio) CompleteRTP(value *AVFrame[[]byte]) {

View File

@@ -6,6 +6,7 @@ import (
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
func NewG711(stream IStream, alaw bool) (g711 *G711) {
@@ -33,11 +34,14 @@ type G711 struct {
}
func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) {
if len(frame) < 2 {
g711.Stream.Error("AVCC data too short", zap.ByteString("data", frame))
if l := util.SizeOfBuffers(frame); l < 2 {
g711.Stream.Error("AVCC data too short", zap.Int("len", l))
return
}
g711.Value.AppendRaw(frame[1:])
g711.Value.AppendRaw(frame[0][1:])
for _, data := range frame[1:] {
g711.Value.AppendRaw(data)
}
g711.Audio.WriteAVCC(ts, frame)
}

View File

@@ -65,16 +65,16 @@ func (vt *H264) WriteSliceBytes(slice []byte) {
}
func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) {
if len(frame) < 6 {
vt.Stream.Error("AVCC data too short", zap.ByteString("data", frame))
if l:=util.SizeOfBuffers(frame);l < 6 {
vt.Stream.Error("AVCC data too short", zap.Int("len", l))
return
}
if frame.IsSequence() {
vt.dcChanged = true
vt.Video.DecoderConfiguration.Seq++
vt.Video.DecoderConfiguration.AVCC = net.Buffers{frame}
vt.Video.DecoderConfiguration.AVCC = net.Buffers(frame)
var info codec.AVCDecoderConfigurationRecord
if _, err := info.Unmarshal(frame[5:]); err == nil {
if _, err := info.Unmarshal(frame[0][5:]); err == nil {
vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit)
vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1)
vt.Video.DecoderConfiguration.Raw[0] = info.SequenceParameterSetNALUnit

View File

@@ -60,17 +60,17 @@ func (vt *H265) WriteSliceBytes(slice []byte) {
}
func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) {
if len(frame) < 6 {
vt.Stream.Error("AVCC data too short", zap.ByteString("data", frame))
if l := util.SizeOfBuffers(frame); l < 6 {
vt.Stream.Error("AVCC data too short", zap.Int("len", l))
return
}
if frame.IsSequence() {
vt.Video.dcChanged = true
vt.Video.DecoderConfiguration.Seq++
vt.Video.DecoderConfiguration.AVCC = net.Buffers{frame}
if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(frame); err == nil {
vt.Video.SPSInfo, _ = codec.ParseHevcSPS(frame)
vt.Video.nalulenSize = (int(frame[26]) & 0x03) + 1
vt.Video.DecoderConfiguration.AVCC = net.Buffers(frame)
if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(frame[0]); err == nil {
vt.Video.SPSInfo, _ = codec.ParseHevcSPS(frame[0])
vt.Video.nalulenSize = (int(frame[0][26]) & 0x03) + 1
vt.Video.DecoderConfiguration.Raw[0] = vps
vt.Video.DecoderConfiguration.Raw[1] = sps
vt.Video.DecoderConfiguration.Raw[2] = pps

View File

@@ -89,10 +89,10 @@ func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) e
func (vt *Video) computeGOP() {
vt.idrCount++
if vt.IDRing != nil {
vt.GOP = int(vt.AVRing.RingBuffer.Value.Sequence - vt.IDRing.Value.Sequence)
if l := vt.AVRing.RingBuffer.Size - vt.GOP - 5; l > 5 {
vt.AVRing.RingBuffer.Size -= l
vt.Stream.Debug("resize", zap.Int("before", vt.AVRing.RingBuffer.Size+l), zap.Int("after", vt.AVRing.RingBuffer.Size), zap.String("name", vt.Name))
vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence)
if l := vt.AVRing.Size - vt.GOP - 5; l > 5 {
vt.AVRing.Size -= l
vt.Stream.Debug("resize", zap.Int("before", vt.AVRing.Size+l), zap.Int("after", vt.AVRing.Size), zap.String("name", vt.Name))
//缩小缓冲环节省内存
vt.Unlink(l).Do(func(v AVFrame[NALUSlice]) {
if v.IFrame {
@@ -102,7 +102,7 @@ func (vt *Video) computeGOP() {
})
}
}
vt.IDRing = vt.AVRing.RingBuffer.Ring
vt.IDRing = vt.AVRing.Ring
}
func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame) {
@@ -134,19 +134,17 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) {
vt.Media.WriteAVCC(ts, frame)
vt.Value.DTS = ts * 90
vt.Value.PTS = (ts + frame.CTS()) * 90
for nalus := frame[5:]; len(nalus) > vt.nalulenSize; {
nalulen := util.ReadBE[int](nalus[:vt.nalulenSize])
frame.ReadN(5)
for len(frame) > 0 {
nalulen := 0
for i, n := 0, vt.nalulenSize; i < n; i++ {
nalulen += int(frame.ReadByte()) << ((n - i - 1) << 3)
}
if nalulen == 0 {
vt.Stream.Warn("WriteAVCC with nalulen=0", zap.Int("len", len(nalus)))
vt.Stream.Warn("WriteAVCC with nalulen=0")
return
}
if end := nalulen + vt.nalulenSize; len(nalus) >= end {
vt.WriteRawBytes(nalus[vt.nalulenSize:end])
nalus = nalus[end:]
} else {
vt.Stream.Error("WriteAVCC", zap.Int("len", len(nalus)), zap.Int("naluLenSize", vt.nalulenSize), zap.Int("end", end))
break
}
vt.WriteSlice(NALUSlice(frame.ReadN(nalulen)))
}
vt.Flush()
}
@@ -155,8 +153,12 @@ func (vt *Video) WriteSliceByte(b ...byte) {
vt.WriteSliceBytes(b)
}
func (vt *Video) WriteSlice(slice NALUSlice) {
vt.Value.AppendRaw(slice)
}
func (vt *Video) WriteRawBytes(slice []byte) {
if naluSlice := util.MallocSlice(&vt.AVRing.Value.Raw); naluSlice == nil {
if naluSlice := util.MallocSlice(&vt.Value.Raw); naluSlice == nil {
vt.Value.AppendRaw(NALUSlice{slice})
} else {
naluSlice.Reset(slice)
@@ -221,10 +223,10 @@ func (vt *Video) CompleteAVCC(rv *AVFrame[NALUSlice]) {
// 写入CTS
util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90)
lengths := b.Malloc(len(rv.Raw) * 4) //每个slice的长度内存复用
rv.AppendAVCC(b.SubBuf(0, 5))
rv.AVCC = append(rv.AVCC, b.SubBuf(0, 5))
for i, nalu := range rv.Raw {
rv.AppendAVCC(util.PutBE(lengths.SubBuf(i*4, 4), util.SizeOfBuffers(nalu)))
rv.AppendAVCC(nalu...)
rv.AVCC = append(rv.AVCC, util.PutBE(lengths.SubBuf(i*4, 4), util.SizeOfBuffers(nalu)))
rv.AVCC = append(rv.AVCC, nalu...)
}
}
@@ -250,17 +252,19 @@ func (vt *Video) Flush() {
return
}
}
// 仅存一枚I帧
if vt.idrCount == 1 {
// 下一帧为I帧即将覆盖需要扩环
if vt.Next().Value.IFrame {
if vt.AVRing.RingBuffer.Size < 256 {
// 仅存一枚I帧
if vt.idrCount == 1 {
if vt.AVRing.Size < 256 {
vt.Stream.Debug("resize", zap.Int("before", vt.AVRing.Size), zap.Int("after", vt.AVRing.Size+5), zap.String("name", vt.Name))
vt.Link(util.NewRing[AVFrame[NALUSlice]](5)) // 扩大缓冲环
}
} else {
vt.idrCount--
}
}
vt.Media.Flush()
vt.dcChanged = false
}

View File

@@ -1,11 +1,9 @@
package codec
package util
import (
"fmt"
"io"
"reflect"
"m7s.live/engine/v4/util"
)
// Action Message Format -- AMF 0
@@ -82,7 +80,7 @@ var (
type EcmaArray map[string]any
type AMF struct {
util.Buffer
Buffer
}
func (amf *AMF) ReadShortString() string {
@@ -136,7 +134,7 @@ func (amf *AMF) Unmarshal() (obj any, err error) {
if !amf.CanRead() {
return nil, io.ErrUnexpectedEOF
}
defer func(b util.Buffer) {
defer func(b Buffer) {
if err != nil {
amf.Buffer = b
}
@@ -239,7 +237,7 @@ func (amf *AMF) Marshal(v any) []byte {
amf.WriteString(vv)
case float64, uint, float32, int, int16, int32, int64, uint16, uint32, uint64, uint8, int8:
amf.WriteByte(AMF0_NUMBER)
amf.WriteFloat64(util.ToFloat64(vv))
amf.WriteFloat64(ToFloat64(vv))
case bool:
amf.WriteByte(AMF0_BOOLEAN)
if vv {

View File

@@ -3,6 +3,7 @@ package util
import (
"encoding/binary"
"math"
"net"
)
type Buffer []byte
@@ -83,15 +84,35 @@ func (b *Buffer) Malloc(count int) Buffer {
}
return b.SubBuf(l, count)
}
func (b *Buffer) Reset() {
*b = b.SubBuf(0, 0)
}
func (b *Buffer) Glow(n int) {
l := b.Len()
b.Malloc(n)
*b = b.SubBuf(0, l)
}
func (b *Buffer) Split(n int) (result net.Buffers) {
origin := *b
for {
if b.CanReadN(n) {
result = append(result, b.ReadN(n))
} else {
result = append(result, *b)
*b = origin
return
}
}
}
func (b *Buffer) MarshalAMFs(v ...any) {
amf := AMF{*b}
*b = amf.Marshals(v...)
}
// MallocSlice 用来对容量够的slice进行长度扩展+1并返回新的位置的指针用于写入
func MallocSlice[T any](slice *[]T) *T {
oslice := *slice

113
util/pool.go Normal file
View File

@@ -0,0 +1,113 @@
package util
import "net"
type BytesLinkList struct {
Head *BytesLinkItem
Tail *BytesLinkItem
Length int
ByteLength int
}
func (list *BytesLinkList) Push(item *BytesLinkItem) {
if list == nil {
return
}
if list.Head == nil {
list.Head = item
list.Tail = item
list.Length = 1
list.ByteLength = item.Len()
return
}
list.Tail.Next = item
list.Tail = item
list.Length++
list.ByteLength += item.Len()
}
func (list *BytesLinkList) Shift() (item *BytesLinkItem) {
if list.Head == nil {
return nil
}
item = list.Head
list.Head = list.Head.Next
list.Length--
list.ByteLength -= item.Len()
return
}
func (list *BytesLinkList) ToBuffers() (result net.Buffers) {
for p := list.Head; p != nil; p = p.Next {
result = append(result, p.Bytes)
}
return
}
// 全部回收掉
func (list *BytesLinkList) Recycle() {
for p := list.Head; p != nil; p = p.Next {
p.Pool.Push(p)
}
list.Head = nil
list.Tail = nil
list.Length = 0
list.ByteLength = 0
}
type BytesLinkItem struct {
Next *BytesLinkItem
Bytes []byte
Pool *BytesLinkList
}
func (b *BytesLinkItem) Len() int {
return len(b.Bytes)
}
func (b *BytesLinkItem) Recycle() {
b.Pool.Push(b)
}
func (b *BytesLinkItem) ToBuffers() (result net.Buffers) {
for p := b; p != nil; p = p.Next {
result = append(result, p.Bytes)
}
return
}
type BytesPool []BytesLinkList
// 获取来自真实内存的切片的——假内存块,即只回收外壳
func (p BytesPool) GetFake() (item *BytesLinkItem) {
if p[0].Length > 0 {
return p[0].Shift()
} else {
return &BytesLinkItem{
Pool: &p[0],
}
}
}
func (p BytesPool) Get(size int) (item *BytesLinkItem) {
for i := 1; i < len(p); i++ {
level := 1 << i
if level >= size {
if p[i].Length > 0 {
item = p[i].Shift()
item.Bytes = item.Bytes[:size]
} else {
item = &BytesLinkItem{
Bytes: make([]byte, size, level),
Pool: &p[i],
}
}
}
}
if item == nil {
item = &BytesLinkItem{
Bytes: make([]byte, size),
}
}
return
}