call OnDecodeError when rtpcleaner returns an error

This commit is contained in:
aler9
2022-10-31 18:51:29 +01:00
parent b1f72f9392
commit 0e6a0b8b25
8 changed files with 196 additions and 109 deletions

View File

@@ -250,7 +250,7 @@ func TestClientRead(t *testing.T) {
Media: "application",
Payloads: []TrackGenericPayload{{
Type: 97,
RTPMap: "97 private/90000",
RTPMap: "private/90000",
}},
}
err = track.Init()
@@ -2721,6 +2721,7 @@ func TestClientReadDecodeErrors(t *testing.T) {
"packets lost",
"rtp too big",
"rtcp too big",
"cleaner error",
} {
t.Run(ca, func(t *testing.T) {
errorRecv := make(chan struct{})
@@ -2761,11 +2762,23 @@ func TestClientReadDecodeErrors(t *testing.T) {
require.Equal(t, base.Describe, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/stream"), req.URL)
tracks := Tracks{&TrackH264{
var track Track
if ca != "cleaner error" {
track = &TrackGeneric{
Media: "application",
Payloads: []TrackGenericPayload{{
Type: 97,
RTPMap: "private/90000",
}},
}
} else {
track = &TrackH264{
PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
}}
}
}
tracks := Tracks{track}
tracks.setControls()
err = conn.WriteResponse(&base.Response{
@@ -2868,6 +2881,18 @@ func TestClientReadDecodeErrors(t *testing.T) {
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[1],
})
case "cleaner error":
byts, _ := rtp.Packet{
Header: rtp.Header{
SequenceNumber: 100,
},
Payload: []byte{0x99},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[0],
})
}
req, err = conn.ReadRequest()
@@ -2898,6 +2923,8 @@ func TestClientReadDecodeErrors(t *testing.T) {
require.EqualError(t, err, "RTP packet is too big to be read with UDP")
case "rtcp too big":
require.EqualError(t, err, "RTCP packet is too big to be read with UDP")
case "cleaner error":
require.EqualError(t, err, "packet type not supported (STAP-B)")
}
close(errorRecv)
},

View File

@@ -204,18 +204,19 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
}
packets, missing := u.ct.reorderer.Process(pkt)
if missing != 0 {
u.c.OnDecodeError(fmt.Errorf("%d RTP packet(s) lost", missing))
// do not return
}
for _, pkt := range packets {
out, err := u.ct.cleaner.Process(pkt)
if err != nil {
u.c.OnDecodeError(err)
continue
// do not return
}
if out != nil {
out0 := out[0]
u.ct.udpRTCPReceiver.ProcessPacketRTP(time.Now(), pkt, out0.PTSEqualsDTS)
@@ -228,6 +229,7 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
H264PTS: out0.H264PTS,
})
}
}
}
func (u *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) {

View File

@@ -65,35 +65,25 @@ func (p *Cleaner) processH264(pkt *rtp.Packet) ([]*Output, error) {
p.h264Encoder.Init()
}
// re-encode
if p.h264Encoder != nil {
// decode
nalus, pts, err := p.h264Decoder.DecodeUntilMarker(pkt)
if err != nil {
// ignore decode errors, except for the case in which the
// encoder is active
if p.h264Encoder == nil {
return []*Output{{
Packet: pkt,
PTSEqualsDTS: false,
}}, nil
}
if err == rtph264.ErrNonStartingPacketAndNoPrevious ||
err == rtph264.ErrMorePacketsNeeded {
return nil, nil
err == rtph264.ErrMorePacketsNeeded { // hide standard errors
err = nil
}
return nil, err
return nil, err // original packets are oversized, do not return them
}
packets, err := p.h264Encoder.Encode(nalus, pts)
if err != nil {
return nil, err // original packets are oversized, do not return them
}
ptsEqualsDTS := h264.IDRPresent(nalus)
// re-encode
if p.h264Encoder != nil {
packets, err := p.h264Encoder.Encode(nalus, pts)
if err != nil {
return nil, err
}
output := make([]*Output, len(packets))
for i, pkt := range packets {
@@ -115,9 +105,23 @@ func (p *Cleaner) processH264(pkt *rtp.Packet) ([]*Output, error) {
return output, nil
}
// decode
nalus, pts, err := p.h264Decoder.DecodeUntilMarker(pkt)
if err != nil {
if err == rtph264.ErrNonStartingPacketAndNoPrevious ||
err == rtph264.ErrMorePacketsNeeded { // hide standard errors
err = nil
}
return []*Output{{
Packet: pkt,
PTSEqualsDTS: ptsEqualsDTS,
PTSEqualsDTS: false,
}}, err
}
return []*Output{{
Packet: pkt,
PTSEqualsDTS: h264.IDRPresent(nalus),
H264NALUs: nalus,
H264PTS: pts,
}}, nil

View File

@@ -154,7 +154,7 @@ func TestH264ProcessEvenIfInvalid(t *testing.T) {
},
Payload: []byte{25},
})
require.NoError(t, err)
require.Error(t, err)
require.Equal(t, []*Output{{
Packet: &rtp.Packet{
Header: rtp.Header{
@@ -167,3 +167,48 @@ func TestH264ProcessEvenIfInvalid(t *testing.T) {
},
}}, out)
}
func TestH264RandomAccess(t *testing.T) {
for _, ca := range []string{
"standard",
"oversized",
} {
t.Run(ca, func(t *testing.T) {
cleaner := New(true, true)
var payload []byte
if ca == "standard" {
payload = append([]byte{0x1C, 1 << 6},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 10/5)...)
} else {
payload = append([]byte{0x1C, 1 << 6},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 2048/5)...)
}
out, err := cleaner.Process(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: 34572,
},
Payload: payload,
})
require.NoError(t, err)
if ca == "standard" {
require.Equal(t, []*Output{{
Packet: &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: 34572,
},
Payload: payload,
},
}}, out)
} else {
require.Equal(t, []*Output(nil), out)
}
})
}
}

