server: move serverMulticastHandler inside serverStreamTrack

This commit is contained in:
aler9
2022-10-28 14:56:03 +02:00
parent 0b75c240c7
commit c51fddc784
2 changed files with 58 additions and 73 deletions

View File

@@ -771,7 +771,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
th.Delivery = &de th.Delivery = &de
v := uint(127) v := uint(127)
th.TTL = &v th.TTL = &v
d := stream.serverMulticastHandlers[trackID].ip() d := stream.streamTracks[trackID].multicastHandler.ip()
th.Destination = &d th.Destination = &d
th.Ports = &[2]int{ss.s.MulticastRTPPort, ss.s.MulticastRTCPPort} th.Ports = &[2]int{ss.s.MulticastRTPPort, ss.s.MulticastRTCPPort}

View File

@@ -18,23 +18,23 @@ type serverStreamTrack struct {
lastSSRC uint32 lastSSRC uint32
lastTimeRTP uint32 lastTimeRTP uint32
lastTimeNTP time.Time lastTimeNTP time.Time
udpRTCPSender *rtcpsender.RTCPSender rtcpSender *rtcpsender.RTCPSender
multicastHandler *serverMulticastHandler
} }
// ServerStream represents a data stream. // ServerStream represents a data stream.
// This is in charge of // This is in charge of
// - distributing the stream to each reader // - distributing the stream to each reader
// - allocating multicast listeners // - allocating multicast listeners
// - gathering infos about the stream to generate SSRC and RTP-Info // - gathering infos about the stream in order to generate SSRC and RTP-Info
type ServerStream struct { type ServerStream struct {
tracks Tracks tracks Tracks
mutex sync.RWMutex mutex sync.RWMutex
s *Server s *Server
activeUnicastReaders map[*ServerSession]struct{} activeUnicastReaders map[*ServerSession]struct{}
readers map[*ServerSession]struct{} readers map[*ServerSession]struct{}
serverMulticastHandlers []*serverMulticastHandler streamTracks []*serverStreamTrack
ssTracks []*serverStreamTrack
} }
// NewServerStream allocates a ServerStream. // NewServerStream allocates a ServerStream.
@@ -48,9 +48,9 @@ func NewServerStream(tracks Tracks) *ServerStream {
readers: make(map[*ServerSession]struct{}), readers: make(map[*ServerSession]struct{}),
} }
st.ssTracks = make([]*serverStreamTrack, len(tracks)) st.streamTracks = make([]*serverStreamTrack, len(tracks))
for i := range st.ssTracks { for i := range st.streamTracks {
st.ssTracks[i] = &serverStreamTrack{} st.streamTracks[i] = &serverStreamTrack{}
} }
return st return st
@@ -65,15 +65,13 @@ func (st *ServerStream) Close() error {
ss.Close() ss.Close()
} }
if st.serverMulticastHandlers != nil { for _, track := range st.streamTracks {
for _, h := range st.serverMulticastHandlers { if track.multicastHandler != nil {
h.close() track.multicastHandler.close()
} }
st.serverMulticastHandlers = nil
} }
st.readers = nil st.readers = nil
st.activeUnicastReaders = nil
return nil return nil
} }
@@ -86,14 +84,14 @@ func (st *ServerStream) Tracks() Tracks {
func (st *ServerStream) ssrc(trackID int) uint32 { func (st *ServerStream) ssrc(trackID int) uint32 {
st.mutex.Lock() st.mutex.Lock()
defer st.mutex.Unlock() defer st.mutex.Unlock()
return st.ssTracks[trackID].lastSSRC return st.streamTracks[trackID].lastSSRC
} }
func (st *ServerStream) rtpInfo(trackID int, now time.Time) (uint16, uint32, bool) { func (st *ServerStream) rtpInfo(trackID int, now time.Time) (uint16, uint32, bool) {
st.mutex.Lock() st.mutex.Lock()
defer st.mutex.Unlock() defer st.mutex.Unlock()
track := st.ssTracks[trackID] track := st.streamTracks[trackID]
if !track.firstPacketSent { if !track.firstPacketSent {
return 0, 0, false return 0, 0, false
@@ -125,9 +123,8 @@ func (st *ServerStream) readerAdd(
return fmt.Errorf("stream is closed") return fmt.Errorf("stream is closed")
} }
switch transport { // check whether UDP ports are already in use by another reader
case TransportUDP: if transport == TransportUDP {
// check if client ports are already in use by another reader
for r := range st.readers { for r := range st.readers {
if *r.setuppedTransport == TransportUDP && if *r.setuppedTransport == TransportUDP &&
r.author.ip().Equal(ss.author.ip()) && r.author.ip().Equal(ss.author.ip()) &&
@@ -139,33 +136,12 @@ func (st *ServerStream) readerAdd(
} }
} }
} }
case TransportUDPMulticast:
// allocate multicast listeners
if st.serverMulticastHandlers == nil {
st.serverMulticastHandlers = make([]*serverMulticastHandler, len(st.tracks))
for i := range st.tracks {
h, err := newServerMulticastHandler(st.s)
if err != nil {
for _, h := range st.serverMulticastHandlers {
if h != nil {
h.close()
}
}
st.serverMulticastHandlers = nil
return err
}
st.serverMulticastHandlers[i] = h
}
}
} }
if st.s == nil { if st.s == nil {
st.s = ss.s st.s = ss.s
for trackID, track := range st.ssTracks { for trackID, track := range st.streamTracks {
cTrackID := trackID cTrackID := trackID
// always generate RTCP sender reports. // always generate RTCP sender reports.
@@ -173,7 +149,7 @@ func (st *ServerStream) readerAdd(
// they're also needed when transport protocol is TCP and client is Nvidia Deepstream // they're also needed when transport protocol is TCP and client is Nvidia Deepstream
// since they're used to compute NTP timestamp of frames: // since they're used to compute NTP timestamp of frames:
// https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_NTP_Timestamp.html // https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_NTP_Timestamp.html
track.udpRTCPSender = rtcpsender.New( track.rtcpSender = rtcpsender.New(
st.s.udpSenderReportPeriod, st.s.udpSenderReportPeriod,
st.tracks[trackID].ClockRate(), st.tracks[trackID].ClockRate(),
func(pkt rtcp.Packet) { func(pkt rtcp.Packet) {
@@ -183,6 +159,19 @@ func (st *ServerStream) readerAdd(
} }
} }
// allocate multicast listeners
if transport == TransportUDPMulticast {
for _, track := range st.streamTracks {
if track.multicastHandler == nil {
mh, err := newServerMulticastHandler(st.s)
if err != nil {
return err
}
track.multicastHandler = mh
}
}
}
st.readers[ss] = struct{}{} st.readers[ss] = struct{}{}
return nil return nil
@@ -194,12 +183,13 @@ func (st *ServerStream) readerRemove(ss *ServerSession) {
delete(st.readers, ss) delete(st.readers, ss)
if len(st.readers) == 0 && st.serverMulticastHandlers != nil { if len(st.readers) == 0 {
for _, l := range st.serverMulticastHandlers { for _, track := range st.streamTracks {
l.rtpl.close() if track.multicastHandler != nil {
l.rtcpl.close() track.multicastHandler.close()
track.multicastHandler = nil
}
} }
st.serverMulticastHandlers = nil
} }
} }
@@ -207,15 +197,13 @@ func (st *ServerStream) readerSetActive(ss *ServerSession) {
st.mutex.Lock() st.mutex.Lock()
defer st.mutex.Unlock() defer st.mutex.Unlock()
switch *ss.setuppedTransport { if *ss.setuppedTransport == TransportUDPMulticast {
case TransportUDP, TransportTCP:
st.activeUnicastReaders[ss] = struct{}{}
default: // UDPMulticast
for trackID, track := range ss.setuppedTracks { for trackID, track := range ss.setuppedTracks {
st.serverMulticastHandlers[trackID].rtcpl.addClient( st.streamTracks[trackID].multicastHandler.rtcpl.addClient(
ss.author.ip(), st.serverMulticastHandlers[trackID].rtcpl.port(), ss, track, false) ss.author.ip(), st.streamTracks[trackID].multicastHandler.rtcpl.port(), ss, track, false)
} }
} else {
st.activeUnicastReaders[ss] = struct{}{}
} }
} }
@@ -223,16 +211,12 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
st.mutex.Lock() st.mutex.Lock()
defer st.mutex.Unlock() defer st.mutex.Unlock()
switch *ss.setuppedTransport { if *ss.setuppedTransport == TransportUDPMulticast {
case TransportUDP, TransportTCP: for trackID := range ss.setuppedTracks {
delete(st.activeUnicastReaders, ss) st.streamTracks[trackID].multicastHandler.rtcpl.removeClient(ss)
default: // UDPMulticast
if st.serverMulticastHandlers != nil {
for trackID := range ss.setuppedTracks {
st.serverMulticastHandlers[trackID].rtcpl.removeClient(ss)
}
} }
} else {
delete(st.activeUnicastReaders, ss)
} }
} }
@@ -248,7 +232,7 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDT
st.mutex.RLock() st.mutex.RLock()
defer st.mutex.RUnlock() defer st.mutex.RUnlock()
track := st.ssTracks[trackID] track := st.streamTracks[trackID]
now := time.Now() now := time.Now()
if !track.firstPacketSent || ptsEqualsDTS { if !track.firstPacketSent || ptsEqualsDTS {
@@ -260,8 +244,8 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDT
track.lastSequenceNumber = pkt.Header.SequenceNumber track.lastSequenceNumber = pkt.Header.SequenceNumber
track.lastSSRC = pkt.Header.SSRC track.lastSSRC = pkt.Header.SSRC
if track.udpRTCPSender != nil { if track.rtcpSender != nil {
track.udpRTCPSender.ProcessPacketRTP(now, pkt, ptsEqualsDTS) track.rtcpSender.ProcessPacketRTP(now, pkt, ptsEqualsDTS)
} }
// send unicast // send unicast
@@ -270,8 +254,8 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDT
} }
// send multicast // send multicast
if st.serverMulticastHandlers != nil { if track.multicastHandler != nil {
st.serverMulticastHandlers[trackID].writePacketRTP(byts) track.multicastHandler.writePacketRTP(byts)
} }
} }
@@ -291,7 +275,8 @@ func (st *ServerStream) WritePacketRTCP(trackID int, pkt rtcp.Packet) {
} }
// send multicast // send multicast
if st.serverMulticastHandlers != nil { track := st.streamTracks[trackID]
st.serverMulticastHandlers[trackID].writePacketRTCP(byts) if track.multicastHandler != nil {
track.multicastHandler.writePacketRTCP(byts)
} }
} }