mirror of
https://github.com/aler9/gortsplib
synced 2025-10-04 23:02:45 +08:00
267 lines
5.9 KiB
Go
267 lines
5.9 KiB
Go
package gortsplib
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/rtp"
|
|
|
|
"github.com/aler9/gortsplib/pkg/liberrors"
|
|
)
|
|
|
|
type trackInfo struct {
|
|
lastSequenceNumber uint32
|
|
lastTimeRTP uint32
|
|
lastTimeNTP int64
|
|
lastSSRC uint32
|
|
}
|
|
|
|
// ServerStream represents a single stream.
|
|
// This is in charge of
|
|
// - distributing the stream to each reader
|
|
// - allocating multicast listeners
|
|
// - gathering infos about the stream to generate SSRC and RTP-Info
|
|
type ServerStream struct {
|
|
s *Server
|
|
tracks Tracks
|
|
|
|
mutex sync.RWMutex
|
|
readersUnicast map[*ServerSession]struct{}
|
|
readers map[*ServerSession]struct{}
|
|
serverMulticastHandlers []*serverMulticastHandler
|
|
trackInfos []*trackInfo
|
|
}
|
|
|
|
// NewServerStream allocates a ServerStream.
|
|
func NewServerStream(tracks Tracks) *ServerStream {
|
|
tracks = tracks.clone()
|
|
tracks.setControls()
|
|
|
|
st := &ServerStream{
|
|
tracks: tracks,
|
|
readersUnicast: make(map[*ServerSession]struct{}),
|
|
readers: make(map[*ServerSession]struct{}),
|
|
}
|
|
|
|
st.trackInfos = make([]*trackInfo, len(tracks))
|
|
for i := range st.trackInfos {
|
|
st.trackInfos[i] = &trackInfo{}
|
|
}
|
|
|
|
return st
|
|
}
|
|
|
|
// Close closes a ServerStream.
|
|
func (st *ServerStream) Close() error {
|
|
st.mutex.Lock()
|
|
defer st.mutex.Unlock()
|
|
|
|
if st.s != nil {
|
|
select {
|
|
case st.s.streamRemove <- st:
|
|
case <-st.s.ctx.Done():
|
|
}
|
|
}
|
|
|
|
for ss := range st.readers {
|
|
ss.Close()
|
|
}
|
|
|
|
if st.serverMulticastHandlers != nil {
|
|
for _, h := range st.serverMulticastHandlers {
|
|
h.close()
|
|
}
|
|
st.serverMulticastHandlers = nil
|
|
}
|
|
|
|
st.readers = nil
|
|
st.readersUnicast = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
// Tracks returns the tracks of the stream.
|
|
func (st *ServerStream) Tracks() Tracks {
|
|
return st.tracks
|
|
}
|
|
|
|
func (st *ServerStream) ssrc(trackID int) uint32 {
|
|
return atomic.LoadUint32(&st.trackInfos[trackID].lastSSRC)
|
|
}
|
|
|
|
func (st *ServerStream) timestamp(trackID int) uint32 {
|
|
lastTimeRTP := atomic.LoadUint32(&st.trackInfos[trackID].lastTimeRTP)
|
|
lastTimeNTP := atomic.LoadInt64(&st.trackInfos[trackID].lastTimeNTP)
|
|
|
|
if lastTimeRTP == 0 || lastTimeNTP == 0 {
|
|
return 0
|
|
}
|
|
|
|
return uint32(uint64(lastTimeRTP) +
|
|
uint64(time.Since(time.Unix(lastTimeNTP, 0)).Seconds()*float64(st.tracks[trackID].ClockRate())))
|
|
}
|
|
|
|
func (st *ServerStream) lastSequenceNumber(trackID int) uint16 {
|
|
return uint16(atomic.LoadUint32(&st.trackInfos[trackID].lastSequenceNumber))
|
|
}
|
|
|
|
func (st *ServerStream) readerAdd(
|
|
ss *ServerSession,
|
|
transport Transport,
|
|
clientPorts *[2]int,
|
|
) error {
|
|
st.mutex.Lock()
|
|
defer st.mutex.Unlock()
|
|
|
|
if st.s == nil {
|
|
st.s = ss.s
|
|
select {
|
|
case st.s.streamAdd <- st:
|
|
case <-st.s.ctx.Done():
|
|
}
|
|
}
|
|
|
|
switch transport {
|
|
case TransportUDP:
|
|
// check whether client ports are already in use by another reader.
|
|
for r := range st.readersUnicast {
|
|
if *r.setuppedTransport == TransportUDP &&
|
|
r.author.ip().Equal(ss.author.ip()) &&
|
|
r.author.zone() == ss.author.zone() {
|
|
for _, rt := range r.setuppedTracks {
|
|
if rt.udpRTPPort == clientPorts[0] {
|
|
return liberrors.ErrServerUDPPortsAlreadyInUse{Port: rt.udpRTPPort}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
case TransportUDPMulticast:
|
|
// allocate multicast listeners
|
|
if st.serverMulticastHandlers == nil {
|
|
st.serverMulticastHandlers = make([]*serverMulticastHandler, len(st.tracks))
|
|
|
|
for i := range st.tracks {
|
|
h, err := newServerMulticastHandler(st.s)
|
|
if err != nil {
|
|
for _, h := range st.serverMulticastHandlers {
|
|
if h != nil {
|
|
h.close()
|
|
}
|
|
}
|
|
st.serverMulticastHandlers = nil
|
|
return err
|
|
}
|
|
|
|
st.serverMulticastHandlers[i] = h
|
|
}
|
|
}
|
|
}
|
|
|
|
st.readers[ss] = struct{}{}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (st *ServerStream) readerRemove(ss *ServerSession) {
|
|
st.mutex.Lock()
|
|
defer st.mutex.Unlock()
|
|
|
|
delete(st.readers, ss)
|
|
|
|
if len(st.readers) == 0 && st.serverMulticastHandlers != nil {
|
|
for _, l := range st.serverMulticastHandlers {
|
|
l.rtpl.close()
|
|
l.rtcpl.close()
|
|
}
|
|
st.serverMulticastHandlers = nil
|
|
}
|
|
}
|
|
|
|
func (st *ServerStream) readerSetActive(ss *ServerSession) {
|
|
st.mutex.Lock()
|
|
defer st.mutex.Unlock()
|
|
|
|
switch *ss.setuppedTransport {
|
|
case TransportUDP, TransportTCP:
|
|
st.readersUnicast[ss] = struct{}{}
|
|
|
|
default: // UDPMulticast
|
|
for trackID := range ss.setuppedTracks {
|
|
st.serverMulticastHandlers[trackID].rtcpl.addClient(
|
|
ss.author.ip(), st.serverMulticastHandlers[trackID].rtcpl.port(), ss, trackID, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (st *ServerStream) readerSetInactive(ss *ServerSession) {
|
|
st.mutex.Lock()
|
|
defer st.mutex.Unlock()
|
|
|
|
switch *ss.setuppedTransport {
|
|
case TransportUDP, TransportTCP:
|
|
delete(st.readersUnicast, ss)
|
|
|
|
default: // UDPMulticast
|
|
if st.serverMulticastHandlers != nil {
|
|
for trackID := range ss.setuppedTracks {
|
|
st.serverMulticastHandlers[trackID].rtcpl.removeClient(ss)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// WritePacketRTP writes a RTP packet to all the readers of the stream.
|
|
func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet) {
|
|
byts, err := pkt.Marshal()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
track := st.trackInfos[trackID]
|
|
|
|
atomic.StoreUint32(&track.lastSequenceNumber,
|
|
uint32(pkt.Header.SequenceNumber))
|
|
|
|
atomic.StoreUint32(&track.lastTimeRTP, pkt.Header.Timestamp)
|
|
atomic.StoreInt64(&track.lastTimeNTP, time.Now().Unix())
|
|
|
|
atomic.StoreUint32(&track.lastSSRC, pkt.Header.SSRC)
|
|
|
|
st.mutex.RLock()
|
|
defer st.mutex.RUnlock()
|
|
|
|
// send unicast
|
|
for r := range st.readersUnicast {
|
|
r.writePacketRTP(trackID, byts)
|
|
}
|
|
|
|
// send multicast
|
|
if st.serverMulticastHandlers != nil {
|
|
st.serverMulticastHandlers[trackID].writePacketRTP(byts)
|
|
}
|
|
}
|
|
|
|
// WritePacketRTCP writes a RTCP packet to all the readers of the stream.
|
|
func (st *ServerStream) WritePacketRTCP(trackID int, pkt rtcp.Packet) {
|
|
byts, err := pkt.Marshal()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
st.mutex.RLock()
|
|
defer st.mutex.RUnlock()
|
|
|
|
// send unicast
|
|
for r := range st.readersUnicast {
|
|
r.writePacketRTCP(trackID, byts)
|
|
}
|
|
|
|
// send multicast
|
|
if st.serverMulticastHandlers != nil {
|
|
st.serverMulticastHandlers[trackID].writePacketRTCP(byts)
|
|
}
|
|
}
|