update cache

This commit is contained in:
notch
2020-12-25 09:09:02 +08:00
parent 05aa59409b
commit ebdbc74770
10 changed files with 57 additions and 356 deletions

10
media/cache/cache.go vendored
View File

@@ -4,6 +4,8 @@
package cache
import "github.com/cnotch/queue"
// Pack 表示流媒体包
type Pack interface {
Size() int // 包内数据的长度
@@ -12,16 +14,16 @@ type Pack interface {
// PackCache 媒体包缓存接口
type PackCache interface {
CachePack(pack Pack)
EnqueueTo(q *PackQueue) int
PushTo(q *queue.SyncQueue) int
Reset()
}
type emptyCache struct {
}
func (emptyCache) CachePack(Pack) {}
func (emptyCache) EnqueueTo(q *PackQueue) int { return 0 }
func (emptyCache) Reset() {}
func (emptyCache) CachePack(Pack) {}
func (emptyCache) PushTo(q *queue.SyncQueue) int { return 0 }
func (emptyCache) Reset() {}
// NewEmptyCache .
func NewEmptyCache() PackCache {

View File

@@ -8,13 +8,14 @@ import (
"sync"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/queue"
)
// FlvCache Flv包缓存.
type FlvCache struct {
cacheGop bool
l sync.RWMutex
gop PackBuffer
gop queue.Queue
// cached meta data
metaData *flv.Tag
// cached video sequence header
@@ -53,9 +54,9 @@ func (cache *FlvCache) CachePack(pack Pack) {
if cache.cacheGop { // 如果启用 FlvCache
if tag.IsH264KeyFrame() { // 关键帧重置GOP
cache.gop.Reset()
cache.gop.WritePack(pack)
cache.gop.Push(pack)
} else if cache.gop.Len() > 0 { // 必须关键帧作为cache的第一个包
cache.gop.WritePack(pack)
cache.gop.Push(pack)
}
}
}
@@ -70,14 +71,14 @@ func (cache *FlvCache) Reset() {
cache.audioSequenceHeader = nil
}
// EnqueueTo 入列到指定的队列
func (cache *FlvCache) EnqueueTo(q *PackQueue) int {
// PushTo 入列到指定的队列
func (cache *FlvCache) PushTo(q *queue.SyncQueue) int {
cache.l.RLock()
defer cache.l.RUnlock()
bytes := 0
gop := cache.gop.Packs()
gop := cache.gop.Elems()
initTimestamp := uint32(0)
if len(gop) > 0 {
tag := gop[0].(*flv.Tag)
@@ -87,28 +88,28 @@ func (cache *FlvCache) EnqueueTo(q *PackQueue) int {
//write meta data
if nil != cache.metaData {
cache.metaData.Timestamp = initTimestamp
q.Buffer().WritePack(cache.metaData)
q.Queue().Push(cache.metaData)
bytes += cache.metaData.Size()
}
//write video data
if nil != cache.videoSequenceHeader {
cache.videoSequenceHeader.Timestamp = initTimestamp
q.Buffer().WritePack(cache.videoSequenceHeader)
q.Queue().Push(cache.videoSequenceHeader)
bytes += cache.videoSequenceHeader.Size()
}
//write audio data
if nil != cache.audioSequenceHeader {
cache.audioSequenceHeader.Timestamp = initTimestamp
q.Buffer().WritePack(cache.audioSequenceHeader)
q.Queue().Push(cache.audioSequenceHeader)
bytes += cache.audioSequenceHeader.Size()
}
// write gop
q.Buffer().Write(gop) // 启动阶段调用,无需加锁
q.Queue().PushN(gop) // 启动阶段调用,无需加锁
for _, p := range gop {
bytes += p.Size()
bytes += p.(Pack).Size()
}
return bytes

View File

@@ -9,13 +9,14 @@ import (
"github.com/cnotch/ipchub/av/h264"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/queue"
)
// H264Cache 画面组缓存(Group of Pictures).
type H264Cache struct {
cacheGop bool
l sync.RWMutex
gop PackBuffer
gop queue.Queue
spsPack Pack // 序列参数集包
ppsPack Pack // 图像参数集包
}
@@ -54,9 +55,9 @@ func (cache *H264Cache) CachePack(pack Pack) {
if cache.cacheGop { // 需要缓存 GOP
if islice { // 关键帧
cache.gop.Reset()
cache.gop.WritePack(pack)
cache.gop.Push(pack)
} else if cache.gop.Len() > 0 { // 必须关键帧作为cache的第一个包
cache.gop.WritePack(pack)
cache.gop.Push(pack)
}
}
}
@@ -71,28 +72,28 @@ func (cache *H264Cache) Reset() {
cache.gop.Reset()
}
// EnqueueTo 入列到指定的队列
func (cache *H264Cache) EnqueueTo(q *PackQueue) int {
// PushTo 入列到指定的队列
func (cache *H264Cache) PushTo(q *queue.SyncQueue) int {
bytes := 0
cache.l.RLock()
defer cache.l.RUnlock()
// 写参数包
if cache.spsPack != nil {
q.Buffer().WritePack(cache.spsPack)
q.Queue().Push(cache.spsPack)
bytes += cache.spsPack.Size()
}
if cache.ppsPack != nil {
q.Buffer().WritePack(cache.ppsPack)
q.Queue().Push(cache.ppsPack)
bytes += cache.ppsPack.Size()
}
// 如果必要,写 GopCache
if cache.cacheGop {
packs := cache.gop.Packs()
q.Buffer().Write(packs) // 启动阶段调用,无需加锁
packs := cache.gop.Elems()
q.Queue().PushN(packs) // 启动阶段调用,无需加锁
for _, p := range packs {
bytes += p.Size()
bytes += p.(Pack).Size()
}
}

View File

@@ -9,13 +9,14 @@ import (
"github.com/cnotch/ipchub/av/hevc"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/queue"
)
// HevcCache 画面组缓存(Group of Pictures).
type HevcCache struct {
cacheGop bool
l sync.RWMutex
gop PackBuffer
gop queue.Queue
vpsPack Pack // 视频参数集包
spsPack Pack // 序列参数集包
ppsPack Pack // 图像参数集包
@@ -60,9 +61,9 @@ func (cache *HevcCache) CachePack(pack Pack) {
if cache.cacheGop { // 需要缓存 GOP
if islice { // 关键帧
cache.gop.Reset()
cache.gop.WritePack(pack)
cache.gop.Push(pack)
} else if cache.gop.Len() > 0 {
cache.gop.WritePack(pack)
cache.gop.Push(pack)
}
}
}
@@ -78,34 +79,34 @@ func (cache *HevcCache) Reset() {
cache.gop.Reset()
}
// EnqueueTo 入列到指定的队列
func (cache *HevcCache) EnqueueTo(q *PackQueue) int {
// PushTo 入列到指定的队列
func (cache *HevcCache) PushTo(q *queue.SyncQueue) int {
bytes := 0
cache.l.RLock()
defer cache.l.RUnlock()
// 写参数包
if cache.vpsPack != nil {
q.Buffer().WritePack(cache.vpsPack)
q.Queue().Push(cache.vpsPack)
bytes += cache.vpsPack.Size()
}
if cache.spsPack != nil {
q.Buffer().WritePack(cache.spsPack)
q.Queue().Push(cache.spsPack)
bytes += cache.spsPack.Size()
}
if cache.ppsPack != nil {
q.Buffer().WritePack(cache.ppsPack)
q.Queue().Push(cache.ppsPack)
bytes += cache.ppsPack.Size()
}
// 如果必要,写 GopCache
if cache.cacheGop {
packs := cache.gop.Packs()
q.Buffer().Write(packs) // 启动阶段调用,无需加锁
packs := cache.gop.Elems()
q.Queue().PushN(packs) // 启动阶段调用,无需加锁
for _, p := range packs {
bytes += p.Size()
bytes += p.(Pack).Size()
}
}

View File

@@ -1,188 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package cache
import (
"errors"
"io"
)
// PackBuffer 数据包缓冲
type PackBuffer struct {
buf []Pack // contents are the Packs buf[off : len(buf)]
off int // read at &buf[off], write at &buf[len(buf)]
}
// NewPackBuffer 创建新的缓冲
func NewPackBuffer(buf []Pack) *PackBuffer {
return &PackBuffer{buf: buf}
}
func makeSlice(n int) []Pack {
// If the make fails, give a known error.
defer func() {
if recover() != nil {
panic(ErrTooLarge)
}
}()
return make([]Pack, n)
}
// 避免内存泄露,重置指针引用
func resetSlice(packs []Pack) {
for i := 0; i < len(packs); i++ {
packs[i] = nil
}
}
func (b *PackBuffer) empty() bool { return len(b.buf) <= b.off }
// Packs 获取所有的包
func (b *PackBuffer) Packs() []Pack { return b.buf[b.off:] }
// Len 缓冲包长度
func (b *PackBuffer) Len() int { return len(b.buf) - b.off }
// Cap 缓冲容量
func (b *PackBuffer) Cap() int { return cap(b.buf) }
// Reset 重置缓冲
func (b *PackBuffer) Reset() {
resetSlice(b.buf[b.off:])
b.buf = b.buf[:0]
b.off = 0
}
const maxLen = int(^uint(0) >> 16)
// ErrTooLarge buf太长了
var ErrTooLarge = errors.New("stream.PackBuffer: too large")
// 尝试在cap范围内扩展buf
func (b *PackBuffer) tryGrowByReslice(n int) (int, bool) {
if l := len(b.buf); n <= cap(b.buf)-l {
b.buf = b.buf[:l+n]
return l, true
}
return 0, false
}
// 扩展buf
func (b *PackBuffer) grow(n int) int {
m := b.Len()
// If PackBuffer is empty, reset to recover space.
if m == 0 && b.off != 0 {
b.Reset()
}
// Try to grow by means of a reslice.
if i, ok := b.tryGrowByReslice(n); ok {
return i
}
// // Check if we can make use of bootstrap array.
// if b.buf == nil && n <= len(b.bootstrap) {
// b.buf = b.bootstrap[:n]
// return 0
// }
c := cap(b.buf)
if n <= c/2-m {
// We can slide things down instead of allocating a new
// slice. We only need m+n <= c to slide, but
// we instead let capacity get twice as large so we
// don't spend all our time copying.
copy(b.buf, b.buf[b.off:])
resetSlice(b.buf[m:]) // 释放移动copy后剩余的指针引用
} else if c > maxLen-c-n {
panic(ErrTooLarge)
} else {
// Not enough space anywhere, we need to allocate.
buf := makeSlice(2*c + n)
copy(buf, b.buf[b.off:])
resetSlice(b.buf[b.off:]) // 重新分配了缓冲,释放旧缓冲的指针引用
b.buf = buf
}
// Restore b.off and len(b.buf).
b.off = 0
b.buf = b.buf[:m+n]
return m
}
// Grow 扩展包长度能容纳接下来n个包数
func (b *PackBuffer) Grow(n int) {
if n < 0 {
panic("stream.PackBuffer.Grow: negative count")
}
m := b.grow(n)
b.buf = b.buf[:m]
}
// Write 写入包数组
func (b *PackBuffer) Write(p []Pack) (n int, err error) {
m, ok := b.tryGrowByReslice(len(p))
if !ok {
m = b.grow(len(p))
}
return copy(b.buf[m:], p), nil
}
// WritePack 写入单个包
func (b *PackBuffer) WritePack(c Pack) error {
m, ok := b.tryGrowByReslice(1)
if !ok {
m = b.grow(1)
}
b.buf[m] = c
return nil
}
// Skip 跳过指定个数的包
func (b *PackBuffer) Skip(size int) int {
if b.empty() {
// PackBuffer is empty, reset to recover space.
b.Reset()
return 0
}
len := b.Len()
if size > len {
size = len
}
resetSlice(b.buf[b.off : b.off+size]) // 释放已经读取的指针引用
b.off += size
return size
}
// Read 读包Slice
func (b *PackBuffer) Read(p []Pack) (n int, err error) {
if b.empty() {
// PackBuffer is empty, reset to recover space.
b.Reset()
if len(p) == 0 {
return 0, nil
}
return 0, io.EOF
}
n = copy(p, b.buf[b.off:])
resetSlice(b.buf[b.off : b.off+n]) // 释放已经读取的指针引用
b.off += n
return n, nil
}
// ReadPack 读一个包
func (b *PackBuffer) ReadPack() (Pack, error) {
if b.empty() {
// PackBuffer is empty, reset to recover space.
b.Reset()
return nil, io.EOF
}
p := b.buf[b.off]
b.buf[b.off] = nil // 释放指针引用,避免内存泄露
b.off++
return p, nil
}

View File

@@ -1,75 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package cache
import (
"sync"
)
// PackQueue 媒体包的队列,它并发安全
type PackQueue struct {
cond *sync.Cond
packs PackBuffer
}
// NewPackQueue 创建包队列
func NewPackQueue() *PackQueue {
return &PackQueue{
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Buffer 返回内部缓存
func (pq *PackQueue) Buffer() *PackBuffer {
return &pq.packs
}
// Enqueue 入列包并发送信号
func (pq *PackQueue) Enqueue(pack Pack) {
pq.cond.L.Lock()
pq.packs.WritePack(pack)
pq.cond.Signal()
pq.cond.L.Unlock()
}
// Dequeue 出列包,如果没有等待信号做一次重试
func (pq *PackQueue) Dequeue() Pack {
var pack Pack
pq.cond.L.Lock()
if pq.packs.Len() <= 0 {
pq.cond.Wait()
}
pack, _ = pq.packs.ReadPack()
pq.cond.L.Unlock()
return pack
}
// Signal 发送信号,以便结束等待
func (pq *PackQueue) Signal() {
pq.cond.Signal()
}
// Broadcast 广播信号,以释放所有出列的阻塞等待
func (pq *PackQueue) Broadcast() {
pq.cond.Broadcast()
}
// Len 队列长度
func (pq *PackQueue) Len() int {
pq.cond.L.Lock()
defer pq.cond.L.Unlock()
return pq.packs.Len()
}
// Clear 清空队列
func (pq *PackQueue) Clear() {
pq.cond.L.Lock()
defer pq.cond.L.Unlock()
pq.packs.Reset()
}

View File

@@ -1,43 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package cache
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPackQueue(t *testing.T) {
t.Run("PackQueue", func(t *testing.T) {
closed := false
nilC := 0
pq := NewPackQueue()
go func() {
for !closed {
pack := pq.Dequeue()
if pack == nil {
nilC++
continue
}
}
}()
for i := 0; i < 10000; i++ {
p := emptyPack{}
pq.Enqueue(p)
}
<-time.After(time.Millisecond * 100)
closed = true
pq.Signal()
<-time.After(time.Millisecond * 10)
assert.Equal(t, 1, nilC, "need = 1")
})
}
type emptyPack struct {
}
func (p emptyPack) Size() int { return 100 }

View File

@@ -11,6 +11,7 @@ import (
"github.com/cnotch/ipchub/media/cache"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
@@ -28,7 +29,7 @@ type consumption struct {
consumer Consumer // 消费者
packetType PacketType // 消费的包类型
extra string // 消费者额外信息
recvQueue *cache.PackQueue // 接收媒体源数据的队列
recvQueue *queue.SyncQueue // 接收媒体源数据的队列
closed bool // 消费者是否关闭
Flow stats.Flow // 流量统计
logger *xlog.Logger // 日志对象
@@ -51,13 +52,13 @@ func (c *consumption) Close() error {
// 向消费者发送媒体包
func (c *consumption) send(pack cache.Pack) {
c.recvQueue.Enqueue(pack)
c.recvQueue.Push(pack)
c.Flow.AddIn(int64(pack.Size()))
}
// 向消费者发送一个图像组
func (c *consumption) sendGop(packCache cache.PackCache) int {
bytes := packCache.EnqueueTo(c.recvQueue)
bytes := packCache.PushTo(c.recvQueue)
c.Flow.AddIn(int64(bytes))
return bytes
}
@@ -77,19 +78,19 @@ func (c *consumption) consume() {
c.consumer.Close()
// 尽早通知GC回收内存
c.recvQueue.Clear()
c.recvQueue.Reset()
c.stream = nil
}()
for !c.closed {
pack := c.recvQueue.Dequeue()
if pack == nil {
e := c.recvQueue.Pop()
if e == nil {
if !c.closed {
c.logger.Warn("receive nil pack")
}
continue
}
pack := e.(cache.Pack)
c.consumer.Consume(pack)
c.Flow.AddOut(int64(pack.Size()))
}

View File

@@ -11,10 +11,10 @@ import (
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/av/h264"
"github.com/cnotch/ipchub/media/cache"
"github.com/cnotch/ipchub/protos/amf"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
@@ -27,7 +27,7 @@ type flvMuxer struct {
audioMeta av.AudioMeta
typeFlags byte
audioDataTemplate *flv.AudioData
recvQueue *cache.PackQueue
recvQueue *queue.SyncQueue
extractFuncs [4]func(packet *rtp.Packet) error
tagWriter flv.TagWriter
closed bool
@@ -39,7 +39,7 @@ type flvMuxer struct {
func newFlvMuxer(videoMeta av.VideoMeta, audioMeta av.AudioMeta, tagWriter flv.TagWriter, logger *xlog.Logger) FlvMuxer {
muxer := &flvMuxer{
recvQueue: cache.NewPackQueue(),
recvQueue: queue.NewSyncQueue(),
videoMeta: videoMeta,
audioMeta: audioMeta,
typeFlags: byte(flv.TypeFlagsVideo),
@@ -317,14 +317,14 @@ func (muxer *flvMuxer) consume() {
}
// 尽早通知GC回收内存
muxer.recvQueue.Clear()
muxer.recvQueue.Reset()
}()
muxer.muxMetadataTag()
muxer.muxSequenceHeaderTag()
for !muxer.closed {
pack := muxer.recvQueue.Dequeue()
pack := muxer.recvQueue.Pop()
if pack == nil {
if !muxer.closed {
muxer.logger.Warn("flvmuxer:receive nil packet")
@@ -351,7 +351,7 @@ func (muxer *flvMuxer) Close() error {
}
func (muxer *flvMuxer) WritePacket(packet *rtp.Packet) error {
muxer.recvQueue.Enqueue(packet)
muxer.recvQueue.Push(packet)
return nil
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/ipchub/utils"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
@@ -190,7 +191,7 @@ func (s *Stream) startConsume(consumer Consumer, packetType PacketType, extra st
startOn: time.Now(),
stream: s,
cid: NewCID(packetType, &s.consumerSequenceSeed),
recvQueue: cache.NewPackQueue(),
recvQueue: queue.NewSyncQueue(),
consumer: consumer,
packetType: packetType,
extra: extra,