use a single TCP outgoing buffer each client / session (#665)

this saves memory.
This commit is contained in:
Alessandro Ros
2024-12-24 10:24:24 +01:00
committed by GitHub
parent 6750427282
commit 5506eb2f7f
8 changed files with 78 additions and 87 deletions

View File

@@ -339,6 +339,8 @@ type Client struct {
reader *clientReader
timeDecoder *rtptime.GlobalDecoder2
mustClose bool
tcpFrame *base.InterleavedFrame
tcpBuffer []byte
// in
chOptions chan optionsReq
@@ -856,6 +858,11 @@ func (c *Client) startTransportRoutines() {
cm.start()
}
if *c.effectiveTransport == TransportTCP {
c.tcpFrame = &base.InterleavedFrame{}
c.tcpBuffer = make([]byte, c.MaxPacketSize+4)
}
if c.state == clientStatePlay && c.stdChannelSetupped {
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)
@@ -1902,8 +1909,18 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet,
}
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return ct.writePacketRTP(byts, pkt, ntp)
cf := cm.formats[pkt.PayloadType]
cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))
ok := c.writer.push(func() error {
return cm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}
return nil
}
// WritePacketRTCP writes a RTCP packet to the server.
@@ -1920,7 +1937,15 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error
}
cm := c.medias[medi]
return cm.writePacketRTCP(byts)
ok := c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}
return nil
}
// PacketPTS returns the PTS of an incoming RTP packet.

View File

