diff --git a/api.go b/api.go index 191bf1a..9ba9d28 100644 --- a/api.go +++ b/api.go @@ -16,6 +16,7 @@ import ( "net" "net/http" "os" + "runtime" "strings" "sync" "time" @@ -73,6 +74,10 @@ func startApiServer(addr string) { //TCP主动,设置连接地址 apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", apiServer.connectGBSource) apiServer.router.HandleFunc("/api/v1/gb28181/source/close", apiServer.closeGBSource) + apiServer.router.HandleFunc("/api/v1/gb28181/gc/force", func(writer http.ResponseWriter, request *http.Request) { + runtime.GC() + writer.WriteHeader(http.StatusOK) + }) apiServer.router.HandleFunc("/rtc.html", func(writer http.ResponseWriter, request *http.Request) { http.ServeFile(writer, request, "./rtc.html") diff --git a/flv/http_flv.go b/flv/http_flv.go index fe54a08..ce07329 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -104,8 +104,8 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error { } else if utils.AVMediaTypeVideo == stream.Type() { t.muxer.AddVideoTrack(stream.CodecId()) - t.muxer.AddProperty("width", stream.CodecParameters().SPSInfo().Width()) - t.muxer.AddProperty("height", stream.CodecParameters().SPSInfo().Height()) + t.muxer.AddProperty("width", stream.CodecParameters().Width()) + t.muxer.AddProperty("height", stream.CodecParameters().Height()) } return nil } @@ -187,7 +187,7 @@ func (t *httpTransStream) WriteHeader() error { if utils.AVMediaTypeAudio == track.Type() { data = track.Extra() } else if utils.AVMediaTypeVideo == track.Type() { - data = track.CodecParameters().DecoderConfRecord().ToMP4VC() + data = track.CodecParameters().MP4ExtraData() } n := t.muxer.Input(t.header[t.headerSize:], track.Type(), len(data), 0, 0, false, true) diff --git a/gb28181/gb28181_test.go b/gb28181/gb28181_test.go index 1a72ad2..e435937 100644 --- a/gb28181/gb28181_test.go +++ b/gb28181/gb28181_test.go @@ -80,17 +80,15 @@ func connectSource(source string, addr string) { } } -func createSource(source, transport, setup string, ssrc uint32) int { +func createSource(source, setup string, ssrc uint32) (string, uint16) { v := struct { - Source string `json:"source"` //SourceId - Transport string `json:"transport,omitempty"` - Setup string `json:"setup"` //active/passive - SSRC uint32 `json:"ssrc,omitempty"` + Source string `json:"source"` //SourceId + Setup string `json:"setup"` //active/passive + SSRC uint32 `json:"ssrc,omitempty"` }{ - Source: source, - Transport: transport, - Setup: setup, - SSRC: ssrc, + Source: source, + Setup: setup, + SSRC: ssrc, } marshal, err := json.Marshal(v) @@ -98,7 +96,7 @@ func createSource(source, transport, setup string, ssrc uint32) int { panic(err) } - request, err := http.NewRequest("POST", "http://localhost:8080/v1/gb28181/source/create", bytes.NewBuffer(marshal)) + request, err := http.NewRequest("POST", "http://localhost:8080/api/v1/gb28181/source/create", bytes.NewBuffer(marshal)) if err != nil { panic(err) } @@ -108,46 +106,46 @@ func createSource(source, transport, setup string, ssrc uint32) int { if err != nil { panic(err) } + if response.StatusCode != http.StatusOK { + panic("") + } all, err := io.ReadAll(response.Body) if err != nil { panic(err) } - resposne := &struct { + connectInfo := &struct { Code int `json:"code"` Msg string `json:"msg"` Data struct { - Port int `json:"port"` - } `json:"data"` + IP string `json:"ip"` + Port uint16 `json:"port,omitempty"` + } }{} - err = json.Unmarshal(all, resposne) + err = json.Unmarshal(all, connectInfo) if err != nil { panic(err) } - if resposne.Code != http.StatusOK { - panic("") - } - - return resposne.Data.Port + return connectInfo.Data.IP, connectInfo.Data.Port } +// 使用wireshark直接导出udp流 +// 根据ssrc来查找每个rtp包, rtp不要带扩展字段 func TestUDPRecv(t *testing.T) { - path := "D:\\GOProjects\\avformat\\gb28181_h264.rtp" - ssrc := 0xBEBC201 - ip := "192.168.2.148" + path := "D:\\GOProjects\\avformat\\gb28181_h265.rtp" + ssrc := 0xBEBC202 localAddr := "0.0.0.0:20001" - network := "tcp" - setup := "passive" + setup := "udp" //udp/passive/active id := "hls_mystream" - port := createSource(id, network, setup, uint32(ssrc)) + ip, port := createSource(id, setup, uint32(ssrc)) - if network == "udp" { - addr, _ := net.ResolveUDPAddr(network, localAddr) - remoteAddr, _ := net.ResolveUDPAddr(network, fmt.Sprintf("%s:%d", ip, port)) + if setup == "udp" { + addr, _ := net.ResolveUDPAddr("udp", localAddr) + remoteAddr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ip, port)) client := &transport.UDPClient{} err := client.Connect(addr, remoteAddr) @@ -160,8 +158,8 @@ func TestUDPRecv(t *testing.T) { time.Sleep(1 * time.Millisecond) }) } else if !(setup == "active") { - addr, _ := net.ResolveTCPAddr(network, localAddr) - remoteAddr, _ := net.ResolveTCPAddr(network, fmt.Sprintf("%s:%d", ip, port)) + addr, _ := net.ResolveTCPAddr("tcp", localAddr) + remoteAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port)) client := transport.TCPClient{} err := client.Connect(addr, remoteAddr) @@ -175,14 +173,16 @@ func TestUDPRecv(t *testing.T) { time.Sleep(1 * time.Millisecond) }) } else { - addr, _ := net.ResolveTCPAddr(network, localAddr) + addr, _ := net.ResolveTCPAddr("tcp", localAddr) server := transport.TCPServer{} - server.SetHandler2(func(conn net.Conn) { + server.SetHandler2(func(conn net.Conn) []byte { readRtp(path, uint32(ssrc), true, func(data []byte) { conn.Write(data) time.Sleep(1 * time.Millisecond) }) + + return nil }, nil, nil) err := server.Bind(addr) @@ -190,8 +190,7 @@ func TestUDPRecv(t *testing.T) { panic(err) } - connectSource(id, "192.168.2.148:20001") - // + connectSource(id, fmt.Sprintf("%s:%d", ip, port)) } select {} diff --git a/gb28181/source.go b/gb28181/source.go index fa8bb83..1792434 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -126,7 +126,7 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT return err } - source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.Record(), codecData) + source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData) stream_ = source.videoStream } @@ -139,13 +139,13 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT return err } - codecData, err := utils.NewHevcCodecData(vps, sps, pps) + codecData, err := utils.NewHEVCCodecData(vps, sps, pps) if err != nil { log.Sugar.Errorf("解析sps pps失败 source:%s data:%s vps:%s sps:%s, pps:%s", source.Id_, hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps)) return err } - source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.Record(), codecData) + source.videoStream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData) stream_ = source.videoStream } diff --git a/hls/hls_stream.go b/hls/hls_stream.go index 5bf83d9..beb9921 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -75,7 +75,7 @@ func (t *transStream) AddTrack(stream utils.AVStream) error { } if stream.CodecId() == utils.AVCodecIdH264 { - data := stream.CodecParameters().DecoderConfRecord().ToAnnexB() + data := stream.CodecParameters().AnnexBExtraData() _, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), data) } else { _, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), stream.Extra()) @@ -178,6 +178,11 @@ func (t *transStream) createSegment() error { func (t *transStream) Close() error { var err error + if t.muxer != nil { + t.muxer.Close() + t.muxer = nil + } + if t.context.file != nil { err = t.flushSegment() err = t.context.file.Close() diff --git a/main.go b/main.go index 171330c..ec3205e 100644 --- a/main.go +++ b/main.go @@ -79,7 +79,7 @@ func NewDefaultAppConfig() stream.AppConfig_ { }, Hook: stream.HookConfig{ - Enable: true, + Enable: false, Timeout: int64(60 * time.Second), OnPublishUrl: "http://localhost:9000/api/v1/hook/on_publish", OnPublishDoneUrl: "http://localhost:9000/api/v1/hook/on_publish_done", diff --git a/rtc/rtc_stream.go b/rtc/rtc_stream.go index 55199b1..96f3f1c 100644 --- a/rtc/rtc_stream.go +++ b/rtc/rtc_stream.go @@ -31,7 +31,7 @@ func (t *transStream) Input(packet utils.AVPacket) error { } if packet.KeyFrame() { - extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().DecoderConfRecord().ToAnnexB() + extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData() sink_.input(packet.Index(), extra, 0) } diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index edab375..2544599 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -56,3 +56,8 @@ func (p *Publisher) OnPartPacket(index int, mediaType utils.AVMediaType, data [] buffer.Write(data) } + +func (p *Publisher) Close() { + p.PublishSource.Close() + p.stack = nil +} diff --git a/rtmp/rtmp_server.go b/rtmp/rtmp_server.go index 79a7b41..183780f 100644 --- a/rtmp/rtmp_server.go +++ b/rtmp/rtmp_server.go @@ -56,10 +56,7 @@ func (s *server) OnPacket(conn net.Conn, data []byte) []byte { if err != nil { log.Sugar.Errorf("处理rtmp包失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String()) - _ = conn.Close() - t.Data.(*Session).Close() - t.Data = nil } if session.isPublisher { diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 38c54f4..90a3f44 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -10,10 +10,8 @@ import ( // Session 负责除连接和断开以外的所有RTMP生命周期处理 type Session struct { - //解析rtmp协议栈 - stack *librtmp.Stack - //Publisher/sink, 在publish或play成功后赋值 - handle interface{} + stack *librtmp.Stack //rtmp协议栈 + handle interface{} //Publisher/sink, 在publish或play成功后赋值 isPublisher bool conn net.Conn @@ -89,9 +87,12 @@ func (s *Session) Input(conn net.Conn, data []byte) error { } func (s *Session) Close() { + //session/conn/stack相关引用, go释放不了...手动赋值为nil + s.conn = nil //释放协议栈 if s.stack != nil { s.stack.Close() + s.stack = nil } //还没到publish/play @@ -105,6 +106,7 @@ func (s *Session) Close() { if s.isPublisher { s.handle.(*Publisher).Close() + s.receiveBuffer = nil } } else { sink := s.handle.(stream.Sink) diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index 056071a..6ea1398 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -158,7 +158,7 @@ func (t *transStream) WriteHeader() error { if videoStream != nil { tmp := n n += t.muxer.WriteVideoData(t.header[n+12:], 0, false, true) - extra := videoStream.CodecParameters().DecoderConfRecord().ToMP4VC() + extra := videoStream.CodecParameters().MP4ExtraData() copy(t.header[n+12:], extra) n += len(extra) @@ -179,6 +179,7 @@ func (t *transStream) Close() error { if len(segment) > 0 { t.SendPacket(segment) } + return nil } diff --git a/rtsp/rtsp_stream.go b/rtsp/rtsp_stream.go index 799e0ac..3c91779 100644 --- a/rtsp/rtsp_stream.go +++ b/rtsp/rtsp_stream.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "fmt" "github.com/yangjiechina/avformat/libavc" - "github.com/yangjiechina/avformat/libhevc" "github.com/yangjiechina/avformat/librtp" "github.com/yangjiechina/avformat/librtsp/sdp" "github.com/yangjiechina/avformat/utils" @@ -109,12 +108,12 @@ func (t *tranStream) Input(packet utils.AVPacket) error { parameters := t.BaseTransStream.Tracks[packet.Index()].CodecParameters() if utils.AVCodecIdH265 == packet.CodecId() { - bytes := parameters.DecoderConfRecord().(*libhevc.HEVCDecoderConfRecord).VPS + bytes := parameters.(*utils.HEVCCodecData).VPS() stream_.muxer.Input(bytes[0], uint32(packet.ConvertPts(stream_.rate))) } - spsBytes := parameters.DecoderConfRecord().SPSBytes() - ppsBytes := parameters.DecoderConfRecord().PPSBytes() + spsBytes := parameters.SPS() + ppsBytes := parameters.PPS() stream_.muxer.Input(spsBytes[0], uint32(packet.ConvertPts(stream_.rate))) stream_.muxer.Input(ppsBytes[0], uint32(packet.ConvertPts(stream_.rate))) diff --git a/stream/gop_buffer.go b/stream/gop_buffer.go index 514beee..ffb41ed 100644 --- a/stream/gop_buffer.go +++ b/stream/gop_buffer.go @@ -18,6 +18,8 @@ type GOPBuffer interface { Size() int Clear() + + Close() } type streamBuffer struct { @@ -107,3 +109,7 @@ func (s *streamBuffer) Size() int { func (s *streamBuffer) Clear() { s.discard() } + +func (s *streamBuffer) Close() { + s.discardHandler = nil +} diff --git a/stream/memory_pool.go b/stream/memory_pool.go index e86ca19..057007c 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -25,41 +25,36 @@ type MemoryPool interface { // Fetch 获取当前内存块,必须先调用Mark函数 Fetch() []byte - // Reset 清空本次写入的数据,本次缓存的数据无效 + // Reset 清空本次流程写入的还未生效内存块 Reset() - // Reserve 预留指定大小的内存空间 + // Reserve 预留指定大小的内存块 //主要是为了和实现和Write相似功能,但是不拷贝, 所以使用流程和Write一样. Reserve(size int) - // FreeHead 从头部释放一块内存 + // FreeHead 释放头部一块内存 FreeHead() - // FreeTail 从尾部释放一块内存 + // FreeTail 释放尾部一块内存 FreeTail() + // Data 返回头尾已使用的内存块 Data() ([]byte, []byte) // Clear 清空所有内存块 Clear() - - Empty() bool - - Capacity() int - - Size() int } type memoryPool struct { data []byte capacity int //实际的可用容量,当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用. - head int - tail int + head int //起始索引 + tail int //末尾索引, 当形成回环时, 会小于起始索引 - markIndex int //保存开始索引 + markIndex int //分配内存块的起始索引, 一定小于末尾索引, data[markIndex:tail]此次分配的内存块 marked bool blockQueue *Queue - discardBlockCount int + discardBlockCount int //扩容时, 丢弃之前的内存块数量 recopy bool //扩容时,是否拷贝旧数据. 缓存AVPacket时, 内存已经被Data引用,所以不需要再拷贝旧数据. 用作合并写缓存时, 流还没有发送使用, 需要拷贝旧数据. isFull func(int) bool } @@ -144,11 +139,19 @@ func (m *memoryPool) Reset() { m.tail = m.markIndex } -func (m *memoryPool) FreeHead() { +func (m *memoryPool) freeOldBlocks() bool { utils.Assert(!m.marked) if m.discardBlockCount > 0 { m.discardBlockCount-- + return true + } + + return false +} + +func (m *memoryPool) FreeHead() { + if m.freeOldBlocks() { return } @@ -156,35 +159,29 @@ func (m *memoryPool) FreeHead() { size := m.blockQueue.Pop().(int) m.head += size - if m.head == m.tail { - m.head = 0 - m.tail = 0 - } else if m.head >= m.capacity { - m.head = 0 - } - if m.blockQueue.IsEmpty() { - m.markIndex = 0 + m.Clear() + } else if m.head >= m.capacity { + //清空末尾, 从头开始 + m.head = 0 } } func (m *memoryPool) FreeTail() { - utils.Assert(!m.marked) - - if m.discardBlockCount > 0 { - m.discardBlockCount-- + if m.freeOldBlocks() { return } utils.Assert(!m.blockQueue.IsEmpty()) size := m.blockQueue.PopBack().(int) m.tail -= size - if m.tail == 0 && !m.blockQueue.IsEmpty() { - m.tail = m.capacity - } if m.blockQueue.IsEmpty() { - m.markIndex = 0 + m.Clear() + } else if m.tail == 0 { + //回环回到线性 + m.tail = m.capacity + m.capacity = cap(m.data) } } @@ -207,17 +204,3 @@ func (m *memoryPool) Clear() { m.blockQueue.Clear() m.discardBlockCount = 0 } - -func (m *memoryPool) Empty() bool { - utils.Assert(!m.marked) - return m.blockQueue.Size() < 1 -} - -func (m *memoryPool) Capacity() int { - return m.capacity -} - -func (m *memoryPool) Size() int { - head, tail := m.Data() - return len(head) + len(tail) -} diff --git a/stream/memorypool_direct.go b/stream/memorypool_direct.go index bf1edcc..4e84ce7 100644 --- a/stream/memorypool_direct.go +++ b/stream/memorypool_direct.go @@ -14,7 +14,7 @@ func NewDirectMemoryPool(capacity int) MemoryPool { pool.memoryPool = &memoryPool{ data: make([]byte, capacity), capacity: capacity, - blockQueue: NewQueue(capacity), + blockQueue: NewQueue(2048), recopy: true, isFull: pool.isFull, } diff --git a/stream/memorypool_rb.go b/stream/memorypool_rb.go index f5dc158..a221ef3 100644 --- a/stream/memorypool_rb.go +++ b/stream/memorypool_rb.go @@ -11,6 +11,16 @@ func (m *rbMemoryPool) isFull(size int) bool { //头部有大小合适的内存空间 } else if !over && m.capacity-m.tail >= size { //尾部有大小合适的内存空间 + } else if !over && m.head > size { + //形成回环 + + //修改有效内存容量大小 + m.capacity = m.markIndex + //拷贝之前的数据 + incompleteBlockSize := m.tail - m.markIndex + copy(m.data, m.data[m.markIndex:m.tail]) + m.markIndex = 0 + m.tail = incompleteBlockSize } else { return true } @@ -23,7 +33,7 @@ func NewRbMemoryPool(capacity int) MemoryPool { pool.memoryPool = &memoryPool{ data: make([]byte, capacity), capacity: capacity, - blockQueue: NewQueue(capacity), + blockQueue: NewQueue(2048), recopy: false, isFull: pool.isFull, } diff --git a/stream/sink.go b/stream/sink.go index 8c57ba8..420654a 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -213,6 +213,7 @@ func (s *BaseSink) Close() { go HookPlayDoneEvent(s) } } + func (s *BaseSink) PrintInfo() string { return fmt.Sprintf("%s-%v source:%s", s.Protocol().ToString(), s.Id_, s.SourceId_) } @@ -228,6 +229,7 @@ func (s *BaseSink) RemoteAddr() string { func (s *BaseSink) UrlValues() url.Values { return s.urlValues } + func (s *BaseSink) SetUrlValues(values url.Values) { s.urlValues = values } diff --git a/stream/source.go b/stream/source.go index 3641ff7..2e63f9b 100644 --- a/stream/source.go +++ b/stream/source.go @@ -441,6 +441,12 @@ func (s *PublishSource) doClose() { if s.Conn != nil { s.Conn.Close() + s.Conn = nil + } + + if s.TransDeMuxer != nil { + s.TransDeMuxer.Close() + s.TransDeMuxer = nil } //清空未写完的buffer @@ -453,13 +459,18 @@ func (s *PublishSource) doClose() { //释放GOP缓存 if s.gopBuffer != nil { s.gopBuffer.Clear() + s.gopBuffer.Close() + s.gopBuffer = nil } + if s.probeTimer != nil { s.probeTimer.Stop() } + if s.receiveDataTimer != nil { s.receiveDataTimer.Stop() } + if s.idleTimer != nil { s.idleTimer.Stop() }