From 2b7672cdc2eda8fc7f692bd375a409d4e4e1b0d4 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Fri, 14 Jun 2024 17:13:02 +0800 Subject: [PATCH] memory leak --- pkg/annexb.go | 8 +- pkg/avframe.go | 6 +- pkg/util/buf-reader.go | 89 ++++++----- pkg/util/buf-reader_test.go | 4 +- pkg/util/buffers.go | 247 ++++++++++++++---------------- pkg/util/mem.go | 63 ++++---- plugin/hdl/pkg/pull.go | 16 +- plugin/rtmp/pkg/audio.go | 2 +- plugin/rtmp/pkg/handshake.go | 7 +- plugin/rtmp/pkg/net-connection.go | 24 +-- plugin/rtmp/pkg/video.go | 6 +- publisher.go | 21 +-- server.go | 41 ++--- subscriber.go | 25 ++- 14 files changed, 282 insertions(+), 277 deletions(-) diff --git a/pkg/annexb.go b/pkg/annexb.go index 4268f66..2495fa9 100644 --- a/pkg/annexb.go +++ b/pkg/annexb.go @@ -63,16 +63,16 @@ type Annexb265Ctx struct { func (a *Annexb264Ctx) CreateFrame(frame *AVFrame) (IAVFrame, error) { var annexb AnnexB // annexb.RecyclableBuffers.ScalableMemoryAllocator = frame.Wraps[0].GetScalableMemoryAllocator() - annexb.ReadFromBytes(codec.NALU_Delimiter2) + annexb.Append(codec.NALU_Delimiter2) if frame.IDR { - annexb.ReadFromBytes(a.SPS[0], codec.NALU_Delimiter2, a.PPS[0], codec.NALU_Delimiter2) + annexb.Append(a.SPS[0], codec.NALU_Delimiter2, a.PPS[0], codec.NALU_Delimiter2) } var nalus = frame.Raw.(Nalus) for i, nalu := range nalus.Nalus { if i > 0 { - annexb.ReadFromBytes(codec.NALU_Delimiter1) + annexb.Append(codec.NALU_Delimiter1) } - annexb.ReadFromBytes(nalu...) + annexb.Append(nalu...) } return &annexb, nil } diff --git a/pkg/avframe.go b/pkg/avframe.go index 6516f3e..d29d887 100644 --- a/pkg/avframe.go +++ b/pkg/avframe.go @@ -107,11 +107,9 @@ func (nalus *Nalus) ParseAVCC(reader *util.MemoryReader, naluSizeLen int) error if err != nil { return err } - nalu, err := reader.ReadBytes(int(l)) - if err != nil { - return err + for nalu := range reader.RangeN(l) { + nalus.Append(nalu) } - nalus.Append(nalu) } return nil } diff --git a/pkg/util/buf-reader.go b/pkg/util/buf-reader.go index 3ac25d9..3e4f301 100644 --- a/pkg/util/buf-reader.go +++ b/pkg/util/buf-reader.go @@ -4,29 +4,29 @@ import ( "io" ) -const defaultBufSize = 1 << 16 +const defaultBufSize = 1 << 14 type BufReader struct { reader io.Reader allocator *ScalableMemoryAllocator buf MemoryReader BufLen int + Err error } func NewBufReaderWithBufLen(reader io.Reader, bufLen int) (r *BufReader) { - r = &BufReader{} - r.reader = reader - r.allocator = NewScalableMemoryAllocator(bufLen) - r.BufLen = bufLen + r = &BufReader{ + reader: reader, + allocator: NewScalableMemoryAllocator(bufLen), + BufLen: bufLen, + } + r.buf.Memory = &Memory{} + //fmt.Println("NewBufReaderWithBufLen", uintptr(unsafe.Pointer(r.allocator))) return } func NewBufReader(reader io.Reader) (r *BufReader) { - r = &BufReader{} - r.reader = reader - r.allocator = NewScalableMemoryAllocator(defaultBufSize) - r.BufLen = defaultBufSize - return + return NewBufReaderWithBufLen(reader, defaultBufSize) } func (r *BufReader) Recycle() { @@ -39,25 +39,27 @@ func (r *BufReader) eat() error { buf := r.allocator.Malloc(r.BufLen) if n, err := r.reader.Read(buf); err != nil { r.allocator.Free(buf) + r.Err = err return err } else { if n < r.BufLen { r.allocator.Free(buf[n:]) buf = buf[:n] } - r.buf.ReadFromBytes(buf) + r.buf.Buffers = append(r.buf.Buffers, buf) + r.buf.Size += n + r.buf.Length += n } return nil } func (r *BufReader) ReadByte() (b byte, err error) { - for ; r.buf.Length == 0 && err == nil; err = r.eat() { - + for r.buf.Length == 0 { + if err = r.eat(); err != nil { + return + } } - if err == nil { - b, err = r.buf.ReadByte() - } - return + return r.buf.ReadByte() } func (r *BufReader) ReadBE(n int) (num int, err error) { @@ -93,30 +95,43 @@ func (r *BufReader) ReadBE32(n int) (num uint32, err error) { return } -func (r *BufReader) ReadBytes(n int) (mem RecyclableMemory, err error) { - mem.ScalableMemoryAllocator = r.allocator - defer func() { - if err != nil { - mem.Recycle() - mem = RecyclableMemory{} - } - }() - for r.recycleFront(); n > 0 && err == nil; err = r.eat() { - if r.buf.Length > 0 { - if r.buf.Length >= n { - mem.AddRecycleBytes(r.buf.ClipN(n)...) - return - } - n -= r.buf.Length - mem.AddRecycleBytes(r.buf.Buffers...) - r.buf = MemoryReader{} +func (r *BufReader) Skip(n int) (err error) { + r.recycleFront() + for r.buf.Length < n { + if err = r.eat(); err != nil { + return err } } + r.buf.RangeN(n)(nil) return } -func (r *BufReader) recycleFront() { - for _, buf := range r.buf.ClipFront() { - r.allocator.Free(buf) +func (r *BufReader) ReadRange(n int) func(func([]byte) bool) { + return func(yield func([]byte) bool) { + for r.recycleFront(); n > 0 && r.Err == nil; r.eat() { + if r.buf.Length > 0 { + if r.buf.Length >= n { + r.buf.RangeN(n)(yield) + return + } + n -= r.buf.Length + for _, buf := range r.buf.Buffers { + yield(buf) + } + r.buf.MoveToEnd() + } + } } } + +func (r *BufReader) ReadBytes(n int) (mem Memory, err error) { + for buf := range r.ReadRange(n) { + mem.Buffers = append(mem.Buffers, buf) + } + mem.Size = n + return mem, r.Err +} + +func (r *BufReader) recycleFront() { + r.buf.ClipFront(r.allocator.Free) +} diff --git a/pkg/util/buf-reader_test.go b/pkg/util/buf-reader_test.go index 26bf742..382e73d 100644 --- a/pkg/util/buf-reader_test.go +++ b/pkg/util/buf-reader_test.go @@ -114,20 +114,18 @@ func BenchmarkBufRead(b *testing.B) { b.RunParallel(func(pb *testing.PB) { var testData = make([]byte, 10*1024*1024) var err error - var mem RecyclableMemory for pb.Next() { rand.Read(testData) testReader := bytes.NewReader(testData) reader := NewBufReaderWithBufLen(testReader, 1024) for err == nil { - mem.Recycle() r := rand.Intn(10) if r < 4 { _, err = reader.ReadByte() } else if r < 7 { _, err = reader.ReadBE(4) } else { - mem, err = reader.ReadBytes(rand.Intn(4096)) + _, err = reader.ReadBytes(rand.Intn(4096)) } } } diff --git a/pkg/util/buffers.go b/pkg/util/buffers.go index 5bcfb3c..3dbf34e 100644 --- a/pkg/util/buffers.go +++ b/pkg/util/buffers.go @@ -12,7 +12,7 @@ type Memory struct { } type MemoryReader struct { - Memory + *Memory Length int offset0 int offset1 int @@ -24,7 +24,7 @@ func NewMemoryFromBytes(b ...[]byte) *Memory { func NewReadableBuffersFromBytes(b ...[]byte) *MemoryReader { buf := NewMemory(b) - return &MemoryReader{Memory: *buf, Length: buf.Size} + return &MemoryReader{Memory: NewMemory(b), Length: buf.Size} } func NewMemory(buffers net.Buffers) *Memory { @@ -35,75 +35,74 @@ func NewMemory(buffers net.Buffers) *Memory { return ret } -func (buffers *Memory) UpdateBuffer(index int, buf []byte) { +func (m *Memory) UpdateBuffer(index int, buf []byte) { if index < 0 { - index = len(buffers.Buffers) + index + index = len(m.Buffers) + index } - buffers.Size = len(buf) - len(buffers.Buffers[index]) - buffers.Buffers[index] = buf + m.Size = len(buf) - len(m.Buffers[index]) + m.Buffers[index] = buf } -func (buffers *Memory) CopyFrom(b Memory) { +func (m *Memory) CopyFrom(b *Memory) { buf := make([]byte, b.Size) - bufs := slices.Clone(b.Buffers) - bufs.Read(buf) - buffers.ReadFromBytes(buf) + b.CopyTo(buf) + m.Append(buf) } -func (buffers *Memory) ReadFromBytes(b ...[]byte) { - buffers.Buffers = append(buffers.Buffers, b...) - for _, level0 := range b { - buffers.Size += len(level0) +func (m *Memory) CopyTo(buf []byte) { + for _, b := range m.Buffers { + l := len(b) + copy(buf, b) + buf = buf[l:] } } -func (buffers *Memory) Count() int { - return len(buffers.Buffers) +func (m *Memory) Append(b ...[]byte) { + m.Buffers = append(m.Buffers, b...) + for _, level0 := range b { + m.Size += len(level0) + } } -func (r Memory) NewReader() *MemoryReader { +func (m *Memory) Count() int { + return len(m.Buffers) +} + +func (m *Memory) NewReader() *MemoryReader { var reader MemoryReader - reader.Memory = r - reader.Length = r.Size + reader.Memory = m + reader.Length = m.Size return &reader } -func (buffers *MemoryReader) ReadFromBytes(b ...[]byte) { - buffers.Memory.Buffers = append(buffers.Memory.Buffers, b...) - for _, level0 := range b { - buffers.Size += len(level0) - buffers.Length += len(level0) - } -} - -func (buffers *MemoryReader) Pop() []byte { +func (r *MemoryReader) Pop() []byte { panic("ReadableBuffers Pop not allowed") } -func (buffers *MemoryReader) GetCurrent() []byte { - return buffers.Memory.Buffers[buffers.offset0][buffers.offset1:] +func (r *MemoryReader) GetCurrent() []byte { + return r.Memory.Buffers[r.offset0][r.offset1:] } -func (buffers *MemoryReader) MoveToEnd() { - buffers.offset0 = buffers.Count() - buffers.offset1 = 0 - buffers.Length = 0 +func (r *MemoryReader) MoveToEnd() { + r.offset0 = r.Count() + r.offset1 = 0 + r.Length = 0 } -func (buffers *MemoryReader) ReadBytesTo(buf []byte) (actual int) { +func (r *MemoryReader) ReadBytesTo(buf []byte) (actual int) { n := len(buf) - curBuf := buffers.GetCurrent() + curBuf := r.GetCurrent() curBufLen := len(curBuf) - if n > buffers.Length { + if n > r.Length { if curBufLen > 0 { actual += copy(buf, curBuf) - buffers.offset0++ - buffers.offset1 = 0 + r.offset0++ + r.offset1 = 0 } - for _, b := range buffers.Memory.Buffers[buffers.offset0:] { + for _, b := range r.Memory.Buffers[r.offset0:] { actual += copy(buf[actual:], b) } - buffers.MoveToEnd() + r.MoveToEnd() return } l := n @@ -111,25 +110,25 @@ func (buffers *MemoryReader) ReadBytesTo(buf []byte) (actual int) { if n < curBufLen { actual += n copy(buf[l-n:], curBuf[:n]) - buffers.forward(n) + r.forward(n) break } copy(buf[l-n:], curBuf) n -= curBufLen actual += curBufLen - buffers.skipBuf() - if buffers.Length == 0 && n > 0 { + r.skipBuf() + if r.Length == 0 && n > 0 { return } } return } -func (reader *MemoryReader) ReadByteTo(b ...*byte) (err error) { +func (r *MemoryReader) ReadByteTo(b ...*byte) (err error) { for i := range b { - if reader.Length == 0 { + if r.Length == 0 { return io.EOF } - *b[i], err = reader.ReadByte() + *b[i], err = r.ReadByte() if err != nil { return } @@ -137,37 +136,37 @@ func (reader *MemoryReader) ReadByteTo(b ...*byte) (err error) { return } -func (reader *MemoryReader) ReadByteMask(mask byte) (byte, error) { - b, err := reader.ReadByte() +func (r *MemoryReader) ReadByteMask(mask byte) (byte, error) { + b, err := r.ReadByte() if err != nil { return 0, err } return b & mask, nil } -func (reader *MemoryReader) ReadByte() (b byte, err error) { - if reader.Length == 0 { +func (r *MemoryReader) ReadByte() (b byte, err error) { + if r.Length == 0 { return 0, io.EOF } - curBuf := reader.GetCurrent() + curBuf := r.GetCurrent() b = curBuf[0] if len(curBuf) == 1 { - reader.skipBuf() + r.skipBuf() } else { - reader.forward(1) + r.forward(1) } return } -func (reader *MemoryReader) LEB128Unmarshal() (uint, int, error) { +func (r *MemoryReader) LEB128Unmarshal() (uint, int, error) { v := uint(0) n := 0 for i := 0; i < 8; i++ { - b, err := reader.ReadByte() + b, err := r.ReadByte() if err != nil { return 0, 0, err } - v |= (uint(b&0b01111111) << (i * 7)) + v |= uint(b&0b01111111) << (i * 7) n++ if (b & 0b10000000) == 0 { @@ -177,82 +176,52 @@ func (reader *MemoryReader) LEB128Unmarshal() (uint, int, error) { return v, n, nil } -func (reader *MemoryReader) getCurrentBufLen() int { - return len(reader.Memory.Buffers[reader.offset0]) - reader.offset1 +func (r *MemoryReader) getCurrentBufLen() int { + return len(r.Memory.Buffers[r.offset0]) - r.offset1 } -func (reader *MemoryReader) Skip(n int) error { - if n > reader.Length { +func (r *MemoryReader) Skip(n int) error { + if n > r.Length { return io.EOF } - curBufLen := reader.getCurrentBufLen() + curBufLen := r.getCurrentBufLen() for n > 0 { if n < curBufLen { - reader.forward(n) + r.forward(n) break } n -= curBufLen - reader.skipBuf() - if reader.Length == 0 && n > 0 { + r.skipBuf() + if r.Length == 0 && n > 0 { return io.EOF } } return nil } -func (reader *MemoryReader) forward(n int) { - reader.Length -= n - reader.offset1 += n +func (r *MemoryReader) forward(n int) { + r.Length -= n + r.offset1 += n } -func (buffers *MemoryReader) skipBuf() { - curBufLen := buffers.getCurrentBufLen() - buffers.Length -= curBufLen - buffers.offset0++ - buffers.offset1 = 0 +func (r *MemoryReader) skipBuf() { + curBufLen := r.getCurrentBufLen() + r.Length -= curBufLen + r.offset0++ + r.offset1 = 0 } -func (reader *MemoryReader) ReadBytes(n int) ([]byte, error) { - if n > reader.Length { +func (r *MemoryReader) ReadBytes(n int) ([]byte, error) { + if n > r.Length { return nil, io.EOF } b := make([]byte, n) - actual := reader.ReadBytesTo(b) + actual := r.ReadBytesTo(b) return b[:actual], nil } -// func (buffers *ReadableBuffers) WriteTo(w io.Writer) (n int64, err error) { -// var buf net.Buffers -// if buffers.Count() > buffers.offset1 { -// buf = append(buf, buffers.Buffers[buffers.offset:]...) -// } -// if buffers.curBufLen > 0 { -// buf[0] = buffers.curBuf -// } -// buffers.MoveToEnd() -// return buf.WriteTo(w) -// } - -func (reader *MemoryReader) WriteNTo(n int, result *net.Buffers) (actual int) { - for actual = n; reader.Length > 0 && n > 0; reader.skipBuf() { - curBuf := reader.GetCurrent() - if len(curBuf) > n { - if result != nil { - *result = append(*result, curBuf[:n]) - } - reader.forward(n) - return actual - } - if result != nil { - *result = append(*result, curBuf) - } - n -= len(curBuf) - } - return actual - n -} - -func (reader *MemoryReader) ReadBE(n int) (num int, err error) { +func (r *MemoryReader) ReadBE(n int) (num int, err error) { for i := range n { - b, err := reader.ReadByte() + b, err := r.ReadByte() if err != nil { return -1, err } @@ -261,39 +230,45 @@ func (reader *MemoryReader) ReadBE(n int) (num int, err error) { return } -func (reader *MemoryReader) ClipN(n int) (r net.Buffers) { - reader.WriteNTo(n, nil) - return reader.ClipFront() +func (r *MemoryReader) RangeN(n int) func(yield func([]byte) bool) { + return func(yield func([]byte) bool) { + for good := yield != nil; r.Length > 0 && n > 0; r.skipBuf() { + curBuf := r.GetCurrent() + if curBufLen := len(curBuf); curBufLen > n { + if r.forward(n); good { + good = yield(curBuf[:n]) + } + return + } else if n -= curBufLen; good { + good = yield(curBuf) + } + } + } } -func (reader *MemoryReader) ClipFront() (r net.Buffers) { - offset := reader.Size - reader.Length +func (r *MemoryReader) ClipFront(yield func([]byte) bool) { + offset := r.Size - r.Length if offset == 0 { return } - buffers := &reader.Memory - if reader.Length == 0 { - r = slices.Clone(buffers.Buffers) - buffers.Buffers = buffers.Buffers[:0] - } else { - r = slices.Clone(buffers.Buffers[:reader.offset0]) - if reader.offset1 > 0 { - r = append(r, buffers.Buffers[reader.offset0][:reader.offset1]) - buffers.Buffers[reader.offset0] = reader.GetCurrent() + if m := r.Memory; r.Length == 0 { + for _, buf := range m.Buffers { + yield(buf) } - if reader.offset0 > 0 { - buffers.Buffers = slices.Delete(buffers.Buffers, 0, reader.offset0) + m.Buffers = m.Buffers[:0] + } else { + for _, buf := range m.Buffers[:r.offset0] { + yield(buf) + } + if r.offset1 > 0 { + yield(m.Buffers[r.offset0][:r.offset1]) + m.Buffers[r.offset0] = r.GetCurrent() + } + if r.offset0 > 0 { + m.Buffers = slices.Delete(m.Buffers, 0, r.offset0) } } - // bs := 0 - // for _, b := range r { - // bs += len(b) - // } - // if bs != offset { - // panic("ClipFront error") - // } - reader.Size -= offset - reader.offset0 = 0 - reader.offset1 = 0 - return -} \ No newline at end of file + r.Size -= offset + r.offset0 = 0 + r.offset1 = 0 +} diff --git a/pkg/util/mem.go b/pkg/util/mem.go index 9f40863..e2af407 100644 --- a/pkg/util/mem.go +++ b/pkg/util/mem.go @@ -1,6 +1,7 @@ package util import ( + "fmt" "slices" "sync" "unsafe" @@ -8,16 +9,16 @@ import ( const ( MaxBlockSize = 1 << 22 - BuddySize = MaxBlockSize << 4 + BuddySize = MaxBlockSize << 7 MinPowerOf2 = 10 ) var ( - memoryPool [BuddySize]byte - buddy = NewBuddy(BuddySize >> MinPowerOf2) - lock sync.Mutex - poolStart = int64(uintptr(unsafe.Pointer(&memoryPool[0]))) - //EnableCheckSize bool = false + memoryPool [BuddySize]byte + buddy = NewBuddy(BuddySize >> MinPowerOf2) + lock sync.Mutex + poolStart = int64(uintptr(unsafe.Pointer(&memoryPool[0]))) + EnableCheckSize bool = false ) type MemoryAllocator struct { @@ -50,13 +51,14 @@ func NewMemoryAllocator(size int) (ret *MemoryAllocator) { memory: make([]byte, size), allocator: NewAllocator(size), } + fmt.Println(size) ret.start = int64(uintptr(unsafe.Pointer(&ret.memory[0]))) return } func (ma *MemoryAllocator) Recycle() { lock.Lock() - _ = buddy.Free(int((poolStart - ma.start) >> 10)) + _ = buddy.Free(int((poolStart - ma.start) >> MinPowerOf2)) lock.Unlock() } @@ -101,8 +103,12 @@ func (sma *ScalableMemoryAllocator) checkSize() { for _, child := range sma.children { totalFree += child.allocator.GetFreeSize() } - if totalFree != sma.size-(int(sma.totalMalloc)-int(sma.totalFree)) { + if inUse := sma.totalMalloc - sma.totalFree; totalFree != sma.size-int(inUse) { panic("CheckSize") + } else { + if inUse > 3000000 { + fmt.Println(uintptr(unsafe.Pointer(sma)), inUse) + } } } @@ -136,9 +142,9 @@ func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) { if sma == nil { return make([]byte, size) } - //if EnableCheckSize { - // defer sma.checkSize() - //} + if EnableCheckSize { + defer sma.checkSize() + } defer sma.addMallocCount(size) var child *MemoryAllocator for _, child = range sma.children { @@ -146,7 +152,7 @@ func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) { return } } - for sma.childSize <= MaxBlockSize { + for sma.childSize < MaxBlockSize { sma.childSize = sma.childSize << 1 if sma.childSize >= size { break @@ -167,19 +173,19 @@ func (sma *ScalableMemoryAllocator) Free(mem []byte) bool { if sma == nil { return false } - //if EnableCheckSize { - // defer sma.checkSize() - //} + if EnableCheckSize { + defer sma.checkSize() + } ptr := int64(uintptr(unsafe.Pointer(&mem[0]))) size := len(mem) - for i, child := range sma.children { + for _, child := range sma.children { if start := int(ptr - child.start); start >= 0 && start < child.Size && child.free(start, size) { sma.addFreeCount(size) - if len(sma.children) > 1 && child.allocator.sizeTree.End-child.allocator.sizeTree.Start == child.Size { - child.Recycle() - sma.children = slices.Delete(sma.children, i, i+1) - sma.size -= child.Size - } + //if len(sma.children) > 1 && child.allocator.sizeTree.End-child.allocator.sizeTree.Start == child.Size { + // child.Recycle() + // sma.children = slices.Delete(sma.children, i, i+1) + // sma.size -= child.Size + //} return true } } @@ -195,25 +201,22 @@ type RecyclableMemory struct { func (r *RecyclableMemory) NextN(size int) (memory []byte) { memory = r.ScalableMemoryAllocator.Malloc(size) if r.RecycleIndexes != nil { - r.RecycleIndexes = append(r.RecycleIndexes, len(r.Buffers)) + r.RecycleIndexes = append(r.RecycleIndexes, r.Count()) } - r.ReadFromBytes(memory) + r.Append(memory) return } -func (r *RecyclableMemory) AddRecycleBytes(b ...[]byte) { +func (r *RecyclableMemory) AddRecycleBytes(b []byte) { if r.RecycleIndexes != nil { - start := len(r.Buffers) - for i := range b { - r.RecycleIndexes = append(r.RecycleIndexes, start+i) - } + r.RecycleIndexes = append(r.RecycleIndexes, r.Count()) } - r.ReadFromBytes(b...) + r.Append(b) } func (r *RecyclableMemory) RemoveRecycleBytes(index int) (buf []byte) { if index < 0 { - index = len(r.Buffers) + index + index = r.Count() + index } buf = r.Buffers[index] if r.RecycleIndexes != nil { diff --git a/plugin/hdl/pkg/pull.go b/plugin/hdl/pkg/pull.go index 13ac1b0..0f3ce84 100644 --- a/plugin/hdl/pkg/pull.go +++ b/plugin/hdl/pkg/pull.go @@ -15,13 +15,16 @@ import ( type HDLPuller struct { *util.BufReader + *util.ScalableMemoryAllocator hasAudio bool hasVideo bool absTS uint32 //绝对时间戳 } func NewHDLPuller() *HDLPuller { - return &HDLPuller{} + return &HDLPuller{ + ScalableMemoryAllocator: util.NewScalableMemoryAllocator(1024), + } } func (puller *HDLPuller) Connect(p *m7s.Client) (err error) { @@ -51,9 +54,8 @@ func (puller *HDLPuller) Connect(p *m7s.Client) (err error) { } } if err == nil { - var head util.RecyclableMemory + var head util.Memory head, err = puller.BufReader.ReadBytes(13) - defer head.Recycle() if err == nil { var flvHead [3]byte var version, flag byte @@ -102,10 +104,15 @@ func (puller *HDLPuller) Pull(p *m7s.Puller) (err error) { } puller.ReadBE(3) // stream id always 0 var frame rtmp.RTMPData - frame.RecyclableMemory, err = puller.ReadBytes(int(dataSize)) + frame.ScalableMemoryAllocator = puller.ScalableMemoryAllocator + mem, err := puller.ReadBytes(int(dataSize)) if err != nil { return err } + switch t { + case FLV_TAG_TYPE_AUDIO, FLV_TAG_TYPE_VIDEO: + mem.CopyTo(frame.NextN(mem.Size)) + } puller.absTS = offsetTs + (timestamp - startTs) frame.Timestamp = puller.absTS // fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS) @@ -116,7 +123,6 @@ func (puller *HDLPuller) Pull(p *m7s.Puller) (err error) { p.WriteVideo(frame.WrapVideo()) case FLV_TAG_TYPE_SCRIPT: p.Info("script") - frame.Recycle() } } return diff --git a/plugin/rtmp/pkg/audio.go b/plugin/rtmp/pkg/audio.go index 66b38c7..7487491 100644 --- a/plugin/rtmp/pkg/audio.go +++ b/plugin/rtmp/pkg/audio.go @@ -50,7 +50,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) return } var cloneFrame RTMPAudio - cloneFrame.CopyFrom(avcc.Memory) + cloneFrame.CopyFrom(&avcc.Memory) ctx.AudioObjectType = b0 >> 3 ctx.SamplingFrequencyIndex = (b0 & 0x07 << 1) | (b1 >> 7) ctx.ChannelConfiguration = (b1 >> 3) & 0x0F diff --git a/plugin/rtmp/pkg/handshake.go b/plugin/rtmp/pkg/handshake.go index ad925ce..134dfb0 100644 --- a/plugin/rtmp/pkg/handshake.go +++ b/plugin/rtmp/pkg/handshake.go @@ -119,10 +119,9 @@ func (nc *NetConnection) simple_handshake(C1 []byte, checkC2 bool) error { if err != nil { return err } - C2.Recycle() if checkC2 { buf := nc.mediaDataPool.NextN(C2.Size) - _, err = C2.Read(buf) + C2.CopyTo(buf) if !bytes.Equal(buf[8:], S0S1[9:]) { return errors.New("C2 Error") } @@ -179,9 +178,7 @@ func (nc *NetConnection) complex_handshake(C1 []byte) error { buffer := net.Buffers{[]byte{RTMP_HANDSHAKE_VERSION}, S1, S2_Random, S2_Digest} _, err = buffer.WriteTo(nc) - b, _ := nc.ReadBytes(1536) - b.Recycle() - return err + return nc.Skip(1536) } func validateClient(C1 []byte) (scheme int, challenge []byte, digest []byte, ok bool, err error) { diff --git a/plugin/rtmp/pkg/net-connection.go b/plugin/rtmp/pkg/net-connection.go index f956a9f..16d89d8 100644 --- a/plugin/rtmp/pkg/net-connection.go +++ b/plugin/rtmp/pkg/net-connection.go @@ -146,16 +146,13 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { if msgLen == 0 { return nil, nil } - var mem util.RecyclableMemory + var bufSize = 0 if unRead := msgLen - chunk.bufLen; unRead < conn.readChunkSize { - mem, err = conn.ReadBytes(unRead) + bufSize = unRead } else { - mem, err = conn.ReadBytes(conn.readChunkSize) + bufSize = conn.readChunkSize } - if err != nil { - return nil, err - } - conn.readSeqNum += uint32(mem.Size) + conn.readSeqNum += uint32(bufSize) if chunk.bufLen == 0 { chunk.AVData.RecyclableMemory = util.RecyclableMemory{ ScalableMemoryAllocator: conn.mediaDataPool.ScalableMemoryAllocator, @@ -163,9 +160,12 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { chunk.AVData.NextN(msgLen) } buffer := chunk.AVData.Buffers[0] - for _, b := range mem.Buffers { - copy(buffer[chunk.bufLen:], b) - chunk.bufLen += len(b) + for buf := range conn.ReadRange(bufSize) { + copy(buffer[chunk.bufLen:], buf) + chunk.bufLen += len(buf) + } + if conn.Err != nil { + return nil, conn.Err } if chunk.bufLen == msgLen { msg = chunk @@ -332,7 +332,9 @@ func (conn *NetConnection) sendChunk(data net.Buffers, head *ChunkHeader, headTy head.WriteTo(RTMP_CHUNK_HEAD_1, &chunk3) r := util.NewReadableBuffersFromBytes(data...) for { - r.WriteNTo(conn.WriteChunkSize, &chunks) + for buf := range r.RangeN(conn.WriteChunkSize) { + chunks = append(chunks, buf) + } if r.Length <= 0 { break } diff --git a/plugin/rtmp/pkg/video.go b/plugin/rtmp/pkg/video.go index 4c72d7f..3d39ceb 100644 --- a/plugin/rtmp/pkg/video.go +++ b/plugin/rtmp/pkg/video.go @@ -31,7 +31,7 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) isSeq = true isIDR = false var cloneFrame RTMPVideo - cloneFrame.CopyFrom(avcc.Memory) + cloneFrame.CopyFrom(&avcc.Memory) switch fourCC { case codec.FourCC_H264: var ctx H264Ctx @@ -125,7 +125,7 @@ func (avcc *RTMPVideo) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { b.Write(h264ctx.PPS[0]) t.ICodecCtx = &ctx var seqFrame RTMPData - seqFrame.ReadFromBytes(b) + seqFrame.Append(b) t.SequenceFrame = seqFrame.WrapVideo() if t.Enabled(context.TODO(), TraceLevel) { c := t.FourCC().String() @@ -241,7 +241,7 @@ func createH26xFrame(from *AVFrame, codecID VideoCodecID) (frame IAVFrame, err e naluLenM := rtmpVideo.NextN(4) naluLen := uint32(util.LenOfBuffers(nalu)) binary.BigEndian.PutUint32(naluLenM, naluLen) - rtmpVideo.ReadFromBytes(nalu...) + rtmpVideo.Append(nalu...) } frame = &rtmpVideo return diff --git a/publisher.go b/publisher.go index 674a603..a0dd9b9 100644 --- a/publisher.go +++ b/publisher.go @@ -1,7 +1,6 @@ package m7s import ( - "errors" "reflect" "sync" "time" @@ -182,7 +181,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { } t := p.VideoTrack.AVTrack if t == nil { - t = NewAVTrack(data, p.Logger.With("track", "video"), 100) + t = NewAVTrack(data, p.Logger.With("track", "video"), 20) p.Lock() p.VideoTrack.AVTrack = t p.VideoTrack.Add(t) @@ -293,7 +292,7 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { } t := p.AudioTrack.AVTrack if t == nil { - t = NewAVTrack(data, p.Logger.With("track", "audio"), 256) + t = NewAVTrack(data, p.Logger.With("track", "audio"), 20) p.Lock() p.AudioTrack.AVTrack = t p.AudioTrack.Add(t) @@ -363,7 +362,7 @@ func (p *Publisher) Dispose(err error) { if p.State == PublisherStateDisposed { return } - if !errors.Is(p.StopReason(), ErrKick) && p.IsStopped() { + if p.IsStopped() { if !p.AudioTrack.IsEmpty() { p.AudioTrack.Dispose() } @@ -378,18 +377,12 @@ func (p *Publisher) Dispose(err error) { func (p *Publisher) TakeOver(old *Publisher) { p.baseTs = old.lastTs - p.VideoTrack = old.VideoTrack - p.VideoTrack.ICodecCtx = nil - p.VideoTrack.Logger = p.Logger.With("track", "video") - p.AudioTrack = old.AudioTrack - p.AudioTrack.ICodecCtx = nil - p.AudioTrack.Logger = p.Logger.With("track", "audio") - p.DataTrack = old.DataTrack + p.Info("takeOver", "old", old.ID) for subscriber := range old.SubscriberRange { p.AddSubscriber(subscriber) } + if old.Plugin != nil { + old.Dispose(nil) + } old.Subscribers = util.Collection[int, *Subscriber]{} - // for _, track := range p.TransTrack { - // track.ICodecCtx = nil - // } } diff --git a/server.go b/server.go index 3b02579..c98810e 100644 --- a/server.go +++ b/server.go @@ -276,9 +276,7 @@ func (s *Server) eventLoop() { if publisher.Plugin != nil { if err := publisher.checkTimeout(); err != nil { publisher.Dispose(err) - newPublisher := &Publisher{} - newPublisher.StreamPath = publisher.StreamPath - s.Waiting.Set(newPublisher) + s.createWait(publisher.StreamPath) } } for sub := range publisher.SubscriberRange { @@ -365,7 +363,7 @@ func (s *Server) eventLoop() { func (s *Server) onUnsubscribe(subscriber *Subscriber) { s.Subscribers.Remove(subscriber) - s.Info("unsubscribe", "streamPath", subscriber.StreamPath) + s.Info("unsubscribe", "streamPath", subscriber.StreamPath, "reason", subscriber.StopReason()) if subscriber.Closer != nil { subscriber.Close() } @@ -383,9 +381,13 @@ func (s *Server) onUnsubscribe(subscriber *Subscriber) { func (s *Server) onUnpublish(publisher *Publisher) { s.Streams.Remove(publisher) s.Waiting.Add(publisher) - s.Info("unpublish", "streamPath", publisher.StreamPath, "count", s.Streams.Length) + s.Info("unpublish", "streamPath", publisher.StreamPath, "count", s.Streams.Length, "reason", publisher.StopReason()) for subscriber := range publisher.SubscriberRange { - subscriber.TimeoutTimer.Reset(publisher.WaitCloseTimeout) + waitCloseTimeout := publisher.WaitCloseTimeout + if waitCloseTimeout == 0 { + waitCloseTimeout = subscriber.WaitTimeout + } + subscriber.TimeoutTimer.Reset(waitCloseTimeout) } if publisher.Closer != nil { _ = publisher.Close() @@ -407,26 +409,31 @@ func (s *Server) OnPublish(publisher *Publisher) error { s.pidG++ p := publisher.Plugin publisher.ID = s.pidG - publisher.Logger = p.With("streamPath", publisher.StreamPath, "puber", publisher.ID) + publisher.Logger = p.With("streamPath", publisher.StreamPath, "pubID", publisher.ID) publisher.TimeoutTimer = time.NewTimer(p.config.PublishTimeout) publisher.Info("publish") if waiting, ok := s.Waiting.Get(publisher.StreamPath); ok { - if waiting.Plugin != nil { - publisher.TakeOver(waiting) - } else { - for subscriber := range waiting.SubscriberRange { - publisher.AddSubscriber(subscriber) - } - } + publisher.TakeOver(waiting) s.Waiting.Remove(waiting) } return nil } +func (s *Server) createWait(streamPath string) *Publisher { + newPublisher := &Publisher{} + s.pidG++ + newPublisher.ID = s.pidG + newPublisher.Logger = s.Logger.With("pubID", newPublisher.ID, "streamPath", streamPath) + s.Info("createWait") + newPublisher.StreamPath = streamPath + s.Waiting.Set(newPublisher) + return newPublisher +} + func (s *Server) OnSubscribe(subscriber *Subscriber) error { s.sidG++ subscriber.ID = s.sidG - subscriber.Logger = subscriber.Plugin.With("streamPath", subscriber.StreamPath, "suber", subscriber.ID) + subscriber.Logger = subscriber.Plugin.With("streamPath", subscriber.StreamPath, "subID", subscriber.ID) subscriber.TimeoutTimer = time.NewTimer(subscriber.Plugin.config.Subscribe.WaitTimeout) s.Subscribers.Add(subscriber) subscriber.Info("subscribe") @@ -435,9 +442,7 @@ func (s *Server) OnSubscribe(subscriber *Subscriber) error { } else if publisher, ok = s.Waiting.Get(subscriber.StreamPath); ok { publisher.AddSubscriber(subscriber) } else { - newPublisher := &Publisher{} - newPublisher.StreamPath = subscriber.StreamPath - newPublisher.AddSubscriber(subscriber) + s.createWait(subscriber.StreamPath).AddSubscriber(subscriber) } return nil } diff --git a/subscriber.go b/subscriber.go index 122a4b1..3fe5923 100644 --- a/subscriber.go +++ b/subscriber.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "reflect" + "runtime" "strconv" "time" @@ -239,10 +240,14 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( }() sendAudioFrame := func() (err error) { if awi >= 0 { - if s.Enabled(s, TraceLevel) { - s.Trace("send audio frame", "seq", audioFrame.Sequence) + if len(audioFrame.Wraps) > awi { + if s.Enabled(s, TraceLevel) { + s.Trace("send audio frame", "seq", audioFrame.Sequence) + } + err = onAudio(audioFrame.Wraps[awi].(A)) + } else { + ar.StopRead() } - err = onAudio(audioFrame.Wraps[awi].(A)) } else { err = onAudio(any(audioFrame).(A)) } @@ -254,10 +259,14 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( } sendVideoFrame := func() (err error) { if vwi >= 0 { - if s.Enabled(s, TraceLevel) { - s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wraps[vwi].String(), "size", videoFrame.Wraps[vwi].GetSize()) + if len(videoFrame.Wraps) > vwi { + if s.Enabled(s, TraceLevel) { + s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wraps[vwi].String(), "size", videoFrame.Wraps[vwi].GetSize()) + } + err = onVideo(videoFrame.Wraps[vwi].(V)) + } else { + vr.StopRead() } - err = onVideo(videoFrame.Wraps[vwi].(V)) } else { err = onVideo(any(videoFrame).(V)) } @@ -269,13 +278,16 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( } checkPublisherChange := func() { if prePublisher != s.Publisher { + s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID) if ar != nil { startAudioTs = time.Duration(ar.AbsTime) * time.Millisecond ar.StopRead() + ar = nil } if vr != nil { startVideoTs = time.Duration(vr.AbsTime) * time.Millisecond vr.StopRead() + vr = nil } createAudioReader() createVideoReader() @@ -374,5 +386,6 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( createAudioReader() } checkPublisherChange() + runtime.Gosched() } }