mirror of
				https://github.com/aler9/gortsplib
				synced 2025-10-31 18:42:40 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			349 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			349 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package gortsplib
 | |
| 
 | |
| import (
 | |
| 	"crypto/rand"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/pion/rtcp"
 | |
| 	"github.com/pion/rtp"
 | |
| 
 | |
| 	"github.com/bluenviron/gortsplib/v4/pkg/description"
 | |
| 	"github.com/bluenviron/gortsplib/v4/pkg/format"
 | |
| 	"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
 | |
| )
 | |
| 
 | |
| func serverStreamExtractExistingSSRCs(medias map[*description.Media]*serverStreamMedia) []uint32 {
 | |
| 	var ret []uint32
 | |
| 	for _, media := range medias {
 | |
| 		for _, forma := range media.formats {
 | |
| 			ret = append(ret, forma.localSSRC)
 | |
| 		}
 | |
| 	}
 | |
| 	return ret
 | |
| }
 | |
| 
 | |
| // NewServerStream allocates a ServerStream.
 | |
| //
 | |
| // Deprecated: replaced by ServerStream.Initialize().
 | |
| func NewServerStream(s *Server, desc *description.Session) *ServerStream {
 | |
| 	st := &ServerStream{
 | |
| 		Server: s,
 | |
| 		Desc:   desc,
 | |
| 	}
 | |
| 	err := st.Initialize()
 | |
| 	if err != nil {
 | |
| 		panic(err)
 | |
| 	}
 | |
| 	return st
 | |
| }
 | |
| 
 | |
| // ServerStream represents a data stream.
 | |
| // This is in charge of
 | |
| // - storing stream description and statistics
 | |
| // - distributing the stream to each reader
 | |
| // - allocating multicast listeners
 | |
| type ServerStream struct {
 | |
| 	Server *Server
 | |
| 	Desc   *description.Session
 | |
| 
 | |
| 	mutex                sync.RWMutex
 | |
| 	readers              map[*ServerSession]struct{}
 | |
| 	multicastReaderCount int
 | |
| 	activeUnicastReaders map[*ServerSession]struct{}
 | |
| 	medias               map[*description.Media]*serverStreamMedia
 | |
| 	closed               bool
 | |
| }
 | |
| 
 | |
| // Initialize initializes a ServerStream.
 | |
