From ac39deea337a06e13fe1c177f35a3e634c31984f Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Sun, 14 Jul 2024 17:22:03 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80=E8=A7=A3=E6=9E=90=E5=A4=84?= =?UTF-8?q?=E7=90=861078=E5=92=8C28181=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- collections/memory_pool.go | 17 +- gb28181/source.go | 120 ++++-------- jt1078/jt_session.go | 371 ++++++++++++++++++------------------- stream/hook_source.go | 5 + stream/source.go | 43 +++-- stream/source_utils.go | 94 ++++++++++ 6 files changed, 359 insertions(+), 291 deletions(-) diff --git a/collections/memory_pool.go b/collections/memory_pool.go index 4009a14..d65e20f 100644 --- a/collections/memory_pool.go +++ b/collections/memory_pool.go @@ -19,6 +19,8 @@ type MemoryPool interface { // Mark 标记内存块起始位置 Mark() + TryMark() + // Write 向内存块中写入数据, 必须先调用Mark函数 Write(data []byte) @@ -113,6 +115,13 @@ func (m *memoryPool) Mark() { m.marked = true } +func (m *memoryPool) TryMark() { + if !m.marked { + m.markIndex = m.tail + m.marked = true + } +} + func (m *memoryPool) Write(data []byte) { utils.Assert(m.marked) @@ -157,11 +166,11 @@ func (m *memoryPool) freeOldBlocks() bool { } func (m *memoryPool) FreeHead() { - if m.freeOldBlocks() { + if m.freeOldBlocks() || m.blockQueue.IsEmpty() { return } - utils.Assert(!m.blockQueue.IsEmpty()) + //utils.Assert(!m.blockQueue.IsEmpty()) size := m.blockQueue.Pop().(int) m.head += size @@ -174,11 +183,11 @@ func (m *memoryPool) FreeHead() { } func (m *memoryPool) FreeTail() { - if m.freeOldBlocks() { + if m.freeOldBlocks() || m.blockQueue.IsEmpty() { return } - utils.Assert(!m.blockQueue.IsEmpty()) + //utils.Assert(!m.blockQueue.IsEmpty()) size := m.blockQueue.PopBack().(int) m.tail -= size diff --git a/gb28181/source.go b/gb28181/source.go index be4504a..12dba52 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -1,10 +1,7 @@ package gb28181 import ( - "encoding/hex" "fmt" - "github.com/lkmio/avformat/libavc" - "github.com/lkmio/avformat/libhevc" "github.com/lkmio/avformat/libmpeg" "github.com/lkmio/avformat/transport" "github.com/lkmio/avformat/utils" @@ -103,105 +100,51 @@ func (source *BaseGBSource) OnLossPacket(index int, mediaType utils.AVMediaType, // OnCompletePacket 完整帧回调 func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, dts int64, pts int64, key bool) error { buffer := source.FindOrCreatePacketBuffer(index, mediaType) - data := buffer.Fetch() + var packet utils.AVPacket var stream_ utils.AVStream + var err error + defer func() { if packet == nil { buffer.FreeTail() } }() - if utils.AVCodecIdH264 == codec { - //从关键帧中解析出sps和pps - if source.videoStream == nil { - sps, pps, err := libavc.ParseExtraDataFromKeyNALU(data) - if err != nil { - log.Sugar.Errorf("从关键帧中解析sps pps失败 source:%s data:%s", source.Id_, hex.EncodeToString(data)) - return err - } - - codecData, err := utils.NewAVCCodecData(sps, pps) - if err != nil { - log.Sugar.Errorf("解析sps pps失败 source:%s data:%s sps:%s, pps:%s", source.Id_, hex.EncodeToString(data), hex.EncodeToString(sps), hex.EncodeToString(pps)) - return err - } - - source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData) - stream_ = source.videoStream + if source.IsCompleted() && source.NotTrackAdded(index) { + if !source.IsTimeoutTrack(index) { + source.SetTimeoutTrack(index) + log.Sugar.Errorf("添加track超时 source:%s", source.Id()) } - packet = utils.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, 90000) - } else if utils.AVCodecIdH265 == codec { - if source.videoStream == nil { - vps, sps, pps, err := libhevc.ParseExtraDataFromKeyNALU(data) - if err != nil { - log.Sugar.Errorf("从关键帧中解析vps sps pps失败 source:%s data:%s", source.Id_, hex.EncodeToString(data)) - return err - } - - codecData, err := utils.NewHEVCCodecData(vps, sps, pps) - if err != nil { - log.Sugar.Errorf("解析sps pps失败 source:%s data:%s vps:%s sps:%s, pps:%s", source.Id_, hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps)) - return err - } - - source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData) - stream_ = source.videoStream - } - - packet = utils.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, 90000) - } else if utils.AVCodecIdAAC == codec { - //必须包含ADTSHeader - if len(data) < 7 { - log.Sugar.Warnf("need more data...") - return nil - } - - var skip int - header, err := utils.ReadADtsFixedHeader(data) - if err != nil { - log.Sugar.Errorf("读取ADTSHeader失败 suorce:%s data:%s", source.Id_, hex.EncodeToString(data[:7])) - return nil - } else { - skip = 7 - //跳过ADtsHeader长度 - if header.ProtectionAbsent() == 0 { - skip += 2 - } - } - - if source.audioStream == nil { - if source.IsCompleted() { - return nil - } - - configData, err := utils.ADtsHeader2MpegAudioConfigData(header) - config, err := utils.ParseMpeg4AudioConfig(configData) - println(config) - if err != nil { - log.Sugar.Errorf("adt头转m4ac失败 suorce:%s data:%s", source.Id_, hex.EncodeToString(data[:7])) - return nil - } - - source.audioStream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, configData, nil) - stream_ = source.audioStream - } - - packet = utils.NewAudioPacket(data[skip:], dts, pts, codec, index, 90000) - } else if utils.AVCodecIdPCMALAW == codec || utils.AVCodecIdPCMMULAW == codec { - if source.audioStream == nil { - source.audioStream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, nil, nil) - stream_ = source.audioStream - } - - packet = utils.NewAudioPacket(data, dts, pts, codec, index, 90000) - } else { - log.Sugar.Errorf("the codec %d is not implemented.", codec) return nil } + if utils.AVMediaTypeAudio == mediaType { + stream_, packet, err = stream.ExtractAudioPacket(codec, source.audioStream == nil, data, pts, dts, index, 90000) + if err != nil { + return err + } + + if stream_ != nil { + source.audioStream = stream_ + } + } else { + if source.videoStream == nil && !key { + log.Sugar.Errorf("skip non keyframes conn:%s", source.Conn.RemoteAddr()) + return nil + } + + stream_, packet, err = stream.ExtractVideoPacket(codec, key, source.videoStream == nil, data, pts, dts, index, 90000) + if err != nil { + return err + } + if stream_ != nil { + source.videoStream = stream_ + } + } + if stream_ != nil { source.OnDeMuxStream(stream_) if len(source.OriginStreams()) >= source.deMuxerCtx.TrackCount() { @@ -210,7 +153,6 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT } source.OnDeMuxPacket(packet) - return nil } diff --git a/jt1078/jt_session.go b/jt1078/jt_session.go index e18a36c..051aa39 100644 --- a/jt1078/jt_session.go +++ b/jt1078/jt_session.go @@ -58,6 +58,190 @@ type RtpPacket struct { payload []byte } +func (s *Session) OnJtPTPPacket(data []byte) { + packet, err := read1078RTPPacket(data) + if err != nil { + return + } + + //过滤空数据 + if len(packet.payload) == 0 { + return + } + + //首包处理, hook通知 + if s.rtpPacket == nil { + s.Id_ = packet.simNumber + s.rtpPacket = &RtpPacket{} + *s.rtpPacket = packet + + go func() { + _, state := stream.PreparePublishSource(s, true) + if utils.HookStateOK != state { + log.Sugar.Errorf("1078推流失败 source:%s", s.phone) + + if s.Conn != nil { + s.Conn.Close() + } + } + }() + } + + //完整包/最后一个分包, 创建AVPacket + //参考时间戳, 遇到不同的时间戳, 处理前一包. 分包标记可能不靠谱 + if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt { + if s.rtpPacket.packetType == AudioFrameMark && s.audioBuffer != nil { + if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil { + log.Sugar.Errorf("处理音频包失败 phone:%s err:%s", s.phone, err.Error()) + s.audioBuffer.FreeTail() + } + + *s.rtpPacket = packet + } else if s.rtpPacket.packetType < AudioFrameMark && s.videoBuffer != nil { + if err := s.processVideoPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.videoBuffer.Fetch(), s.videoIndex); err != nil { + log.Sugar.Errorf("处理视频包失败 phone:%s err:%s", s.phone, err.Error()) + s.videoBuffer.FreeTail() + } + + *s.rtpPacket = packet + } + } + + if packet.packetType == AudioFrameMark { + if s.audioBuffer == nil { + if s.videoIndex == 0 && s.audioIndex == 0 { + s.videoIndex = 1 + } + + if s.IsCompleted() { + if !s.IsTimeoutTrack(s.audioIndex) { + s.SetTimeoutTrack(s.audioIndex) + log.Sugar.Errorf("添加audiotrack超时") + } + return + } + + s.audioBuffer = s.FindOrCreatePacketBuffer(s.audioIndex, utils.AVMediaTypeAudio) + } + + s.audioBuffer.TryMark() + s.audioBuffer.Write(packet.payload) + } else { + if s.videoBuffer == nil { + if s.videoIndex == 0 && s.audioIndex == 0 { + s.audioIndex = 1 + } + + if s.IsCompleted() { + if !s.IsTimeoutTrack(s.videoIndex) { + s.SetTimeoutTrack(s.videoIndex) + log.Sugar.Errorf("添加videotrack超时") + } + return + } + + s.videoBuffer = s.FindOrCreatePacketBuffer(s.videoIndex, utils.AVMediaTypeVideo) + } + + s.videoBuffer.TryMark() + s.videoBuffer.Write(packet.payload) + } +} + +func (s *Session) Input(data []byte) error { + return s.decoder.Input(data) +} + +func (s *Session) Close() { + log.Sugar.Infof("1078推流结束 phone number:%s %s", s.phone, s.PublishSource.PrintInfo()) + + if s.audioBuffer != nil { + s.audioBuffer.Clear() + } + + if s.videoBuffer != nil { + s.videoBuffer.Clear() + } + + if s.Conn != nil { + s.Conn.Close() + s.Conn = nil + } + + if s.decoder != nil { + s.decoder.Close() + s.decoder = nil + } + + s.PublishSource.Close() +} + +func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error { + var codecId utils.AVCodecID + + if PTVideoH264 == pt { + if s.videoStream == nil && VideoIFrameMark != pktType { + log.Sugar.Errorf("skip non keyframes conn:%s", s.Conn.RemoteAddr()) + return nil + } + codecId = utils.AVCodecIdH264 + } else if PTVideoH265 == pt { + if s.videoStream == nil && VideoIFrameMark != pktType { + log.Sugar.Errorf("skip non keyframes conn:%s", s.Conn.RemoteAddr()) + return nil + } + codecId = utils.AVCodecIdH265 + } else { + return fmt.Errorf("the codec %d is not implemented", pt) + } + + videoStream, videoPacket, err := stream.ExtractVideoPacket(codecId, VideoIFrameMark == pktType, s.videoStream == nil, data, int64(ts), int64(ts), index, 1000) + if err != nil { + return err + } + + if videoStream != nil { + s.videoStream = videoStream + s.OnDeMuxStream(videoStream) + if s.videoStream != nil && s.audioStream != nil { + s.OnDeMuxStreamDone() + } + } + + s.OnDeMuxPacket(videoPacket) + return nil +} + +func (s *Session) processAudioPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error { + var codecId utils.AVCodecID + + if PTAudioG711A == pt { + codecId = utils.AVCodecIdPCMALAW + } else if PTAudioG711U == pt { + codecId = utils.AVCodecIdPCMMULAW + } else if PTAudioAAC == pt { + codecId = utils.AVCodecIdAAC + } else { + return fmt.Errorf("the codec %d is not implemented", pt) + } + + audioStream, audioPacket, err := stream.ExtractAudioPacket(codecId, s.audioStream == nil, data, int64(ts), int64(ts), index, 1000) + if err != nil { + return err + } + + if audioStream != nil { + s.audioStream = audioStream + s.OnDeMuxStream(audioStream) + if s.videoStream != nil && s.audioStream != nil { + s.OnDeMuxStreamDone() + } + } + + s.OnDeMuxPacket(audioPacket) + return nil +} + // 读取1078的rtp包, 返回数据类型, 负载类型、时间戳、负载数据 func read1078RTPPacket(data []byte) (RtpPacket, error) { if len(data) < 12 { @@ -114,193 +298,6 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) { return RtpPacket{pt: pt, packetType: packetType, ts: ts, simNumber: simNumber, subMark: subMark, payload: data[n:]}, nil } -func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error { - var packet utils.AVPacket - var stream_ utils.AVStream - - if PTVideoH264 == pt { - if s.videoStream == nil { - if VideoIFrameMark != pktType { - return fmt.Errorf("skip non keyframes") - } - - videoStream, err := utils.CreateAVCStreamFromKeyFrame(data, 1) - if err != nil { - return err - } - - stream_ = videoStream - } - - packet = utils.NewVideoPacket(data, int64(ts), int64(ts), VideoIFrameMark == pktType, utils.PacketTypeAnnexB, utils.AVCodecIdH265, index, 1000) - } else if PTVideoH265 == pt { - if s.videoStream == nil { - if VideoIFrameMark != pktType { - return fmt.Errorf("skip non keyframes") - } - - videoStream, err := utils.CreateHevcStreamFromKeyFrame(data, 1) - if err != nil { - return err - } - - stream_ = videoStream - } - - packet = utils.NewVideoPacket(data, int64(ts), int64(ts), VideoIFrameMark == pktType, utils.PacketTypeAnnexB, utils.AVCodecIdH265, index, 1000) - } else { - return fmt.Errorf("the codec %d is not implemented", pt) - } - - if stream_ != nil { - s.videoStream = stream_ - s.OnDeMuxStream(stream_) - if s.videoStream != nil && s.audioStream != nil { - s.OnDeMuxStreamDone() - } - } - - s.OnDeMuxPacket(packet) - return nil -} - -func (s *Session) processAudioPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error { - var packet utils.AVPacket - var stream_ utils.AVStream - - if PTAudioG711A == pt { - if s.audioStream == nil { - stream_ = utils.NewAVStream(utils.AVMediaTypeAudio, 0, utils.AVCodecIdPCMALAW, nil, nil) - } - - packet = utils.NewAudioPacket(data, int64(ts), int64(ts), utils.AVCodecIdPCMALAW, index, 1000) - } else if PTAudioG711U == pt { - if s.audioStream == nil { - stream_ = utils.NewAVStream(utils.AVMediaTypeAudio, 0, utils.AVCodecIdPCMMULAW, nil, nil) - } - - packet = utils.NewAudioPacket(data, int64(ts), int64(ts), utils.AVCodecIdPCMMULAW, index, 1000) - } else if PTAudioAAC == pt { - - } else { - return fmt.Errorf("the codec %d is not implemented", pt) - } - - if stream_ != nil { - s.audioStream = stream_ - s.OnDeMuxStream(stream_) - if s.videoStream != nil && s.audioStream != nil { - s.OnDeMuxStreamDone() - } - } - - s.OnDeMuxPacket(packet) - return nil -} -func (s *Session) OnJtPTPPacket(data []byte) { - packet, err := read1078RTPPacket(data) - if err != nil { - return - } - - //过滤空数据 - if len(packet.payload) == 0 { - return - } - - //首包处理, hook通知 - if s.rtpPacket == nil { - s.Id_ = packet.simNumber - s.rtpPacket = &RtpPacket{} - *s.rtpPacket = packet - - go func() { - _, state := stream.PreparePublishSource(s, true) - if utils.HookStateOK != state { - log.Sugar.Errorf("1078推流失败 source:%s", s.phone) - - if s.Conn != nil { - s.Conn.Close() - } - } - }() - } - - //完整包/最后一个分包, 创建AVPacket - //或者参考时间戳, 推流的分包标记为可能不靠谱 - if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt { - if s.rtpPacket.packetType == AudioFrameMark { - if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil { - log.Sugar.Errorf("处理音频包失败 phone:%s err:%s", s.phone, err.Error()) - s.audioBuffer.FreeTail() - } - - *s.rtpPacket = packet - s.audioBuffer.Mark() - } else { - if err := s.processVideoPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.videoBuffer.Fetch(), s.videoIndex); err != nil { - log.Sugar.Errorf("处理视频包失败 phone:%s err:%s", s.phone, err.Error()) - s.videoBuffer.FreeTail() - } - - *s.rtpPacket = packet - s.videoBuffer.Mark() - } - } - - if packet.packetType == AudioFrameMark { - if s.audioBuffer == nil { - if s.videoIndex == 0 && s.audioIndex == 0 { - s.videoIndex = 1 - } - - s.audioBuffer = s.FindOrCreatePacketBuffer(s.audioIndex, utils.AVMediaTypeAudio) - s.audioBuffer.Mark() - } - - s.audioBuffer.Write(packet.payload) - } else { - if s.videoBuffer == nil { - if s.videoIndex == 0 && s.audioIndex == 0 { - s.audioIndex = 1 - } - - s.videoBuffer = s.FindOrCreatePacketBuffer(s.videoIndex, utils.AVMediaTypeVideo) - s.videoBuffer.Mark() - } - - s.videoBuffer.Write(packet.payload) - } -} - -func (s *Session) Input(data []byte) error { - return s.decoder.Input(data) -} - -func (s *Session) Close() { - log.Sugar.Infof("1078推流结束 phone number:%s %s", s.phone, s.PublishSource.PrintInfo()) - - if s.audioBuffer != nil { - s.audioBuffer.Clear() - } - - if s.videoBuffer != nil { - s.videoBuffer.Clear() - } - - if s.Conn != nil { - s.Conn.Close() - s.Conn = nil - } - - if s.decoder != nil { - s.decoder.Close() - s.decoder = nil - } - - s.PublishSource.Close() -} - func NewSession(conn net.Conn) *Session { session := Session{ PublishSource: stream.PublishSource{ diff --git a/stream/hook_source.go b/stream/hook_source.go index 88d4447..9f14e6e 100644 --- a/stream/hook_source.go +++ b/stream/hook_source.go @@ -1,6 +1,7 @@ package stream import ( + "encoding/json" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "net/http" @@ -31,6 +32,10 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS source.StartIdleTimer() } + urls := GetStreamPlayUrls(source.Id()) + indent, _ := json.MarshalIndent(urls, "", "\t") + log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.Type().ToString(), source.Id(), indent) + return response, utils.HookStateOK } diff --git a/stream/source.go b/stream/source.go index d1253c6..68cacf1 100644 --- a/stream/source.go +++ b/stream/source.go @@ -1,7 +1,6 @@ package stream import ( - "encoding/json" "fmt" "github.com/lkmio/lkm/collections" "github.com/lkmio/lkm/log" @@ -175,8 +174,8 @@ type PublishSource struct { idleTimer *time.Timer sinkCount int //拉流计数 closed bool //是否已经被关闭 - firstPacket bool //是否第一包 urlValues url.Values + timeoutTracks []int } func (s *PublishSource) Id() string { @@ -236,7 +235,7 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe if s.pktBuffers[index] == nil { if utils.AVMediaTypeAudio == mediaType { - s.pktBuffers[index] = collections.NewRbMemoryPool(48000 * 64) + s.pktBuffers[index] = collections.NewRbMemoryPool(48000 * 12) } else if AppConfig.GOPCache { //开启GOP缓存 s.pktBuffers[index] = collections.NewRbMemoryPool(AppConfig.GOPBufferSize) @@ -258,13 +257,6 @@ func (s *PublishSource) LoopEvent() { break } - if !s.firstPacket { - urls := GetStreamPlayUrls(s.Id_) - indent, _ := json.MarshalIndent(urls, "", "\t") - log.Sugar.Infof("%s 开始推流 source:%s 拉流地址:\r\n%s", s.Type_.ToString(), s.Id_, indent) - s.firstPacket = true - } - if AppConfig.ReceiveTimeout > 0 { s.lastPacketTime = time.Now() } @@ -583,11 +575,16 @@ func (s *PublishSource) writeHeader() { } s.completed = true - if s.probeTimer != nil { s.probeTimer.Stop() } + if len(s.originStreams.All()) == 0 { + log.Sugar.Errorf("没有一路流, 删除source:%s", s.Id_) + s.doClose() + return + } + //创建录制流和HLS s.CreateDefaultOutStreams() @@ -603,6 +600,30 @@ func (s *PublishSource) IsCompleted() bool { return s.completed } +func (s *PublishSource) NotTrackAdded(index int) bool { + for _, avStream := range s.originStreams.All() { + if avStream.Index() == index { + return false + } + } + + return true +} + +func (s *PublishSource) IsTimeoutTrack(index int) bool { + for _, i := range s.timeoutTracks { + if i == index { + return true + } + } + + return false +} + +func (s *PublishSource) SetTimeoutTrack(index int) { + s.timeoutTracks = append(s.timeoutTracks, index) +} + func (s *PublishSource) OnDeMuxStreamDone() { s.writeHeader() } diff --git a/stream/source_utils.go b/stream/source_utils.go index 204afed..7c044bd 100644 --- a/stream/source_utils.go +++ b/stream/source_utils.go @@ -1,7 +1,11 @@ package stream import ( + "encoding/hex" "fmt" + "github.com/lkmio/avformat/libavc" + "github.com/lkmio/avformat/libhevc" + "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "net/url" "strings" @@ -69,3 +73,93 @@ func ParseUrl(name string) (string, url.Values) { return name, nil } + +func ExtractVideoPacket(codec utils.AVCodecID, key, extractStream bool, data []byte, pts, dts int64, index, timebase int) (utils.AVStream, utils.AVPacket, error) { + var stream utils.AVStream + + if utils.AVCodecIdH264 == codec { + //从关键帧中解析出sps和pps + if key && extractStream { + sps, pps, err := libavc.ParseExtraDataFromKeyNALU(data) + if err != nil { + log.Sugar.Errorf("从关键帧中解析sps pps失败 data:%s", hex.EncodeToString(data)) + return nil, nil, err + } + + codecData, err := utils.NewAVCCodecData(sps, pps) + if err != nil { + log.Sugar.Errorf("解析sps pps失败 data:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(sps), hex.EncodeToString(pps)) + return nil, nil, err + } + + stream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData) + } + + } else if utils.AVCodecIdH265 == codec { + if key && extractStream { + vps, sps, pps, err := libhevc.ParseExtraDataFromKeyNALU(data) + if err != nil { + log.Sugar.Errorf("从关键帧中解析vps sps pps失败 data:%s", hex.EncodeToString(data)) + return nil, nil, err + } + + codecData, err := utils.NewHEVCCodecData(vps, sps, pps) + if err != nil { + log.Sugar.Errorf("解析sps pps失败 data:%s vps:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps)) + return nil, nil, err + } + + stream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData) + } + + } + + packet := utils.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, timebase) + return stream, packet, nil +} + +func ExtractAudioPacket(codec utils.AVCodecID, extractStream bool, data []byte, pts, dts int64, index, timebase int) (utils.AVStream, utils.AVPacket, error) { + var stream utils.AVStream + var packet utils.AVPacket + if utils.AVCodecIdAAC == codec { + //必须包含ADTSHeader + if len(data) < 7 { + return nil, nil, fmt.Errorf("need more data") + } + + var skip int + header, err := utils.ReadADtsFixedHeader(data) + if err != nil { + log.Sugar.Errorf("读取ADTSHeader失败 data:%s", hex.EncodeToString(data[:7])) + return nil, nil, err + } else { + skip = 7 + //跳过ADtsHeader长度 + if header.ProtectionAbsent() == 0 { + skip += 2 + } + } + + if extractStream { + configData, err := utils.ADtsHeader2MpegAudioConfigData(header) + config, err := utils.ParseMpeg4AudioConfig(configData) + println(config) + if err != nil { + log.Sugar.Errorf("adt头转m4ac失败 data:%s", hex.EncodeToString(data[:7])) + return nil, nil, err + } + + stream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, configData, nil) + } + + packet = utils.NewAudioPacket(data[skip:], dts, pts, codec, index, timebase) + } else if utils.AVCodecIdPCMALAW == codec || utils.AVCodecIdPCMMULAW == codec { + if extractStream { + stream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, nil, nil) + } + + packet = utils.NewAudioPacket(data, dts, pts, codec, index, timebase) + } + + return stream, packet, nil +}