reuse UDP buffers when possible (#639)

This commit is contained in:
Alessandro Ros
2024-10-24 14:28:53 +02:00
committed by GitHub
parent 21e06b0f68
commit 49ee03f0b1
5 changed files with 91 additions and 46 deletions

View File

@@ -187,7 +187,7 @@ func (cm *clientMedia) writePacketRTCP(byts []byte) error {
return nil return nil
} }
func (cm *clientMedia) readRTPTCPPlay(payload []byte) { func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool {
now := cm.c.timeNow() now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
@@ -195,31 +195,33 @@ func (cm *clientMedia) readRTPTCPPlay(payload []byte) {
err := pkt.Unmarshal(payload) err := pkt.Unmarshal(payload)
if err != nil { if err != nil {
cm.c.OnDecodeError(err) cm.c.OnDecodeError(err)
return return false
} }
forma, ok := cm.formats[pkt.PayloadType] forma, ok := cm.formats[pkt.PayloadType]
if !ok { if !ok {
cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return return false
} }
forma.readRTPTCP(pkt) forma.readRTPTCP(pkt)
return true
} }
func (cm *clientMedia) readRTCPTCPPlay(payload []byte) { func (cm *clientMedia) readRTCPTCPPlay(payload []byte) bool {
now := cm.c.timeNow() now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
if len(payload) > udpMaxPayloadSize { if len(payload) > udpMaxPayloadSize {
cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return return false
} }
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
cm.c.OnDecodeError(err) cm.c.OnDecodeError(err)
return return false
} }
for _, pkt := range packets { for _, pkt := range packets {
@@ -232,55 +234,62 @@ func (cm *clientMedia) readRTCPTCPPlay(payload []byte) {
cm.onPacketRTCP(pkt) cm.onPacketRTCP(pkt)
} }
return true
} }
func (cm *clientMedia) readRTPTCPRecord(_ []byte) { func (cm *clientMedia) readRTPTCPRecord(_ []byte) bool {
return false
} }
func (cm *clientMedia) readRTCPTCPRecord(payload []byte) { func (cm *clientMedia) readRTCPTCPRecord(payload []byte) bool {
if len(payload) > udpMaxPayloadSize { if len(payload) > udpMaxPayloadSize {
cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return return false
} }
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
cm.c.OnDecodeError(err) cm.c.OnDecodeError(err)
return return false
} }
for _, pkt := range packets { for _, pkt := range packets {
cm.onPacketRTCP(pkt) cm.onPacketRTCP(pkt)
} }
return true
} }
func (cm *clientMedia) readRTPUDPPlay(payload []byte) { func (cm *clientMedia) readRTPUDPPlay(payload []byte) bool {
plen := len(payload) plen := len(payload)
atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) atomic.AddUint64(cm.c.BytesReceived, uint64(plen))
if plen == (udpMaxPayloadSize + 1) { if plen == (udpMaxPayloadSize + 1) {
cm.c.OnDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{}) cm.c.OnDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{})
return return false
} }
pkt := &rtp.Packet{} pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload) err := pkt.Unmarshal(payload)
if err != nil { if err != nil {
cm.c.OnDecodeError(err) cm.c.OnDecodeError(err)
return return false
} }
forma, ok := cm.formats[pkt.PayloadType] forma, ok := cm.formats[pkt.PayloadType]
if !ok { if !ok {
cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return return false
} }
forma.readRTPUDP(pkt) forma.readRTPUDP(pkt)
return true
} }
func (cm *clientMedia) readRTCPUDPPlay(payload []byte) { func (cm *clientMedia) readRTCPUDPPlay(payload []byte) bool {
now := cm.c.timeNow() now := cm.c.timeNow()
plen := len(payload) plen := len(payload)
@@ -288,13 +297,13 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) {
if plen == (udpMaxPayloadSize + 1) { if plen == (udpMaxPayloadSize + 1) {
cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
return return false
} }
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
cm.c.OnDecodeError(err) cm.c.OnDecodeError(err)
return return false
} }
for _, pkt := range packets { for _, pkt := range packets {
@@ -307,28 +316,33 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) {
cm.onPacketRTCP(pkt) cm.onPacketRTCP(pkt)
} }
return true
} }
func (cm *clientMedia) readRTPUDPRecord(_ []byte) { func (cm *clientMedia) readRTPUDPRecord(_ []byte) bool {
return false
} }
func (cm *clientMedia) readRTCPUDPRecord(payload []byte) { func (cm *clientMedia) readRTCPUDPRecord(payload []byte) bool {
plen := len(payload) plen := len(payload)
atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) atomic.AddUint64(cm.c.BytesReceived, uint64(plen))
if plen == (udpMaxPayloadSize + 1) { if plen == (udpMaxPayloadSize + 1) {
cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
return return false
} }
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
cm.c.OnDecodeError(err) cm.c.OnDecodeError(err)
return return false
} }
for _, pkt := range packets { for _, pkt := range packets {
cm.onPacketRTCP(pkt) cm.onPacketRTCP(pkt)
} }
return true
} }

View File