| func (st *ServerStream) Initialize() error {
 | |
| 	if st.Server == nil || st.Server.sessions == nil {
 | |
| 		return fmt.Errorf("server not present or not initialized")
 | |
| 	}
 | |
| 
 | |
| 	st.readers = make(map[*ServerSession]struct{})
 | |
| 	st.activeUnicastReaders = make(map[*ServerSession]struct{})
 | |
| 
 | |
| 	st.medias = make(map[*description.Media]*serverStreamMedia, len(st.Desc.Medias))
 | |
| 
 | |
| 	for i, medi := range st.Desc.Medias {
 | |
| 		localSSRCs, err := generateLocalSSRCs(
 | |
| 			serverStreamExtractExistingSSRCs(st.medias),
 | |
| 			medi.Formats,
 | |
| 		)
 | |
| 		if err != nil {
 | |
| 			for _, sm := range st.medias {
 | |
| 				sm.close()
 | |
| 			}
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		var srtpOutCtx *wrappedSRTPContext
 | |
| 
 | |
| 		if st.Server.TLSConfig != nil {
 | |
| 			srtpOutKey := make([]byte, srtpKeyLength)
 | |
| 			_, err = rand.Read(srtpOutKey)
 | |
| 			if err != nil {
 | |
| 				for _, sm := range st.medias {
 | |
| 					sm.close()
 | |
| 				}
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			srtpOutCtx = &wrappedSRTPContext{
 | |
| 				key:   srtpOutKey,
 | |
| 				ssrcs: ssrcsMapToList(localSSRCs),
 | |
| 			}
 | |
| 			err = srtpOutCtx.initialize()
 | |
| 			if err != nil {
 | |
| 				for _, sm := range st.medias {
 | |
| 					sm.close()
 | |
| 				}
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		sm := &serverStreamMedia{
 | |
| 			st:         st,
 | |
| 			media:      medi,
 | |
| 			trackID:    i,
 | |
| 			localSSRCs: localSSRCs,
 | |
| 			srtpOutCtx: srtpOutCtx,
 | |
| 		}
 | |
| 		sm.initialize()
 | |
| 
 | |
| 		st.medias[medi] = sm
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close closes a ServerStream.
 | |
| func (st *ServerStream) Close() {
 | |
| 	st.mutex.Lock()
 | |
| 	st.closed = true
 | |
| 	st.mutex.Unlock()
 | |
| 
 | |
| 	for ss := range st.readers {
 | |
| 		ss.Close()
 | |
| 	}
 | |
| 
 | |
| 	for _, sm := range st.medias {
 | |
| 		sm.close()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // BytesSent returns the number of written bytes.
 | |
| //
 | |
| // Deprecated: replaced by Stats()
 | |
| func (st *ServerStream) BytesSent() uint64 {
 | |
| 	v := uint64(0)
 | |
| 	for _, me := range st.medias {
 | |
| 		v += atomic.LoadUint64(me.bytesSent)
 | |
| 	}
 | |
| 	return v
 | |
| }
 | |
| 
 | |
| // Description returns the description of the stream.
 | |
| //
 | |
| // Deprecated: use ServerStream.Desc.
 | |
| func (st *ServerStream) Description() *description.Session {
 | |
| 	return st.Desc
 | |
| }
 | |
| 
 | |
| // Stats returns stream statistics.
 | |
| func (st *ServerStream) Stats() *ServerStreamStats {
 | |
| 	mediaStats := func() map[*description.Media]ServerStreamStatsMedia {
 | |
| 		ret := make(map[*description.Media]ServerStreamStatsMedia, len(st.medias))
 | |
| 
 | |
| 		for med, sm := range st.medias {
 | |
| 			ret[med] = ServerStreamStatsMedia{
 | |
| 				BytesSent:       atomic.LoadUint64(sm.bytesSent),
 | |
| 				RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
 | |
| 				Formats: func() map[format.Format]ServerStreamStatsFormat {
 | |
| 					ret := make(map[format.Format]ServerStreamStatsFormat)
 | |
| 
 | |
| 					for _, fo := range sm.formats {
 | |
| 						ret[fo.format] = ServerStreamStatsFormat{
 | |
| 							RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
 | |
| 							LocalSSRC:      fo.localSSRC,
 | |
| 						}
 | |
| 					}
 | |
| 
 | |
| 					return ret
 | |
| 				}(),
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		return ret
 | |
| 	}()
 | |
| 
 | |
| 	return &ServerStreamStats{
 | |
| 		BytesSent: func() uint64 {
 | |
| 			v := uint64(0)
 | |
| 			for _, ms := range mediaStats {
 | |
| 				v += ms.BytesSent
 | |
| 			}
 | |
| 			return v
 | |
| 		}(),
 | |
| 		RTPPacketsSent: func() uint64 {
 | |
| 			v := uint64(0)
 | |
| 			for _, ms := range mediaStats {
 | |
| 				for _, f := range ms.Formats {
 | |
| 					v += f.RTPPacketsSent
 | |
| 				}
 | |
| 			}
 | |
| 			return v
 | |
| 		}(),
 | |
| 		RTCPPacketsSent: func() uint64 {
 | |
| 			v := uint64(0)
 | |
| 			for _, ms := range mediaStats {
 | |
| 				v += ms.RTCPPacketsSent
 | |
| 			}
 | |
| 			return v
 | |
| 		}(),
 | |
| 		Medias: mediaStats,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (st *ServerStream) readerAdd(
 | |
| 	ss *ServerSession,
 | |
| 	clientPorts *[2]int,
 | |
| 	protocol TransportProtocol,
 | |
| ) error {
 | |
| 	st.mutex.Lock()
 | |
| 	defer st.mutex.Unlock()
 | |
| 
 | |
| 	if st.closed {
 | |
| 		return liberrors.ErrServerStreamClosed{}
 | |
| 	}
 | |
| 
 | |
| 	switch protocol {
 | |
| 	case TransportUDP:
 | |
| 		// check whether UDP ports and IP are already assigned to another reader
 | |
| 		for r := range st.readers {
 | |
| 			if protocol == TransportUDP &&
 | |
| 				r.author.ip().Equal(ss.author.ip()) &&
 | |
| 				r.author.zone() == ss.author.zone() {
 | |
| 				for _, rt := range r.setuppedMedias {
 | |
| 					if rt.udpRTPReadPort == clientPorts[0] {
 | |
| 						return liberrors.ErrServerUDPPortsAlreadyInUse{Port: rt.udpRTPReadPort}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 	case TransportUDPMulticast:
 | |
| 		if st.multicastReaderCount == 0 {
 | |
| 			for _, media := range st.medias {
 | |
| 				mw := &serverMulticastWriter{
 | |
| 					s: st.Server,
 | |
| 				}
 | |
| 				err := mw.initialize()
 | |
| 				if err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 				media.multicastWriter = mw
 | |
| 			}
 | |
| 		}
 | |
| 		st.multicastReaderCount++
 | |
| 	}
 | |
| 
 | |
| 	st.readers[ss] = struct{}{}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (st *ServerStream) readerRemove(ss *ServerSession) {
 | |
| 	st.mutex.Lock()
 | |
| 	defer st.mutex.Unlock()
 | |
| 
 | |
| 	if st.closed {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	delete(st.readers, ss)
 | |
| 
 | |
| 	if ss.setuppedTransport.Protocol == TransportUDPMulticast {
 | |
| 		st.multicastReaderCount--
 | |
| 		if st.multicastReaderCount == 0 {
 | |
| 			for _, media := range st.medias {
 | |
| 				media.multicastWriter.close()
 | |
| 				media.multicastWriter = nil
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (st *ServerStream) readerSetActive(ss *ServerSession) {
 | |
| 	st.mutex.Lock()
 | |
| 	defer st.mutex.Unlock()
 | |
| 
 | |
| 	if st.closed {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if ss.setuppedTransport.Protocol == TransportUDPMulticast {
 | |
| 		for medi, sm := range ss.setuppedMedias {
 | |
| 			streamMedia := st.medias[medi]
 | |
| 			streamMedia.multicastWriter.rtcpl.addClient(
 | |
| 				ss.author.ip(), streamMedia.multicastWriter.rtcpl.port(), sm.readPacketRTCPUDPPlay)
 | |
| 		}
 | |
| 	} else {
 | |
| 		st.activeUnicastReaders[ss] = struct{}{}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (st *ServerStream) readerSetInactive(ss *ServerSession) {
 | |
| 	st.mutex.Lock()
 | |
| 	defer st.mutex.Unlock()
 | |
| 
 | |
| 	if st.closed {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if ss.setuppedTransport.Protocol == TransportUDPMulticast {
 | |
| 		for medi := range ss.setuppedMedias {
 | |
| 			streamMedia := st.medias[medi]
 | |
| 			streamMedia.multicastWriter.rtcpl.removeClient(ss.author.ip(), streamMedia.multicastWriter.rtcpl.port())
 | |
| 		}
 | |
| 	} else {
 | |
| 		delete(st.activeUnicastReaders, ss)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // WritePacketRTP writes a RTP packet to all the readers of the stream.
 | |
| func (st *ServerStream) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error {
 | |
| 	return st.WritePacketRTPWithNTP(medi, pkt, st.Server.timeNow())
 | |
| }
 | |
| 
 | |
| // WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream.
 | |
| // ntp is the absolute timestamp of the packet, and is sent with periodic RTCP sender reports.
 | |
| func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error {
 | |
| 	st.mutex.RLock()
 | |
| 	defer st.mutex.RUnlock()
 | |
| 
 | |
| 	if st.closed {
 | |
| 		return liberrors.ErrServerStreamClosed{}
 | |
| 	}
 | |
| 
 | |
| 	sm := st.medias[medi]
 | |
| 	sf := sm.formats[pkt.PayloadType]
 | |
| 	return sf.writePacketRTP(pkt, ntp)
 | |
| }
 | |
| 
 | |
| // WritePacketRTCP writes a RTCP packet to all the readers of the stream.
 | |
| func (st *ServerStream) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error {
 | |
| 	st.mutex.RLock()
 | |
| 	defer st.mutex.RUnlock()
 | |
| 
 | |
| 	if st.closed {
 | |
| 		return liberrors.ErrServerStreamClosed{}
 | |
| 	}
 | |
| 
 | |
| 	sm := st.medias[medi]
 | |
| 	return sm.writePacketRTCP(pkt)
 | |
| }
 | 
