取消统计超时track索引

This commit is contained in:
yangjiechina
2024-11-21 19:46:36 +08:00
parent 823dc19b29
commit 99b9c7871a
4 changed files with 37 additions and 66 deletions

View File

@@ -122,15 +122,6 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT
}
}()
if source.IsCompleted() && source.NotTrackAdded(index) {
if !source.IsTimeoutTrack(index) {
source.SetTimeoutTrack(index)
log.Sugar.Errorf("添加track超时 source:%s", source.GetID())
}
return nil
}
if utils.AVMediaTypeAudio == mediaType {
stream_, packet, err = stream.ExtractAudioPacket(codec, source.audioStream == nil, data, pts, dts, index, 90000)
if err != nil {

View File

@@ -64,12 +64,12 @@ func (s *Session) OnJtPTPPacket(data []byte) {
return
}
//过滤空数据
// 过滤空数据
if len(packet.payload) == 0 {
return
}
//首包处理, hook通知
// 首包处理, hook通知
if s.rtpPacket == nil {
s.SetID(packet.simNumber)
s.rtpPacket = &RtpPacket{}
@@ -87,8 +87,8 @@ func (s *Session) OnJtPTPPacket(data []byte) {
}()
}
// 完整包/最后一个分包, 创建AVPacket
// 参考时间戳, 遇到不同的时间戳, 处理前一包. 分包标记可能不靠谱
// 如果时间戳或者负载类型发生变化, 认为是新的音视频帧处理前一包创建AVPacket回调给PublishSource。
// 分包标记可能不靠谱
if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt {
if s.rtpPacket.packetType == AudioFrameMark && s.audioBuffer != nil {
if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil {
@@ -107,23 +107,18 @@ func (s *Session) OnJtPTPPacket(data []byte) {
}
}
// 部分音视频帧
if packet.packetType == AudioFrameMark {
if s.audioBuffer == nil {
if s.videoIndex == 0 && s.audioIndex == 0 {
s.videoIndex = 1
}
if s.IsCompleted() {
if !s.IsTimeoutTrack(s.audioIndex) {
s.SetTimeoutTrack(s.audioIndex)
log.Sugar.Errorf("添加audiotrack超时")
}
return
}
// 创建音频的AVPacket缓冲区
s.audioBuffer = s.FindOrCreatePacketBuffer(s.audioIndex, utils.AVMediaTypeAudio)
}
// 将部分音频帧写入缓冲区
s.audioBuffer.TryMark()
s.audioBuffer.Write(packet.payload)
} else {
@@ -132,17 +127,11 @@ func (s *Session) OnJtPTPPacket(data []byte) {
s.audioIndex = 1
}
if s.IsCompleted() {
if !s.IsTimeoutTrack(s.videoIndex) {
s.SetTimeoutTrack(s.videoIndex)
log.Sugar.Errorf("添加videotrack超时")
}
return
}
// 创建视频的AVPacket缓冲区
s.videoBuffer = s.FindOrCreatePacketBuffer(s.videoIndex, utils.AVMediaTypeVideo)
}
// 将部分视频帧写入缓冲区
s.videoBuffer.TryMark()
s.videoBuffer.Write(packet.payload)
}
@@ -176,6 +165,7 @@ func (s *Session) Close() {
s.PublishSource.Close()
}
// 从视频帧中提取AVPacket和AVStream, 回调给PublishSource
func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error {
var codecId utils.AVCodecID
@@ -212,6 +202,7 @@ func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []by
return nil
}
// 从音频帧中提取AVPacket和AVStream, 回调给PublishSource
func (s *Session) processAudioPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error {
var codecId utils.AVCodecID
@@ -249,20 +240,20 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) {
}
packetType := data[11] >> 4 & 0x0F
//忽略透传数据
// 忽略透传数据
if TransmissionDataMark == packetType {
return RtpPacket{}, fmt.Errorf("invaild data")
}
//忽略低于最低长度的数据包
// 忽略低于最低长度的数据包
if (AudioFrameMark == packetType && len(data) < 26) || (AudioFrameMark == packetType && len(data) < 22) {
return RtpPacket{}, fmt.Errorf("invaild data")
}
//x扩展位,固定为0
// x扩展位,固定为0
_ = data[0] >> 4 & 0x1
pt := data[1] & 0x7F
//seq
// seq
_ = binary.BigEndian.Uint16(data[2:])
var simNumber string
@@ -270,11 +261,11 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) {
simNumber += fmt.Sprintf("%02d", data[i])
}
//channel
// channel
_ = data[10]
//subMark
// subMark
subMark := data[11] & 0x0F
//单位ms
// 时间戳,单位ms
var ts uint64
n := 12
if TransmissionDataMark != packetType {
@@ -283,15 +274,15 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) {
}
if AudioFrameMark > packetType {
//iFrameInterval
// iFrameInterval
_ = binary.BigEndian.Uint16(data[n:])
n += 2
//lastFrameInterval
// lastFrameInterval
_ = binary.BigEndian.Uint16(data[n:])
n += 2
}
//size
// size
_ = binary.BigEndian.Uint16(data[n:])
n += 2

View File

@@ -4,7 +4,6 @@ import (
"github.com/lkmio/avformat/libflv"
"github.com/lkmio/avformat/librtmp"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"net"
)
@@ -23,12 +22,7 @@ func (p *Publisher) Input(data []byte) error {
func (p *Publisher) OnDeMuxStream(stream utils.AVStream) {
// AVStream的ExtraData已经拷贝, 释放掉内存池中最新分配的内存
p.FindOrCreatePacketBuffer(stream.Index(), stream.Type()).FreeTail()
if !p.IsCompleted() {
p.PublishSource.OnDeMuxStream(stream)
} else if !p.IsTimeoutTrack(stream.Index()) {
p.SetTimeoutTrack(stream.Index())
log.Sugar.Errorf("添加 %s track超时", stream.Type().ToString())
}
p.PublishSource.OnDeMuxStream(stream)
}
// OnVideo 解析出来的完整视频包

View File

@@ -151,11 +151,10 @@ type PublishSource struct {
streamPipe chan []byte // 推流数据管道
mainContextEvents chan func() // 切换到主协程执行函数的事件管道
lastPacketTime time.Time // 最近收到推流包的时间
lastStreamEndTime time.Time // 最近拉流端结束拉流的时间
sinkCount int // 拉流端计数
urlValues url.Values // 推流url携带的参数
timeoutTracks []int
lastPacketTime time.Time // 最近收到推流包的时间
lastStreamEndTime time.Time // 最近拉流端结束拉流的时间
sinkCount int // 拉流端计数
urlValues url.Values // 推流url携带的参数
createTime time.Time // source创建时间
statistics *BitrateStatistics // 码流统计
}
@@ -676,7 +675,10 @@ func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {
func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
if s.completed {
log.Sugar.Warnf("添加Stream失败 Source: %s已经WriteHeader", s.ID)
log.Sugar.Warnf("添加track失败,已经WriteHeader. source: %s", s.ID)
return
} else if !s.NotTrackAdded(stream.Index()) {
log.Sugar.Warnf("添加track失败,已经添加索引为%d的track. source: %s", stream.Index(), s.ID)
return
}
@@ -742,6 +744,7 @@ func (s *PublishSource) IsCompleted() bool {
return s.completed
}
// NotTrackAdded 是否没有添加该index对应的track
func (s *PublishSource) NotTrackAdded(index int) bool {
for _, avStream := range s.originStreams.All() {
if avStream.Index() == index {
@@ -752,25 +755,17 @@ func (s *PublishSource) NotTrackAdded(index int) bool {
return true
}
func (s *PublishSource) IsTimeoutTrack(index int) bool {
for _, i := range s.timeoutTracks {
if i == index {
return true
}
}
return false
}
func (s *PublishSource) SetTimeoutTrack(index int) {
s.timeoutTracks = append(s.timeoutTracks, index)
}
func (s *PublishSource) OnDeMuxStreamDone() {
s.writeHeader()
}
func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
// track超时忽略推流数据
if s.NotTrackAdded(packet.Index()) {
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail()
return
}
if AppConfig.GOPCache && s.existVideo {
s.gopBuffer.AddPacket(packet)
}