diff --git a/client_media.go b/client_media.go index bc974308..1390d073 100644 --- a/client_media.go +++ b/client_media.go @@ -187,7 +187,7 @@ func (cm *clientMedia) writePacketRTCP(byts []byte) error { return nil } -func (cm *clientMedia) readRTPTCPPlay(payload []byte) { +func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool { now := cm.c.timeNow() atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) @@ -195,31 +195,33 @@ func (cm *clientMedia) readRTPTCPPlay(payload []byte) { err := pkt.Unmarshal(payload) if err != nil { cm.c.OnDecodeError(err) - return + return false } forma, ok := cm.formats[pkt.PayloadType] if !ok { cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) - return + return false } forma.readRTPTCP(pkt) + + return true } -func (cm *clientMedia) readRTCPTCPPlay(payload []byte) { +func (cm *clientMedia) readRTCPTCPPlay(payload []byte) bool { now := cm.c.timeNow() atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) if len(payload) > udpMaxPayloadSize { cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) - return + return false } packets, err := rtcp.Unmarshal(payload) if err != nil { cm.c.OnDecodeError(err) - return + return false } for _, pkt := range packets { @@ -232,55 +234,62 @@ func (cm *clientMedia) readRTCPTCPPlay(payload []byte) { cm.onPacketRTCP(pkt) } + + return true } -func (cm *clientMedia) readRTPTCPRecord(_ []byte) { +func (cm *clientMedia) readRTPTCPRecord(_ []byte) bool { + return false } -func (cm *clientMedia) readRTCPTCPRecord(payload []byte) { +func (cm *clientMedia) readRTCPTCPRecord(payload []byte) bool { if len(payload) > udpMaxPayloadSize { cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) - return + return false } packets, err := rtcp.Unmarshal(payload) if err != nil { cm.c.OnDecodeError(err) - return + return false } for _, pkt := range packets { cm.onPacketRTCP(pkt) } + + return true } -func (cm *clientMedia) readRTPUDPPlay(payload []byte) { +func (cm *clientMedia) readRTPUDPPlay(payload []byte) bool { plen := len(payload) atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) if plen == (udpMaxPayloadSize + 1) { cm.c.OnDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{}) - return + return false } pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { cm.c.OnDecodeError(err) - return + return false } forma, ok := cm.formats[pkt.PayloadType] if !ok { cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) - return + return false } forma.readRTPUDP(pkt) + + return true } -func (cm *clientMedia) readRTCPUDPPlay(payload []byte) { +func (cm *clientMedia) readRTCPUDPPlay(payload []byte) bool { now := cm.c.timeNow() plen := len(payload) @@ -288,13 +297,13 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) { if plen == (udpMaxPayloadSize + 1) { cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) - return + return false } packets, err := rtcp.Unmarshal(payload) if err != nil { cm.c.OnDecodeError(err) - return + return false } for _, pkt := range packets { @@ -307,28 +316,33 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) { cm.onPacketRTCP(pkt) } + + return true } -func (cm *clientMedia) readRTPUDPRecord(_ []byte) { +func (cm *clientMedia) readRTPUDPRecord(_ []byte) bool { + return false } -func (cm *clientMedia) readRTCPUDPRecord(payload []byte) { +func (cm *clientMedia) readRTCPUDPRecord(payload []byte) bool { plen := len(payload) atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) if plen == (udpMaxPayloadSize + 1) { cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) - return + return false } packets, err := rtcp.Unmarshal(payload) if err != nil { cm.c.OnDecodeError(err) - return + return false } for _, pkt := range packets { cm.onPacketRTCP(pkt) } + + return true } diff --git a/client_udp_listener.go b/client_udp_listener.go index fb0ae2f9..c47cabcd 100644 --- a/client_udp_listener.go +++ b/client_udp_listener.go @@ -142,8 +142,15 @@ func (u *clientUDPListener) stop() { func (u *clientUDPListener) run() { defer close(u.done) + var buf []byte + + createNewBuffer := func() { + buf = make([]byte, udpMaxPayloadSize+1) + } + + createNewBuffer() + for { - buf := make([]byte, udpMaxPayloadSize+1) n, addr, err := u.pc.ReadFrom(buf) if err != nil { return @@ -166,7 +173,9 @@ func (u *clientUDPListener) run() { now := u.c.timeNow() atomic.StoreInt64(u.lastPacketTime, now.Unix()) - u.readFunc(buf[:n]) + if u.readFunc(buf[:n]) { + createNewBuffer() + } } } diff --git a/server_session.go b/server_session.go index b48a36d0..12b2157f 100644 --- a/server_session.go +++ b/server_session.go @@ -23,7 +23,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/sdp" ) -type readFunc func([]byte) +type readFunc func([]byte) bool func stringsReverseIndex(s, substr string) int { for i := len(s) - 1 - len(substr); i >= 0; i-- { diff --git a/server_session_media.go b/server_session_media.go index 858d9114..2ee6369f 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -161,20 +161,20 @@ func (sm *serverSessionMedia) writePacketRTCP(payload []byte) error { return nil } -func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) { +func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool { plen := len(payload) atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (udpMaxPayloadSize + 1) { sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) - return + return false } packets, err := rtcp.Unmarshal(payload) if err != nil { sm.ss.onDecodeError(err) - return + return false } now := sm.ss.s.timeNow() @@ -183,51 +183,55 @@ func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) { for _, pkt := range packets { sm.onPacketRTCP(pkt) } + + return true } -func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) { +func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) bool { plen := len(payload) atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (udpMaxPayloadSize + 1) { sm.ss.onDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{}) - return + return false } pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { sm.ss.onDecodeError(err) - return + return false } forma, ok := sm.formats[pkt.PayloadType] if !ok { sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) - return + return false } now := sm.ss.s.timeNow() atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) forma.readRTPUDP(pkt, now) + + return true } -func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) { +func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) bool { plen := len(payload) atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (udpMaxPayloadSize + 1) { sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) - return + return false } packets, err := rtcp.Unmarshal(payload) if err != nil { sm.ss.onDecodeError(err) - return + return false } now := sm.ss.s.timeNow() @@ -243,55 +247,62 @@ func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) { sm.onPacketRTCP(pkt) } + + return true } -func (sm *serverSessionMedia) readRTPTCPPlay(_ []byte) { +func (sm *serverSessionMedia) readRTPTCPPlay(_ []byte) bool { + return false } -func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) { +func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) bool { if len(payload) > udpMaxPayloadSize { sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) - return + return false } packets, err := rtcp.Unmarshal(payload) if err != nil { sm.ss.onDecodeError(err) - return + return false } for _, pkt := range packets { sm.onPacketRTCP(pkt) } + + return true } -func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) { +func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) bool { pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { sm.ss.onDecodeError(err) - return + return false } forma, ok := sm.formats[pkt.PayloadType] if !ok { sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) - return + return false } forma.readRTPTCP(pkt) + + return true } -func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) { +func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) bool { if len(payload) > udpMaxPayloadSize { sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) - return + return false } packets, err := rtcp.Unmarshal(payload) if err != nil { sm.ss.onDecodeError(err) - return + return false } now := sm.ss.s.timeNow() @@ -306,4 +317,6 @@ func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) { sm.onPacketRTCP(pkt) } + + return true } diff --git a/server_udp_listener.go b/server_udp_listener.go index 4e333ba6..62e0e64f 100644 --- a/server_udp_listener.go +++ b/server_udp_listener.go @@ -124,8 +124,15 @@ func (u *serverUDPListener) port() int { func (u *serverUDPListener) run() { defer close(u.done) + var buf []byte + + createNewBuffer := func() { + buf = make([]byte, udpMaxPayloadSize+1) + } + + createNewBuffer() + for { - buf := make([]byte, udpMaxPayloadSize+1) n, addr2, err := u.pc.ReadFrom(buf) if err != nil { break @@ -143,7 +150,9 @@ func (u *serverUDPListener) run() { return } - cb(buf[:n]) + if cb(buf[:n]) { + createNewBuffer() + } }() } }