From 0e6811a17847cfab4d64707ffa72395769a07eb5 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 28 Mar 2021 21:36:12 +0200 Subject: [PATCH] client: allow calling ReadFrames() when publishing --- clientconn.go | 120 +++++++++++++++++++-- clientconnpublish.go | 184 +++++++++++++++++--------------- clientconnpublish_test.go | 171 ++++++++++++++++++++++++++++++ clientconnread.go | 79 -------------- clientconnread_test.go | 214 ++++++++++++++++++++++---------------- clientconnudpl.go | 46 +++++--- 6 files changed, 543 insertions(+), 271 deletions(-) diff --git a/clientconn.go b/clientconn.go index 1622bfd7..07d4b17a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -88,11 +88,13 @@ type ClientConn struct { writeFrameAllowed bool writeError error backgroundRunning bool + readCB func(int, StreamType, []byte) + + // TCP stream protocol + tcpFrameBuffer *multibuffer.MultiBuffer // read - rtpInfo *headers.RTPInfo - tcpFrameBuffer *multibuffer.MultiBuffer - readCB func(int, StreamType, []byte) + rtpInfo *headers.RTPInfo // in backgroundTerminate chan struct{} @@ -695,15 +697,15 @@ func (cc *ClientConn) Setup(mode headers.TransportMode, track *Track, if mode == headers.TransportModePlay { cc.state = clientConnStatePrePlay - - if *cc.streamProtocol == StreamProtocolTCP && cc.tcpFrameBuffer == nil { - cc.tcpFrameBuffer = multibuffer.New(uint64(cc.conf.ReadBufferCount), uint64(cc.conf.ReadBufferSize)) - } - } else { cc.state = clientConnStatePreRecord } + if *cc.streamProtocol == StreamProtocolTCP && + cc.tcpFrameBuffer == nil { + cc.tcpFrameBuffer = multibuffer.New(uint64(cc.conf.ReadBufferCount), uint64(cc.conf.ReadBufferSize)) + } + return res, nil } @@ -775,3 +777,105 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b } return frame.Write(cc.bw) } + +// ReadFrames starts reading frames. +// it returns a channel that is written when the reading stops. +func (cc *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) chan error { + // channel is buffered, since listening to it is not mandatory + done := make(chan error, 1) + + err := cc.checkState(map[clientConnState]struct{}{ + clientConnStatePlay: {}, + clientConnStateRecord: {}, + }) + if err != nil { + done <- err + return done + } + + // close previous ReadFrames() + if cc.backgroundRunning { + close(cc.backgroundTerminate) + <-cc.backgroundDone + } + + cc.backgroundRunning = true + cc.backgroundTerminate = make(chan struct{}) + cc.backgroundDone = make(chan struct{}) + cc.readCB = onFrame + cc.writeFrameAllowed = true + + go func() { + done <- func() error { + safeState := cc.state + err := func() error { + if *cc.streamProtocol == StreamProtocolUDP { + if cc.state == clientConnStatePlay { + return cc.backgroundPlayUDP() + } + return cc.backgroundRecordUDP() + } + + if cc.state == clientConnStatePlay { + return cc.backgroundPlayTCP() + } + return cc.backgroundRecordTCP() + }() + + cc.writeError = err + + func() { + cc.writeMutex.Lock() + defer cc.writeMutex.Unlock() + cc.writeFrameAllowed = false + }() + + close(cc.backgroundDone) + + // automatically change protocol in case of timeout + if *cc.streamProtocol == StreamProtocolUDP && + safeState == clientConnStatePlay { + if _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently); ok { + if cc.conf.StreamProtocol == nil { + prevURL := cc.streamURL + prevTracks := cc.tracks + cc.reset() + v := StreamProtocolTCP + cc.streamProtocol = &v + + err := cc.connOpen(prevURL.Scheme, prevURL.Host) + if err != nil { + return err + } + + _, err = cc.Options(prevURL) + if err != nil { + cc.Close() + return err + } + + for _, track := range prevTracks { + _, err := cc.Setup(headers.TransportModePlay, track.track, 0, 0) + if err != nil { + cc.Close() + return err + } + } + + _, err = cc.Play() + if err != nil { + cc.Close() + return err + } + + return <-cc.ReadFrames(onFrame) + } + } + } + + return err + }() + }() + + return done +} diff --git a/clientconnpublish.go b/clientconnpublish.go index 84404b30..85ad71ea 100644 --- a/clientconnpublish.go +++ b/clientconnpublish.go @@ -54,70 +54,6 @@ func (cc *ClientConn) Announce(u *base.URL, tracks Tracks) (*base.Response, erro return res, nil } -func (cc *ClientConn) backgroundRecordUDP() { - // disable deadline - cc.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - var res base.Response - err := res.Read(cc.br) - if err != nil { - readerDone <- err - return - } - } - }() - - reportTicker := time.NewTicker(cc.conf.senderReportPeriod) - defer reportTicker.Stop() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - cc.writeError = fmt.Errorf("terminated") - return - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - sr := cct.rtcpSender.Report(now) - if sr != nil { - cc.WriteFrame(trackID, StreamTypeRTCP, sr) - } - } - - case err := <-readerDone: - cc.writeError = err - return - } - } -} - -func (cc *ClientConn) backgroundRecordTCP() { - reportTicker := time.NewTicker(cc.conf.senderReportPeriod) - defer reportTicker.Stop() - - for { - select { - case <-cc.backgroundTerminate: - return - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - sr := cct.rtcpSender.Report(now) - if sr != nil { - cc.WriteFrame(trackID, StreamTypeRTCP, sr) - } - } - } - } -} - // Record writes a RECORD request and reads a Response. // This can be called only after Announce() and Setup(). func (cc *ClientConn) Record() (*base.Response, error) { @@ -142,27 +78,107 @@ func (cc *ClientConn) Record() (*base.Response, error) { } cc.state = clientConnStateRecord - cc.writeFrameAllowed = true - cc.backgroundRunning = true - cc.backgroundTerminate = make(chan struct{}) - cc.backgroundDone = make(chan struct{}) - - go func() { - defer close(cc.backgroundDone) - - defer func() { - cc.writeMutex.Lock() - defer cc.writeMutex.Unlock() - cc.writeFrameAllowed = false - }() - - if *cc.streamProtocol == StreamProtocolUDP { - cc.backgroundRecordUDP() - } else { - cc.backgroundRecordTCP() - } - }() + cc.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { + }) return nil, nil } + +func (cc *ClientConn) backgroundRecordUDP() error { + for _, cct := range cc.tracks { + cct.udpRTPListener.start() + cct.udpRTCPListener.start() + } + + defer func() { + for _, cct := range cc.tracks { + cct.udpRTPListener.stop() + cct.udpRTCPListener.stop() + } + }() + + // disable deadline + cc.nconn.SetReadDeadline(time.Time{}) + + readerDone := make(chan error) + go func() { + for { + var res base.Response + err := res.Read(cc.br) + if err != nil { + readerDone <- err + return + } + } + }() + + reportTicker := time.NewTicker(cc.conf.senderReportPeriod) + defer reportTicker.Stop() + + for { + select { + case <-cc.backgroundTerminate: + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range cc.tracks { + sr := cct.rtcpSender.Report(now) + if sr != nil { + cc.WriteFrame(trackID, StreamTypeRTCP, sr) + } + } + + case err := <-readerDone: + return err + } + } +} + +func (cc *ClientConn) backgroundRecordTCP() error { + // disable deadline + cc.nconn.SetReadDeadline(time.Time{}) + + readerDone := make(chan error) + go func() { + for { + frame := base.InterleavedFrame{ + Payload: cc.tcpFrameBuffer.Next(), + } + err := frame.Read(cc.br) + if err != nil { + readerDone <- err + return + } + + cc.readCB(frame.TrackID, frame.StreamType, frame.Payload) + } + }() + + reportTicker := time.NewTicker(cc.conf.senderReportPeriod) + defer reportTicker.Stop() + + for { + select { + case <-cc.backgroundTerminate: + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range cc.tracks { + sr := cct.rtcpSender.Report(now) + if sr != nil { + cc.WriteFrame(trackID, StreamTypeRTCP, sr) + } + } + + case err := <-readerDone: + return err + } + } +} diff --git a/clientconnpublish_test.go b/clientconnpublish_test.go index 15b32636..534f8439 100644 --- a/clientconnpublish_test.go +++ b/clientconnpublish_test.go @@ -753,3 +753,174 @@ func TestClientPublishRTCP(t *testing.T) { err = conn.WriteFrame(track.ID, StreamTypeRTP, byts) require.NoError(t, err) } + +func TestClientPublishReadManualRTCP(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := l.Accept() + require.NoError(t, err) + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + var req base.Request + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Announce), + string(base.Setup), + string(base.Record), + }, ", ")}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Announce, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) + + var inTH headers.Transport + err = inTH.Read(req.Header["Transport"]) + require.NoError(t, err) + + th := headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + } + + var l1 net.PacketConn + if proto == "udp" { + var err error + l1, err = net.ListenPacket("udp", "localhost:34557") + require.NoError(t, err) + defer l1.Close() + + th.Protocol = StreamProtocolUDP + th.ServerPorts = &[2]int{34556, 34557} + th.ClientPorts = inTH.ClientPorts + + } else { + th.Protocol = StreamProtocolTCP + th.InterleavedIDs = inTH.InterleavedIDs + } + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": th.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Record, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + + if proto == "udp" { + buf := make([]byte, 2048) + n, _, err := l1.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, buf[:n]) + + } else { + var f base.InterleavedFrame + f.Payload = make([]byte, 2048) + err = f.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) + } + + if proto == "udp" { + l1.WriteTo([]byte{0x01, 0x02, 0x03, 0x04}, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ClientPorts[1], + }) + + } else { + err = base.InterleavedFrame{ + TrackID: 0, + StreamType: StreamTypeRTCP, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + }.Write(bconn.Writer) + require.NoError(t, err) + } + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Teardown, req.Method) + + base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + + conn.Close() + }() + + conf := ClientConf{ + StreamProtocol: func() *StreamProtocol { + if proto == "udp" { + v := StreamProtocolUDP + return &v + } + v := StreamProtocolTCP + return &v + }(), + } + + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", + Tracks{track}) + require.NoError(t, err) + + recvDone := make(chan struct{}) + done := conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { + require.Equal(t, 0, trackID) + require.Equal(t, StreamTypeRTCP, streamType) + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) + close(recvDone) + }) + + err = conn.WriteFrame(track.ID, StreamTypeRTCP, + []byte{0x05, 0x06, 0x07, 0x08}) + require.NoError(t, err) + + <-recvDone + conn.Close() + <-done + }) + } +} diff --git a/clientconnread.go b/clientconnread.go index f25f8fed..014839fa 100644 --- a/clientconnread.go +++ b/clientconnread.go @@ -43,7 +43,6 @@ func (cc *ClientConn) Play() (*base.Response, error) { } cc.state = clientConnStatePlay - cc.writeFrameAllowed = true return res, nil } @@ -246,81 +245,3 @@ func (cc *ClientConn) backgroundPlayTCP() error { } } } - -// ReadFrames starts reading frames. -// it returns a channel that is written when the reading stops. -// This can be called only after Play(). -func (cc *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) chan error { - // channel is buffered, since listening to it is not mandatory - done := make(chan error, 1) - - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStatePlay: {}, - }) - if err != nil { - done <- err - return done - } - - cc.backgroundRunning = true - cc.backgroundTerminate = make(chan struct{}) - cc.backgroundDone = make(chan struct{}) - cc.readCB = onFrame - - go func() { - if *cc.streamProtocol == StreamProtocolUDP { - err := cc.backgroundPlayUDP() - close(cc.backgroundDone) - - // automatically change protocol in case of timeout - if _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently); ok { - if cc.conf.StreamProtocol == nil { - err := func() error { - prevURL := cc.streamURL - prevTracks := cc.tracks - cc.reset() - v := StreamProtocolTCP - cc.streamProtocol = &v - - err := cc.connOpen(prevURL.Scheme, prevURL.Host) - if err != nil { - return err - } - - _, err = cc.Options(prevURL) - if err != nil { - cc.Close() - return err - } - - for _, track := range prevTracks { - _, err := cc.Setup(headers.TransportModePlay, track.track, 0, 0) - if err != nil { - cc.Close() - return err - } - } - - _, err = cc.Play() - if err != nil { - cc.Close() - return err - } - - return <-cc.ReadFrames(onFrame) - }() - done <- err - return - } - } - - done <- err - - } else { - defer close(cc.backgroundDone) - done <- cc.backgroundPlayTCP() - } - }() - - return done -} diff --git a/clientconnread_test.go b/clientconnread_test.go index 502770cf..4f7abc20 100644 --- a/clientconnread_test.go +++ b/clientconnread_test.go @@ -197,11 +197,6 @@ func TestClientRead(t *testing.T) { <-frameRecv conn.Close() <-done - - done = conn.ReadFrames(func(id int, typ StreamType, payload []byte) { - t.Error("should not happen") - }) - <-done }) } } @@ -1147,105 +1142,150 @@ func TestClientReadRTCP(t *testing.T) { } func TestClientReadWriteManualRTCP(t *testing.T) { - l, err := net.Listen("tcp", "localhost:8554") - require.NoError(t, err) - defer l.Close() + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() - serverDone := make(chan struct{}) - defer func() { <-serverDone }() - go func() { - defer close(serverDone) + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) - conn, err := l.Accept() - require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + conn, err := l.Accept() + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - var req base.Request - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Options, req.Method) + var req base.Request + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Public": base.HeaderValue{strings.Join([]string{ - string(base.Describe), - string(base.Setup), - string(base.Play), - }, ", ")}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + string(base.Setup), + string(base.Play), + }, ", ")}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Describe, req.Method) + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Describe, req.Method) - track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) - require.NoError(t, err) + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Content-Type": base.HeaderValue{"application/sdp"}, - }, - Body: Tracks{track}.Write(), - }.Write(bconn.Writer) - require.NoError(t, err) + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: Tracks{track}.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Setup, req.Method) + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) - var th headers.Transport - err = th.Read(req.Header["Transport"]) - require.NoError(t, err) + var inTH headers.Transport + err = inTH.Read(req.Header["Transport"]) + require.NoError(t, err) - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + th := headers.Transport{ Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v }(), - ClientPorts: th.ClientPorts, - InterleavedIDs: &[2]int{0, 1}, - }.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) + } - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Play, req.Method) + var l1 net.PacketConn + if proto == "udp" { + var err error + l1, err = net.ListenPacket("udp", "localhost:34557") + require.NoError(t, err) + defer l1.Close() - err = base.Response{ - StatusCode: base.StatusOK, - }.Write(bconn.Writer) - require.NoError(t, err) + th.Protocol = StreamProtocolUDP + th.ServerPorts = &[2]int{34556, 34557} + th.ClientPorts = inTH.ClientPorts - var f base.InterleavedFrame - f.Payload = make([]byte, 2048) - err = f.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, StreamTypeRTCP, f.StreamType) - require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) - }() + } else { + th.Protocol = StreamProtocolTCP + th.InterleavedIDs = inTH.InterleavedIDs + } - conf := ClientConf{ - StreamProtocol: func() *StreamProtocol { - v := StreamProtocolTCP - return &v - }(), + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": th.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Play, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + + if proto == "udp" { + buf := make([]byte, 2048) + + // skip firewall opening + _, _, err := l1.ReadFrom(buf) + require.NoError(t, err) + + n, _, err := l1.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, buf[:n]) + + } else { + var f base.InterleavedFrame + f.Payload = make([]byte, 2048) + err = f.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, 0, f.TrackID) + require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) + } + }() + + conf := ClientConf{ + StreamProtocol: func() *StreamProtocol { + if proto == "udp" { + v := StreamProtocolUDP + return &v + } + v := StreamProtocolTCP + return &v + }(), + } + + conn, err := conf.DialRead("rtsp://localhost:8554/teststream") + require.NoError(t, err) + defer conn.Close() + + conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { + }) + + time.Sleep(500 * time.Millisecond) + + err = conn.WriteFrame(0, StreamTypeRTCP, []byte{0x01, 0x02, 0x03, 0x04}) + require.NoError(t, err) + }) } - - conn, err := conf.DialRead("rtsp://localhost:8554/teststream") - require.NoError(t, err) - defer conn.Close() - - err = conn.WriteFrame(0, StreamTypeRTCP, []byte{0x01, 0x02, 0x03, 0x04}) - require.NoError(t, err) } diff --git a/clientconnudpl.go b/clientconnudpl.go index 49c473fe..ff8b6781 100644 --- a/clientconnudpl.go +++ b/clientconnudpl.go @@ -73,23 +73,43 @@ func (l *clientConnUDPListener) stop() { func (l *clientConnUDPListener) run() { defer close(l.done) - for { - buf := l.frameBuffer.Next() - n, addr, err := l.pc.ReadFrom(buf) - if err != nil { - return + if l.cc.state == clientConnStatePlay { + for { + buf := l.frameBuffer.Next() + n, addr, err := l.pc.ReadFrom(buf) + if err != nil { + return + } + + uaddr := addr.(*net.UDPAddr) + + if !l.remoteIP.Equal(uaddr.IP) || (l.remotePort != 0 && l.remotePort != uaddr.Port) { + continue + } + + now := time.Now() + atomic.StoreInt64(l.lastFrameTime, now.Unix()) + l.cc.tracks[l.trackID].rtcpReceiver.ProcessFrame(now, l.streamType, buf[:n]) + l.cc.readCB(l.trackID, l.streamType, buf[:n]) } + } else { + for { + buf := l.frameBuffer.Next() + n, addr, err := l.pc.ReadFrom(buf) + if err != nil { + return + } - uaddr := addr.(*net.UDPAddr) + uaddr := addr.(*net.UDPAddr) - if !l.remoteIP.Equal(uaddr.IP) || (l.remotePort != 0 && l.remotePort != uaddr.Port) { - continue + if !l.remoteIP.Equal(uaddr.IP) || (l.remotePort != 0 && l.remotePort != uaddr.Port) { + continue + } + + now := time.Now() + atomic.StoreInt64(l.lastFrameTime, now.Unix()) + l.cc.readCB(l.trackID, l.streamType, buf[:n]) } - - now := time.Now() - atomic.StoreInt64(l.lastFrameTime, now.Unix()) - l.cc.tracks[l.trackID].rtcpReceiver.ProcessFrame(now, l.streamType, buf[:n]) - l.cc.readCB(l.trackID, l.streamType, buf[:n]) } }