mirror of
				https://github.com/aler9/gortsplib
				synced 2025-10-31 18:42:40 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			191 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			191 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package gortsplib
 | |
| 
 | |
| import (
 | |
| 	"log"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/pion/rtcp"
 | |
| 	"github.com/pion/rtp"
 | |
| 
 | |
| 	"github.com/bluenviron/gortsplib/v4/pkg/format"
 | |
| 	"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
 | |
| 	"github.com/bluenviron/gortsplib/v4/pkg/rtpreceiver"
 | |
| )
 | |
| 
 | |
| type serverSessionFormat struct {
 | |
| 	sm          *serverSessionMedia
 | |
| 	format      format.Format
 | |
| 	localSSRC   uint32
 | |
| 	onPacketRTP OnPacketRTPFunc
 | |
| 
 | |
| 	rtpReceiver           *rtpreceiver.Receiver
 | |
| 	writePacketRTPInQueue func([]byte) error
 | |
| 	rtpPacketsReceived    *uint64
 | |
| 	rtpPacketsSent        *uint64
 | |
| 	rtpPacketsLost        *uint64
 | |
| }
 | |
| 
 | |
| func (sf *serverSessionFormat) initialize() {
 | |
| 	sf.rtpPacketsReceived = new(uint64)
 | |
| 	sf.rtpPacketsSent = new(uint64)
 | |
| 	sf.rtpPacketsLost = new(uint64)
 | |
| 
 | |
| 	udp := sf.sm.ss.setuppedTransport.Protocol == TransportUDP ||
 | |
| 		sf.sm.ss.setuppedTransport.Protocol == TransportUDPMulticast
 | |
| 
 | |
| 	if udp {
 | |
| 		sf.writePacketRTPInQueue = sf.writePacketRTPInQueueUDP
 | |
| 	} else {
 | |
| 		sf.writePacketRTPInQueue = sf.writePacketRTPInQueueTCP
 | |
| 	}
 | |
| 
 | |
| 	if sf.sm.ss.state == ServerSessionStatePreRecord || sf.sm.media.IsBackChannel {
 | |
| 		sf.rtpReceiver = &rtpreceiver.Receiver{
 | |
| 			ClockRate:            sf.format.ClockRate(),
 | |
| 			LocalSSRC:            &sf.localSSRC,
 | |
| 			UnrealiableTransport: udp,
 | |
| 			Period:               sf.sm.ss.s.receiverReportPeriod,
 | |
| 			TimeNow:              sf.sm.ss.s.timeNow,
 | |
| 			WritePacketRTCP: func(pkt rtcp.Packet) {
 | |
| 				if udp {
 | |
| 					sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck
 | |
| 				}
 | |
| 			},
 | |
| 		}
 | |
| 		err := sf.rtpReceiver.Initialize()
 | |
| 		if err != nil {
 | |
| 			panic(err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (sf *serverSessionFormat) close() {
 | |
| 	if sf.rtpReceiver != nil {
 | |
| 		sf.rtpReceiver.Close()
 | |
| 		sf.rtpReceiver = nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (sf *serverSessionFormat) remoteSSRC() (uint32, bool) {
 | |
| 	if sf.rtpReceiver != nil {
 | |
| 		stats := sf.rtpReceiver.Stats()
 | |
| 		if stats != nil {
 | |
| 			return stats.RemoteSSRC, true
 | |
| 		}
 | |
| 	}
 | |
| 	return 0, false
 | |
| }
 | |
| 
 | |
| func (sf *serverSessionFormat) readPacketRTP(pkt *rtp.Packet, now time.Time) {
 | |
| 	pkts, lost, err := sf.rtpReceiver.ProcessPacket2(pkt, now, sf.format.PTSEqualsDTS(pkt))
 | |
| 	if err != nil {
 | |
| 		sf.sm.onPacketRTPDecodeError(err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if lost != 0 {
 | |
| 		atomic.AddUint64(sf.rtpPacketsLost, lost)
 | |
| 
 | |
| 		if h, ok := sf.sm.ss.s.Handler.(ServerHandlerOnPacketsLost); ok {
 | |
| 			h.OnPacketsLost(&ServerHandlerOnPacketsLostCtx{
 | |
| 				Session: sf.sm.ss,
 | |
| 				Lost:    lost,
 | |
| 			})
 | |
| 		} else if h, ok2 := sf.sm.ss.s.Handler.(ServerHandlerOnPacketLost); ok2 {
 | |
| 			h.OnPacketLost(&ServerHandlerOnPacketLostCtx{
 | |
| 				Session: sf.sm.ss,
 | |
| 				Error:   liberrors.ErrServerRTPPacketsLost{Lost: uint(lost)}, //nolint:staticcheck
 | |
| 			})
 | |
| 		} else {
 | |
| 			log.Printf("%d RTP %s lost",
 | |
| 				lost,
 | |
| 				func() string {
 | |
| 					if lost == 1 {
 | |
| 						return "packet"
 | |
| 					}
 | |
| 					return "packets"
 | |
| 				}())
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	atomic.AddUint64(sf.rtpPacketsReceived, uint64(len(pkts)))
 | |
| 
 | |
| 	for _, pkt := range pkts {
 | |
| 		sf.onPacketRTP(pkt)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (sf *serverSessionFormat) writePacketRTP(pkt *rtp.Packet) error {
 | |
| 	pkt.SSRC = sf.localSSRC
 | |
| 
 | |
| 	maxPlainPacketSize := sf.sm.ss.s.MaxPacketSize
 | |
| 	if isSecure(sf.sm.ss.setuppedTransport.Profile) {
 | |
| 		maxPlainPacketSize -= srtpOverhead
 | |
| 	}
 | |
| 
 | |
| 	plain := make([]byte, maxPlainPacketSize)
 | |
| 	n, err := pkt.MarshalTo(plain)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	plain = plain[:n]
 | |
| 
 | |
| 	var encr []byte
 | |
| 	if isSecure(sf.sm.ss.setuppedTransport.Profile) {
 | |
| 		encr = make([]byte, sf.sm.ss.s.MaxPacketSize)
 | |
| 		encr, err = sf.sm.srtpOutCtx.encryptRTP(encr, plain, &pkt.Header)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if isSecure(sf.sm.ss.setuppedTransport.Profile) {
 | |
| 		return sf.writePacketRTPEncoded(encr)
 | |
| 	}
 | |
| 	return sf.writePacketRTPEncoded(plain)
 | |
| }
 | |
| 
 | |
| func (sf *serverSessionFormat) writePacketRTPEncoded(payload []byte) error {
 | |
| 	sf.sm.ss.writerMutex.RLock()
 | |
| 	defer sf.sm.ss.writerMutex.RUnlock()
 | |
| 
 | |
| 	if sf.sm.ss.writer == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	ok := sf.sm.ss.writer.Push(func() error {
 | |
| 		return sf.writePacketRTPInQueue(payload)
 | |
| 	})
 | |
| 	if !ok {
 | |
| 		return liberrors.ErrServerWriteQueueFull{}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sf *serverSessionFormat) writePacketRTPInQueueUDP(payload []byte) error {
 | |
| 	err := sf.sm.ss.s.udpRTPListener.write(payload, sf.sm.udpRTPWriteAddr)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload)))
 | |
| 	atomic.AddUint64(sf.rtpPacketsSent, 1)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sf *serverSessionFormat) writePacketRTPInQueueTCP(payload []byte) error {
 | |
| 	sf.sm.ss.tcpFrame.Channel = sf.sm.tcpChannel
 | |
| 	sf.sm.ss.tcpFrame.Payload = payload
 | |
| 	sf.sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sf.sm.ss.s.WriteTimeout))
 | |
| 	err := sf.sm.ss.tcpConn.reader.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload)))
 | |
| 	atomic.AddUint64(sf.rtpPacketsSent, 1)
 | |
| 	return nil
 | |
| }
 | 
