mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-27 03:26:01 +08:00
修复rtmp推拉流内存泄漏问题
This commit is contained in:
5
api.go
5
api.go
@@ -16,6 +16,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -73,6 +74,10 @@ func startApiServer(addr string) {
|
|||||||
//TCP主动,设置连接地址
|
//TCP主动,设置连接地址
|
||||||
apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", apiServer.connectGBSource)
|
apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", apiServer.connectGBSource)
|
||||||
apiServer.router.HandleFunc("/api/v1/gb28181/source/close", apiServer.closeGBSource)
|
apiServer.router.HandleFunc("/api/v1/gb28181/source/close", apiServer.closeGBSource)
|
||||||
|
apiServer.router.HandleFunc("/api/v1/gb28181/gc/force", func(writer http.ResponseWriter, request *http.Request) {
|
||||||
|
runtime.GC()
|
||||||
|
writer.WriteHeader(http.StatusOK)
|
||||||
|
})
|
||||||
|
|
||||||
apiServer.router.HandleFunc("/rtc.html", func(writer http.ResponseWriter, request *http.Request) {
|
apiServer.router.HandleFunc("/rtc.html", func(writer http.ResponseWriter, request *http.Request) {
|
||||||
http.ServeFile(writer, request, "./rtc.html")
|
http.ServeFile(writer, request, "./rtc.html")
|
||||||
|
@@ -104,8 +104,8 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
|
|||||||
} else if utils.AVMediaTypeVideo == stream.Type() {
|
} else if utils.AVMediaTypeVideo == stream.Type() {
|
||||||
t.muxer.AddVideoTrack(stream.CodecId())
|
t.muxer.AddVideoTrack(stream.CodecId())
|
||||||
|
|
||||||
t.muxer.AddProperty("width", stream.CodecParameters().SPSInfo().Width())
|
t.muxer.AddProperty("width", stream.CodecParameters().Width())
|
||||||
t.muxer.AddProperty("height", stream.CodecParameters().SPSInfo().Height())
|
t.muxer.AddProperty("height", stream.CodecParameters().Height())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -187,7 +187,7 @@ func (t *httpTransStream) WriteHeader() error {
|
|||||||
if utils.AVMediaTypeAudio == track.Type() {
|
if utils.AVMediaTypeAudio == track.Type() {
|
||||||
data = track.Extra()
|
data = track.Extra()
|
||||||
} else if utils.AVMediaTypeVideo == track.Type() {
|
} else if utils.AVMediaTypeVideo == track.Type() {
|
||||||
data = track.CodecParameters().DecoderConfRecord().ToMP4VC()
|
data = track.CodecParameters().MP4ExtraData()
|
||||||
}
|
}
|
||||||
|
|
||||||
n := t.muxer.Input(t.header[t.headerSize:], track.Type(), len(data), 0, 0, false, true)
|
n := t.muxer.Input(t.header[t.headerSize:], track.Type(), len(data), 0, 0, false, true)
|
||||||
|
@@ -80,17 +80,15 @@ func connectSource(source string, addr string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createSource(source, transport, setup string, ssrc uint32) int {
|
func createSource(source, setup string, ssrc uint32) (string, uint16) {
|
||||||
v := struct {
|
v := struct {
|
||||||
Source string `json:"source"` //SourceId
|
Source string `json:"source"` //SourceId
|
||||||
Transport string `json:"transport,omitempty"`
|
Setup string `json:"setup"` //active/passive
|
||||||
Setup string `json:"setup"` //active/passive
|
SSRC uint32 `json:"ssrc,omitempty"`
|
||||||
SSRC uint32 `json:"ssrc,omitempty"`
|
|
||||||
}{
|
}{
|
||||||
Source: source,
|
Source: source,
|
||||||
Transport: transport,
|
Setup: setup,
|
||||||
Setup: setup,
|
SSRC: ssrc,
|
||||||
SSRC: ssrc,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
marshal, err := json.Marshal(v)
|
marshal, err := json.Marshal(v)
|
||||||
@@ -98,7 +96,7 @@ func createSource(source, transport, setup string, ssrc uint32) int {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
request, err := http.NewRequest("POST", "http://localhost:8080/v1/gb28181/source/create", bytes.NewBuffer(marshal))
|
request, err := http.NewRequest("POST", "http://localhost:8080/api/v1/gb28181/source/create", bytes.NewBuffer(marshal))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@@ -108,46 +106,46 @@ func createSource(source, transport, setup string, ssrc uint32) int {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
if response.StatusCode != http.StatusOK {
|
||||||
|
panic("")
|
||||||
|
}
|
||||||
|
|
||||||
all, err := io.ReadAll(response.Body)
|
all, err := io.ReadAll(response.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resposne := &struct {
|
connectInfo := &struct {
|
||||||
Code int `json:"code"`
|
Code int `json:"code"`
|
||||||
Msg string `json:"msg"`
|
Msg string `json:"msg"`
|
||||||
Data struct {
|
Data struct {
|
||||||
Port int `json:"port"`
|
IP string `json:"ip"`
|
||||||
} `json:"data"`
|
Port uint16 `json:"port,omitempty"`
|
||||||
|
}
|
||||||
}{}
|
}{}
|
||||||
|
|
||||||
err = json.Unmarshal(all, resposne)
|
err = json.Unmarshal(all, connectInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if resposne.Code != http.StatusOK {
|
return connectInfo.Data.IP, connectInfo.Data.Port
|
||||||
panic("")
|
|
||||||
}
|
|
||||||
|
|
||||||
return resposne.Data.Port
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 使用wireshark直接导出udp流
|
||||||
|
// 根据ssrc来查找每个rtp包, rtp不要带扩展字段
|
||||||
func TestUDPRecv(t *testing.T) {
|
func TestUDPRecv(t *testing.T) {
|
||||||
path := "D:\\GOProjects\\avformat\\gb28181_h264.rtp"
|
path := "D:\\GOProjects\\avformat\\gb28181_h265.rtp"
|
||||||
ssrc := 0xBEBC201
|
ssrc := 0xBEBC202
|
||||||
ip := "192.168.2.148"
|
|
||||||
localAddr := "0.0.0.0:20001"
|
localAddr := "0.0.0.0:20001"
|
||||||
network := "tcp"
|
setup := "udp" //udp/passive/active
|
||||||
setup := "passive"
|
|
||||||
id := "hls_mystream"
|
id := "hls_mystream"
|
||||||
|
|
||||||
port := createSource(id, network, setup, uint32(ssrc))
|
ip, port := createSource(id, setup, uint32(ssrc))
|
||||||
|
|
||||||
if network == "udp" {
|
if setup == "udp" {
|
||||||
addr, _ := net.ResolveUDPAddr(network, localAddr)
|
addr, _ := net.ResolveUDPAddr("udp", localAddr)
|
||||||
remoteAddr, _ := net.ResolveUDPAddr(network, fmt.Sprintf("%s:%d", ip, port))
|
remoteAddr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ip, port))
|
||||||
|
|
||||||
client := &transport.UDPClient{}
|
client := &transport.UDPClient{}
|
||||||
err := client.Connect(addr, remoteAddr)
|
err := client.Connect(addr, remoteAddr)
|
||||||
@@ -160,8 +158,8 @@ func TestUDPRecv(t *testing.T) {
|
|||||||
time.Sleep(1 * time.Millisecond)
|
time.Sleep(1 * time.Millisecond)
|
||||||
})
|
})
|
||||||
} else if !(setup == "active") {
|
} else if !(setup == "active") {
|
||||||
addr, _ := net.ResolveTCPAddr(network, localAddr)
|
addr, _ := net.ResolveTCPAddr("tcp", localAddr)
|
||||||
remoteAddr, _ := net.ResolveTCPAddr(network, fmt.Sprintf("%s:%d", ip, port))
|
remoteAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port))
|
||||||
|
|
||||||
client := transport.TCPClient{}
|
client := transport.TCPClient{}
|
||||||
err := client.Connect(addr, remoteAddr)
|
err := client.Connect(addr, remoteAddr)
|
||||||
@@ -175,14 +173,16 @@ func TestUDPRecv(t *testing.T) {
|
|||||||
time.Sleep(1 * time.Millisecond)
|
time.Sleep(1 * time.Millisecond)
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
addr, _ := net.ResolveTCPAddr(network, localAddr)
|
addr, _ := net.ResolveTCPAddr("tcp", localAddr)
|
||||||
server := transport.TCPServer{}
|
server := transport.TCPServer{}
|
||||||
|
|
||||||
server.SetHandler2(func(conn net.Conn) {
|
server.SetHandler2(func(conn net.Conn) []byte {
|
||||||
readRtp(path, uint32(ssrc), true, func(data []byte) {
|
readRtp(path, uint32(ssrc), true, func(data []byte) {
|
||||||
conn.Write(data)
|
conn.Write(data)
|
||||||
time.Sleep(1 * time.Millisecond)
|
time.Sleep(1 * time.Millisecond)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
}, nil, nil)
|
}, nil, nil)
|
||||||
|
|
||||||
err := server.Bind(addr)
|
err := server.Bind(addr)
|
||||||
@@ -190,8 +190,7 @@ func TestUDPRecv(t *testing.T) {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
connectSource(id, "192.168.2.148:20001")
|
connectSource(id, fmt.Sprintf("%s:%d", ip, port))
|
||||||
//
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
|
@@ -126,7 +126,7 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.Record(), codecData)
|
source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
|
||||||
stream_ = source.videoStream
|
stream_ = source.videoStream
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,13 +139,13 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
codecData, err := utils.NewHevcCodecData(vps, sps, pps)
|
codecData, err := utils.NewHEVCCodecData(vps, sps, pps)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Sugar.Errorf("解析sps pps失败 source:%s data:%s vps:%s sps:%s, pps:%s", source.Id_, hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps))
|
log.Sugar.Errorf("解析sps pps失败 source:%s data:%s vps:%s sps:%s, pps:%s", source.Id_, hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.Record(), codecData)
|
source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
|
||||||
stream_ = source.videoStream
|
stream_ = source.videoStream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -75,7 +75,7 @@ func (t *transStream) AddTrack(stream utils.AVStream) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if stream.CodecId() == utils.AVCodecIdH264 {
|
if stream.CodecId() == utils.AVCodecIdH264 {
|
||||||
data := stream.CodecParameters().DecoderConfRecord().ToAnnexB()
|
data := stream.CodecParameters().AnnexBExtraData()
|
||||||
_, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), data)
|
_, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), data)
|
||||||
} else {
|
} else {
|
||||||
_, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), stream.Extra())
|
_, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), stream.Extra())
|
||||||
@@ -178,6 +178,11 @@ func (t *transStream) createSegment() error {
|
|||||||
func (t *transStream) Close() error {
|
func (t *transStream) Close() error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
if t.muxer != nil {
|
||||||
|
t.muxer.Close()
|
||||||
|
t.muxer = nil
|
||||||
|
}
|
||||||
|
|
||||||
if t.context.file != nil {
|
if t.context.file != nil {
|
||||||
err = t.flushSegment()
|
err = t.flushSegment()
|
||||||
err = t.context.file.Close()
|
err = t.context.file.Close()
|
||||||
|
2
main.go
2
main.go
@@ -79,7 +79,7 @@ func NewDefaultAppConfig() stream.AppConfig_ {
|
|||||||
},
|
},
|
||||||
|
|
||||||
Hook: stream.HookConfig{
|
Hook: stream.HookConfig{
|
||||||
Enable: true,
|
Enable: false,
|
||||||
Timeout: int64(60 * time.Second),
|
Timeout: int64(60 * time.Second),
|
||||||
OnPublishUrl: "http://localhost:9000/api/v1/hook/on_publish",
|
OnPublishUrl: "http://localhost:9000/api/v1/hook/on_publish",
|
||||||
OnPublishDoneUrl: "http://localhost:9000/api/v1/hook/on_publish_done",
|
OnPublishDoneUrl: "http://localhost:9000/api/v1/hook/on_publish_done",
|
||||||
|
@@ -31,7 +31,7 @@ func (t *transStream) Input(packet utils.AVPacket) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if packet.KeyFrame() {
|
if packet.KeyFrame() {
|
||||||
extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().DecoderConfRecord().ToAnnexB()
|
extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData()
|
||||||
sink_.input(packet.Index(), extra, 0)
|
sink_.input(packet.Index(), extra, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -56,3 +56,8 @@ func (p *Publisher) OnPartPacket(index int, mediaType utils.AVMediaType, data []
|
|||||||
|
|
||||||
buffer.Write(data)
|
buffer.Write(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) Close() {
|
||||||
|
p.PublishSource.Close()
|
||||||
|
p.stack = nil
|
||||||
|
}
|
||||||
|
@@ -56,10 +56,7 @@ func (s *server) OnPacket(conn net.Conn, data []byte) []byte {
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Sugar.Errorf("处理rtmp包失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String())
|
log.Sugar.Errorf("处理rtmp包失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String())
|
||||||
|
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
t.Data.(*Session).Close()
|
|
||||||
t.Data = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if session.isPublisher {
|
if session.isPublisher {
|
||||||
|
@@ -10,10 +10,8 @@ import (
|
|||||||
|
|
||||||
// Session 负责除连接和断开以外的所有RTMP生命周期处理
|
// Session 负责除连接和断开以外的所有RTMP生命周期处理
|
||||||
type Session struct {
|
type Session struct {
|
||||||
//解析rtmp协议栈
|
stack *librtmp.Stack //rtmp协议栈
|
||||||
stack *librtmp.Stack
|
handle interface{} //Publisher/sink, 在publish或play成功后赋值
|
||||||
//Publisher/sink, 在publish或play成功后赋值
|
|
||||||
handle interface{}
|
|
||||||
isPublisher bool
|
isPublisher bool
|
||||||
|
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
@@ -89,9 +87,12 @@ func (s *Session) Input(conn net.Conn, data []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Session) Close() {
|
func (s *Session) Close() {
|
||||||
|
//session/conn/stack相关引用, go释放不了...手动赋值为nil
|
||||||
|
s.conn = nil
|
||||||
//释放协议栈
|
//释放协议栈
|
||||||
if s.stack != nil {
|
if s.stack != nil {
|
||||||
s.stack.Close()
|
s.stack.Close()
|
||||||
|
s.stack = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//还没到publish/play
|
//还没到publish/play
|
||||||
@@ -105,6 +106,7 @@ func (s *Session) Close() {
|
|||||||
|
|
||||||
if s.isPublisher {
|
if s.isPublisher {
|
||||||
s.handle.(*Publisher).Close()
|
s.handle.(*Publisher).Close()
|
||||||
|
s.receiveBuffer = nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sink := s.handle.(stream.Sink)
|
sink := s.handle.(stream.Sink)
|
||||||
|
@@ -158,7 +158,7 @@ func (t *transStream) WriteHeader() error {
|
|||||||
if videoStream != nil {
|
if videoStream != nil {
|
||||||
tmp := n
|
tmp := n
|
||||||
n += t.muxer.WriteVideoData(t.header[n+12:], 0, false, true)
|
n += t.muxer.WriteVideoData(t.header[n+12:], 0, false, true)
|
||||||
extra := videoStream.CodecParameters().DecoderConfRecord().ToMP4VC()
|
extra := videoStream.CodecParameters().MP4ExtraData()
|
||||||
copy(t.header[n+12:], extra)
|
copy(t.header[n+12:], extra)
|
||||||
n += len(extra)
|
n += len(extra)
|
||||||
|
|
||||||
@@ -179,6 +179,7 @@ func (t *transStream) Close() error {
|
|||||||
if len(segment) > 0 {
|
if len(segment) > 0 {
|
||||||
t.SendPacket(segment)
|
t.SendPacket(segment)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -4,7 +4,6 @@ import (
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/yangjiechina/avformat/libavc"
|
"github.com/yangjiechina/avformat/libavc"
|
||||||
"github.com/yangjiechina/avformat/libhevc"
|
|
||||||
"github.com/yangjiechina/avformat/librtp"
|
"github.com/yangjiechina/avformat/librtp"
|
||||||
"github.com/yangjiechina/avformat/librtsp/sdp"
|
"github.com/yangjiechina/avformat/librtsp/sdp"
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
@@ -109,12 +108,12 @@ func (t *tranStream) Input(packet utils.AVPacket) error {
|
|||||||
parameters := t.BaseTransStream.Tracks[packet.Index()].CodecParameters()
|
parameters := t.BaseTransStream.Tracks[packet.Index()].CodecParameters()
|
||||||
|
|
||||||
if utils.AVCodecIdH265 == packet.CodecId() {
|
if utils.AVCodecIdH265 == packet.CodecId() {
|
||||||
bytes := parameters.DecoderConfRecord().(*libhevc.HEVCDecoderConfRecord).VPS
|
bytes := parameters.(*utils.HEVCCodecData).VPS()
|
||||||
stream_.muxer.Input(bytes[0], uint32(packet.ConvertPts(stream_.rate)))
|
stream_.muxer.Input(bytes[0], uint32(packet.ConvertPts(stream_.rate)))
|
||||||
}
|
}
|
||||||
|
|
||||||
spsBytes := parameters.DecoderConfRecord().SPSBytes()
|
spsBytes := parameters.SPS()
|
||||||
ppsBytes := parameters.DecoderConfRecord().PPSBytes()
|
ppsBytes := parameters.PPS()
|
||||||
|
|
||||||
stream_.muxer.Input(spsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
|
stream_.muxer.Input(spsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
|
||||||
stream_.muxer.Input(ppsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
|
stream_.muxer.Input(ppsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
|
||||||
|
@@ -18,6 +18,8 @@ type GOPBuffer interface {
|
|||||||
Size() int
|
Size() int
|
||||||
|
|
||||||
Clear()
|
Clear()
|
||||||
|
|
||||||
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamBuffer struct {
|
type streamBuffer struct {
|
||||||
@@ -107,3 +109,7 @@ func (s *streamBuffer) Size() int {
|
|||||||
func (s *streamBuffer) Clear() {
|
func (s *streamBuffer) Clear() {
|
||||||
s.discard()
|
s.discard()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *streamBuffer) Close() {
|
||||||
|
s.discardHandler = nil
|
||||||
|
}
|
||||||
|
@@ -25,41 +25,36 @@ type MemoryPool interface {
|
|||||||
// Fetch 获取当前内存块,必须先调用Mark函数
|
// Fetch 获取当前内存块,必须先调用Mark函数
|
||||||
Fetch() []byte
|
Fetch() []byte
|
||||||
|
|
||||||
// Reset 清空本次写入的数据,本次缓存的数据无效
|
// Reset 清空本次流程写入的还未生效内存块
|
||||||
Reset()
|
Reset()
|
||||||
|
|
||||||
// Reserve 预留指定大小的内存空间
|
// Reserve 预留指定大小的内存块
|
||||||
//主要是为了和实现和Write相似功能,但是不拷贝, 所以使用流程和Write一样.
|
//主要是为了和实现和Write相似功能,但是不拷贝, 所以使用流程和Write一样.
|
||||||
Reserve(size int)
|
Reserve(size int)
|
||||||
|
|
||||||
// FreeHead 从头部释放一块内存
|
// FreeHead 释放头部一块内存
|
||||||
FreeHead()
|
FreeHead()
|
||||||
|
|
||||||
// FreeTail 从尾部释放一块内存
|
// FreeTail 释放尾部一块内存
|
||||||
FreeTail()
|
FreeTail()
|
||||||
|
|
||||||
|
// Data 返回头尾已使用的内存块
|
||||||
Data() ([]byte, []byte)
|
Data() ([]byte, []byte)
|
||||||
|
|
||||||
// Clear 清空所有内存块
|
// Clear 清空所有内存块
|
||||||
Clear()
|
Clear()
|
||||||
|
|
||||||
Empty() bool
|
|
||||||
|
|
||||||
Capacity() int
|
|
||||||
|
|
||||||
Size() int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type memoryPool struct {
|
type memoryPool struct {
|
||||||
data []byte
|
data []byte
|
||||||
capacity int //实际的可用容量,当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用.
|
capacity int //实际的可用容量,当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用.
|
||||||
head int
|
head int //起始索引
|
||||||
tail int
|
tail int //末尾索引, 当形成回环时, 会小于起始索引
|
||||||
|
|
||||||
markIndex int //保存开始索引
|
markIndex int //分配内存块的起始索引, 一定小于末尾索引, data[markIndex:tail]此次分配的内存块
|
||||||
marked bool
|
marked bool
|
||||||
blockQueue *Queue
|
blockQueue *Queue
|
||||||
discardBlockCount int
|
discardBlockCount int //扩容时, 丢弃之前的内存块数量
|
||||||
recopy bool //扩容时,是否拷贝旧数据. 缓存AVPacket时, 内存已经被Data引用,所以不需要再拷贝旧数据. 用作合并写缓存时, 流还没有发送使用, 需要拷贝旧数据.
|
recopy bool //扩容时,是否拷贝旧数据. 缓存AVPacket时, 内存已经被Data引用,所以不需要再拷贝旧数据. 用作合并写缓存时, 流还没有发送使用, 需要拷贝旧数据.
|
||||||
isFull func(int) bool
|
isFull func(int) bool
|
||||||
}
|
}
|
||||||
@@ -144,11 +139,19 @@ func (m *memoryPool) Reset() {
|
|||||||
m.tail = m.markIndex
|
m.tail = m.markIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryPool) FreeHead() {
|
func (m *memoryPool) freeOldBlocks() bool {
|
||||||
utils.Assert(!m.marked)
|
utils.Assert(!m.marked)
|
||||||
|
|
||||||
if m.discardBlockCount > 0 {
|
if m.discardBlockCount > 0 {
|
||||||
m.discardBlockCount--
|
m.discardBlockCount--
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryPool) FreeHead() {
|
||||||
|
if m.freeOldBlocks() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,35 +159,29 @@ func (m *memoryPool) FreeHead() {
|
|||||||
size := m.blockQueue.Pop().(int)
|
size := m.blockQueue.Pop().(int)
|
||||||
m.head += size
|
m.head += size
|
||||||
|
|
||||||
if m.head == m.tail {
|
|
||||||
m.head = 0
|
|
||||||
m.tail = 0
|
|
||||||
} else if m.head >= m.capacity {
|
|
||||||
m.head = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.blockQueue.IsEmpty() {
|
if m.blockQueue.IsEmpty() {
|
||||||
m.markIndex = 0
|
m.Clear()
|
||||||
|
} else if m.head >= m.capacity {
|
||||||
|
//清空末尾, 从头开始
|
||||||
|
m.head = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryPool) FreeTail() {
|
func (m *memoryPool) FreeTail() {
|
||||||
utils.Assert(!m.marked)
|
if m.freeOldBlocks() {
|
||||||
|
|
||||||
if m.discardBlockCount > 0 {
|
|
||||||
m.discardBlockCount--
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
utils.Assert(!m.blockQueue.IsEmpty())
|
utils.Assert(!m.blockQueue.IsEmpty())
|
||||||
size := m.blockQueue.PopBack().(int)
|
size := m.blockQueue.PopBack().(int)
|
||||||
m.tail -= size
|
m.tail -= size
|
||||||
if m.tail == 0 && !m.blockQueue.IsEmpty() {
|
|
||||||
m.tail = m.capacity
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.blockQueue.IsEmpty() {
|
if m.blockQueue.IsEmpty() {
|
||||||
m.markIndex = 0
|
m.Clear()
|
||||||
|
} else if m.tail == 0 {
|
||||||
|
//回环回到线性
|
||||||
|
m.tail = m.capacity
|
||||||
|
m.capacity = cap(m.data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,17 +204,3 @@ func (m *memoryPool) Clear() {
|
|||||||
m.blockQueue.Clear()
|
m.blockQueue.Clear()
|
||||||
m.discardBlockCount = 0
|
m.discardBlockCount = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryPool) Empty() bool {
|
|
||||||
utils.Assert(!m.marked)
|
|
||||||
return m.blockQueue.Size() < 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryPool) Capacity() int {
|
|
||||||
return m.capacity
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryPool) Size() int {
|
|
||||||
head, tail := m.Data()
|
|
||||||
return len(head) + len(tail)
|
|
||||||
}
|
|
||||||
|
@@ -14,7 +14,7 @@ func NewDirectMemoryPool(capacity int) MemoryPool {
|
|||||||
pool.memoryPool = &memoryPool{
|
pool.memoryPool = &memoryPool{
|
||||||
data: make([]byte, capacity),
|
data: make([]byte, capacity),
|
||||||
capacity: capacity,
|
capacity: capacity,
|
||||||
blockQueue: NewQueue(capacity),
|
blockQueue: NewQueue(2048),
|
||||||
recopy: true,
|
recopy: true,
|
||||||
isFull: pool.isFull,
|
isFull: pool.isFull,
|
||||||
}
|
}
|
||||||
|
@@ -11,6 +11,16 @@ func (m *rbMemoryPool) isFull(size int) bool {
|
|||||||
//头部有大小合适的内存空间
|
//头部有大小合适的内存空间
|
||||||
} else if !over && m.capacity-m.tail >= size {
|
} else if !over && m.capacity-m.tail >= size {
|
||||||
//尾部有大小合适的内存空间
|
//尾部有大小合适的内存空间
|
||||||
|
} else if !over && m.head > size {
|
||||||
|
//形成回环
|
||||||
|
|
||||||
|
//修改有效内存容量大小
|
||||||
|
m.capacity = m.markIndex
|
||||||
|
//拷贝之前的数据
|
||||||
|
incompleteBlockSize := m.tail - m.markIndex
|
||||||
|
copy(m.data, m.data[m.markIndex:m.tail])
|
||||||
|
m.markIndex = 0
|
||||||
|
m.tail = incompleteBlockSize
|
||||||
} else {
|
} else {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -23,7 +33,7 @@ func NewRbMemoryPool(capacity int) MemoryPool {
|
|||||||
pool.memoryPool = &memoryPool{
|
pool.memoryPool = &memoryPool{
|
||||||
data: make([]byte, capacity),
|
data: make([]byte, capacity),
|
||||||
capacity: capacity,
|
capacity: capacity,
|
||||||
blockQueue: NewQueue(capacity),
|
blockQueue: NewQueue(2048),
|
||||||
recopy: false,
|
recopy: false,
|
||||||
isFull: pool.isFull,
|
isFull: pool.isFull,
|
||||||
}
|
}
|
||||||
|
@@ -213,6 +213,7 @@ func (s *BaseSink) Close() {
|
|||||||
go HookPlayDoneEvent(s)
|
go HookPlayDoneEvent(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BaseSink) PrintInfo() string {
|
func (s *BaseSink) PrintInfo() string {
|
||||||
return fmt.Sprintf("%s-%v source:%s", s.Protocol().ToString(), s.Id_, s.SourceId_)
|
return fmt.Sprintf("%s-%v source:%s", s.Protocol().ToString(), s.Id_, s.SourceId_)
|
||||||
}
|
}
|
||||||
@@ -228,6 +229,7 @@ func (s *BaseSink) RemoteAddr() string {
|
|||||||
func (s *BaseSink) UrlValues() url.Values {
|
func (s *BaseSink) UrlValues() url.Values {
|
||||||
return s.urlValues
|
return s.urlValues
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BaseSink) SetUrlValues(values url.Values) {
|
func (s *BaseSink) SetUrlValues(values url.Values) {
|
||||||
s.urlValues = values
|
s.urlValues = values
|
||||||
}
|
}
|
||||||
|
@@ -441,6 +441,12 @@ func (s *PublishSource) doClose() {
|
|||||||
|
|
||||||
if s.Conn != nil {
|
if s.Conn != nil {
|
||||||
s.Conn.Close()
|
s.Conn.Close()
|
||||||
|
s.Conn = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.TransDeMuxer != nil {
|
||||||
|
s.TransDeMuxer.Close()
|
||||||
|
s.TransDeMuxer = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//清空未写完的buffer
|
//清空未写完的buffer
|
||||||
@@ -453,13 +459,18 @@ func (s *PublishSource) doClose() {
|
|||||||
//释放GOP缓存
|
//释放GOP缓存
|
||||||
if s.gopBuffer != nil {
|
if s.gopBuffer != nil {
|
||||||
s.gopBuffer.Clear()
|
s.gopBuffer.Clear()
|
||||||
|
s.gopBuffer.Close()
|
||||||
|
s.gopBuffer = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.probeTimer != nil {
|
if s.probeTimer != nil {
|
||||||
s.probeTimer.Stop()
|
s.probeTimer.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.receiveDataTimer != nil {
|
if s.receiveDataTimer != nil {
|
||||||
s.receiveDataTimer.Stop()
|
s.receiveDataTimer.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.idleTimer != nil {
|
if s.idleTimer != nil {
|
||||||
s.idleTimer.Stop()
|
s.idleTimer.Stop()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user