View File

@@ -1483,6 +1483,7 @@ func TestServerPublishDecodeErrors(t *testing.T) {
"packets lost",
"rtp too big",
"rtcp too big",
"cleaner error",
} {
t.Run(ca, func(t *testing.T) {
errorRecv := make(chan struct{})
@@ -1516,6 +1517,8 @@ func TestServerPublishDecodeErrors(t *testing.T) {
require.EqualError(t, ctx.Error, "RTP packet is too big to be read with UDP")
case "rtcp too big":
require.EqualError(t, ctx.Error, "RTCP packet is too big to be read with UDP")
case "cleaner error":
require.EqualError(t, ctx.Error, "packet type not supported (STAP-B)")
}
close(errorRecv)
},
@@ -1534,11 +1537,23 @@ func TestServerPublishDecodeErrors(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
tracks := Tracks{&TrackH264{
var track Track
if ca != "cleaner error" {
track = &TrackGeneric{
Media: "application",
Payloads: []TrackGenericPayload{{
Type: 97,
RTPMap: "private/90000",
}},
}
} else {
track = &TrackH264{
PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
}}
}
}
tracks := Tracks{track}
tracks.setControls()
res, err := writeReqReadRes(conn, base.Request{
@@ -1649,6 +1664,18 @@ func TestServerPublishDecodeErrors(t *testing.T) {
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[1],
})
case "cleaner error":
byts, _ := rtp.Packet{
Header: rtp.Header{
SequenceNumber: 100,
},
Payload: []byte{0x99},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[0],
})
}
<-errorRecv

View File

@@ -257,7 +257,8 @@ func (sc *ServerConn) readFuncTCP(readRequest chan readReq) error {
out, err := sc.session.setuppedTracks[trackID].cleaner.Process(pkt)
if err != nil {
return err
onDecodeError(sc.session, err)
// do not return
}
if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTP); ok {

View File

@@ -176,7 +176,7 @@ type ServerSession struct {
lastRequestTime time.Time
tcpConn *ServerConn
announcedTracks Tracks // publish
udpLastFrameTime *int64 // publish
udpLastPacketTime *int64 // publish
udpCheckStreamTimer *time.Timer
writerRunning bool
writeBuffer *ringbuffer.RingBuffer
@@ -412,7 +412,7 @@ func (ss *ServerSession) runInner() error {
// in case of RECORD, timeout happens when no RTP or RTCP packets are being received
if ss.state == ServerSessionStateRecord {
lft := atomic.LoadInt64(ss.udpLastFrameTime)
lft := atomic.LoadInt64(ss.udpLastPacketTime)
if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout {
return liberrors.ErrServerNoUDPPacketsInAWhile{}
}
@@ -565,7 +565,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.announcedTracks = tracks
v := time.Now().Unix()
ss.udpLastFrameTime = &v
ss.udpLastPacketTime = &v
return res, err
case base.Setup:

View File

@@ -13,7 +13,7 @@ import (
)
type clientData struct {
ss *ServerSession
session *ServerSession
track *ServerSessionSetuppedTrack
isPublishing bool
}
@@ -34,6 +34,15 @@ func (p *clientAddr) fill(ip net.IP, port int) {
}
}
func onDecodeError(ss *ServerSession, err error) {
if h, ok := ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: ss,
Error: err,
})
}
}
type serverUDPListener struct {
s *Server
@@ -193,60 +202,41 @@ func (u *serverUDPListener) runReader() {
func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) {
if len(payload) == (maxPacketSize + 1) {
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: clientData.ss,
Error: fmt.Errorf("RTP packet is too big to be read with UDP"),
})
}
onDecodeError(clientData.session, fmt.Errorf("RTP packet is too big to be read with UDP"))
return
}
pkt := u.s.udpRTPPacketBuffer.next()
err := pkt.Unmarshal(payload)
if err != nil {
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: clientData.ss,
Error: err,
})
}
onDecodeError(clientData.session, err)
return
}
packets, missing := clientData.track.reorderer.Process(pkt)
now := time.Now()
atomic.StoreInt64(clientData.session.udpLastPacketTime, now.Unix())
packets, missing := clientData.track.reorderer.Process(pkt)
if missing != 0 {
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: clientData.ss,
Error: fmt.Errorf("%d RTP packet(s) lost", missing),
})
}
onDecodeError(clientData.session, fmt.Errorf("%d RTP packet(s) lost", missing))
// do not return
}
for _, pkt := range packets {
now := time.Now()
atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix())
out, err := clientData.track.cleaner.Process(pkt)
if err != nil {
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: clientData.ss,
Error: err,
})
}
continue
onDecodeError(clientData.session, err)
// do not return
}
if out != nil {
out0 := out[0]
clientData.track.udpRTCPReceiver.ProcessPacketRTP(now, pkt, out0.PTSEqualsDTS)
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnPacketRTP); ok {
if h, ok := clientData.session.s.Handler.(ServerHandlerOnPacketRTP); ok {
h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{
Session: clientData.ss,
Session: clientData.session,
TrackID: clientData.track.id,
Packet: out0.Packet,
PTSEqualsDTS: out0.PTSEqualsDTS,
@@ -255,33 +245,24 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) {
})
}
}
}
}
func (u *serverUDPListener) processRTCP(clientData *clientData, payload []byte) {
if len(payload) == (maxPacketSize + 1) {
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: clientData.ss,
Error: fmt.Errorf("RTCP packet is too big to be read with UDP"),
})
}
onDecodeError(clientData.session, fmt.Errorf("RTCP packet is too big to be read with UDP"))
return
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: clientData.ss,
Error: err,
})
}
onDecodeError(clientData.session, err)
return
}
if clientData.isPublishing {
now := time.Now()
atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix())
atomic.StoreInt64(clientData.session.udpLastPacketTime, now.Unix())
for _, pkt := range packets {
clientData.track.udpRTCPReceiver.ProcessPacketRTCP(now, pkt)
@@ -289,7 +270,7 @@ func (u *serverUDPListener) processRTCP(clientData *clientData, payload []byte)
}
for _, pkt := range packets {
clientData.ss.onPacketRTCP(clientData.track.id, pkt)
clientData.session.onPacketRTCP(clientData.track.id, pkt)
}
}
@@ -312,7 +293,7 @@ func (u *serverUDPListener) addClient(ip net.IP, port int, ss *ServerSession,
addr.fill(ip, port)
u.clients[addr] = &clientData{
ss: ss,
session: ss,
track: track,
isPublishing: isPublishing,
}
@@ -323,7 +304,7 @@ func (u *serverUDPListener) removeClient(ss *ServerSession) {
defer u.clientsMutex.Unlock()
for addr, data := range u.clients {
if data.ss == ss {
if data.session == ss {
delete(u.clients, addr)
}
}