diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index 81621d3..157f8ba 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -11,15 +11,13 @@ type Publisher struct { deMuxer libflv.DeMuxer audioMemoryPool stream.MemoryPool videoMemoryPool stream.MemoryPool - audioPacket []byte - videoPacket []byte audioUnmark bool videoUnmark bool } func NewPublisher(sourceId string) *Publisher { - publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}} + publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}, audioUnmark: false, videoUnmark: false} publisher.deMuxer = libflv.DeMuxer{} //设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl publisher.deMuxer.SetHandler(publisher) @@ -36,15 +34,17 @@ func NewPublisher(sourceId string) *Publisher { } func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) { + //AVStream的Data单独拷贝出来 + //释放掉内存池中最新分配的内存 tmp := stream_.Extra() bytes := make([]byte, len(tmp)) copy(bytes, tmp) stream_.SetExtraData(bytes) if utils.AVMediaTypeAudio == stream_.Type() { - p.audioMemoryPool.FreeTail(len(p.audioPacket)) + p.audioMemoryPool.FreeTail() } else if utils.AVMediaTypeVideo == stream_.Type() { - p.videoMemoryPool.FreeTail(len(p.videoPacket)) + p.videoMemoryPool.FreeTail() } p.SourceImpl.OnDeMuxStream(stream_) @@ -62,9 +62,9 @@ func (p *Publisher) OnDeMuxPacket(index int, packet utils.AVPacket) { } if utils.AVMediaTypeAudio == packet.MediaType() { - p.audioMemoryPool.FreeHead(len(packet.Data())) + p.audioMemoryPool.FreeHead() } else if utils.AVMediaTypeVideo == packet.MediaType() { - p.videoMemoryPool.FreeHead(len(packet.Data())) + p.videoMemoryPool.FreeHead() } } @@ -79,7 +79,6 @@ func (p *Publisher) OnVideo(data []byte, ts uint32) { p.videoUnmark = false } - p.videoPacket = data _ = p.deMuxer.InputVideo(data, ts) } @@ -89,7 +88,6 @@ func (p *Publisher) OnAudio(data []byte, ts uint32) { p.audioUnmark = false } - p.audioPacket = data _ = p.deMuxer.InputAudio(data, ts) } @@ -97,7 +95,7 @@ func (p *Publisher) OnAudio(data []byte, ts uint32) { func (p *Publisher) OnPartPacket(index int, data []byte, first bool) { //audio if index == 0 { - if p.audioUnmark { + if !p.audioUnmark { p.audioMemoryPool.Mark() p.audioUnmark = true } @@ -105,7 +103,7 @@ func (p *Publisher) OnPartPacket(index int, data []byte, first bool) { p.audioMemoryPool.Write(data) //video } else if index == 1 { - if p.videoUnmark { + if !p.videoUnmark { p.videoMemoryPool.Mark() p.videoUnmark = true } diff --git a/rtmp/rtmp_transtream.go b/rtmp/rtmp_transtream.go index 3ced4f6..c5120f0 100644 --- a/rtmp/rtmp_transtream.go +++ b/rtmp/rtmp_transtream.go @@ -99,8 +99,8 @@ func (t *TransStream) AddSink(sink stream.ISink) { } func (t *TransStream) onDiscardPacket(pkt interface{}) { - bytes := pkt.([]byte) - t.memoryPool.FreeHead(len(bytes)) + //bytes := pkt.([]byte) + t.memoryPool.FreeHead() } func (t *TransStream) WriteHeader() error { diff --git a/stream/memory_pool.go b/stream/memory_pool.go index 41b3f3a..8942514 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -21,54 +21,55 @@ type MemoryPool interface { Reset() // FreeHead 从头部释放指定大小内存 - FreeHead(size int) + FreeHead() // FreeTail 从尾部释放指定大小内存 - FreeTail(size int) + FreeTail() } func NewMemoryPool(capacity int) MemoryPool { pool := &memoryPool{ - data: make([]byte, capacity), - capacity: capacity, + data: make([]byte, capacity), + capacity: capacity, + blockQueue: NewQueue(128), } return pool } type memoryPool struct { - data []byte - ptrStart uintptr - ptrEnd uintptr - //剩余的可用内存空间不足以为此次write + data []byte + //实际的可用容量,当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用. capacity int head int tail int //保存开始索引 - mark int + markIndex int + mark bool + blockQueue *Queue } // 根据head和tail计算出可用的内存地址 func (m *memoryPool) allocate(size int) []byte { if m.capacity-m.tail < size { //使用从头释放的内存 - if m.tail-m.mark+size <= m.head { - copy(m.data, m.data[m.mark:m.tail]) - m.capacity = m.mark - m.tail = m.tail - m.mark - m.mark = 0 + if m.tail-m.markIndex+size <= m.head { + copy(m.data, m.data[m.markIndex:m.tail]) + m.capacity = m.markIndex + m.tail = m.tail - m.markIndex + m.markIndex = 0 } else { //扩容 - capacity := (cap(m.data) + m.tail - m.mark + size) * 3 / 2 + capacity := (cap(m.data) + m.tail - m.markIndex + size) * 3 / 2 bytes := make([]byte, capacity) //不对之前的内存进行复制, 已经被AVPacket引用, 自行GC - copy(bytes, m.data[m.mark:m.tail]) + copy(bytes, m.data[m.markIndex:m.tail]) m.data = bytes m.capacity = capacity - m.tail = m.tail - m.mark - m.mark = 0 + m.tail = m.tail - m.markIndex + m.markIndex = 0 m.head = 0 } @@ -80,28 +81,42 @@ func (m *memoryPool) allocate(size int) []byte { } func (m *memoryPool) Mark() { - m.mark = m.tail + m.markIndex = m.tail + m.mark = true } func (m *memoryPool) Write(data []byte) { + utils.Assert(m.mark) + allocate := m.allocate(len(data)) copy(allocate, data) } func (m *memoryPool) Allocate(size int) []byte { + utils.Assert(m.mark) return m.allocate(size) } func (m *memoryPool) Fetch() []byte { - return m.data[m.mark:m.tail] + utils.Assert(m.mark) + + m.mark = false + size := m.tail - m.markIndex + m.blockQueue.Push(size) + return m.data[m.markIndex:m.tail] } func (m *memoryPool) Reset() { - m.tail = m.mark + m.mark = false + m.tail = m.markIndex } -func (m *memoryPool) FreeHead(size int) { +func (m *memoryPool) FreeHead() { + utils.Assert(!m.blockQueue.IsEmpty()) + + size := m.blockQueue.Pop().(int) m.head += size + if m.head == m.tail { m.head = 0 m.tail = 0 @@ -110,7 +125,13 @@ func (m *memoryPool) FreeHead(size int) { } } -func (m *memoryPool) FreeTail(size int) { +func (m *memoryPool) FreeTail() { + utils.Assert(!m.blockQueue.IsEmpty()) + + size := m.blockQueue.PopBack().(int) m.tail -= size - utils.Assert(m.tail >= 0) + + if m.tail == 0 && !m.blockQueue.IsEmpty() { + m.tail = m.capacity + } } diff --git a/stream/memory_pool_test.go b/stream/memory_pool_test.go index c3025c0..ef2a254 100644 --- a/stream/memory_pool_test.go +++ b/stream/memory_pool_test.go @@ -2,7 +2,9 @@ package stream import ( "encoding/hex" + "github.com/yangjiechina/avformat/utils" "testing" + "unsafe" ) func TestMemoryPool(t *testing.T) { @@ -12,14 +14,19 @@ func TestMemoryPool(t *testing.T) { } pool := NewMemoryPool(5) + last := uintptr(0) for i := 0; i < 10; i++ { pool.Mark() pool.Write(bytes) fetch := pool.Fetch() + addr := *(*uintptr)(unsafe.Pointer(&fetch)) + if last != 0 { + utils.Assert(last == addr) + } + last = addr + println(hex.Dump(fetch)) - if i%2 == 0 { - pool.FreeHead(len(fetch)) - } + pool.FreeTail() } } diff --git a/stream/queue.go b/stream/queue.go new file mode 100644 index 0000000..f9a612c --- /dev/null +++ b/stream/queue.go @@ -0,0 +1,48 @@ +package stream + +import "github.com/yangjiechina/avformat/utils" + +type Queue struct { + *ringBuffer +} + +func NewQueue(capacity int) *Queue { + utils.Assert(capacity > 0) + + return &Queue{ringBuffer: &ringBuffer{ + data: make([]interface{}, capacity), + head: 0, + tail: 0, + size: 0, + }} +} + +func (q *Queue) Push(value interface{}) { + if q.ringBuffer.IsFull() { + newArray := make([]interface{}, q.ringBuffer.Size()*2) + head, tail := q.ringBuffer.All() + copy(newArray, head) + if tail != nil { + copy(newArray[len(head):], tail) + } + + q.data = newArray + q.head = 0 + q.tail = q.size + } + + q.data[q.tail] = value + q.tail = (q.tail + 1) % cap(q.data) + + q.size++ +} + +func (q *Queue) PopBack() interface{} { + utils.Assert(q.size > 0) + + value := q.ringBuffer.Tail() + q.size-- + q.tail = utils.MaxInt(0, q.tail-1) + + return value +} diff --git a/stream/queue_test.go b/stream/queue_test.go new file mode 100644 index 0000000..ac5cb95 --- /dev/null +++ b/stream/queue_test.go @@ -0,0 +1,19 @@ +package stream + +import ( + "fmt" + "testing" +) + +func TestQueue(t *testing.T) { + queue := NewQueue(1) + + for i := 0; i < 100; i++ { + queue.Push(i) + } + + for i := 0; i < 100; i++ { + pop := queue.PopBack() + println(fmt.Sprintf("element:%d", pop.(int))) + } +} diff --git a/stream/ring_buffer.go b/stream/ring_buffer.go index 5c0fe15..fe9c3ba 100644 --- a/stream/ring_buffer.go +++ b/stream/ring_buffer.go @@ -1,5 +1,7 @@ package stream +import "github.com/yangjiechina/avformat/utils" + type RingBuffer interface { IsEmpty() bool @@ -19,6 +21,7 @@ type RingBuffer interface { } func NewRingBuffer(capacity int) RingBuffer { + utils.Assert(capacity > 0) r := &ringBuffer{ data: make([]interface{}, capacity), head: 0, @@ -71,7 +74,7 @@ func (r *ringBuffer) Head() interface{} { } func (r *ringBuffer) Tail() interface{} { - return r.data[r.tail] + return r.data[utils.MaxInt(0, r.tail-1)] } func (r *ringBuffer) Size() int { @@ -80,8 +83,8 @@ func (r *ringBuffer) Size() int { func (r *ringBuffer) All() ([]interface{}, []interface{}) { if r.head < r.tail { - return r.data[r.head:r.tail], nil - } else { return r.data[r.head:], r.data[:r.tail] + } else { + return r.data[r.head:], nil } }