@@ -1,8 +1,6 @@
package gortsplib
import (
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
@@ -71,19 +69,6 @@ func (cf *clientFormat) stop() {
}
}
func (cf *clientFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))
ok := cf.cm.c.writer.push(func() error {
return cf.cm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}
return nil
}
func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) {
packets, lost := cf.udpReorderer.Process(pkt)
if lost != 0 {

View File

@@ -8,7 +8,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
)
@@ -22,9 +21,6 @@ type clientMedia struct {
tcpChannel int
udpRTPListener *clientUDPListener
udpRTCPListener *clientUDPListener
tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame
tcpBuffer []byte
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
}
@@ -115,10 +111,6 @@ func (cm *clientMedia) start() {
cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readRTPTCPPlay
cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readRTCPTCPPlay
}
cm.tcpRTPFrame = &base.InterleavedFrame{Channel: cm.tcpChannel}
cm.tcpRTCPFrame = &base.InterleavedFrame{Channel: cm.tcpChannel + 1}
cm.tcpBuffer = make([]byte, cm.c.MaxPacketSize+4)
}
for _, ct := range cm.formats {
@@ -161,26 +153,17 @@ func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error {
}
func (cm *clientMedia) writePacketRTPInQueueTCP(payload []byte) error {
cm.tcpRTPFrame.Payload = payload
cm.c.tcpFrame.Channel = cm.tcpChannel
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
return cm.c.conn.WriteInterleavedFrame(cm.tcpRTPFrame, cm.tcpBuffer)
return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
}
func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error {
cm.tcpRTCPFrame.Payload = payload
cm.c.tcpFrame.Channel = cm.tcpChannel + 1
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
return cm.c.conn.WriteInterleavedFrame(cm.tcpRTCPFrame, cm.tcpBuffer)
}
func (cm *clientMedia) writePacketRTCP(byts []byte) error {
ok := cm.c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}
return nil
return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
}
func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool {

View File

@@ -67,9 +67,9 @@ func (h *serverMulticastWriter) ip() net.IP {
return h.rtpl.ip()
}
func (h *serverMulticastWriter) writePacketRTP(payload []byte) error {
func (h *serverMulticastWriter) writePacketRTP(byts []byte) error {
ok := h.writer.push(func() error {
return h.rtpl.write(payload, h.rtpAddr)
return h.rtpl.write(byts, h.rtpAddr)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
@@ -78,9 +78,9 @@ func (h *serverMulticastWriter) writePacketRTP(payload []byte) error {
return nil
}
func (h *serverMulticastWriter) writePacketRTCP(payload []byte) error {
func (h *serverMulticastWriter) writePacketRTCP(byts []byte) error {
ok := h.writer.push(func() error {
return h.rtcpl.write(payload, h.rtcpAddr)
return h.rtcpl.write(byts, h.rtcpAddr)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}

View File

@@ -254,6 +254,8 @@ type ServerSession struct {
udpCheckStreamTimer *time.Timer
writer *asyncProcessor
timeDecoder *rtptime.GlobalDecoder2
tcpFrame *base.InterleavedFrame
tcpBuffer []byte
// in
chHandleRequest chan sessionRequestReq
@@ -978,6 +980,11 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
sm.start()
}
if *ss.setuppedTransport == TransportTCP {
ss.tcpFrame = &base.InterleavedFrame{}
ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4)
}
switch *ss.setuppedTransport {
case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
@@ -1067,6 +1074,11 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
sm.start()
}
if *ss.setuppedTransport == TransportTCP {
ss.tcpFrame = &base.InterleavedFrame{}
ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4)
}
switch *ss.setuppedTransport {
case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
@@ -1254,7 +1266,15 @@ func (ss *ServerSession) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFu
func (ss *ServerSession) writePacketRTP(medi *description.Media, byts []byte) error {
sm := ss.setuppedMedias[medi]
return sm.writePacketRTP(byts)
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
}
// WritePacketRTP writes a RTP packet to the session.
@@ -1271,7 +1291,15 @@ func (ss *ServerSession) WritePacketRTP(medi *description.Media, pkt *rtp.Packet
func (ss *ServerSession) writePacketRTCP(medi *description.Media, byts []byte) error {
sm := ss.setuppedMedias[medi]
return sm.writePacketRTCP(byts)
ok := ss.writer.push(func() error {
return sm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
}
// WritePacketRTCP writes a RTCP packet to the session.

View File

@@ -8,7 +8,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
)
@@ -23,9 +22,6 @@ type serverSessionMedia struct {
udpRTPWriteAddr *net.UDPAddr
udpRTCPReadPort int
udpRTCPWriteAddr *net.UDPAddr
tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame
tcpBuffer []byte
formats map[uint8]*serverSessionFormat // record only
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
@@ -87,10 +83,6 @@ func (sm *serverSessionMedia) start() {
sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readRTPTCPRecord
sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readRTCPTCPRecord
}
sm.tcpRTPFrame = &base.InterleavedFrame{Channel: sm.tcpChannel}
sm.tcpRTCPFrame = &base.InterleavedFrame{Channel: sm.tcpChannel + 1}
sm.tcpBuffer = make([]byte, sm.ss.s.MaxPacketSize+4)
}
}
@@ -127,38 +119,18 @@ func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error {
func (sm *serverSessionMedia) writePacketRTPInQueueTCP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
sm.tcpRTPFrame.Payload = payload
sm.ss.tcpFrame.Channel = sm.tcpChannel
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.tcpRTPFrame, sm.tcpBuffer)
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
}
func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
sm.tcpRTCPFrame.Payload = payload
sm.ss.tcpFrame.Channel = sm.tcpChannel + 1
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.tcpRTCPFrame, sm.tcpBuffer)
}
func (sm *serverSessionMedia) writePacketRTP(payload []byte) error {
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTPInQueue(payload)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
}
func (sm *serverSessionMedia) writePacketRTCP(payload []byte) error {
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTCPInQueue(payload)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
}
func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool {

View File

@@ -38,9 +38,8 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t
// send unicast
for r := range sf.sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sf.sm.media]
if ok {
err := sm.writePacketRTP(byts)
if _, ok := r.setuppedMedias[sf.sm.media]; ok {
err := r.writePacketRTP(sf.sm.media, byts)
if err != nil {
r.onStreamWriteError(err)
} else {

View File

@@ -40,9 +40,8 @@ func (sm *serverStreamMedia) close() {
func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error {
// send unicast
for r := range sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sm.media]
if ok {
err := sm.writePacketRTCP(byts)
if _, ok := r.setuppedMedias[sm.media]; ok {
err := r.writePacketRTCP(sm.media, byts)
if err != nil {
r.onStreamWriteError(err)
}