diff --git a/media/cache/flvcache.go b/media/cache/flvcache.go index 15bc265..b848426 100755 --- a/media/cache/flvcache.go +++ b/media/cache/flvcache.go @@ -32,7 +32,7 @@ func NewFlvCache(cacheGop bool) *FlvCache { } // CachePack 向FlvCache中缓存包 -func (cache *FlvCache) CachePack(pack Pack) { +func (cache *FlvCache) CachePack(pack Pack) bool { tag := pack.(*flv.Tag) cache.l.Lock() @@ -40,25 +40,27 @@ func (cache *FlvCache) CachePack(pack Pack) { if tag.IsMetadata() { cache.metaData = tag - return + return false } if tag.IsH2645SequenceHeader() { cache.videoSequenceHeader = tag - return + return false } if tag.IsAACSequenceHeader() { cache.audioSequenceHeader = tag - return + return false } + keyframe := tag.IsH2645KeyFrame() if cache.cacheGop { // 如果启用 FlvCache - if tag.IsH2645KeyFrame() { // 关键帧,重置GOP + if keyframe { // 关键帧,重置GOP cache.gop.Reset() cache.gop.Push(pack) } else if cache.gop.Len() > 0 { // 必须关键帧作为cache的第一个包 cache.gop.Push(pack) } } + return keyframe } // Reset 重置FlvCache缓存 diff --git a/media/cache/h264cache.go b/media/cache/h264cache.go index 30f99d0..00c1dc0 100755 --- a/media/cache/h264cache.go +++ b/media/cache/h264cache.go @@ -29,11 +29,11 @@ func NewH264Cache(cacheGop bool) *H264Cache { } // CachePack 向H264Cache中缓存包 -func (cache *H264Cache) CachePack(pack Pack) { +func (cache *H264Cache) CachePack(pack Pack) bool { rtppack := pack.(*rtp.Packet) if rtppack.Channel != rtp.ChannelVideo { - return + return false } // 判断是否是参数和关键帧包 @@ -44,12 +44,12 @@ func (cache *H264Cache) CachePack(pack Pack) { if sps { // 新序列参数,重置图像参数和 GopCache cache.sps = rtppack - return + return false } if pps { // 新图像参数,重置 GopCahce cache.pps = rtppack - return + return false } if cache.cacheGop { // 需要缓存 GOP @@ -60,6 +60,7 @@ func (cache *H264Cache) CachePack(pack Pack) { cache.gop.Push(rtppack) } } + return islice } // Reset 重置H264Cache缓存 diff --git a/media/cache/hevccache.go b/media/cache/hevccache.go index 01eee25..3335c4f 100755 --- a/media/cache/hevccache.go +++ b/media/cache/hevccache.go @@ -30,11 +30,11 @@ func NewHevcCache(cacheGop bool) *HevcCache { } // CachePack 向HevcCache中缓存包 -func (cache *HevcCache) CachePack(pack Pack) { +func (cache *HevcCache) CachePack(pack Pack) bool { rtppack := pack.(*rtp.Packet) if rtppack.Channel != rtp.ChannelVideo { - return + return false } // 判断是否是参数和关键帧包 @@ -45,17 +45,17 @@ func (cache *HevcCache) CachePack(pack Pack) { if vps { // 视频参数 cache.vps = rtppack - return + return false } if sps { // 序列头参数 cache.sps = rtppack - return + return false } if pps { // 图像参数 cache.pps = rtppack - return + return false } if cache.cacheGop { // 需要缓存 GOP @@ -66,6 +66,7 @@ func (cache *HevcCache) CachePack(pack Pack) { cache.gop.Push(rtppack) } } + return islice } // Reset 重置HevcCache缓存 diff --git a/media/consumption.go b/media/consumption.go index 64e46f3..de79f93 100755 --- a/media/consumption.go +++ b/media/consumption.go @@ -32,6 +32,8 @@ type consumption struct { closed bool // 消费者是否关闭 Flow stats.Flow // 流量统计 logger *xlog.Logger // 日志对象 + discarding bool // 媒体包丢弃中 + maxQLen int } func (c *consumption) ID() CID { @@ -50,9 +52,20 @@ func (c *consumption) Close() error { } // 向消费者发送媒体包 -func (c *consumption) send(pack Pack) { - c.recvQueue.Push(pack) - c.Flow.AddIn(int64(pack.Size())) +func (c *consumption) send(pack Pack, keyframe bool) { + if keyframe { // 是 key frame + n := c.recvQueue.Len() + if c.discarding && n < c.maxQLen { + c.discarding = false + } else if !c.discarding && n > c.maxQLen { + c.discarding = true + } + } + + if !c.discarding { + c.recvQueue.Push(pack) + c.Flow.AddIn(int64(pack.Size())) + } } // 向消费者发送一个图像组 diff --git a/media/consumptions.go b/media/consumptions.go index 21ca665..0e68a22 100755 --- a/media/consumptions.go +++ b/media/consumptions.go @@ -15,10 +15,10 @@ type consumptions struct { count int32 } -func (m *consumptions) SendToAll(p Pack) { +func (m *consumptions) SendToAll(p Pack, keyframe bool) { m.Range(func(key, value interface{}) bool { c := value.(*consumption) - c.send(p) + c.send(p, keyframe) return true }) } diff --git a/media/placehold.go b/media/placehold.go index 4c392f7..303c166 100644 --- a/media/placehold.go +++ b/media/placehold.go @@ -17,7 +17,7 @@ import ( type Pack = format.Packet type packCache interface { - CachePack(pack Pack) + CachePack(pack Pack) (keyframe bool) // 返回是否是关键帧 PushTo(q *queue.SyncQueue) int Reset() } @@ -27,7 +27,7 @@ var _ packCache = emptyCache{} type emptyCache struct { } -func (emptyCache) CachePack(Pack) {} +func (emptyCache) CachePack(Pack) bool { return false } func (emptyCache) PushTo(q *queue.SyncQueue) int { return 0 } func (emptyCache) Reset() {} @@ -41,9 +41,9 @@ var _ flvMuxer = emptyFlvMuxer{} type emptyFlvMuxer struct{} -func (emptyFlvMuxer) TypeFlags() byte { return 0 } +func (emptyFlvMuxer) TypeFlags() byte { return 0 } func (emptyFlvMuxer) WriteFrame(frame *codec.Frame) error { return nil } -func (emptyFlvMuxer) Close() error { return nil } +func (emptyFlvMuxer) Close() error { return nil } type rtpDemuxer interface { rtp.PacketWriter @@ -54,6 +54,6 @@ var _ rtpDemuxer = emptyRtpDemuxer{} type emptyRtpDemuxer struct{} -func (emptyRtpDemuxer) TypeFlags() byte { return 0 } +func (emptyRtpDemuxer) TypeFlags() byte { return 0 } func (emptyRtpDemuxer) WriteRtpPacket(*rtp.Packet) error { return nil } -func (emptyRtpDemuxer) Close() error { return nil } +func (emptyRtpDemuxer) Close() error { return nil } diff --git a/media/stream.go b/media/stream.go index 31e7b92..dacf82d 100755 --- a/media/stream.go +++ b/media/stream.go @@ -207,8 +207,8 @@ func (s *Stream) WriteRtpPacket(packet *rtp.Packet) error { atomic.AddUint64(&s.size, uint64(packet.Size())) - s.cache.CachePack(packet) - s.consumptions.SendToAll(packet) + keyframe := s.cache.CachePack(packet) + s.consumptions.SendToAll(packet, keyframe) s.rtpDemuxer.WriteRtpPacket(packet) return nil @@ -234,8 +234,8 @@ func (s *Stream) WriteFlvTag(tag *flv.Tag) error { return statusErrors[status] } - s.flvCache.CachePack(tag) - s.flvConsumptions.SendToAll(tag) + keyframe := s.flvCache.CachePack(tag) + s.flvConsumptions.SendToAll(tag, keyframe) return nil } @@ -263,6 +263,7 @@ func (s *Stream) startConsume(consumer Consumer, packetType PacketType, extra st packetType: packetType, extra: extra, Flow: stats.NewFlow(), + maxQLen: 1000, } c.logger = s.logger.With(xlog.Fields(