From bca6756cd61cd308b49e4405ca38b418e8c56901 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 17 Feb 2022 22:25:01 +0100 Subject: [PATCH] improve performance --- client.go | 8 ++++--- serverconn.go | 28 +++++++++++----------- serversession.go | 1 + serverudpl.go | 60 +++++++++++++++++++++++++++++++++--------------- 4 files changed, 61 insertions(+), 36 deletions(-) diff --git a/client.go b/client.go index 9b7e399e..8691d9c9 100644 --- a/client.go +++ b/client.go @@ -214,6 +214,7 @@ type Client struct { tracksByChannel map[int]int lastRange *headers.Range tcpReadBuffer *multibuffer.MultiBuffer + tcpRTPPacketBuffer *rtpPacketMultiBuffer writeMutex sync.RWMutex // publish writeFrameAllowed bool // publish udpReportTimer *time.Timer @@ -760,14 +761,14 @@ func (c *Client) runReader() { atomic.StoreInt64(c.tcpLastFrameTime, now.Unix()) if isRTP { - var pkt rtp.Packet + pkt := c.tcpRTPPacketBuffer.next() err := pkt.Unmarshal(payload) if err != nil { return } - c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, &pkt) - c.OnPacketRTP(trackID, &pkt) + c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, pkt) + c.OnPacketRTP(trackID, pkt) } else { packets, err := rtcp.Unmarshal(payload) if err != nil { @@ -1536,6 +1537,7 @@ func (c *Client) doSetup( case TransportTCP: if c.tcpReadBuffer == nil { c.tcpReadBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)) + c.tcpRTPPacketBuffer = newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)) } if c.tracksByChannel == nil { diff --git a/serverconn.go b/serverconn.go index 8b587815..b4b0867b 100644 --- a/serverconn.go +++ b/serverconn.go @@ -11,7 +11,6 @@ import ( "time" "github.com/pion/rtcp" - "github.com/pion/rtp" "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/liberrors" @@ -35,17 +34,18 @@ type ServerConn struct { s *Server conn net.Conn - ctx context.Context - ctxCancel func() - remoteAddr *net.TCPAddr - br *bufio.Reader - sessions map[string]*ServerSession - tcpFrameEnabled bool - tcpSession *ServerSession - tcpFrameTimeout bool - tcpReadBuffer *multibuffer.MultiBuffer - tcpProcessFunc func(int, bool, []byte) - tcpWriterRunning bool + ctx context.Context + ctxCancel func() + remoteAddr *net.TCPAddr + br *bufio.Reader + sessions map[string]*ServerSession + tcpFrameEnabled bool + tcpSession *ServerSession + tcpFrameTimeout bool + tcpReadBuffer *multibuffer.MultiBuffer + tcpRTPPacketBuffer *rtpPacketMultiBuffer + tcpProcessFunc func(int, bool, []byte) + tcpWriterRunning bool // in sessionRemove chan *ServerSession @@ -260,7 +260,7 @@ func (sc *ServerConn) tcpProcessPlay(trackID int, isRTP bool, payload []byte) { func (sc *ServerConn) tcpProcessRecord(trackID int, isRTP bool, payload []byte) { if isRTP { - var pkt rtp.Packet + pkt := sc.tcpRTPPacketBuffer.next() err := pkt.Unmarshal(payload) if err != nil { return @@ -270,7 +270,7 @@ func (sc *ServerConn) tcpProcessRecord(trackID int, isRTP bool, payload []byte) h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ Session: sc.tcpSession, TrackID: trackID, - Packet: &pkt, + Packet: pkt, }) } } else { diff --git a/serversession.go b/serversession.go index b24e1e94..270a1ff0 100644 --- a/serversession.go +++ b/serversession.go @@ -1019,6 +1019,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.tcpConn.tcpFrameEnabled = true ss.tcpConn.tcpFrameTimeout = true ss.tcpConn.tcpReadBuffer = multibuffer.New(uint64(sc.s.ReadBufferCount), uint64(sc.s.ReadBufferSize)) + ss.tcpConn.tcpRTPPacketBuffer = newRTPPacketMultiBuffer(uint64(sc.s.ReadBufferCount)) ss.tcpConn.tcpProcessFunc = sc.tcpProcessRecord // when recording, writeBuffer is only used to send RTCP receiver reports, diff --git a/serverudpl.go b/serverudpl.go index f662b671..3531647d 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -15,6 +15,26 @@ import ( "github.com/aler9/gortsplib/pkg/multibuffer" ) +type rtpPacketMultiBuffer struct { + count uint64 + buffers []rtp.Packet + cur uint64 +} + +func newRTPPacketMultiBuffer(count uint64) *rtpPacketMultiBuffer { + buffers := make([]rtp.Packet, count) + return &rtpPacketMultiBuffer{ + count: count, + buffers: buffers, + } +} + +func (mb *rtpPacketMultiBuffer) next() *rtp.Packet { + ret := &mb.buffers[mb.cur%mb.count] + mb.cur++ + return ret +} + type clientData struct { ss *ServerSession trackID int @@ -40,14 +60,15 @@ func (p *clientAddr) fill(ip net.IP, port int) { type serverUDPListener struct { s *Server - pc *net.UDPConn - listenIP net.IP - isRTP bool - writeTimeout time.Duration - readBuffer *multibuffer.MultiBuffer - clientsMutex sync.RWMutex - clients map[clientAddr]*clientData - processFunc func(*clientData, []byte) + pc *net.UDPConn + listenIP net.IP + isRTP bool + writeTimeout time.Duration + readBuffer *multibuffer.MultiBuffer + rtpPacketBuffer *rtpPacketMultiBuffer + clientsMutex sync.RWMutex + clients map[clientAddr]*clientData + processFunc func(*clientData, []byte) readerDone chan struct{} } @@ -135,14 +156,15 @@ func newServerUDPListener( } u := &serverUDPListener{ - s: s, - pc: pc, - listenIP: listenIP, - clients: make(map[clientAddr]*clientData), - isRTP: isRTP, - writeTimeout: s.WriteTimeout, - readBuffer: multibuffer.New(uint64(s.ReadBufferCount), uint64(s.ReadBufferSize)), - readerDone: make(chan struct{}), + s: s, + pc: pc, + listenIP: listenIP, + clients: make(map[clientAddr]*clientData), + isRTP: isRTP, + writeTimeout: s.WriteTimeout, + readBuffer: multibuffer.New(uint64(s.ReadBufferCount), uint64(s.ReadBufferSize)), + rtpPacketBuffer: newRTPPacketMultiBuffer(uint64(s.ReadBufferCount)), + readerDone: make(chan struct{}), } if isRTP { @@ -196,7 +218,7 @@ func (u *serverUDPListener) runReader() { } func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) { - var pkt rtp.Packet + pkt := u.rtpPacketBuffer.next() err := pkt.Unmarshal(payload) if err != nil { return @@ -204,13 +226,13 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) { now := time.Now() atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) - clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, &pkt) + clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, pkt) if h, ok := u.s.Handler.(ServerHandlerOnPacketRTP); ok { h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ Session: clientData.ss, TrackID: clientData.trackID, - Packet: &pkt, + Packet: pkt, }) } }