@@ -142,8 +142,15 @@ func (u *clientUDPListener) stop() {
func (u *clientUDPListener) run() { func (u *clientUDPListener) run() {
defer close(u.done) defer close(u.done)
var buf []byte
createNewBuffer := func() {
buf = make([]byte, udpMaxPayloadSize+1)
}
createNewBuffer()
for { for {
buf := make([]byte, udpMaxPayloadSize+1)
n, addr, err := u.pc.ReadFrom(buf) n, addr, err := u.pc.ReadFrom(buf)
if err != nil { if err != nil {
return return
@@ -166,7 +173,9 @@ func (u *clientUDPListener) run() {
now := u.c.timeNow() now := u.c.timeNow()
atomic.StoreInt64(u.lastPacketTime, now.Unix()) atomic.StoreInt64(u.lastPacketTime, now.Unix())
u.readFunc(buf[:n]) if u.readFunc(buf[:n]) {
createNewBuffer()
}
} }
} }

View File

@@ -23,7 +23,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/sdp" "github.com/bluenviron/gortsplib/v4/pkg/sdp"
) )
type readFunc func([]byte) type readFunc func([]byte) bool
func stringsReverseIndex(s, substr string) int { func stringsReverseIndex(s, substr string) int {
for i := len(s) - 1 - len(substr); i >= 0; i-- { for i := len(s) - 1 - len(substr); i >= 0; i-- {

View File

@@ -161,20 +161,20 @@ func (sm *serverSessionMedia) writePacketRTCP(payload []byte) error {
return nil return nil
} }
func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) { func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool {
plen := len(payload) plen := len(payload)
atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) atomic.AddUint64(sm.ss.bytesReceived, uint64(plen))
if plen == (udpMaxPayloadSize + 1) { if plen == (udpMaxPayloadSize + 1) {
sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
return return false
} }
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
sm.ss.onDecodeError(err) sm.ss.onDecodeError(err)
return return false
} }
now := sm.ss.s.timeNow() now := sm.ss.s.timeNow()
@@ -183,51 +183,55 @@ func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) {
for _, pkt := range packets { for _, pkt := range packets {
sm.onPacketRTCP(pkt) sm.onPacketRTCP(pkt)
} }
return true
} }
func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) { func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) bool {
plen := len(payload) plen := len(payload)
atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) atomic.AddUint64(sm.ss.bytesReceived, uint64(plen))
if plen == (udpMaxPayloadSize + 1) { if plen == (udpMaxPayloadSize + 1) {
sm.ss.onDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{}) sm.ss.onDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
return return false
} }
pkt := &rtp.Packet{} pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload) err := pkt.Unmarshal(payload)
if err != nil { if err != nil {
sm.ss.onDecodeError(err) sm.ss.onDecodeError(err)
return return false
} }
forma, ok := sm.formats[pkt.PayloadType] forma, ok := sm.formats[pkt.PayloadType]
if !ok { if !ok {
sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return return false
} }
now := sm.ss.s.timeNow() now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
forma.readRTPUDP(pkt, now) forma.readRTPUDP(pkt, now)
return true
} }
func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) { func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) bool {
plen := len(payload) plen := len(payload)
atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) atomic.AddUint64(sm.ss.bytesReceived, uint64(plen))
if plen == (udpMaxPayloadSize + 1) { if plen == (udpMaxPayloadSize + 1) {
sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
return return false
} }
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
sm.ss.onDecodeError(err) sm.ss.onDecodeError(err)
return return false
} }
now := sm.ss.s.timeNow() now := sm.ss.s.timeNow()
@@ -243,55 +247,62 @@ func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) {
sm.onPacketRTCP(pkt) sm.onPacketRTCP(pkt)
} }
return true
} }
func (sm *serverSessionMedia) readRTPTCPPlay(_ []byte) { func (sm *serverSessionMedia) readRTPTCPPlay(_ []byte) bool {
return false
} }
func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) { func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) bool {
if len(payload) > udpMaxPayloadSize { if len(payload) > udpMaxPayloadSize {
sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return return false
} }
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
sm.ss.onDecodeError(err) sm.ss.onDecodeError(err)
return return false
} }
for _, pkt := range packets { for _, pkt := range packets {
sm.onPacketRTCP(pkt) sm.onPacketRTCP(pkt)
} }
return true
} }
func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) { func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) bool {
pkt := &rtp.Packet{} pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload) err := pkt.Unmarshal(payload)
if err != nil { if err != nil {
sm.ss.onDecodeError(err) sm.ss.onDecodeError(err)
return return false
} }
forma, ok := sm.formats[pkt.PayloadType] forma, ok := sm.formats[pkt.PayloadType]
if !ok { if !ok {
sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return return false
} }
forma.readRTPTCP(pkt) forma.readRTPTCP(pkt)
return true
} }
func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) { func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) bool {
if len(payload) > udpMaxPayloadSize { if len(payload) > udpMaxPayloadSize {
sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return return false
} }
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
sm.ss.onDecodeError(err) sm.ss.onDecodeError(err)
return return false
} }
now := sm.ss.s.timeNow() now := sm.ss.s.timeNow()
@@ -306,4 +317,6 @@ func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) {
sm.onPacketRTCP(pkt) sm.onPacketRTCP(pkt)
} }
return true
} }

View File

@@ -124,8 +124,15 @@ func (u *serverUDPListener) port() int {
func (u *serverUDPListener) run() { func (u *serverUDPListener) run() {
defer close(u.done) defer close(u.done)
var buf []byte
createNewBuffer := func() {
buf = make([]byte, udpMaxPayloadSize+1)
}
createNewBuffer()
for { for {
buf := make([]byte, udpMaxPayloadSize+1)
n, addr2, err := u.pc.ReadFrom(buf) n, addr2, err := u.pc.ReadFrom(buf)
if err != nil { if err != nil {
break break
@@ -143,7 +150,9 @@ func (u *serverUDPListener) run() {
return return
} }
cb(buf[:n]) if cb(buf[:n]) {
createNewBuffer()
}
}() }()
} }
} }