diff --git a/client_read_test.go b/client_read_test.go index b340c1a4..88eb405b 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -250,7 +250,7 @@ func TestClientRead(t *testing.T) { Media: "application", Payloads: []TrackGenericPayload{{ Type: 97, - RTPMap: "97 private/90000", + RTPMap: "private/90000", }}, } err = track.Init() @@ -2721,6 +2721,7 @@ func TestClientReadDecodeErrors(t *testing.T) { "packets lost", "rtp too big", "rtcp too big", + "cleaner error", } { t.Run(ca, func(t *testing.T) { errorRecv := make(chan struct{}) @@ -2761,11 +2762,23 @@ func TestClientReadDecodeErrors(t *testing.T) { require.Equal(t, base.Describe, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/stream"), req.URL) - tracks := Tracks{&TrackH264{ - PayloadType: 96, - SPS: []byte{0x01, 0x02, 0x03, 0x04}, - PPS: []byte{0x01, 0x02, 0x03, 0x04}, - }} + var track Track + if ca != "cleaner error" { + track = &TrackGeneric{ + Media: "application", + Payloads: []TrackGenericPayload{{ + Type: 97, + RTPMap: "private/90000", + }}, + } + } else { + track = &TrackH264{ + PayloadType: 96, + SPS: []byte{0x01, 0x02, 0x03, 0x04}, + PPS: []byte{0x01, 0x02, 0x03, 0x04}, + } + } + tracks := Tracks{track} tracks.setControls() err = conn.WriteResponse(&base.Response{ @@ -2868,6 +2881,18 @@ func TestClientReadDecodeErrors(t *testing.T) { IP: net.ParseIP("127.0.0.1"), Port: th.ClientPorts[1], }) + + case "cleaner error": + byts, _ := rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 100, + }, + Payload: []byte{0x99}, + }.Marshal() + l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ClientPorts[0], + }) } req, err = conn.ReadRequest() @@ -2898,6 +2923,8 @@ func TestClientReadDecodeErrors(t *testing.T) { require.EqualError(t, err, "RTP packet is too big to be read with UDP") case "rtcp too big": require.EqualError(t, err, "RTCP packet is too big to be read with UDP") + case "cleaner error": + require.EqualError(t, err, "packet type not supported (STAP-B)") } close(errorRecv) }, diff --git a/clientudpl.go b/clientudpl.go index b27a5b34..38e5a9b6 100644 --- a/clientudpl.go +++ b/clientudpl.go @@ -204,29 +204,31 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { } packets, missing := u.ct.reorderer.Process(pkt) - if missing != 0 { u.c.OnDecodeError(fmt.Errorf("%d RTP packet(s) lost", missing)) + // do not return } for _, pkt := range packets { out, err := u.ct.cleaner.Process(pkt) if err != nil { u.c.OnDecodeError(err) - continue + // do not return } - out0 := out[0] + if out != nil { + out0 := out[0] - u.ct.udpRTCPReceiver.ProcessPacketRTP(time.Now(), pkt, out0.PTSEqualsDTS) + u.ct.udpRTCPReceiver.ProcessPacketRTP(time.Now(), pkt, out0.PTSEqualsDTS) - u.c.OnPacketRTP(&ClientOnPacketRTPCtx{ - TrackID: u.ct.id, - Packet: out0.Packet, - PTSEqualsDTS: out0.PTSEqualsDTS, - H264NALUs: out0.H264NALUs, - H264PTS: out0.H264PTS, - }) + u.c.OnPacketRTP(&ClientOnPacketRTPCtx{ + TrackID: u.ct.id, + Packet: out0.Packet, + PTSEqualsDTS: out0.PTSEqualsDTS, + H264NALUs: out0.H264NALUs, + H264PTS: out0.H264PTS, + }) + } } } diff --git a/pkg/rtpcleaner/cleaner.go b/pkg/rtpcleaner/cleaner.go index 4cf9ae73..bbae4702 100644 --- a/pkg/rtpcleaner/cleaner.go +++ b/pkg/rtpcleaner/cleaner.go @@ -65,35 +65,25 @@ func (p *Cleaner) processH264(pkt *rtp.Packet) ([]*Output, error) { p.h264Encoder.Init() } - // decode - nalus, pts, err := p.h264Decoder.DecodeUntilMarker(pkt) - if err != nil { - // ignore decode errors, except for the case in which the - // encoder is active - if p.h264Encoder == nil { - return []*Output{{ - Packet: pkt, - PTSEqualsDTS: false, - }}, nil - } - - if err == rtph264.ErrNonStartingPacketAndNoPrevious || - err == rtph264.ErrMorePacketsNeeded { - return nil, nil - } - - return nil, err - } - - ptsEqualsDTS := h264.IDRPresent(nalus) - // re-encode if p.h264Encoder != nil { - packets, err := p.h264Encoder.Encode(nalus, pts) + // decode + nalus, pts, err := p.h264Decoder.DecodeUntilMarker(pkt) if err != nil { - return nil, err + if err == rtph264.ErrNonStartingPacketAndNoPrevious || + err == rtph264.ErrMorePacketsNeeded { // hide standard errors + err = nil + } + + return nil, err // original packets are oversized, do not return them } + packets, err := p.h264Encoder.Encode(nalus, pts) + if err != nil { + return nil, err // original packets are oversized, do not return them + } + + ptsEqualsDTS := h264.IDRPresent(nalus) output := make([]*Output, len(packets)) for i, pkt := range packets { @@ -115,9 +105,23 @@ func (p *Cleaner) processH264(pkt *rtp.Packet) ([]*Output, error) { return output, nil } + // decode + nalus, pts, err := p.h264Decoder.DecodeUntilMarker(pkt) + if err != nil { + if err == rtph264.ErrNonStartingPacketAndNoPrevious || + err == rtph264.ErrMorePacketsNeeded { // hide standard errors + err = nil + } + + return []*Output{{ + Packet: pkt, + PTSEqualsDTS: false, + }}, err + } + return []*Output{{ Packet: pkt, - PTSEqualsDTS: ptsEqualsDTS, + PTSEqualsDTS: h264.IDRPresent(nalus), H264NALUs: nalus, H264PTS: pts, }}, nil diff --git a/pkg/rtpcleaner/cleaner_test.go b/pkg/rtpcleaner/cleaner_test.go index 91eae3cb..5c580aa3 100644 --- a/pkg/rtpcleaner/cleaner_test.go +++ b/pkg/rtpcleaner/cleaner_test.go @@ -154,7 +154,7 @@ func TestH264ProcessEvenIfInvalid(t *testing.T) { }, Payload: []byte{25}, }) - require.NoError(t, err) + require.Error(t, err) require.Equal(t, []*Output{{ Packet: &rtp.Packet{ Header: rtp.Header{ @@ -167,3 +167,48 @@ func TestH264ProcessEvenIfInvalid(t *testing.T) { }, }}, out) } + +func TestH264RandomAccess(t *testing.T) { + for _, ca := range []string{ + "standard", + "oversized", + } { + t.Run(ca, func(t *testing.T) { + cleaner := New(true, true) + + var payload []byte + if ca == "standard" { + payload = append([]byte{0x1C, 1 << 6}, + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 10/5)...) + } else { + payload = append([]byte{0x1C, 1 << 6}, + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 2048/5)...) + } + + out, err := cleaner.Process(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + SequenceNumber: 34572, + }, + Payload: payload, + }) + require.NoError(t, err) + + if ca == "standard" { + require.Equal(t, []*Output{{ + Packet: &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + SequenceNumber: 34572, + }, + Payload: payload, + }, + }}, out) + } else { + require.Equal(t, []*Output(nil), out) + } + }) + } +} diff --git a/server_publish_test.go b/server_publish_test.go index 2abc8557..f079b763 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -1483,6 +1483,7 @@ func TestServerPublishDecodeErrors(t *testing.T) { "packets lost", "rtp too big", "rtcp too big", + "cleaner error", } { t.Run(ca, func(t *testing.T) { errorRecv := make(chan struct{}) @@ -1516,6 +1517,8 @@ func TestServerPublishDecodeErrors(t *testing.T) { require.EqualError(t, ctx.Error, "RTP packet is too big to be read with UDP") case "rtcp too big": require.EqualError(t, ctx.Error, "RTCP packet is too big to be read with UDP") + case "cleaner error": + require.EqualError(t, ctx.Error, "packet type not supported (STAP-B)") } close(errorRecv) }, @@ -1534,11 +1537,23 @@ func TestServerPublishDecodeErrors(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - tracks := Tracks{&TrackH264{ - PayloadType: 96, - SPS: []byte{0x01, 0x02, 0x03, 0x04}, - PPS: []byte{0x01, 0x02, 0x03, 0x04}, - }} + var track Track + if ca != "cleaner error" { + track = &TrackGeneric{ + Media: "application", + Payloads: []TrackGenericPayload{{ + Type: 97, + RTPMap: "private/90000", + }}, + } + } else { + track = &TrackH264{ + PayloadType: 96, + SPS: []byte{0x01, 0x02, 0x03, 0x04}, + PPS: []byte{0x01, 0x02, 0x03, 0x04}, + } + } + tracks := Tracks{track} tracks.setControls() res, err := writeReqReadRes(conn, base.Request{ @@ -1649,6 +1664,18 @@ func TestServerPublishDecodeErrors(t *testing.T) { IP: net.ParseIP("127.0.0.1"), Port: resTH.ServerPorts[1], }) + + case "cleaner error": + byts, _ := rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 100, + }, + Payload: []byte{0x99}, + }.Marshal() + l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: resTH.ServerPorts[0], + }) } <-errorRecv diff --git a/serverconn.go b/serverconn.go index 6e052adc..528ee1d4 100644 --- a/serverconn.go +++ b/serverconn.go @@ -257,7 +257,8 @@ func (sc *ServerConn) readFuncTCP(readRequest chan readReq) error { out, err := sc.session.setuppedTracks[trackID].cleaner.Process(pkt) if err != nil { - return err + onDecodeError(sc.session, err) + // do not return } if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTP); ok { diff --git a/serversession.go b/serversession.go index 62c32d0c..d625bd12 100644 --- a/serversession.go +++ b/serversession.go @@ -176,7 +176,7 @@ type ServerSession struct { lastRequestTime time.Time tcpConn *ServerConn announcedTracks Tracks // publish - udpLastFrameTime *int64 // publish + udpLastPacketTime *int64 // publish udpCheckStreamTimer *time.Timer writerRunning bool writeBuffer *ringbuffer.RingBuffer @@ -412,7 +412,7 @@ func (ss *ServerSession) runInner() error { // in case of RECORD, timeout happens when no RTP or RTCP packets are being received if ss.state == ServerSessionStateRecord { - lft := atomic.LoadInt64(ss.udpLastFrameTime) + lft := atomic.LoadInt64(ss.udpLastPacketTime) if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout { return liberrors.ErrServerNoUDPPacketsInAWhile{} } @@ -565,7 +565,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.announcedTracks = tracks v := time.Now().Unix() - ss.udpLastFrameTime = &v + ss.udpLastPacketTime = &v return res, err case base.Setup: diff --git a/serverudpl.go b/serverudpl.go index b2cdff94..42eff354 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -13,7 +13,7 @@ import ( ) type clientData struct { - ss *ServerSession + session *ServerSession track *ServerSessionSetuppedTrack isPublishing bool } @@ -34,6 +34,15 @@ func (p *clientAddr) fill(ip net.IP, port int) { } } +func onDecodeError(ss *ServerSession, err error) { + if h, ok := ss.s.Handler.(ServerHandlerOnDecodeError); ok { + h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ + Session: ss, + Error: err, + }) + } +} + type serverUDPListener struct { s *Server @@ -193,95 +202,67 @@ func (u *serverUDPListener) runReader() { func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) { if len(payload) == (maxPacketSize + 1) { - if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok { - h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ - Session: clientData.ss, - Error: fmt.Errorf("RTP packet is too big to be read with UDP"), - }) - } + onDecodeError(clientData.session, fmt.Errorf("RTP packet is too big to be read with UDP")) return } pkt := u.s.udpRTPPacketBuffer.next() err := pkt.Unmarshal(payload) if err != nil { - if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok { - h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ - Session: clientData.ss, - Error: err, - }) - } + onDecodeError(clientData.session, err) return } - packets, missing := clientData.track.reorderer.Process(pkt) + now := time.Now() + atomic.StoreInt64(clientData.session.udpLastPacketTime, now.Unix()) + packets, missing := clientData.track.reorderer.Process(pkt) if missing != 0 { - if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok { - h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ - Session: clientData.ss, - Error: fmt.Errorf("%d RTP packet(s) lost", missing), - }) - } + onDecodeError(clientData.session, fmt.Errorf("%d RTP packet(s) lost", missing)) + // do not return } for _, pkt := range packets { - now := time.Now() - atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) - out, err := clientData.track.cleaner.Process(pkt) if err != nil { - if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok { - h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ - Session: clientData.ss, - Error: err, - }) - } - continue + onDecodeError(clientData.session, err) + // do not return } - out0 := out[0] + if out != nil { + out0 := out[0] - clientData.track.udpRTCPReceiver.ProcessPacketRTP(now, pkt, out0.PTSEqualsDTS) + clientData.track.udpRTCPReceiver.ProcessPacketRTP(now, pkt, out0.PTSEqualsDTS) - if h, ok := clientData.ss.s.Handler.(ServerHandlerOnPacketRTP); ok { - h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ - Session: clientData.ss, - TrackID: clientData.track.id, - Packet: out0.Packet, - PTSEqualsDTS: out0.PTSEqualsDTS, - H264NALUs: out0.H264NALUs, - H264PTS: out0.H264PTS, - }) + if h, ok := clientData.session.s.Handler.(ServerHandlerOnPacketRTP); ok { + h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ + Session: clientData.session, + TrackID: clientData.track.id, + Packet: out0.Packet, + PTSEqualsDTS: out0.PTSEqualsDTS, + H264NALUs: out0.H264NALUs, + H264PTS: out0.H264PTS, + }) + } } } } func (u *serverUDPListener) processRTCP(clientData *clientData, payload []byte) { if len(payload) == (maxPacketSize + 1) { - if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok { - h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ - Session: clientData.ss, - Error: fmt.Errorf("RTCP packet is too big to be read with UDP"), - }) - } + onDecodeError(clientData.session, fmt.Errorf("RTCP packet is too big to be read with UDP")) return } packets, err := rtcp.Unmarshal(payload) if err != nil { - if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok { - h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ - Session: clientData.ss, - Error: err, - }) - } + onDecodeError(clientData.session, err) return } if clientData.isPublishing { now := time.Now() - atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) + atomic.StoreInt64(clientData.session.udpLastPacketTime, now.Unix()) for _, pkt := range packets { clientData.track.udpRTCPReceiver.ProcessPacketRTCP(now, pkt) @@ -289,7 +270,7 @@ func (u *serverUDPListener) processRTCP(clientData *clientData, payload []byte) } for _, pkt := range packets { - clientData.ss.onPacketRTCP(clientData.track.id, pkt) + clientData.session.onPacketRTCP(clientData.track.id, pkt) } } @@ -312,7 +293,7 @@ func (u *serverUDPListener) addClient(ip net.IP, port int, ss *ServerSession, addr.fill(ip, port) u.clients[addr] = &clientData{ - ss: ss, + session: ss, track: track, isPublishing: isPublishing, } @@ -323,7 +304,7 @@ func (u *serverUDPListener) removeClient(ss *ServerSession) { defer u.clientsMutex.Unlock() for addr, data := range u.clients { - if data.ss == ss { + if data.session == ss { delete(u.clients, addr) } }