From 5506eb2f7feeee1094338902948b4b37c58c4429 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Tue, 24 Dec 2024 10:24:24 +0100 Subject: [PATCH] use a single TCP outgoing buffer each client / session (#665) this saves memory. --- client.go | 31 ++++++++++++++++++++++++++--- client_format.go | 15 -------------- client_media.go | 29 ++++++--------------------- server_multicast_writer.go | 8 ++++---- server_session.go | 32 ++++++++++++++++++++++++++++-- server_session_media.go | 40 ++++++-------------------------------- server_stream_format.go | 5 ++--- server_stream_media.go | 5 ++--- 8 files changed, 78 insertions(+), 87 deletions(-) diff --git a/client.go b/client.go index 424da01e..d1be2db2 100644 --- a/client.go +++ b/client.go @@ -339,6 +339,8 @@ type Client struct { reader *clientReader timeDecoder *rtptime.GlobalDecoder2 mustClose bool + tcpFrame *base.InterleavedFrame + tcpBuffer []byte // in chOptions chan optionsReq @@ -856,6 +858,11 @@ func (c *Client) startTransportRoutines() { cm.start() } + if *c.effectiveTransport == TransportTCP { + c.tcpFrame = &base.InterleavedFrame{} + c.tcpBuffer = make([]byte, c.MaxPacketSize+4) + } + if c.state == clientStatePlay && c.stdChannelSetupped { c.keepaliveTimer = time.NewTimer(c.keepalivePeriod) @@ -1902,8 +1909,18 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, } cm := c.medias[medi] - ct := cm.formats[pkt.PayloadType] - return ct.writePacketRTP(byts, pkt, ntp) + cf := cm.formats[pkt.PayloadType] + + cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) + + ok := c.writer.push(func() error { + return cm.writePacketRTPInQueue(byts) + }) + if !ok { + return liberrors.ErrClientWriteQueueFull{} + } + + return nil } // WritePacketRTCP writes a RTCP packet to the server. @@ -1920,7 +1937,15 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error } cm := c.medias[medi] - return cm.writePacketRTCP(byts) + + ok := c.writer.push(func() error { + return cm.writePacketRTCPInQueue(byts) + }) + if !ok { + return liberrors.ErrClientWriteQueueFull{} + } + + return nil } // PacketPTS returns the PTS of an incoming RTP packet. diff --git a/client_format.go b/client_format.go index 8695b2c2..8f63e5a3 100644 --- a/client_format.go +++ b/client_format.go @@ -1,8 +1,6 @@ package gortsplib import ( - "time" - "github.com/pion/rtcp" "github.com/pion/rtp" @@ -71,19 +69,6 @@ func (cf *clientFormat) stop() { } } -func (cf *clientFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error { - cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) - - ok := cf.cm.c.writer.push(func() error { - return cf.cm.writePacketRTPInQueue(byts) - }) - if !ok { - return liberrors.ErrClientWriteQueueFull{} - } - - return nil -} - func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) { packets, lost := cf.udpReorderer.Process(pkt) if lost != 0 { diff --git a/client_media.go b/client_media.go index 47f279b5..6aff9b27 100644 --- a/client_media.go +++ b/client_media.go @@ -8,7 +8,6 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" - "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" ) @@ -22,9 +21,6 @@ type clientMedia struct { tcpChannel int udpRTPListener *clientUDPListener udpRTCPListener *clientUDPListener - tcpRTPFrame *base.InterleavedFrame - tcpRTCPFrame *base.InterleavedFrame - tcpBuffer []byte writePacketRTPInQueue func([]byte) error writePacketRTCPInQueue func([]byte) error } @@ -115,10 +111,6 @@ func (cm *clientMedia) start() { cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readRTPTCPPlay cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readRTCPTCPPlay } - - cm.tcpRTPFrame = &base.InterleavedFrame{Channel: cm.tcpChannel} - cm.tcpRTCPFrame = &base.InterleavedFrame{Channel: cm.tcpChannel + 1} - cm.tcpBuffer = make([]byte, cm.c.MaxPacketSize+4) } for _, ct := range cm.formats { @@ -161,26 +153,17 @@ func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error { } func (cm *clientMedia) writePacketRTPInQueueTCP(payload []byte) error { - cm.tcpRTPFrame.Payload = payload + cm.c.tcpFrame.Channel = cm.tcpChannel + cm.c.tcpFrame.Payload = payload cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout)) - return cm.c.conn.WriteInterleavedFrame(cm.tcpRTPFrame, cm.tcpBuffer) + return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer) } func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error { - cm.tcpRTCPFrame.Payload = payload + cm.c.tcpFrame.Channel = cm.tcpChannel + 1 + cm.c.tcpFrame.Payload = payload cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout)) - return cm.c.conn.WriteInterleavedFrame(cm.tcpRTCPFrame, cm.tcpBuffer) -} - -func (cm *clientMedia) writePacketRTCP(byts []byte) error { - ok := cm.c.writer.push(func() error { - return cm.writePacketRTCPInQueue(byts) - }) - if !ok { - return liberrors.ErrClientWriteQueueFull{} - } - - return nil + return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer) } func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool { diff --git a/server_multicast_writer.go b/server_multicast_writer.go index 26806ee0..1d214d71 100644 --- a/server_multicast_writer.go +++ b/server_multicast_writer.go @@ -67,9 +67,9 @@ func (h *serverMulticastWriter) ip() net.IP { return h.rtpl.ip() } -func (h *serverMulticastWriter) writePacketRTP(payload []byte) error { +func (h *serverMulticastWriter) writePacketRTP(byts []byte) error { ok := h.writer.push(func() error { - return h.rtpl.write(payload, h.rtpAddr) + return h.rtpl.write(byts, h.rtpAddr) }) if !ok { return liberrors.ErrServerWriteQueueFull{} @@ -78,9 +78,9 @@ func (h *serverMulticastWriter) writePacketRTP(payload []byte) error { return nil } -func (h *serverMulticastWriter) writePacketRTCP(payload []byte) error { +func (h *serverMulticastWriter) writePacketRTCP(byts []byte) error { ok := h.writer.push(func() error { - return h.rtcpl.write(payload, h.rtcpAddr) + return h.rtcpl.write(byts, h.rtcpAddr) }) if !ok { return liberrors.ErrServerWriteQueueFull{} diff --git a/server_session.go b/server_session.go index 5fb49503..e640d2f2 100644 --- a/server_session.go +++ b/server_session.go @@ -254,6 +254,8 @@ type ServerSession struct { udpCheckStreamTimer *time.Timer writer *asyncProcessor timeDecoder *rtptime.GlobalDecoder2 + tcpFrame *base.InterleavedFrame + tcpBuffer []byte // in chHandleRequest chan sessionRequestReq @@ -978,6 +980,11 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( sm.start() } + if *ss.setuppedTransport == TransportTCP { + ss.tcpFrame = &base.InterleavedFrame{} + ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4) + } + switch *ss.setuppedTransport { case TransportUDP: ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) @@ -1067,6 +1074,11 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( sm.start() } + if *ss.setuppedTransport == TransportTCP { + ss.tcpFrame = &base.InterleavedFrame{} + ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4) + } + switch *ss.setuppedTransport { case TransportUDP: ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) @@ -1254,7 +1266,15 @@ func (ss *ServerSession) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFu func (ss *ServerSession) writePacketRTP(medi *description.Media, byts []byte) error { sm := ss.setuppedMedias[medi] - return sm.writePacketRTP(byts) + + ok := sm.ss.writer.push(func() error { + return sm.writePacketRTPInQueue(byts) + }) + if !ok { + return liberrors.ErrServerWriteQueueFull{} + } + + return nil } // WritePacketRTP writes a RTP packet to the session. @@ -1271,7 +1291,15 @@ func (ss *ServerSession) WritePacketRTP(medi *description.Media, pkt *rtp.Packet func (ss *ServerSession) writePacketRTCP(medi *description.Media, byts []byte) error { sm := ss.setuppedMedias[medi] - return sm.writePacketRTCP(byts) + + ok := ss.writer.push(func() error { + return sm.writePacketRTCPInQueue(byts) + }) + if !ok { + return liberrors.ErrServerWriteQueueFull{} + } + + return nil } // WritePacketRTCP writes a RTCP packet to the session. diff --git a/server_session_media.go b/server_session_media.go index 5e74ee9d..c558ac55 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -8,7 +8,6 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" - "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" ) @@ -23,9 +22,6 @@ type serverSessionMedia struct { udpRTPWriteAddr *net.UDPAddr udpRTCPReadPort int udpRTCPWriteAddr *net.UDPAddr - tcpRTPFrame *base.InterleavedFrame - tcpRTCPFrame *base.InterleavedFrame - tcpBuffer []byte formats map[uint8]*serverSessionFormat // record only writePacketRTPInQueue func([]byte) error writePacketRTCPInQueue func([]byte) error @@ -87,10 +83,6 @@ func (sm *serverSessionMedia) start() { sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readRTPTCPRecord sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readRTCPTCPRecord } - - sm.tcpRTPFrame = &base.InterleavedFrame{Channel: sm.tcpChannel} - sm.tcpRTCPFrame = &base.InterleavedFrame{Channel: sm.tcpChannel + 1} - sm.tcpBuffer = make([]byte, sm.ss.s.MaxPacketSize+4) } } @@ -127,38 +119,18 @@ func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error { func (sm *serverSessionMedia) writePacketRTPInQueueTCP(payload []byte) error { atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload))) - sm.tcpRTPFrame.Payload = payload + sm.ss.tcpFrame.Channel = sm.tcpChannel + sm.ss.tcpFrame.Payload = payload sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout)) - return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.tcpRTPFrame, sm.tcpBuffer) + return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) } func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error { atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload))) - sm.tcpRTCPFrame.Payload = payload + sm.ss.tcpFrame.Channel = sm.tcpChannel + 1 + sm.ss.tcpFrame.Payload = payload sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout)) - return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.tcpRTCPFrame, sm.tcpBuffer) -} - -func (sm *serverSessionMedia) writePacketRTP(payload []byte) error { - ok := sm.ss.writer.push(func() error { - return sm.writePacketRTPInQueue(payload) - }) - if !ok { - return liberrors.ErrServerWriteQueueFull{} - } - - return nil -} - -func (sm *serverSessionMedia) writePacketRTCP(payload []byte) error { - ok := sm.ss.writer.push(func() error { - return sm.writePacketRTCPInQueue(payload) - }) - if !ok { - return liberrors.ErrServerWriteQueueFull{} - } - - return nil + return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) } func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool { diff --git a/server_stream_format.go b/server_stream_format.go index 0b21346c..85232558 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -38,9 +38,8 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t // send unicast for r := range sf.sm.st.activeUnicastReaders { - sm, ok := r.setuppedMedias[sf.sm.media] - if ok { - err := sm.writePacketRTP(byts) + if _, ok := r.setuppedMedias[sf.sm.media]; ok { + err := r.writePacketRTP(sf.sm.media, byts) if err != nil { r.onStreamWriteError(err) } else { diff --git a/server_stream_media.go b/server_stream_media.go index c984c6e5..f3253a1a 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -40,9 +40,8 @@ func (sm *serverStreamMedia) close() { func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error { // send unicast for r := range sm.st.activeUnicastReaders { - sm, ok := r.setuppedMedias[sm.media] - if ok { - err := sm.writePacketRTCP(byts) + if _, ok := r.setuppedMedias[sm.media]; ok { + err := r.writePacketRTCP(sm.media, byts) if err != nil { r.onStreamWriteError(err) }