优化amf读取

This commit is contained in:
dexter
2023-01-30 10:35:43 +08:00
parent 092719a61d
commit db6f8de3a7
10 changed files with 118 additions and 52 deletions

View File

@@ -76,7 +76,7 @@ type AVTrack interface {
CurrentFrame() *AVFrame
Attach()
Detach()
WriteAVCC(ts uint32, frame util.BLL) //写入AVCC格式的数据
WriteAVCC(ts uint32, frame util.BLL) error //写入AVCC格式的数据
WriteRTP([]byte)
WriteRTPPack(*rtp.Packet)
Flush()

View File

@@ -27,12 +27,16 @@ const (
SUBSTATE_NORMAL
)
// AVCC 格式的序列帧
type VideoDeConf []byte
// AVCC 格式的序列帧
type AudioDeConf []byte
type AudioFrame struct {
*AVFrame
AbsTime uint32
PTS uint32
DTS uint32
}
type VideoFrame struct {
*AVFrame
@@ -168,13 +172,13 @@ func (s *Subscriber) PlayBlock(subType byte) {
s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO)
ctx := s.TrackPlayer.Context
sendVideoDecConf := func() {
spesic.OnEvent(s.Video.ParamaterSets)
spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead))
}
sendAudioDecConf := func() {
spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead))
}
var sendVideoFrame func(*AVFrame)
var sendAudioFrame func(*AVFrame)
var sendAudioFrame, sendVideoFrame func(*AVFrame)
switch subType {
case SUBTYPE_RAW:
sendVideoFrame = func(frame *AVFrame) {
@@ -183,7 +187,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
}
sendAudioFrame = func(frame *AVFrame) {
// println("a", frame.Sequence, s.AudioReader.AbsTime)
spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, frame.DTS - s.VideoReader.SkipTs*90})
spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, s.AudioReader.AbsTime * 90, s.AudioReader.AbsTime * 90})
}
case SUBTYPE_RTP:
var videoSeq, audioSeq uint16
@@ -201,7 +205,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
audioSeq++
vp := *p
vp.Header.SequenceNumber = audioSeq
vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipTs*90
vp.Header.Timestamp = vp.Header.Timestamp - s.AudioReader.SkipTs*90
spesic.OnEvent((AudioRTP)(vp))
}
}
@@ -225,10 +229,11 @@ func (s *Subscriber) PlayBlock(subType byte) {
// spesic.OnEvent(FLVFrame(copyBuffers(s.Audio.Track.DecoderConfiguration.FLV)))
}
sendVideoFrame = func(frame *AVFrame) {
// println(frame.Sequence, frame.AbsTime, frame.DeltaTime, frame.IFrame)
// println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame)
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...)
}
sendAudioFrame = func(frame *AVFrame) {
// println(frame.Sequence, s.AudioReader.AbsTime, frame.DeltaTime)
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...)
}
}
@@ -237,32 +242,49 @@ func (s *Subscriber) PlayBlock(subType byte) {
if s.Args.Has(s.Config.SubModeArgName) {
subMode, _ = strconv.Atoi(s.Args.Get(s.Config.SubModeArgName))
}
var videoFrame, audioFrame *AVFrame
for ctx.Err() == nil {
hasVideo, hasAudio := s.VideoReader.Track != nil && s.Config.SubVideo, s.AudioReader.Track != nil && s.Config.SubAudio
if hasVideo {
if videoFrame != nil {
sendVideoFrame(videoFrame)
videoFrame = nil
}
for ctx.Err() == nil {
s.VideoReader.Read(ctx, subMode)
frame := s.VideoReader.Frame
// println("video", frame.Sequence, frame.AbsTime)
if frame == nil {
if frame == nil || ctx.Err() != nil {
return
}
if audioFrame != nil {
if frame.AbsTime > audioFrame.AbsTime {
sendAudioFrame(audioFrame)
audioFrame = nil
}
}
if frame.IFrame && s.VideoReader.DecConfChanged() {
s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq
// println(s.Video.confSeq, s.Video.Track.SPSInfo.Width, s.Video.Track.SPSInfo.Height)
sendVideoDecConf()
}
if !s.Config.IFrameOnly || frame.IFrame {
sendVideoFrame(frame)
}
if frame.AbsTime > lastPlayAbsTime {
lastPlayAbsTime = frame.AbsTime
videoFrame = frame
break
} else {
sendVideoFrame(frame)
}
}
}
}
// 正常模式下或者纯音频模式下,音频开始播放
if hasAudio {
if audioFrame != nil {
sendAudioFrame(audioFrame)
audioFrame = nil
}
for ctx.Err() == nil {
switch s.AudioReader.State {
case track.READSTATE_INIT:
@@ -277,17 +299,25 @@ func (s *Subscriber) PlayBlock(subType byte) {
s.AudioReader.Read(ctx, subMode)
frame := s.AudioReader.Frame
// println("audio", frame.Sequence, frame.AbsTime)
if frame == nil {
if frame == nil || ctx.Err() != nil {
return
}
if videoFrame != nil {
if frame.AbsTime > videoFrame.AbsTime {
sendVideoFrame(videoFrame)
videoFrame = nil
}
}
if s.AudioReader.DecConfChanged() {
s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq
sendAudioDecConf()
}
sendAudioFrame(frame)
if frame.AbsTime > lastPlayAbsTime {
lastPlayAbsTime = frame.AbsTime
audioFrame = frame
break
} else {
sendAudioFrame(frame)
}
}
}

View File

@@ -1,6 +1,7 @@
package track
import (
"io"
"net"
"time"
@@ -100,10 +101,10 @@ func (aac *AAC) WriteAVCCSequenceHead(sh []byte) {
aac.WriteSequenceHead(append([]byte{0xAF, 0x00}, sh...))
}
func (aac *AAC) WriteAVCC(ts uint32, frame util.BLL) {
func (aac *AAC) WriteAVCC(ts uint32, frame util.BLL) error {
if l := frame.ByteLength; l < 4 {
aac.Stream.Error("AVCC data too short", zap.Int("len", l))
return
return io.ErrShortWrite
}
if frame.GetByte(1) == 0 {
aac.WriteSequenceHead(frame.ToBytes())
@@ -114,6 +115,7 @@ func (aac *AAC) WriteAVCC(ts uint32, frame util.BLL) {
aac.AppendAuBytes(au...)
aac.Audio.WriteAVCC(ts, frame)
}
return nil
}
func (aac *AAC) CompleteRTP(value *AVFrame) {

View File

@@ -93,7 +93,7 @@ func (av *Media) SnapForJson() {
av.RawPart = av.RawPart[:0]
}
av.RawSize = v.AUList.ByteLength
r := v.AUList.Next.Value.NewReader()
r := v.AUList.NewReader()
for b, err := r.ReadByte(); err == nil && len(av.RawPart) < 10; b, err = r.ReadByte() {
av.RawPart = append(av.RawPart, int(b))
}

View File

@@ -1,6 +1,7 @@
package track
import (
"io"
"time"
"go.uber.org/zap"
@@ -36,17 +37,18 @@ type G711 struct {
Audio
}
func (g711 *G711) WriteAVCC(ts uint32, frame util.BLL) {
func (g711 *G711) WriteAVCC(ts uint32, frame util.BLL) error {
if l := frame.ByteLength; l < 2 {
g711.Stream.Error("AVCC data too short", zap.Int("len", l))
return
return io.ErrShortWrite
}
g711.Value.AUList.Push(g711.BytesPool.GetShell(frame.Next.Value[1:]))
frame.Next.Range(func(v util.BLI) bool {
frame.Range(func(v util.BLI) bool {
g711.Value.AUList.Push(g711.BytesPool.GetShell(v))
return true
})
g711.Audio.WriteAVCC(ts, frame)
return nil
}
func (g711 *G711) WriteRTPFrame(frame *RTPFrame) {

View File

@@ -1,6 +1,7 @@
package track
import (
"io"
"time"
"go.uber.org/zap"
@@ -65,22 +66,23 @@ func (vt *H264) WriteSliceBytes(slice []byte) {
case codec.NALU_SEI:
vt.AppendAuBytes(slice)
case codec.NALU_Access_Unit_Delimiter:
case codec.NALU_Filler_Data:
default:
vt.Stream.Error("H264 WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType)))
}
}
func (vt *H264) WriteAVCC(ts uint32, frame util.BLL) {
func (vt *H264) WriteAVCC(ts uint32, frame util.BLL) (err error) {
if l := frame.ByteLength; l < 6 {
vt.Stream.Error("AVCC data too short", zap.Int("len", l))
return
return io.ErrShortWrite
}
if frame.GetByte(1) == 0 {
vt.SequenceHead = frame.ToBytes()
frame.Recycle()
vt.updateSequeceHead()
var info codec.AVCDecoderConfigurationRecord
if _, err := info.Unmarshal(vt.SequenceHead[5:]); err == nil {
if _, err = info.Unmarshal(vt.SequenceHead[5:]); err == nil {
vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit)
vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1)
vt.SPS = info.SequenceParameterSetNALUnit
@@ -91,8 +93,9 @@ func (vt *H264) WriteAVCC(ts uint32, frame util.BLL) {
vt.Stream.Error("H264 ParseSpsPps Error")
vt.Stream.Close()
}
return
} else {
vt.Video.WriteAVCC(ts, frame)
return vt.Video.WriteAVCC(ts, frame)
}
}

View File

@@ -1,6 +1,7 @@
package track
import (
"io"
"time"
"go.uber.org/zap"
@@ -62,27 +63,25 @@ func (vt *H265) WriteSliceBytes(slice []byte) {
}
}
func (vt *H265) WriteAVCC(ts uint32, frame util.BLL) {
func (vt *H265) WriteAVCC(ts uint32, frame util.BLL) (err error) {
if l := frame.ByteLength; l < 6 {
vt.Stream.Error("AVCC data too short", zap.Int("len", l))
return
return io.ErrShortWrite
}
if frame.GetByte(1) == 0 {
vt.SequenceHead = frame.ToBytes()
frame.Recycle()
vt.updateSequeceHead()
if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(vt.SequenceHead); err == nil {
if vt.VPS, vt.SPS, vt.PPS, err = codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(vt.SequenceHead); err == nil {
vt.SPSInfo, _ = codec.ParseHevcSPS(vt.SequenceHead)
vt.nalulenSize = (int(vt.SequenceHead[26]) & 0x03) + 1
vt.VPS = vps
vt.SPS = sps
vt.PPS = pps
} else {
vt.Stream.Error("H265 ParseVpsSpsPps Error")
vt.Stream.Close()
}
return
} else {
vt.Video.WriteAVCC(ts, frame)
return vt.Video.WriteAVCC(ts, frame)
}
}

View File

@@ -101,11 +101,11 @@ func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
vt.Flush()
}
func (vt *Video) WriteAVCC(ts uint32, frame util.BLL) {
func (vt *Video) WriteAVCC(ts uint32, frame util.BLL) error {
r := frame.NewReader()
b, err := r.ReadByte()
if err != nil {
return
return err
}
b = b >> 4
vt.Value.IFrame = b == 1 || b == 4
@@ -113,13 +113,14 @@ func (vt *Video) WriteAVCC(ts uint32, frame util.BLL) {
vt.Value.WriteAVCC(ts, frame)
cts, err := r.ReadBE(3)
if err != nil {
return
return err
}
vt.Value.PTS = (ts + cts) * 90
for nalulen, err := r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) {
vt.AppendAuBytes(r.ReadN(int(nalulen))...)
}
vt.Flush()
return nil
}
func (vt *Video) WriteSliceByte(b ...byte) {

View File

@@ -83,27 +83,29 @@ type AMF struct {
Buffer
}
func (amf *AMF) ReadShortString() string {
value, _ := amf.Unmarshal()
return value.(string)
}
func (amf *AMF) ReadNumber() float64 {
value, _ := amf.Unmarshal()
return value.(float64)
}
func (amf *AMF) ReadObject() map[string]any {
value, _ := amf.Unmarshal()
if value == nil {
return nil
func ReadAMF[T any](amf *AMF) (result T) {
value, err := amf.Unmarshal()
if err != nil {
return
}
return value.(map[string]any)
result, _ = value.(T)
return
}
func (amf *AMF) ReadBool() bool {
value, _ := amf.Unmarshal()
return value.(bool)
func (amf *AMF) ReadShortString() (result string) {
return ReadAMF[string](amf)
}
func (amf *AMF) ReadNumber() (result float64) {
return ReadAMF[float64](amf)
}
func (amf *AMF) ReadObject() (result map[string]any) {
return ReadAMF[map[string]any](amf)
}
func (amf *AMF) ReadBool() (result bool) {
return ReadAMF[bool](amf)
}
func (amf *AMF) readKey() (string, error) {

View File

@@ -69,6 +69,29 @@ func (r *BLLReader) ReadN(n int) (result net.Buffers) {
return
}
type BLLsReader struct {
*ListItem[*BLL]
BLLReader
}
func (r *BLLsReader) CanRead() bool {
return r.ListItem != nil && !r.IsRoot()
}
func (r *BLLsReader) ReadByte() (b byte, err error) {
if r.BLLReader.CanRead() {
b, err = r.BLLReader.ReadByte()
if err == nil {
return
}
}
r.ListItem = r.Next
if !r.CanRead() {
return 0, io.EOF
}
r.BLLReader = *r.Value.NewReader()
return r.BLLReader.ReadByte()
}
type BLLs struct {
List[*BLL]
ByteLength int
@@ -121,6 +144,10 @@ func (list *BLLs) Recycle() {
list.ByteLength = 0
}
func (list *BLLs) NewReader() *BLLsReader {
return &BLLsReader{list.Next, *list.Next.Value.NewReader()}
}
// ByteLinkList
type BLL struct {
List[BLI]