From e2b52d0b8410a86c0422076e9a348564468ddd05 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 31 Oct 2021 23:27:28 +0100 Subject: [PATCH] server: do not use StreamType --- server.go | 4 ++-- serverconn.go | 32 ++++++++++++++++---------------- serverudpl.go | 45 ++++++++++++++++++++++----------------------- 3 files changed, 40 insertions(+), 41 deletions(-) diff --git a/server.go b/server.go index 5b8aea43..79061269 100644 --- a/server.go +++ b/server.go @@ -222,12 +222,12 @@ func (s *Server) Start() error { return fmt.Errorf("RTP and RTCP ports must be consecutive") } - s.udpRTPListener, err = newServerUDPListener(s, false, s.UDPRTPAddress, StreamTypeRTP) + s.udpRTPListener, err = newServerUDPListener(s, false, s.UDPRTPAddress, true) if err != nil { return err } - s.udpRTCPListener, err = newServerUDPListener(s, false, s.UDPRTCPAddress, StreamTypeRTCP) + s.udpRTCPListener, err = newServerUDPListener(s, false, s.UDPRTCPAddress, false) if err != nil { s.udpRTPListener.close() return err diff --git a/serverconn.go b/serverconn.go index a640c1cd..855a0948 100644 --- a/serverconn.go +++ b/serverconn.go @@ -146,34 +146,34 @@ func (sc *ServerConn) run() { switch what.(type) { case *base.InterleavedFrame: channel := frame.Channel - streamType := StreamTypeRTP + isRTP := true if (channel % 2) != 0 { channel-- - streamType = StreamTypeRTCP + isRTP = false } // forward frame only if it has been set up if trackID, ok := sc.tcpSession.setuppedTracksByChannel[channel]; ok { - if sc.tcpFrameIsRecording { - if streamType == StreamTypeRTP { + if isRTP { + if sc.tcpFrameIsRecording { sc.tcpSession.announcedTracks[trackID].rtcpReceiver.ProcessPacketRTP( time.Now(), frame.Payload) - } else { - sc.tcpSession.announcedTracks[trackID].rtcpReceiver.ProcessPacketRTCP( - time.Now(), frame.Payload) - } - } - if streamType == StreamTypeRTP { - if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTP); ok { - h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ - Session: sc.tcpSession, - TrackID: trackID, - Payload: frame.Payload, - }) + if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTP); ok { + h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ + Session: sc.tcpSession, + TrackID: trackID, + Payload: frame.Payload, + }) + } } } else { if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTCP); ok { + if sc.tcpFrameIsRecording { + sc.tcpSession.announcedTracks[trackID].rtcpReceiver.ProcessPacketRTCP( + time.Now(), frame.Payload) + } + h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{ Session: sc.tcpSession, TrackID: trackID, diff --git a/serverudpl.go b/serverudpl.go index f5834b2b..4feeefb5 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -54,7 +54,7 @@ type serverUDPListener struct { wg sync.WaitGroup pc *net.UDPConn listenIP net.IP - streamType StreamType + isRTP bool writeTimeout time.Duration readBuf *multibuffer.MultiBuffer clientsMutex sync.RWMutex @@ -72,13 +72,13 @@ func newServerUDPListenerMulticastPair(s *Server) (*serverUDPListener, *serverUD ip := <-res rtpListener, err := newServerUDPListener(s, true, - ip.String()+":"+strconv.FormatInt(int64(s.MulticastRTPPort), 10), StreamTypeRTP) + ip.String()+":"+strconv.FormatInt(int64(s.MulticastRTPPort), 10), true) if err != nil { return nil, nil, err } rtcpListener, err := newServerUDPListener(s, true, - ip.String()+":"+strconv.FormatInt(int64(s.MulticastRTCPPort), 10), StreamTypeRTCP) + ip.String()+":"+strconv.FormatInt(int64(s.MulticastRTCPPort), 10), false) if err != nil { rtpListener.close() return nil, nil, err @@ -91,7 +91,7 @@ func newServerUDPListener( s *Server, multicast bool, address string, - streamType StreamType) (*serverUDPListener, error) { + isRTP bool) (*serverUDPListener, error) { var pc *net.UDPConn var listenIP net.IP if multicast { @@ -145,19 +145,18 @@ func newServerUDPListener( ctx, ctxCancel := context.WithCancel(context.Background()) u := &serverUDPListener{ - s: s, - ctx: ctx, - ctxCancel: ctxCancel, - pc: pc, - listenIP: listenIP, - clients: make(map[clientAddr]*clientData), + s: s, + ctx: ctx, + ctxCancel: ctxCancel, + pc: pc, + listenIP: listenIP, + clients: make(map[clientAddr]*clientData), + isRTP: isRTP, + writeTimeout: s.WriteTimeout, + readBuf: multibuffer.New(uint64(s.ReadBufferCount), uint64(s.ReadBufferSize)), + ringBuffer: ringbuffer.New(uint64(s.ReadBufferCount)), } - u.streamType = streamType - u.writeTimeout = s.WriteTimeout - u.readBuf = multibuffer.New(uint64(s.ReadBufferCount), uint64(s.ReadBufferSize)) - u.ringBuffer = ringbuffer.New(uint64(s.ReadBufferCount)) - u.wg.Add(1) go u.run() @@ -202,18 +201,14 @@ func (u *serverUDPListener) run() { return } + now := time.Now() if clientData.isPublishing { - now := time.Now() atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) - - if u.streamType == StreamTypeRTP { - clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n]) - } else { - clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n]) - } } - if u.streamType == StreamTypeRTP { + if u.isRTP { + clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n]) + if h, ok := u.s.Handler.(ServerHandlerOnPacketRTP); ok { h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ Session: clientData.ss, @@ -222,6 +217,10 @@ func (u *serverUDPListener) run() { }) } } else { + if clientData.isPublishing { + clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n]) + } + if h, ok := u.s.Handler.(ServerHandlerOnPacketRTCP); ok { h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{ Session: clientData.ss,