handle the consumption queue overflow

This commit is contained in:
notch
2021-01-19 09:50:04 +08:00
parent be4998938c
commit b8d148ea16
7 changed files with 47 additions and 29 deletions

View File

@@ -32,7 +32,7 @@ func NewFlvCache(cacheGop bool) *FlvCache {
}
// CachePack 向FlvCache中缓存包
func (cache *FlvCache) CachePack(pack Pack) {
func (cache *FlvCache) CachePack(pack Pack) bool {
tag := pack.(*flv.Tag)
cache.l.Lock()
@@ -40,25 +40,27 @@ func (cache *FlvCache) CachePack(pack Pack) {
if tag.IsMetadata() {
cache.metaData = tag
return
return false
}
if tag.IsH2645SequenceHeader() {
cache.videoSequenceHeader = tag
return
return false
}
if tag.IsAACSequenceHeader() {
cache.audioSequenceHeader = tag
return
return false
}
keyframe := tag.IsH2645KeyFrame()
if cache.cacheGop { // 如果启用 FlvCache
if tag.IsH2645KeyFrame() { // 关键帧重置GOP
if keyframe { // 关键帧重置GOP
cache.gop.Reset()
cache.gop.Push(pack)
} else if cache.gop.Len() > 0 { // 必须关键帧作为cache的第一个包
cache.gop.Push(pack)
}
}
return keyframe
}
// Reset 重置FlvCache缓存

View File

@@ -29,11 +29,11 @@ func NewH264Cache(cacheGop bool) *H264Cache {
}
// CachePack 向H264Cache中缓存包
func (cache *H264Cache) CachePack(pack Pack) {
func (cache *H264Cache) CachePack(pack Pack) bool {
rtppack := pack.(*rtp.Packet)
if rtppack.Channel != rtp.ChannelVideo {
return
return false
}
// 判断是否是参数和关键帧包
@@ -44,12 +44,12 @@ func (cache *H264Cache) CachePack(pack Pack) {
if sps { // 新序列参数,重置图像参数和 GopCache
cache.sps = rtppack
return
return false
}
if pps { // 新图像参数,重置 GopCahce
cache.pps = rtppack
return
return false
}
if cache.cacheGop { // 需要缓存 GOP
@@ -60,6 +60,7 @@ func (cache *H264Cache) CachePack(pack Pack) {
cache.gop.Push(rtppack)
}
}
return islice
}
// Reset 重置H264Cache缓存

View File

@@ -30,11 +30,11 @@ func NewHevcCache(cacheGop bool) *HevcCache {
}
// CachePack 向HevcCache中缓存包
func (cache *HevcCache) CachePack(pack Pack) {
func (cache *HevcCache) CachePack(pack Pack) bool {
rtppack := pack.(*rtp.Packet)
if rtppack.Channel != rtp.ChannelVideo {
return
return false
}
// 判断是否是参数和关键帧包
@@ -45,17 +45,17 @@ func (cache *HevcCache) CachePack(pack Pack) {
if vps { // 视频参数
cache.vps = rtppack
return
return false
}
if sps { // 序列头参数
cache.sps = rtppack
return
return false
}
if pps { // 图像参数
cache.pps = rtppack
return
return false
}
if cache.cacheGop { // 需要缓存 GOP
@@ -66,6 +66,7 @@ func (cache *HevcCache) CachePack(pack Pack) {
cache.gop.Push(rtppack)
}
}
return islice
}
// Reset 重置HevcCache缓存

View File

@@ -32,6 +32,8 @@ type consumption struct {
closed bool // 消费者是否关闭
Flow stats.Flow // 流量统计
logger *xlog.Logger // 日志对象
discarding bool // 媒体包丢弃中
maxQLen int
}
func (c *consumption) ID() CID {
@@ -50,9 +52,20 @@ func (c *consumption) Close() error {
}
// 向消费者发送媒体包
func (c *consumption) send(pack Pack) {
func (c *consumption) send(pack Pack, keyframe bool) {
if keyframe { // 是 key frame
n := c.recvQueue.Len()
if c.discarding && n < c.maxQLen {
c.discarding = false
} else if !c.discarding && n > c.maxQLen {
c.discarding = true
}
}
if !c.discarding {
c.recvQueue.Push(pack)
c.Flow.AddIn(int64(pack.Size()))
}
}
// 向消费者发送一个图像组

View File

@@ -15,10 +15,10 @@ type consumptions struct {
count int32
}
func (m *consumptions) SendToAll(p Pack) {
func (m *consumptions) SendToAll(p Pack, keyframe bool) {
m.Range(func(key, value interface{}) bool {
c := value.(*consumption)
c.send(p)
c.send(p, keyframe)
return true
})
}

View File

@@ -17,7 +17,7 @@ import (
type Pack = format.Packet
type packCache interface {
CachePack(pack Pack)
CachePack(pack Pack) (keyframe bool) // 返回是否是关键帧
PushTo(q *queue.SyncQueue) int
Reset()
}
@@ -27,7 +27,7 @@ var _ packCache = emptyCache{}
type emptyCache struct {
}
func (emptyCache) CachePack(Pack) {}
func (emptyCache) CachePack(Pack) bool { return false }
func (emptyCache) PushTo(q *queue.SyncQueue) int { return 0 }
func (emptyCache) Reset() {}

View File

@@ -207,8 +207,8 @@ func (s *Stream) WriteRtpPacket(packet *rtp.Packet) error {
atomic.AddUint64(&s.size, uint64(packet.Size()))
s.cache.CachePack(packet)
s.consumptions.SendToAll(packet)
keyframe := s.cache.CachePack(packet)
s.consumptions.SendToAll(packet, keyframe)
s.rtpDemuxer.WriteRtpPacket(packet)
return nil
@@ -234,8 +234,8 @@ func (s *Stream) WriteFlvTag(tag *flv.Tag) error {
return statusErrors[status]
}
s.flvCache.CachePack(tag)
s.flvConsumptions.SendToAll(tag)
keyframe := s.flvCache.CachePack(tag)
s.flvConsumptions.SendToAll(tag, keyframe)
return nil
}
@@ -263,6 +263,7 @@ func (s *Stream) startConsume(consumer Consumer, packetType PacketType, extra st
packetType: packetType,
extra: extra,
Flow: stats.NewFlow(),
maxQLen: 1000,
}
c.logger = s.logger.With(xlog.Fields(