diff --git a/client.go b/client.go index 98c9f1b9..1c6d4628 100644 --- a/client.go +++ b/client.go @@ -736,66 +736,52 @@ func (c *Client) runReader() error { } } } else { + var processFunc func(int, bool, []byte) + if c.state == clientStatePlay { - for { - frame := base.InterleavedFrame{ - Payload: c.tcpFrameBuffer.Next(), - } - err := frame.Read(c.br) - if err != nil { - return err - } - - channel := frame.Channel - isRTP := true - if (channel % 2) != 0 { - channel-- - isRTP = false - } - - trackID, ok := c.tracksByChannel[channel] - if !ok { - continue - } - + processFunc = func(trackID int, isRTP bool, payload []byte) { now := time.Now() atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix()) if isRTP { - c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, frame.Payload) - c.OnPacketRTP(trackID, frame.Payload) + c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, payload) + c.OnPacketRTP(trackID, payload) } else { - c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, frame.Payload) - c.OnPacketRTCP(trackID, frame.Payload) + c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, payload) + c.OnPacketRTCP(trackID, payload) } } - } else { // Record - for { - frame := base.InterleavedFrame{ - Payload: c.tcpFrameBuffer.Next(), - } - err := frame.Read(c.br) - if err != nil { - return err - } - - channel := frame.Channel - isRTP := true - if (channel % 2) != 0 { - channel-- - isRTP = false - } - - trackID, ok := c.tracksByChannel[channel] - if !ok { - continue - } - + } else { + processFunc = func(trackID int, isRTP bool, payload []byte) { if !isRTP { - c.OnPacketRTCP(trackID, frame.Payload) + c.OnPacketRTCP(trackID, payload) } } } + + for { + frame := base.InterleavedFrame{ + Payload: c.tcpFrameBuffer.Next(), + } + err := frame.Read(c.br) + if err != nil { + return err + } + + channel := frame.Channel + isRTP := true + if (channel % 2) != 0 { + channel-- + isRTP = false + } + + trackID, ok := c.tracksByChannel[channel] + if !ok { + continue + } + + processFunc(trackID, isRTP, frame.Payload) + } } } diff --git a/clientudpl.go b/clientudpl.go index 26962d6d..2651b694 100644 --- a/clientudpl.go +++ b/clientudpl.go @@ -41,6 +41,7 @@ type clientUDPListener struct { frameBuffer *multibuffer.MultiBuffer lastFrameTime *int64 writeMutex sync.Mutex + processFunc func(time.Time, []byte) // out done chan struct{} @@ -136,6 +137,16 @@ func (l *clientUDPListener) port() int { } func (l *clientUDPListener) start() { + if l.c.state == clientStatePlay { + if l.isRTP { + l.processFunc = l.processPlayRTP + } else { + l.processFunc = l.processPlayRTCP + } + } else { + l.processFunc = l.processRecord + } + l.running = true l.pc.SetReadDeadline(time.Time{}) l.done = make(chan struct{}) @@ -150,52 +161,40 @@ func (l *clientUDPListener) stop() { func (l *clientUDPListener) run() { defer close(l.done) - if l.c.state == clientStatePlay { - for { - buf := l.frameBuffer.Next() - n, addr, err := l.pc.ReadFrom(buf) - if err != nil { - return - } - - uaddr := addr.(*net.UDPAddr) - - if !l.remoteReadIP.Equal(uaddr.IP) || (!isAnyPort(l.remotePort) && l.remotePort != uaddr.Port) { - continue - } - - now := time.Now() - atomic.StoreInt64(l.lastFrameTime, now.Unix()) - - if l.isRTP { - l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n]) - l.c.OnPacketRTP(l.trackID, buf[:n]) - } else { - l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n]) - l.c.OnPacketRTCP(l.trackID, buf[:n]) - } + for { + buf := l.frameBuffer.Next() + n, addr, err := l.pc.ReadFrom(buf) + if err != nil { + return } - } else { // record - for { - buf := l.frameBuffer.Next() - n, addr, err := l.pc.ReadFrom(buf) - if err != nil { - return - } - uaddr := addr.(*net.UDPAddr) + uaddr := addr.(*net.UDPAddr) - if !l.remoteReadIP.Equal(uaddr.IP) || (!isAnyPort(l.remotePort) && l.remotePort != uaddr.Port) { - continue - } - - now := time.Now() - atomic.StoreInt64(l.lastFrameTime, now.Unix()) - l.c.OnPacketRTCP(l.trackID, buf[:n]) + if !l.remoteReadIP.Equal(uaddr.IP) || (!isAnyPort(l.remotePort) && l.remotePort != uaddr.Port) { + continue } + + now := time.Now() + atomic.StoreInt64(l.lastFrameTime, now.Unix()) + + l.processFunc(now, buf[:n]) } } +func (l *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { + l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, payload) + l.c.OnPacketRTP(l.trackID, payload) +} + +func (l *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) { + l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, payload) + l.c.OnPacketRTCP(l.trackID, payload) +} + +func (l *clientUDPListener) processRecord(now time.Time, payload []byte) { + l.c.OnPacketRTCP(l.trackID, payload) +} + func (l *clientUDPListener) write(buf []byte) error { l.writeMutex.Lock() defer l.writeMutex.Unlock() diff --git a/serverudpl.go b/serverudpl.go index 4feeefb5..ee9a6067 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -60,6 +60,7 @@ type serverUDPListener struct { clientsMutex sync.RWMutex clients map[clientAddr]*clientData ringBuffer *ringbuffer.RingBuffer + processFunc func(time.Time, *clientData, []byte) } func newServerUDPListenerMulticastPair(s *Server) (*serverUDPListener, *serverUDPListener, error) { @@ -157,6 +158,12 @@ func newServerUDPListener( ringBuffer: ringbuffer.New(uint64(s.ReadBufferCount)), } + if isRTP { + u.processFunc = u.processRTP + } else { + u.processFunc = u.processRTCP + } + u.wg.Add(1) go u.run() @@ -206,29 +213,7 @@ func (u *serverUDPListener) run() { atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) } - if u.isRTP { - clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n]) - - if h, ok := u.s.Handler.(ServerHandlerOnPacketRTP); ok { - h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ - Session: clientData.ss, - TrackID: clientData.trackID, - Payload: buf[:n], - }) - } - } else { - if clientData.isPublishing { - clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n]) - } - - if h, ok := u.s.Handler.(ServerHandlerOnPacketRTCP); ok { - h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{ - Session: clientData.ss, - TrackID: clientData.trackID, - Payload: buf[:n], - }) - } - } + u.processFunc(now, clientData, buf[:n]) }() } }() @@ -255,6 +240,32 @@ func (u *serverUDPListener) run() { u.ringBuffer.Close() } +func (u *serverUDPListener) processRTP(now time.Time, clientData *clientData, payload []byte) { + clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, payload) + + if h, ok := u.s.Handler.(ServerHandlerOnPacketRTP); ok { + h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ + Session: clientData.ss, + TrackID: clientData.trackID, + Payload: payload, + }) + } +} + +func (u *serverUDPListener) processRTCP(now time.Time, clientData *clientData, payload []byte) { + if clientData.isPublishing { + clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTCP(now, payload) + } + + if h, ok := u.s.Handler.(ServerHandlerOnPacketRTCP); ok { + h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{ + Session: clientData.ss, + TrackID: clientData.trackID, + Payload: payload, + }) + } +} + func (u *serverUDPListener) write(buf []byte, addr *net.UDPAddr) { u.ringBuffer.Push(bufAddrPair{buf, addr}) }