mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-26 19:21:14 +08:00
统一解析处理1078和28181流
This commit is contained in:
@@ -19,6 +19,8 @@ type MemoryPool interface {
|
||||
// Mark 标记内存块起始位置
|
||||
Mark()
|
||||
|
||||
TryMark()
|
||||
|
||||
// Write 向内存块中写入数据, 必须先调用Mark函数
|
||||
Write(data []byte)
|
||||
|
||||
@@ -113,6 +115,13 @@ func (m *memoryPool) Mark() {
|
||||
m.marked = true
|
||||
}
|
||||
|
||||
func (m *memoryPool) TryMark() {
|
||||
if !m.marked {
|
||||
m.markIndex = m.tail
|
||||
m.marked = true
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memoryPool) Write(data []byte) {
|
||||
utils.Assert(m.marked)
|
||||
|
||||
@@ -157,11 +166,11 @@ func (m *memoryPool) freeOldBlocks() bool {
|
||||
}
|
||||
|
||||
func (m *memoryPool) FreeHead() {
|
||||
if m.freeOldBlocks() {
|
||||
if m.freeOldBlocks() || m.blockQueue.IsEmpty() {
|
||||
return
|
||||
}
|
||||
|
||||
utils.Assert(!m.blockQueue.IsEmpty())
|
||||
//utils.Assert(!m.blockQueue.IsEmpty())
|
||||
size := m.blockQueue.Pop().(int)
|
||||
m.head += size
|
||||
|
||||
@@ -174,11 +183,11 @@ func (m *memoryPool) FreeHead() {
|
||||
}
|
||||
|
||||
func (m *memoryPool) FreeTail() {
|
||||
if m.freeOldBlocks() {
|
||||
if m.freeOldBlocks() || m.blockQueue.IsEmpty() {
|
||||
return
|
||||
}
|
||||
|
||||
utils.Assert(!m.blockQueue.IsEmpty())
|
||||
//utils.Assert(!m.blockQueue.IsEmpty())
|
||||
size := m.blockQueue.PopBack().(int)
|
||||
m.tail -= size
|
||||
|
||||
|
@@ -1,10 +1,7 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/lkmio/avformat/libavc"
|
||||
"github.com/lkmio/avformat/libhevc"
|
||||
"github.com/lkmio/avformat/libmpeg"
|
||||
"github.com/lkmio/avformat/transport"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
@@ -103,105 +100,51 @@ func (source *BaseGBSource) OnLossPacket(index int, mediaType utils.AVMediaType,
|
||||
// OnCompletePacket 完整帧回调
|
||||
func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, dts int64, pts int64, key bool) error {
|
||||
buffer := source.FindOrCreatePacketBuffer(index, mediaType)
|
||||
|
||||
data := buffer.Fetch()
|
||||
|
||||
var packet utils.AVPacket
|
||||
var stream_ utils.AVStream
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
if packet == nil {
|
||||
buffer.FreeTail()
|
||||
}
|
||||
}()
|
||||
|
||||
if utils.AVCodecIdH264 == codec {
|
||||
//从关键帧中解析出sps和pps
|
||||
if source.videoStream == nil {
|
||||
sps, pps, err := libavc.ParseExtraDataFromKeyNALU(data)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("从关键帧中解析sps pps失败 source:%s data:%s", source.Id_, hex.EncodeToString(data))
|
||||
return err
|
||||
}
|
||||
|
||||
codecData, err := utils.NewAVCCodecData(sps, pps)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("解析sps pps失败 source:%s data:%s sps:%s, pps:%s", source.Id_, hex.EncodeToString(data), hex.EncodeToString(sps), hex.EncodeToString(pps))
|
||||
return err
|
||||
}
|
||||
|
||||
source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
|
||||
stream_ = source.videoStream
|
||||
if source.IsCompleted() && source.NotTrackAdded(index) {
|
||||
if !source.IsTimeoutTrack(index) {
|
||||
source.SetTimeoutTrack(index)
|
||||
log.Sugar.Errorf("添加track超时 source:%s", source.Id())
|
||||
}
|
||||
|
||||
packet = utils.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, 90000)
|
||||
} else if utils.AVCodecIdH265 == codec {
|
||||
if source.videoStream == nil {
|
||||
vps, sps, pps, err := libhevc.ParseExtraDataFromKeyNALU(data)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("从关键帧中解析vps sps pps失败 source:%s data:%s", source.Id_, hex.EncodeToString(data))
|
||||
return err
|
||||
}
|
||||
|
||||
codecData, err := utils.NewHEVCCodecData(vps, sps, pps)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("解析sps pps失败 source:%s data:%s vps:%s sps:%s, pps:%s", source.Id_, hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps))
|
||||
return err
|
||||
}
|
||||
|
||||
source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
|
||||
stream_ = source.videoStream
|
||||
}
|
||||
|
||||
packet = utils.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, 90000)
|
||||
} else if utils.AVCodecIdAAC == codec {
|
||||
//必须包含ADTSHeader
|
||||
if len(data) < 7 {
|
||||
log.Sugar.Warnf("need more data...")
|
||||
return nil
|
||||
}
|
||||
|
||||
var skip int
|
||||
header, err := utils.ReadADtsFixedHeader(data)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("读取ADTSHeader失败 suorce:%s data:%s", source.Id_, hex.EncodeToString(data[:7]))
|
||||
return nil
|
||||
} else {
|
||||
skip = 7
|
||||
//跳过ADtsHeader长度
|
||||
if header.ProtectionAbsent() == 0 {
|
||||
skip += 2
|
||||
}
|
||||
}
|
||||
|
||||
if source.audioStream == nil {
|
||||
if source.IsCompleted() {
|
||||
return nil
|
||||
}
|
||||
|
||||
configData, err := utils.ADtsHeader2MpegAudioConfigData(header)
|
||||
config, err := utils.ParseMpeg4AudioConfig(configData)
|
||||
println(config)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("adt头转m4ac失败 suorce:%s data:%s", source.Id_, hex.EncodeToString(data[:7]))
|
||||
return nil
|
||||
}
|
||||
|
||||
source.audioStream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, configData, nil)
|
||||
stream_ = source.audioStream
|
||||
}
|
||||
|
||||
packet = utils.NewAudioPacket(data[skip:], dts, pts, codec, index, 90000)
|
||||
} else if utils.AVCodecIdPCMALAW == codec || utils.AVCodecIdPCMMULAW == codec {
|
||||
if source.audioStream == nil {
|
||||
source.audioStream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, nil, nil)
|
||||
stream_ = source.audioStream
|
||||
}
|
||||
|
||||
packet = utils.NewAudioPacket(data, dts, pts, codec, index, 90000)
|
||||
} else {
|
||||
log.Sugar.Errorf("the codec %d is not implemented.", codec)
|
||||
return nil
|
||||
}
|
||||
|
||||
if utils.AVMediaTypeAudio == mediaType {
|
||||
stream_, packet, err = stream.ExtractAudioPacket(codec, source.audioStream == nil, data, pts, dts, index, 90000)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if stream_ != nil {
|
||||
source.audioStream = stream_
|
||||
}
|
||||
} else {
|
||||
if source.videoStream == nil && !key {
|
||||
log.Sugar.Errorf("skip non keyframes conn:%s", source.Conn.RemoteAddr())
|
||||
return nil
|
||||
}
|
||||
|
||||
stream_, packet, err = stream.ExtractVideoPacket(codec, key, source.videoStream == nil, data, pts, dts, index, 90000)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stream_ != nil {
|
||||
source.videoStream = stream_
|
||||
}
|
||||
}
|
||||
|
||||
if stream_ != nil {
|
||||
source.OnDeMuxStream(stream_)
|
||||
if len(source.OriginStreams()) >= source.deMuxerCtx.TrackCount() {
|
||||
@@ -210,7 +153,6 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT
|
||||
}
|
||||
|
||||
source.OnDeMuxPacket(packet)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -58,6 +58,190 @@ type RtpPacket struct {
|
||||
payload []byte
|
||||
}
|
||||
|
||||
func (s *Session) OnJtPTPPacket(data []byte) {
|
||||
packet, err := read1078RTPPacket(data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
//过滤空数据
|
||||
if len(packet.payload) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
//首包处理, hook通知
|
||||
if s.rtpPacket == nil {
|
||||
s.Id_ = packet.simNumber
|
||||
s.rtpPacket = &RtpPacket{}
|
||||
*s.rtpPacket = packet
|
||||
|
||||
go func() {
|
||||
_, state := stream.PreparePublishSource(s, true)
|
||||
if utils.HookStateOK != state {
|
||||
log.Sugar.Errorf("1078推流失败 source:%s", s.phone)
|
||||
|
||||
if s.Conn != nil {
|
||||
s.Conn.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
//完整包/最后一个分包, 创建AVPacket
|
||||
//参考时间戳, 遇到不同的时间戳, 处理前一包. 分包标记可能不靠谱
|
||||
if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt {
|
||||
if s.rtpPacket.packetType == AudioFrameMark && s.audioBuffer != nil {
|
||||
if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil {
|
||||
log.Sugar.Errorf("处理音频包失败 phone:%s err:%s", s.phone, err.Error())
|
||||
s.audioBuffer.FreeTail()
|
||||
}
|
||||
|
||||
*s.rtpPacket = packet
|
||||
} else if s.rtpPacket.packetType < AudioFrameMark && s.videoBuffer != nil {
|
||||
if err := s.processVideoPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.videoBuffer.Fetch(), s.videoIndex); err != nil {
|
||||
log.Sugar.Errorf("处理视频包失败 phone:%s err:%s", s.phone, err.Error())
|
||||
s.videoBuffer.FreeTail()
|
||||
}
|
||||
|
||||
*s.rtpPacket = packet
|
||||
}
|
||||
}
|
||||
|
||||
if packet.packetType == AudioFrameMark {
|
||||
if s.audioBuffer == nil {
|
||||
if s.videoIndex == 0 && s.audioIndex == 0 {
|
||||
s.videoIndex = 1
|
||||
}
|
||||
|
||||
if s.IsCompleted() {
|
||||
if !s.IsTimeoutTrack(s.audioIndex) {
|
||||
s.SetTimeoutTrack(s.audioIndex)
|
||||
log.Sugar.Errorf("添加audiotrack超时")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
s.audioBuffer = s.FindOrCreatePacketBuffer(s.audioIndex, utils.AVMediaTypeAudio)
|
||||
}
|
||||
|
||||
s.audioBuffer.TryMark()
|
||||
s.audioBuffer.Write(packet.payload)
|
||||
} else {
|
||||
if s.videoBuffer == nil {
|
||||
if s.videoIndex == 0 && s.audioIndex == 0 {
|
||||
s.audioIndex = 1
|
||||
}
|
||||
|
||||
if s.IsCompleted() {
|
||||
if !s.IsTimeoutTrack(s.videoIndex) {
|
||||
s.SetTimeoutTrack(s.videoIndex)
|
||||
log.Sugar.Errorf("添加videotrack超时")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
s.videoBuffer = s.FindOrCreatePacketBuffer(s.videoIndex, utils.AVMediaTypeVideo)
|
||||
}
|
||||
|
||||
s.videoBuffer.TryMark()
|
||||
s.videoBuffer.Write(packet.payload)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) Input(data []byte) error {
|
||||
return s.decoder.Input(data)
|
||||
}
|
||||
|
||||
func (s *Session) Close() {
|
||||
log.Sugar.Infof("1078推流结束 phone number:%s %s", s.phone, s.PublishSource.PrintInfo())
|
||||
|
||||
if s.audioBuffer != nil {
|
||||
s.audioBuffer.Clear()
|
||||
}
|
||||
|
||||
if s.videoBuffer != nil {
|
||||
s.videoBuffer.Clear()
|
||||
}
|
||||
|
||||
if s.Conn != nil {
|
||||
s.Conn.Close()
|
||||
s.Conn = nil
|
||||
}
|
||||
|
||||
if s.decoder != nil {
|
||||
s.decoder.Close()
|
||||
s.decoder = nil
|
||||
}
|
||||
|
||||
s.PublishSource.Close()
|
||||
}
|
||||
|
||||
func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error {
|
||||
var codecId utils.AVCodecID
|
||||
|
||||
if PTVideoH264 == pt {
|
||||
if s.videoStream == nil && VideoIFrameMark != pktType {
|
||||
log.Sugar.Errorf("skip non keyframes conn:%s", s.Conn.RemoteAddr())
|
||||
return nil
|
||||
}
|
||||
codecId = utils.AVCodecIdH264
|
||||
} else if PTVideoH265 == pt {
|
||||
if s.videoStream == nil && VideoIFrameMark != pktType {
|
||||
log.Sugar.Errorf("skip non keyframes conn:%s", s.Conn.RemoteAddr())
|
||||
return nil
|
||||
}
|
||||
codecId = utils.AVCodecIdH265
|
||||
} else {
|
||||
return fmt.Errorf("the codec %d is not implemented", pt)
|
||||
}
|
||||
|
||||
videoStream, videoPacket, err := stream.ExtractVideoPacket(codecId, VideoIFrameMark == pktType, s.videoStream == nil, data, int64(ts), int64(ts), index, 1000)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if videoStream != nil {
|
||||
s.videoStream = videoStream
|
||||
s.OnDeMuxStream(videoStream)
|
||||
if s.videoStream != nil && s.audioStream != nil {
|
||||
s.OnDeMuxStreamDone()
|
||||
}
|
||||
}
|
||||
|
||||
s.OnDeMuxPacket(videoPacket)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) processAudioPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error {
|
||||
var codecId utils.AVCodecID
|
||||
|
||||
if PTAudioG711A == pt {
|
||||
codecId = utils.AVCodecIdPCMALAW
|
||||
} else if PTAudioG711U == pt {
|
||||
codecId = utils.AVCodecIdPCMMULAW
|
||||
} else if PTAudioAAC == pt {
|
||||
codecId = utils.AVCodecIdAAC
|
||||
} else {
|
||||
return fmt.Errorf("the codec %d is not implemented", pt)
|
||||
}
|
||||
|
||||
audioStream, audioPacket, err := stream.ExtractAudioPacket(codecId, s.audioStream == nil, data, int64(ts), int64(ts), index, 1000)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if audioStream != nil {
|
||||
s.audioStream = audioStream
|
||||
s.OnDeMuxStream(audioStream)
|
||||
if s.videoStream != nil && s.audioStream != nil {
|
||||
s.OnDeMuxStreamDone()
|
||||
}
|
||||
}
|
||||
|
||||
s.OnDeMuxPacket(audioPacket)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 读取1078的rtp包, 返回数据类型, 负载类型、时间戳、负载数据
|
||||
func read1078RTPPacket(data []byte) (RtpPacket, error) {
|
||||
if len(data) < 12 {
|
||||
@@ -114,193 +298,6 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) {
|
||||
return RtpPacket{pt: pt, packetType: packetType, ts: ts, simNumber: simNumber, subMark: subMark, payload: data[n:]}, nil
|
||||
}
|
||||
|
||||
func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error {
|
||||
var packet utils.AVPacket
|
||||
var stream_ utils.AVStream
|
||||
|
||||
if PTVideoH264 == pt {
|
||||
if s.videoStream == nil {
|
||||
if VideoIFrameMark != pktType {
|
||||
return fmt.Errorf("skip non keyframes")
|
||||
}
|
||||
|
||||
videoStream, err := utils.CreateAVCStreamFromKeyFrame(data, 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stream_ = videoStream
|
||||
}
|
||||
|
||||
packet = utils.NewVideoPacket(data, int64(ts), int64(ts), VideoIFrameMark == pktType, utils.PacketTypeAnnexB, utils.AVCodecIdH265, index, 1000)
|
||||
} else if PTVideoH265 == pt {
|
||||
if s.videoStream == nil {
|
||||
if VideoIFrameMark != pktType {
|
||||
return fmt.Errorf("skip non keyframes")
|
||||
}
|
||||
|
||||
videoStream, err := utils.CreateHevcStreamFromKeyFrame(data, 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stream_ = videoStream
|
||||
}
|
||||
|
||||
packet = utils.NewVideoPacket(data, int64(ts), int64(ts), VideoIFrameMark == pktType, utils.PacketTypeAnnexB, utils.AVCodecIdH265, index, 1000)
|
||||
} else {
|
||||
return fmt.Errorf("the codec %d is not implemented", pt)
|
||||
}
|
||||
|
||||
if stream_ != nil {
|
||||
s.videoStream = stream_
|
||||
s.OnDeMuxStream(stream_)
|
||||
if s.videoStream != nil && s.audioStream != nil {
|
||||
s.OnDeMuxStreamDone()
|
||||
}
|
||||
}
|
||||
|
||||
s.OnDeMuxPacket(packet)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) processAudioPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error {
|
||||
var packet utils.AVPacket
|
||||
var stream_ utils.AVStream
|
||||
|
||||
if PTAudioG711A == pt {
|
||||
if s.audioStream == nil {
|
||||
stream_ = utils.NewAVStream(utils.AVMediaTypeAudio, 0, utils.AVCodecIdPCMALAW, nil, nil)
|
||||
}
|
||||
|
||||
packet = utils.NewAudioPacket(data, int64(ts), int64(ts), utils.AVCodecIdPCMALAW, index, 1000)
|
||||
} else if PTAudioG711U == pt {
|
||||
if s.audioStream == nil {
|
||||
stream_ = utils.NewAVStream(utils.AVMediaTypeAudio, 0, utils.AVCodecIdPCMMULAW, nil, nil)
|
||||
}
|
||||
|
||||
packet = utils.NewAudioPacket(data, int64(ts), int64(ts), utils.AVCodecIdPCMMULAW, index, 1000)
|
||||
} else if PTAudioAAC == pt {
|
||||
|
||||
} else {
|
||||
return fmt.Errorf("the codec %d is not implemented", pt)
|
||||
}
|
||||
|
||||
if stream_ != nil {
|
||||
s.audioStream = stream_
|
||||
s.OnDeMuxStream(stream_)
|
||||
if s.videoStream != nil && s.audioStream != nil {
|
||||
s.OnDeMuxStreamDone()
|
||||
}
|
||||
}
|
||||
|
||||
s.OnDeMuxPacket(packet)
|
||||
return nil
|
||||
}
|
||||
func (s *Session) OnJtPTPPacket(data []byte) {
|
||||
packet, err := read1078RTPPacket(data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
//过滤空数据
|
||||
if len(packet.payload) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
//首包处理, hook通知
|
||||
if s.rtpPacket == nil {
|
||||
s.Id_ = packet.simNumber
|
||||
s.rtpPacket = &RtpPacket{}
|
||||
*s.rtpPacket = packet
|
||||
|
||||
go func() {
|
||||
_, state := stream.PreparePublishSource(s, true)
|
||||
if utils.HookStateOK != state {
|
||||
log.Sugar.Errorf("1078推流失败 source:%s", s.phone)
|
||||
|
||||
if s.Conn != nil {
|
||||
s.Conn.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
//完整包/最后一个分包, 创建AVPacket
|
||||
//或者参考时间戳, 推流的分包标记为可能不靠谱
|
||||
if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt {
|
||||
if s.rtpPacket.packetType == AudioFrameMark {
|
||||
if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil {
|
||||
log.Sugar.Errorf("处理音频包失败 phone:%s err:%s", s.phone, err.Error())
|
||||
s.audioBuffer.FreeTail()
|
||||
}
|
||||
|
||||
*s.rtpPacket = packet
|
||||
s.audioBuffer.Mark()
|
||||
} else {
|
||||
if err := s.processVideoPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.videoBuffer.Fetch(), s.videoIndex); err != nil {
|
||||
log.Sugar.Errorf("处理视频包失败 phone:%s err:%s", s.phone, err.Error())
|
||||
s.videoBuffer.FreeTail()
|
||||
}
|
||||
|
||||
*s.rtpPacket = packet
|
||||
s.videoBuffer.Mark()
|
||||
}
|
||||
}
|
||||
|
||||
if packet.packetType == AudioFrameMark {
|
||||
if s.audioBuffer == nil {
|
||||
if s.videoIndex == 0 && s.audioIndex == 0 {
|
||||
s.videoIndex = 1
|
||||
}
|
||||
|
||||
s.audioBuffer = s.FindOrCreatePacketBuffer(s.audioIndex, utils.AVMediaTypeAudio)
|
||||
s.audioBuffer.Mark()
|
||||
}
|
||||
|
||||
s.audioBuffer.Write(packet.payload)
|
||||
} else {
|
||||
if s.videoBuffer == nil {
|
||||
if s.videoIndex == 0 && s.audioIndex == 0 {
|
||||
s.audioIndex = 1
|
||||
}
|
||||
|
||||
s.videoBuffer = s.FindOrCreatePacketBuffer(s.videoIndex, utils.AVMediaTypeVideo)
|
||||
s.videoBuffer.Mark()
|
||||
}
|
||||
|
||||
s.videoBuffer.Write(packet.payload)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) Input(data []byte) error {
|
||||
return s.decoder.Input(data)
|
||||
}
|
||||
|
||||
func (s *Session) Close() {
|
||||
log.Sugar.Infof("1078推流结束 phone number:%s %s", s.phone, s.PublishSource.PrintInfo())
|
||||
|
||||
if s.audioBuffer != nil {
|
||||
s.audioBuffer.Clear()
|
||||
}
|
||||
|
||||
if s.videoBuffer != nil {
|
||||
s.videoBuffer.Clear()
|
||||
}
|
||||
|
||||
if s.Conn != nil {
|
||||
s.Conn.Close()
|
||||
s.Conn = nil
|
||||
}
|
||||
|
||||
if s.decoder != nil {
|
||||
s.decoder.Close()
|
||||
s.decoder = nil
|
||||
}
|
||||
|
||||
s.PublishSource.Close()
|
||||
}
|
||||
|
||||
func NewSession(conn net.Conn) *Session {
|
||||
session := Session{
|
||||
PublishSource: stream.PublishSource{
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
"github.com/lkmio/lkm/log"
|
||||
"net/http"
|
||||
@@ -31,6 +32,10 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS
|
||||
source.StartIdleTimer()
|
||||
}
|
||||
|
||||
urls := GetStreamPlayUrls(source.Id())
|
||||
indent, _ := json.MarshalIndent(urls, "", "\t")
|
||||
log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.Type().ToString(), source.Id(), indent)
|
||||
|
||||
return response, utils.HookStateOK
|
||||
}
|
||||
|
||||
|
@@ -1,7 +1,6 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/lkmio/lkm/collections"
|
||||
"github.com/lkmio/lkm/log"
|
||||
@@ -175,8 +174,8 @@ type PublishSource struct {
|
||||
idleTimer *time.Timer
|
||||
sinkCount int //拉流计数
|
||||
closed bool //是否已经被关闭
|
||||
firstPacket bool //是否第一包
|
||||
urlValues url.Values
|
||||
timeoutTracks []int
|
||||
}
|
||||
|
||||
func (s *PublishSource) Id() string {
|
||||
@@ -236,7 +235,7 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe
|
||||
|
||||
if s.pktBuffers[index] == nil {
|
||||
if utils.AVMediaTypeAudio == mediaType {
|
||||
s.pktBuffers[index] = collections.NewRbMemoryPool(48000 * 64)
|
||||
s.pktBuffers[index] = collections.NewRbMemoryPool(48000 * 12)
|
||||
} else if AppConfig.GOPCache {
|
||||
//开启GOP缓存
|
||||
s.pktBuffers[index] = collections.NewRbMemoryPool(AppConfig.GOPBufferSize)
|
||||
@@ -258,13 +257,6 @@ func (s *PublishSource) LoopEvent() {
|
||||
break
|
||||
}
|
||||
|
||||
if !s.firstPacket {
|
||||
urls := GetStreamPlayUrls(s.Id_)
|
||||
indent, _ := json.MarshalIndent(urls, "", "\t")
|
||||
log.Sugar.Infof("%s 开始推流 source:%s 拉流地址:\r\n%s", s.Type_.ToString(), s.Id_, indent)
|
||||
s.firstPacket = true
|
||||
}
|
||||
|
||||
if AppConfig.ReceiveTimeout > 0 {
|
||||
s.lastPacketTime = time.Now()
|
||||
}
|
||||
@@ -583,11 +575,16 @@ func (s *PublishSource) writeHeader() {
|
||||
}
|
||||
|
||||
s.completed = true
|
||||
|
||||
if s.probeTimer != nil {
|
||||
s.probeTimer.Stop()
|
||||
}
|
||||
|
||||
if len(s.originStreams.All()) == 0 {
|
||||
log.Sugar.Errorf("没有一路流, 删除source:%s", s.Id_)
|
||||
s.doClose()
|
||||
return
|
||||
}
|
||||
|
||||
//创建录制流和HLS
|
||||
s.CreateDefaultOutStreams()
|
||||
|
||||
@@ -603,6 +600,30 @@ func (s *PublishSource) IsCompleted() bool {
|
||||
return s.completed
|
||||
}
|
||||
|
||||
func (s *PublishSource) NotTrackAdded(index int) bool {
|
||||
for _, avStream := range s.originStreams.All() {
|
||||
if avStream.Index() == index {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *PublishSource) IsTimeoutTrack(index int) bool {
|
||||
for _, i := range s.timeoutTracks {
|
||||
if i == index {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *PublishSource) SetTimeoutTrack(index int) {
|
||||
s.timeoutTracks = append(s.timeoutTracks, index)
|
||||
}
|
||||
|
||||
func (s *PublishSource) OnDeMuxStreamDone() {
|
||||
s.writeHeader()
|
||||
}
|
||||
|
@@ -1,7 +1,11 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/lkmio/avformat/libavc"
|
||||
"github.com/lkmio/avformat/libhevc"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
"github.com/lkmio/lkm/log"
|
||||
"net/url"
|
||||
"strings"
|
||||
@@ -69,3 +73,93 @@ func ParseUrl(name string) (string, url.Values) {
|
||||
|
||||
return name, nil
|
||||
}
|
||||
|
||||
func ExtractVideoPacket(codec utils.AVCodecID, key, extractStream bool, data []byte, pts, dts int64, index, timebase int) (utils.AVStream, utils.AVPacket, error) {
|
||||
var stream utils.AVStream
|
||||
|
||||
if utils.AVCodecIdH264 == codec {
|
||||
//从关键帧中解析出sps和pps
|
||||
if key && extractStream {
|
||||
sps, pps, err := libavc.ParseExtraDataFromKeyNALU(data)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("从关键帧中解析sps pps失败 data:%s", hex.EncodeToString(data))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
codecData, err := utils.NewAVCCodecData(sps, pps)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("解析sps pps失败 data:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(sps), hex.EncodeToString(pps))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
stream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
|
||||
}
|
||||
|
||||
} else if utils.AVCodecIdH265 == codec {
|
||||
if key && extractStream {
|
||||
vps, sps, pps, err := libhevc.ParseExtraDataFromKeyNALU(data)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("从关键帧中解析vps sps pps失败 data:%s", hex.EncodeToString(data))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
codecData, err := utils.NewHEVCCodecData(vps, sps, pps)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("解析sps pps失败 data:%s vps:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
stream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
packet := utils.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, timebase)
|
||||
return stream, packet, nil
|
||||
}
|
||||
|
||||
func ExtractAudioPacket(codec utils.AVCodecID, extractStream bool, data []byte, pts, dts int64, index, timebase int) (utils.AVStream, utils.AVPacket, error) {
|
||||
var stream utils.AVStream
|
||||
var packet utils.AVPacket
|
||||
if utils.AVCodecIdAAC == codec {
|
||||
//必须包含ADTSHeader
|
||||
if len(data) < 7 {
|
||||
return nil, nil, fmt.Errorf("need more data")
|
||||
}
|
||||
|
||||
var skip int
|
||||
header, err := utils.ReadADtsFixedHeader(data)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("读取ADTSHeader失败 data:%s", hex.EncodeToString(data[:7]))
|
||||
return nil, nil, err
|
||||
} else {
|
||||
skip = 7
|
||||
//跳过ADtsHeader长度
|
||||
if header.ProtectionAbsent() == 0 {
|
||||
skip += 2
|
||||
}
|
||||
}
|
||||
|
||||
if extractStream {
|
||||
configData, err := utils.ADtsHeader2MpegAudioConfigData(header)
|
||||
config, err := utils.ParseMpeg4AudioConfig(configData)
|
||||
println(config)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("adt头转m4ac失败 data:%s", hex.EncodeToString(data[:7]))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
stream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, configData, nil)
|
||||
}
|
||||
|
||||
packet = utils.NewAudioPacket(data[skip:], dts, pts, codec, index, timebase)
|
||||
} else if utils.AVCodecIdPCMALAW == codec || utils.AVCodecIdPCMMULAW == codec {
|
||||
if extractStream {
|
||||
stream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, nil, nil)
|
||||
}
|
||||
|
||||
packet = utils.NewAudioPacket(data, dts, pts, codec, index, timebase)
|
||||
}
|
||||
|
||||
return stream, packet, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user