mirror of
				https://github.com/aler9/gortsplib
				synced 2025-11-01 02:52:36 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			195 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			195 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package gortsplib
 | |
| 
 | |
| import (
 | |
| 	"math/rand"
 | |
| 	"net"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"golang.org/x/net/ipv4"
 | |
| 
 | |
| 	"github.com/aler9/gortsplib/pkg/multibuffer"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// use the same buffer size as gstreamer's rtspsrc
 | |
| 	clientConnUDPKernelReadBufferSize = 0x80000
 | |
| )
 | |
| 
 | |
| type clientConnUDPListener struct {
 | |
| 	cc            *ClientConn
 | |
| 	pc            *net.UDPConn
 | |
| 	remoteReadIP  net.IP
 | |
| 	remoteWriteIP net.IP
 | |
| 	remoteZone    string
 | |
| 	remotePort    int
 | |
| 	trackID       int
 | |
| 	streamType    StreamType
 | |
| 	running       bool
 | |
| 	frameBuffer   *multibuffer.MultiBuffer
 | |
| 	lastFrameTime *int64
 | |
| 	writeMutex    sync.Mutex
 | |
| 
 | |
| 	// out
 | |
| 	done chan struct{}
 | |
| }
 | |
| 
 | |
| func newClientConnUDPListenerPair(cc *ClientConn) (*clientConnUDPListener, *clientConnUDPListener) {
 | |
| 	// choose two consecutive ports in range 65535-10000
 | |
| 	// rtp must be even and rtcp odd
 | |
| 	for {
 | |
| 		rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000
 | |
| 		rtpListener, err := newClientConnUDPListener(cc, false, ":"+strconv.FormatInt(int64(rtpPort), 10))
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		rtcpPort := rtpPort + 1
 | |
| 		rtcpListener, err := newClientConnUDPListener(cc, false, ":"+strconv.FormatInt(int64(rtcpPort), 10))
 | |
| 		if err != nil {
 | |
| 			rtpListener.close()
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		return rtpListener, rtcpListener
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func newClientConnUDPListener(cc *ClientConn, multicast bool, address string) (*clientConnUDPListener, error) {
 | |
| 	var pc *net.UDPConn
 | |
| 	if multicast {
 | |
| 		host, port, err := net.SplitHostPort(address)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		tmp, err := cc.c.ListenPacket("udp", "224.0.0.0:"+port)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		p := ipv4.NewPacketConn(tmp)
 | |
| 
 | |
| 		err = p.SetTTL(127)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		intfs, err := net.Interfaces()
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		for _, intf := range intfs {
 | |
| 			err := p.JoinGroup(&intf, &net.UDPAddr{IP: net.ParseIP(host)})
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		pc = tmp.(*net.UDPConn)
 | |
| 	} else {
 | |
| 		tmp, err := cc.c.ListenPacket("udp", address)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		pc = tmp.(*net.UDPConn)
 | |
| 	}
 | |
| 
 | |
| 	err := pc.SetReadBuffer(clientConnUDPKernelReadBufferSize)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &clientConnUDPListener{
 | |
| 		cc:          cc,
 | |
| 		pc:          pc,
 | |
| 		frameBuffer: multibuffer.New(uint64(cc.c.ReadBufferCount), uint64(cc.c.ReadBufferSize)),
 | |
| 		lastFrameTime: func() *int64 {
 | |
| 			v := int64(0)
 | |
| 			return &v
 | |
| 		}(),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (l *clientConnUDPListener) close() {
 | |
| 	if l.running {
 | |
| 		l.stop()
 | |
| 	}
 | |
| 	l.pc.Close()
 | |
| }
 | |
| 
 | |
| func (l *clientConnUDPListener) port() int {
 | |
| 	return l.pc.LocalAddr().(*net.UDPAddr).Port
 | |
| }
 | |
| 
 | |
| func (l *clientConnUDPListener) start() {
 | |
| 	l.running = true
 | |
| 	l.pc.SetReadDeadline(time.Time{})
 | |
| 	l.done = make(chan struct{})
 | |
| 	go l.run()
 | |
| }
 | |
| 
 | |
| func (l *clientConnUDPListener) stop() {
 | |
| 	l.pc.SetReadDeadline(time.Now())
 | |
| 	<-l.done
 | |
| }
 | |
| 
 | |
| func (l *clientConnUDPListener) run() {
 | |
| 	defer close(l.done)
 | |
| 
 | |
| 	if l.cc.state == clientConnStatePlay {
 | |
| 		for {
 | |
| 			buf := l.frameBuffer.Next()
 | |
| 			n, addr, err := l.pc.ReadFrom(buf)
 | |
| 			if err != nil {
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			uaddr := addr.(*net.UDPAddr)
 | |
| 
 | |
| 			if !l.remoteReadIP.Equal(uaddr.IP) || (!isAnyPort(l.remotePort) && l.remotePort != uaddr.Port) {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			now := time.Now()
 | |
| 			atomic.StoreInt64(l.lastFrameTime, now.Unix())
 | |
| 			l.cc.tracks[l.trackID].rtcpReceiver.ProcessFrame(now, l.streamType, buf[:n])
 | |
| 			l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n])
 | |
| 		}
 | |
| 	} else { // record
 | |
| 		for {
 | |
| 			buf := l.frameBuffer.Next()
 | |
| 			n, addr, err := l.pc.ReadFrom(buf)
 | |
| 			if err != nil {
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			uaddr := addr.(*net.UDPAddr)
 | |
| 
 | |
| 			if !l.remoteReadIP.Equal(uaddr.IP) || (!isAnyPort(l.remotePort) && l.remotePort != uaddr.Port) {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			now := time.Now()
 | |
| 			atomic.StoreInt64(l.lastFrameTime, now.Unix())
 | |
| 			l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n])
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *clientConnUDPListener) write(buf []byte) error {
 | |
| 	l.writeMutex.Lock()
 | |
| 	defer l.writeMutex.Unlock()
 | |
| 
 | |
| 	l.pc.SetWriteDeadline(time.Now().Add(l.cc.c.WriteTimeout))
 | |
| 	_, err := l.pc.WriteTo(buf, &net.UDPAddr{
 | |
| 		IP:   l.remoteWriteIP,
 | |
| 		Zone: l.remoteZone,
 | |
| 		Port: l.remotePort,
 | |
| 	})
 | |
| 	return err
 | |
| }
 | 
