From 592bd7451c4bce03dd99fd4a56754e6171bb719c Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Wed, 23 Sep 2020 22:09:17 +0200 Subject: [PATCH] simplify connClientUDPListener --- connclient.go | 29 +++++++++++++++++++++++------ connclientudpl.go | 14 +------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/connclient.go b/connclient.go index 930d4da4..ec56fbec 100644 --- a/connclient.go +++ b/connclient.go @@ -164,9 +164,26 @@ func (c *ConnClient) ReadFrame() (*InterleavedFrame, error) { // ReadFrameUDP reads an UDP frame. func (c *ConnClient) ReadFrameUDP(track *Track, streamType StreamType) ([]byte, error) { if streamType == StreamTypeRtp { - return c.udpRtpListeners[track.Id].read() + buf, err := c.udpRtpListeners[track.Id].read() + if err != nil { + return nil, err + } + + atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix()) + c.rtcpReceivers[track.Id].OnFrame(streamType, buf) + + return buf, nil } - return c.udpRtcpListeners[track.Id].read() + + buf, err := c.udpRtcpListeners[track.Id].read() + if err != nil { + return nil, err + } + + atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix()) + c.rtcpReceivers[track.Id].OnFrame(streamType, buf) + + return buf, nil } func (c *ConnClient) readFrameOrResponse() (interface{}, error) { @@ -440,12 +457,12 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, rtpListener, rtcpListener, err := func() (*connClientUDPListener, *connClientUDPListener, error) { if rtpPort != 0 { - rtpListener, err := newConnClientUDPListener(c, rtpPort, track.Id, StreamTypeRtp) + rtpListener, err := newConnClientUDPListener(c, rtpPort) if err != nil { return nil, nil, err } - rtcpListener, err := newConnClientUDPListener(c, rtcpPort, track.Id, StreamTypeRtcp) + rtcpListener, err := newConnClientUDPListener(c, rtcpPort) if err != nil { rtpListener.close() return nil, nil, err @@ -460,12 +477,12 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000 rtcpPort = rtpPort + 1 - rtpListener, err := newConnClientUDPListener(c, rtpPort, track.Id, StreamTypeRtp) + rtpListener, err := newConnClientUDPListener(c, rtpPort) if err != nil { continue } - rtcpListener, err := newConnClientUDPListener(c, rtcpPort, track.Id, StreamTypeRtcp) + rtcpListener, err := newConnClientUDPListener(c, rtcpPort) if err != nil { rtpListener.close() continue diff --git a/connclientudpl.go b/connclientudpl.go index 6832f08a..e9e914b7 100644 --- a/connclientudpl.go +++ b/connclientudpl.go @@ -3,31 +3,23 @@ package gortsplib import ( "net" "strconv" - "sync/atomic" - "time" ) type connClientUDPListener struct { - c *ConnClient pc net.PacketConn - trackId int - streamType StreamType publisherIp net.IP publisherPort int udpFrameReadBuf *MultiBuffer } -func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType StreamType) (*connClientUDPListener, error) { +func newConnClientUDPListener(c *ConnClient, port int) (*connClientUDPListener, error) { pc, err := c.conf.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10)) if err != nil { return nil, err } return &connClientUDPListener{ - c: c, pc: pc, - trackId: trackId, - streamType: streamType, udpFrameReadBuf: NewMultiBuffer(c.conf.ReadBufferCount, 2048), }, nil } @@ -50,10 +42,6 @@ func (l *connClientUDPListener) read() ([]byte, error) { continue } - atomic.StoreInt64(l.c.udpLastFrameTimes[l.trackId], time.Now().Unix()) - - l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) - return buf[:n], nil } }