diff --git a/gb28181/source.go b/gb28181/source.go index f8af39b..640c3dc 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -122,15 +122,6 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT } }() - if source.IsCompleted() && source.NotTrackAdded(index) { - if !source.IsTimeoutTrack(index) { - source.SetTimeoutTrack(index) - log.Sugar.Errorf("添加track超时 source:%s", source.GetID()) - } - - return nil - } - if utils.AVMediaTypeAudio == mediaType { stream_, packet, err = stream.ExtractAudioPacket(codec, source.audioStream == nil, data, pts, dts, index, 90000) if err != nil { diff --git a/jt1078/jt_session.go b/jt1078/jt_session.go index 8cb7bf4..9beeee0 100644 --- a/jt1078/jt_session.go +++ b/jt1078/jt_session.go @@ -64,12 +64,12 @@ func (s *Session) OnJtPTPPacket(data []byte) { return } - //过滤空数据 + // 过滤空数据 if len(packet.payload) == 0 { return } - //首包处理, hook通知 + // 首包处理, hook通知 if s.rtpPacket == nil { s.SetID(packet.simNumber) s.rtpPacket = &RtpPacket{} @@ -87,8 +87,8 @@ func (s *Session) OnJtPTPPacket(data []byte) { }() } - // 完整包/最后一个分包, 创建AVPacket - // 参考时间戳, 遇到不同的时间戳, 处理前一包. 分包标记可能不靠谱 + // 如果时间戳或者负载类型发生变化, 认为是新的音视频帧,处理前一包,创建AVPacket,回调给PublishSource。 + // 分包标记可能不靠谱 if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt { if s.rtpPacket.packetType == AudioFrameMark && s.audioBuffer != nil { if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil { @@ -107,23 +107,18 @@ func (s *Session) OnJtPTPPacket(data []byte) { } } + // 部分音视频帧 if packet.packetType == AudioFrameMark { if s.audioBuffer == nil { if s.videoIndex == 0 && s.audioIndex == 0 { s.videoIndex = 1 } - if s.IsCompleted() { - if !s.IsTimeoutTrack(s.audioIndex) { - s.SetTimeoutTrack(s.audioIndex) - log.Sugar.Errorf("添加audiotrack超时") - } - return - } - + // 创建音频的AVPacket缓冲区 s.audioBuffer = s.FindOrCreatePacketBuffer(s.audioIndex, utils.AVMediaTypeAudio) } + // 将部分音频帧写入缓冲区 s.audioBuffer.TryMark() s.audioBuffer.Write(packet.payload) } else { @@ -132,17 +127,11 @@ func (s *Session) OnJtPTPPacket(data []byte) { s.audioIndex = 1 } - if s.IsCompleted() { - if !s.IsTimeoutTrack(s.videoIndex) { - s.SetTimeoutTrack(s.videoIndex) - log.Sugar.Errorf("添加videotrack超时") - } - return - } - + // 创建视频的AVPacket缓冲区 s.videoBuffer = s.FindOrCreatePacketBuffer(s.videoIndex, utils.AVMediaTypeVideo) } + // 将部分视频帧写入缓冲区 s.videoBuffer.TryMark() s.videoBuffer.Write(packet.payload) } @@ -176,6 +165,7 @@ func (s *Session) Close() { s.PublishSource.Close() } +// 从视频帧中提取AVPacket和AVStream, 回调给PublishSource func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error { var codecId utils.AVCodecID @@ -212,6 +202,7 @@ func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []by return nil } +// 从音频帧中提取AVPacket和AVStream, 回调给PublishSource func (s *Session) processAudioPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error { var codecId utils.AVCodecID @@ -249,20 +240,20 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) { } packetType := data[11] >> 4 & 0x0F - //忽略透传数据 + // 忽略透传数据 if TransmissionDataMark == packetType { return RtpPacket{}, fmt.Errorf("invaild data") } - //忽略低于最低长度的数据包 + // 忽略低于最低长度的数据包 if (AudioFrameMark == packetType && len(data) < 26) || (AudioFrameMark == packetType && len(data) < 22) { return RtpPacket{}, fmt.Errorf("invaild data") } - //x扩展位,固定为0 + // x扩展位,固定为0 _ = data[0] >> 4 & 0x1 pt := data[1] & 0x7F - //seq + // seq _ = binary.BigEndian.Uint16(data[2:]) var simNumber string @@ -270,11 +261,11 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) { simNumber += fmt.Sprintf("%02d", data[i]) } - //channel + // channel _ = data[10] - //subMark + // subMark subMark := data[11] & 0x0F - //单位ms + // 时间戳,单位ms var ts uint64 n := 12 if TransmissionDataMark != packetType { @@ -283,15 +274,15 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) { } if AudioFrameMark > packetType { - //iFrameInterval + // iFrameInterval _ = binary.BigEndian.Uint16(data[n:]) n += 2 - //lastFrameInterval + // lastFrameInterval _ = binary.BigEndian.Uint16(data[n:]) n += 2 } - //size + // size _ = binary.BigEndian.Uint16(data[n:]) n += 2 diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index 3e95082..42e5461 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -4,7 +4,6 @@ import ( "github.com/lkmio/avformat/libflv" "github.com/lkmio/avformat/librtmp" "github.com/lkmio/avformat/utils" - "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" "net" ) @@ -23,12 +22,7 @@ func (p *Publisher) Input(data []byte) error { func (p *Publisher) OnDeMuxStream(stream utils.AVStream) { // AVStream的ExtraData已经拷贝, 释放掉内存池中最新分配的内存 p.FindOrCreatePacketBuffer(stream.Index(), stream.Type()).FreeTail() - if !p.IsCompleted() { - p.PublishSource.OnDeMuxStream(stream) - } else if !p.IsTimeoutTrack(stream.Index()) { - p.SetTimeoutTrack(stream.Index()) - log.Sugar.Errorf("添加 %s track超时", stream.Type().ToString()) - } + p.PublishSource.OnDeMuxStream(stream) } // OnVideo 解析出来的完整视频包 diff --git a/stream/source.go b/stream/source.go index 5d0e46f..3c3da4b 100644 --- a/stream/source.go +++ b/stream/source.go @@ -151,11 +151,10 @@ type PublishSource struct { streamPipe chan []byte // 推流数据管道 mainContextEvents chan func() // 切换到主协程执行函数的事件管道 - lastPacketTime time.Time // 最近收到推流包的时间 - lastStreamEndTime time.Time // 最近拉流端结束拉流的时间 - sinkCount int // 拉流端计数 - urlValues url.Values // 推流url携带的参数 - timeoutTracks []int + lastPacketTime time.Time // 最近收到推流包的时间 + lastStreamEndTime time.Time // 最近拉流端结束拉流的时间 + sinkCount int // 拉流端计数 + urlValues url.Values // 推流url携带的参数 createTime time.Time // source创建时间 statistics *BitrateStatistics // 码流统计 } @@ -676,7 +675,10 @@ func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) { func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) { if s.completed { - log.Sugar.Warnf("添加Stream失败 Source: %s已经WriteHeader", s.ID) + log.Sugar.Warnf("添加track失败,已经WriteHeader. source: %s", s.ID) + return + } else if !s.NotTrackAdded(stream.Index()) { + log.Sugar.Warnf("添加track失败,已经添加索引为%d的track. source: %s", stream.Index(), s.ID) return } @@ -742,6 +744,7 @@ func (s *PublishSource) IsCompleted() bool { return s.completed } +// NotTrackAdded 是否没有添加该index对应的track func (s *PublishSource) NotTrackAdded(index int) bool { for _, avStream := range s.originStreams.All() { if avStream.Index() == index { @@ -752,25 +755,17 @@ func (s *PublishSource) NotTrackAdded(index int) bool { return true } -func (s *PublishSource) IsTimeoutTrack(index int) bool { - for _, i := range s.timeoutTracks { - if i == index { - return true - } - } - - return false -} - -func (s *PublishSource) SetTimeoutTrack(index int) { - s.timeoutTracks = append(s.timeoutTracks, index) -} - func (s *PublishSource) OnDeMuxStreamDone() { s.writeHeader() } func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) { + // track超时,忽略推流数据 + if s.NotTrackAdded(packet.Index()) { + s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail() + return + } + if AppConfig.GOPCache && s.existVideo { s.gopBuffer.AddPacket(packet) }